dict-server/app/api/search_dict/service.py

393 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import re
from typing import List, Tuple, Dict, Literal, Type, Any
from fastapi import HTTPException
from redis.asyncio import Redis
from tortoise import Tortoise, Model
from tortoise.expressions import Q
from app.models import KangjiMapping
from app.utils.all_kana import all_in_kana
from settings import TORTOISE_ORM
async def search_time_updates(redis: Redis) -> None:
key = "search_time"
await redis.incr(key, 1)
async def detect_language(text: str) -> Tuple[str, str, bool]:
"""
自动检测输入语言:
- zh: 简体中文
- jp: 日语(含假名或旧字体)
- fr: 拉丁字母(法语等)
- other: 其他
返回:
(映射或原文本, 语言代码, 是否为“含汉字且命中映射表”的情况)
"""
JAPANESE_HIRAGANA = r"[\u3040-\u309F]"
JAPANESE_KATAKANA = r"[\u30A0-\u30FF\u31F0-\u31FF]"
text = text.strip()
if not text:
return "", "other", False
# ✅ Step 1: 全部假名(无汉字)
if re.fullmatch(f"(?:{JAPANESE_HIRAGANA}|{JAPANESE_KATAKANA})+", text):
return text, "jp", False
# ✅ Step 2: 汉字检测
if re.search(r"[\u4e00-\u9fff]", text):
# 优先判断是否为日语汉字
jp_match = await KangjiMapping.get_or_none(kangji=text).only("kangji")
if jp_match:
return text, "jp", True # 含汉字且命中日语列
# 再检查是否为中文汉字
zh_match = await KangjiMapping.get_or_none(hanzi=text).only("hanzi", "kangji")
if zh_match:
return zh_match.kangji, "zh", True # 含汉字且命中中文列
# 若都不在映射表中,则为未映射的中文
return text, "zh", False
# ✅ Step 3: 拉丁字母检测(如法语)
if re.search(r"[À-ÿ]", text):
return text, "fr", True # True → 含拉丁扩展(非英语)
# 全部为纯英文字符
elif re.fullmatch(r"[a-zA-Z]+", text):
return text, "fr", False # False → 英语单词
# ✅ Step 4: 其他情况(符号、空格等)
return text, "other", False
async def accurate_idiom_proverb(search_id: int, model: Type[Model], only_fields: List[str] = None):
if "freq" not in only_fields:
only_fields.append("freq")
result = await model.get_or_none(id=search_id).only(*only_fields)
if not result:
raise HTTPException(status_code=404, detail="Target not found")
result.freq = result.freq + 1
await result.save(update_fields=["freq"])
return result
async def suggest_proverb(
query: str,
lang: Literal["fr", "zh", "jp"],
model: Type[Model],
search_field: str = "search_text",
target_field: str = "text",
chi_exp_field: str = "chi_exp",
limit: int = 10,
) -> List[Dict[str, str]]:
keyword = query.strip()
if not keyword:
return []
# ✅ 搜索条件:中文时双字段联合匹配
if lang == "zh":
start_condition = Q(**{f"{chi_exp_field}__istartswith": keyword})
contain_condition = Q(**{f"{chi_exp_field}__icontains": keyword})
else:
start_condition = Q(**{f"{search_field}__istartswith": keyword})
contain_condition = Q(**{f"{search_field}__icontains": keyword})
# ✅ 1. 开头匹配
start_matches = await (
model.filter(start_condition)
.order_by("-freq", "id")
.limit(limit)
.values("id", target_field, chi_exp_field, "search_text")
)
# ✅ 2. 包含匹配(但不是开头)
contain_matches = await (
model.filter(contain_condition & ~start_condition)
.order_by("-freq", "id")
.limit(limit)
.values("id", target_field, chi_exp_field, "search_text")
)
# ✅ 3. 合并去重保持顺序
results = []
seen_ids = set()
for row in start_matches + contain_matches:
if row["id"] not in seen_ids:
seen_ids.add(row["id"])
results.append({
"id": row["id"],
"proverb": row[target_field],
"search_text": row["search_text"],
"chi_exp": row[chi_exp_field],
})
# ✅ 截断最终返回数量
return results[:limit]
async def suggest_autocomplete(
query: str,
dict_lang: Literal["fr", "jp"],
model: Type[Model],
search_field: str = "search_text",
text_field: str = "text",
hira_field: str = "hiragana",
freq_field: str = "freq",
limit: int = 10,
) -> List[Dict[str, str]]:
"""
通用自动补全建议接口:
- 法语: 按 search_text / text 搜索 + 反查 DefinitionFr 英/中释义
- 日语: 先按原文 text 匹配,再按假名匹配 + 反查 DefinitionJp 中文释义
统一返回结构:
[
{
"word": "étudier",
"hiragana": None,
"meanings": ["学习", "研究"],
"english": ["to study", "to learn"]
}
]
"""
keyword = query.strip()
if not keyword:
return []
# ========== 法语分支 ==========
if dict_lang == "fr":
start_condition = (
Q(**{f"{search_field}__istartswith": keyword})
| Q(**{f"{text_field}__istartswith": keyword})
)
contain_condition = (
Q(**{f"{search_field}__icontains": keyword})
| Q(**{f"{text_field}__icontains": keyword})
)
value_fields = ["id", text_field, freq_field, search_field]
# ========== 日语分支 ==========
elif dict_lang == "jp":
kana_word = all_in_kana(keyword)
start_condition = Q(**{f"{text_field}__startswith": keyword})
contain_condition = Q(**{f"{text_field}__icontains": keyword})
kana_start = Q(**{f"{hira_field}__startswith": kana_word})
kana_contain = Q(**{f"{hira_field}__icontains": kana_word})
start_condition |= kana_start
contain_condition |= kana_contain
value_fields = ["id", text_field, hira_field, freq_field]
else:
return []
# ✅ 获取匹配单词
start_matches = await (
model.filter(start_condition)
.order_by(f"-{freq_field}", "id")
.limit(limit)
.values(*value_fields)
)
contain_matches = await (
model.filter(contain_condition & ~start_condition)
.order_by(f"-{freq_field}", "id")
.limit(limit)
.values(*value_fields)
)
results = []
seen_ids = set()
for row in start_matches + contain_matches:
if row["id"] not in seen_ids:
seen_ids.add(row["id"])
results.append({
"id": row["id"],
"word": row[text_field],
"hiragana": row.get(hira_field) if dict_lang == "jp" else None,
"meanings": [],
"english": [],
})
# ✅ 批量反查 Definition 表,防止 N+1 查询
if dict_lang == "fr":
from app.models import DefinitionFr # 避免循环导入
word_ids = [r["id"] for r in results]
defs = await DefinitionFr.filter(word_id__in=word_ids).values("word_id", "meaning", "eng_explanation")
meaning_map: Dict[int, Dict[str, List[str]]] = {}
for d in defs:
meaning_map.setdefault(d["word_id"], {"meanings": [], "english": []})
if d["meaning"]:
meaning_map[d["word_id"]]["meanings"].append(d["meaning"].strip())
if d["eng_explanation"]:
meaning_map[d["word_id"]]["english"].append(d["eng_explanation"].strip())
for r in results:
if r["id"] in meaning_map:
r["meanings"] = list(set(meaning_map[r["id"]]["meanings"]))
r["english"] = list(set(meaning_map[r["id"]]["english"]))
elif dict_lang == "jp":
from app.models import DefinitionJp
word_ids = [r["id"] for r in results]
defs = await DefinitionJp.filter(word_id__in=word_ids).values("word_id", "meaning")
meaning_map: Dict[int, List[str]] = {}
for d in defs:
if d["meaning"]:
meaning_map.setdefault(d["word_id"], []).append(d["meaning"].strip())
for r in results:
if r["id"] in meaning_map:
r["meanings"] = list(set(meaning_map[r["id"]]))
# ✅ 删除 id只保留用户需要字段
for r in results:
r.pop("id", None)
return results[:limit]
# ===================================================
# ✅ 释义反查接口(返回统一结构)
# ===================================================
async def search_definition_by_meaning(
query: str,
model: Type[Model],
meaning_field: str = "meaning",
eng_field: str = "eng_explanation",
hira_field: str = "hiragana",
limit: int = 20,
lang: Literal["zh", "en"] = "zh",
) -> List[Dict[str, str]]:
"""
双语释义反查接口(中文/英文):
统一返回结构:
[
{
"word": "étudier",
"hiragana": None,
"meanings": ["学习", "研究"],
"english": ["to study"]
}
]
"""
keyword = query.strip()
if not keyword:
return []
if lang == "zh":
search_field = meaning_field
elif lang == "en":
search_field = eng_field
else:
raise ValueError("lang 参数必须为 'zh''en'")
contain_condition = Q(**{f"{search_field}__icontains": keyword})
matches = (
await model.filter(contain_condition)
.prefetch_related("word")
.order_by("id")
)
word_to_data: Dict[str, Dict[str, List[str] | str | None]] = {}
for entry in matches:
word_obj = await entry.word
word_text = getattr(word_obj, "text", None)
if not word_text:
continue
chi_mean = getattr(entry, meaning_field, "").strip() or None
eng_mean = getattr(entry, eng_field, "").strip() or None
hira_text = getattr(word_obj, hira_field, None) if hasattr(word_obj, hira_field) else None
if word_text not in word_to_data:
word_to_data[word_text] = {"hiragana": hira_text, "meanings": [], "english": []}
if chi_mean:
word_to_data[word_text]["meanings"].append(chi_mean)
if eng_mean:
word_to_data[word_text]["english"].append(eng_mean)
results = []
for word, data in word_to_data.items():
results.append({
"word": word,
"hiragana": data["hiragana"],
"meanings": list(set(data["meanings"])),
"english": list(set(data["english"]))
})
return results[:limit]
def merge_word_results(*lists: List[Dict[str, Any]]) -> List[Dict[str, object]]:
"""
合并多个结果列表并去重:
- 依据 word+ hiragana唯一性去重
- meanings / english 合并去重
- 保留最早出现的顺序
"""
merged: Dict[str, Dict[str, Any]] = {}
order: List[str] = []
for lst in lists:
for item in lst:
word = item.get("word")
hira = item.get("hiragana")
key = f"{word}:{hira or ''}" # 以 word+hiragana 作为唯一标识
if key not in merged:
# 初次出现,加入结果集
merged[key] = {
"word": word,
"hiragana": hira,
"meanings": list(item.get("meanings", [])),
"english": list(item.get("english", []))
}
order.append(key)
else:
# 已存在 → 合并释义和英文解释
merged[key]["meanings"] = list(set(
list(merged[key].get("meanings", [])) +
list(item.get("meanings", []) or [])
))
merged[key]["english"] = list(set(
list(merged[key].get("english", [])) +
list(item.get("english", []) or [])
))
# 保持插入顺序输出
return [merged[k] for k in order]
# async def __test():
# query_word: str = '棋逢'
# return await (
# suggest_proverb(
# query=ProverbSearchRequest(query=query_word),
# lang='zh'
# )
# )
async def __main():
await Tortoise.init(config=TORTOISE_ORM)
print(await __test())
if __name__ == '__main__':
# asyncio.run(__main())
print(detect_language(text="ahsjdasd"))