Files
2026-06-14 09:25:59 +08:00

192 lines
6.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""默契种子服务"""
import hashlib
import re
import uuid
from datetime import datetime
from sqlalchemy import select, or_, func
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.sync_seed import SyncQuestion, SyncSeed
from app.models.friend import Friend
from app.models.user import User
from app.websocket.events import EventType
from app.websocket.manager import manager
# 预置题目池
DEFAULT_QUESTIONS = [
"用一个词形容你理想中的周末",
"深夜最想吃的一道食物",
"如果有一整天完全自由,你会做什么",
"最近让你开心的瞬间是什么",
"你最向往的旅行目的地",
"用一种颜色形容你现在的状态",
"你的快乐源泉是什么",
"如果会一种超能力,你想要什么",
]
def _tokenize(text: str) -> set[str]:
"""中文分词:按 2-gram + 标点切分;英文按单词"""
if not text:
return set()
tokens = set()
# 英文单词
for w in re.findall(r'[a-zA-Z]+', text.lower()):
if len(w) >= 2:
tokens.add(w)
# 中文 2-gram
cn = re.sub(r'[^一-鿿]', '', text)
for i in range(len(cn) - 1):
tokens.add(cn[i:i + 2])
# 单字
for ch in cn:
tokens.add(ch)
return tokens
def _jaccard(a: set[str], b: set[str]) -> float:
if not a or not b:
return 0.0
inter = len(a & b)
union = len(a | b)
return inter / union if union else 0.0
def _score_answers(a: str, b: str) -> int:
"""0-100 默契分:Jaccard 为主 + 句长相似 + emoji/标点加分"""
if not a or not b:
return 0
ta, tb = _tokenize(a), _tokenize(b)
j = _jaccard(ta, tb)
# 句长相似度
len_diff = abs(len(a) - len(b)) / max(len(a), len(b), 1)
len_sim = 1 - len_diff
# emoji / 相同标点
emoji_a = set(re.findall(r'[\U0001F300-\U0001FAFF]', a))
emoji_b = set(re.findall(r'[\U0001F300-\U0001FAFF]', b))
emoji_bonus = 0.1 if emoji_a and emoji_a & emoji_b else 0
raw = j * 0.7 + len_sim * 0.2 + emoji_bonus
# 放大曲线:让中等重合也能有可见分数
score = round(min(100, raw * 130 + (10 if j > 0 else 0)))
return max(0, min(100, score))
class SyncService:
def __init__(self, db: AsyncSession):
self.db = db
async def _ensure_questions(self):
"""确保题目池存在"""
result = await self.db.execute(select(func.count(SyncQuestion.id)))
if (result.scalar() or 0) == 0:
for q in DEFAULT_QUESTIONS:
self.db.add(SyncQuestion(id=str(uuid.uuid4()), content=q))
await self.db.flush()
async def get_today_question(self) -> dict:
"""获取今日题目(按日期确定性选取)"""
await self._ensure_questions()
result = await self.db.execute(select(SyncQuestion))
questions = result.scalars().all()
if not questions:
raise ValueError("没有题目")
today = datetime.utcnow().strftime("%Y%m%d")
idx = int(hashlib.md5(today.encode()).hexdigest(), 16) % len(questions)
q = questions[idx]
return {"id": q.id, "content": q.content}
async def _get_or_create_seed(self, question_id: str, user_a: str, user_b: str) -> SyncSeed:
a, b = (user_a, user_b) if user_a < user_b else (user_b, user_a)
result = await self.db.execute(
select(SyncSeed).where(
SyncSeed.question_id == question_id,
SyncSeed.user_a == a,
SyncSeed.user_b == b,
)
)
seed = result.scalars().first()
if not seed:
seed = SyncSeed(
id=str(uuid.uuid4()),
question_id=question_id,
user_a=a,
user_b=b,
)
self.db.add(seed)
await self.db.flush()
return seed
async def submit_answer(self, user_id: str, question_id: str, friend_id: str,
answer: str) -> dict:
"""提交自己的答案"""
# 校验是好友
fr = await self.db.execute(
select(Friend).where(Friend.user_id == user_id, Friend.friend_user_id == friend_id)
)
if not fr.scalars().first():
raise ValueError("只能和好友默契")
seed = await self._get_or_create_seed(question_id, user_id, friend_id)
# 写入自己的答案(a 或 b
if seed.user_a == user_id:
seed.answer_a = answer
else:
seed.answer_b = answer
result: dict = {
"id": seed.id,
"question_id": question_id,
"my_answer": answer,
"status": seed.status,
}
# 双方都答了 → 揭晓
if seed.answer_a and seed.answer_b and seed.status == "draft":
score = _score_answers(seed.answer_a, seed.answer_b)
seed.score = score
leaf = hashlib.md5(
f"sync:{question_id}:{seed.user_a}:{seed.user_b}".encode()
).hexdigest()[:16]
seed.leaf_seed = leaf
seed.status = "revealed"
seed.revealed_at = datetime.utcnow()
result["score"] = score
result["leaf_seed"] = leaf
result["partner_answer"] = seed.answer_b if seed.user_a == user_id else seed.answer_a
# 推送给搭档
await manager.send_to_user(friend_id, "sync.revealed", {
"seed_id": seed.id,
"question_id": question_id,
"score": score,
"leaf_seed": leaf,
})
await self.db.flush()
return result
async def get_seed(self, user_id: str, seed_id: str) -> dict:
result = await self.db.execute(select(SyncSeed).where(SyncSeed.id == seed_id))
seed = result.scalars().first()
if not seed:
raise ValueError("种子不存在")
if user_id not in (seed.user_a, seed.user_b):
raise ValueError("无权查看")
my_answer = seed.answer_a if seed.user_a == user_id else seed.answer_b
partner_answer = None
if seed.status == "revealed":
partner_answer = seed.answer_b if seed.user_a == user_id else seed.answer_a
return {
"id": seed.id,
"question_id": seed.question_id,
"my_answer": my_answer,
"partner_answer": partner_answer,
"score": seed.score,
"leaf_seed": seed.leaf_seed,
"status": seed.status,
}