"""默契种子服务""" import hashlib import re import uuid from datetime import datetime from sqlalchemy import select, or_, func from sqlalchemy.ext.asyncio import AsyncSession from app.models.sync_seed import SyncQuestion, SyncSeed from app.models.friend import Friend from app.models.user import User from app.websocket.events import EventType from app.websocket.manager import manager # 预置题目池 DEFAULT_QUESTIONS = [ "用一个词形容你理想中的周末", "深夜最想吃的一道食物", "如果有一整天完全自由,你会做什么", "最近让你开心的瞬间是什么", "你最向往的旅行目的地", "用一种颜色形容你现在的状态", "你的快乐源泉是什么", "如果会一种超能力,你想要什么", ] def _tokenize(text: str) -> set[str]: """中文分词:按 2-gram + 标点切分;英文按单词""" if not text: return set() tokens = set() # 英文单词 for w in re.findall(r'[a-zA-Z]+', text.lower()): if len(w) >= 2: tokens.add(w) # 中文 2-gram cn = re.sub(r'[^一-鿿]', '', text) for i in range(len(cn) - 1): tokens.add(cn[i:i + 2]) # 单字 for ch in cn: tokens.add(ch) return tokens def _jaccard(a: set[str], b: set[str]) -> float: if not a or not b: return 0.0 inter = len(a & b) union = len(a | b) return inter / union if union else 0.0 def _score_answers(a: str, b: str) -> int: """0-100 默契分:Jaccard 为主 + 句长相似 + emoji/标点加分""" if not a or not b: return 0 ta, tb = _tokenize(a), _tokenize(b) j = _jaccard(ta, tb) # 句长相似度 len_diff = abs(len(a) - len(b)) / max(len(a), len(b), 1) len_sim = 1 - len_diff # emoji / 相同标点 emoji_a = set(re.findall(r'[\U0001F300-\U0001FAFF]', a)) emoji_b = set(re.findall(r'[\U0001F300-\U0001FAFF]', b)) emoji_bonus = 0.1 if emoji_a and emoji_a & emoji_b else 0 raw = j * 0.7 + len_sim * 0.2 + emoji_bonus # 放大曲线:让中等重合也能有可见分数 score = round(min(100, raw * 130 + (10 if j > 0 else 0))) return max(0, min(100, score)) class SyncService: def __init__(self, db: AsyncSession): self.db = db async def _ensure_questions(self): """确保题目池存在""" result = await self.db.execute(select(func.count(SyncQuestion.id))) if (result.scalar() or 0) == 0: for q in DEFAULT_QUESTIONS: self.db.add(SyncQuestion(id=str(uuid.uuid4()), content=q)) await self.db.flush() async def get_today_question(self) -> dict: """获取今日题目(按日期确定性选取)""" await self._ensure_questions() result = await self.db.execute(select(SyncQuestion)) questions = result.scalars().all() if not questions: raise ValueError("没有题目") today = datetime.utcnow().strftime("%Y%m%d") idx = int(hashlib.md5(today.encode()).hexdigest(), 16) % len(questions) q = questions[idx] return {"id": q.id, "content": q.content} async def _get_or_create_seed(self, question_id: str, user_a: str, user_b: str) -> SyncSeed: a, b = (user_a, user_b) if user_a < user_b else (user_b, user_a) result = await self.db.execute( select(SyncSeed).where( SyncSeed.question_id == question_id, SyncSeed.user_a == a, SyncSeed.user_b == b, ) ) seed = result.scalars().first() if not seed: seed = SyncSeed( id=str(uuid.uuid4()), question_id=question_id, user_a=a, user_b=b, ) self.db.add(seed) await self.db.flush() return seed async def submit_answer(self, user_id: str, question_id: str, friend_id: str, answer: str) -> dict: """提交自己的答案""" # 校验是好友 fr = await self.db.execute( select(Friend).where(Friend.user_id == user_id, Friend.friend_user_id == friend_id) ) if not fr.scalars().first(): raise ValueError("只能和好友默契") seed = await self._get_or_create_seed(question_id, user_id, friend_id) # 写入自己的答案(a 或 b) if seed.user_a == user_id: seed.answer_a = answer else: seed.answer_b = answer result: dict = { "id": seed.id, "question_id": question_id, "my_answer": answer, "status": seed.status, } # 双方都答了 → 揭晓 if seed.answer_a and seed.answer_b and seed.status == "draft": score = _score_answers(seed.answer_a, seed.answer_b) seed.score = score leaf = hashlib.md5( f"sync:{question_id}:{seed.user_a}:{seed.user_b}".encode() ).hexdigest()[:16] seed.leaf_seed = leaf seed.status = "revealed" seed.revealed_at = datetime.utcnow() result["score"] = score result["leaf_seed"] = leaf result["partner_answer"] = seed.answer_b if seed.user_a == user_id else seed.answer_a # 推送给搭档 await manager.send_to_user(friend_id, "sync.revealed", { "seed_id": seed.id, "question_id": question_id, "score": score, "leaf_seed": leaf, }) await self.db.flush() return result async def get_seed(self, user_id: str, seed_id: str) -> dict: result = await self.db.execute(select(SyncSeed).where(SyncSeed.id == seed_id)) seed = result.scalars().first() if not seed: raise ValueError("种子不存在") if user_id not in (seed.user_a, seed.user_b): raise ValueError("无权查看") my_answer = seed.answer_a if seed.user_a == user_id else seed.answer_b partner_answer = None if seed.status == "revealed": partner_answer = seed.answer_b if seed.user_a == user_id else seed.answer_a return { "id": seed.id, "question_id": seed.question_id, "my_answer": my_answer, "partner_answer": partner_answer, "score": seed.score, "leaf_seed": seed.leaf_seed, "status": seed.status, }