Files
chat/backend/app/services/capsule_service.py
T
2026-06-13 17:57:43 +08:00

106 lines
3.9 KiB
Python

"""时光胶囊服务(懒解锁)"""
import uuid
from datetime import datetime
from sqlalchemy import select, or_
from sqlalchemy.ext.asyncio import AsyncSession
from app.models.time_capsule import TimeCapsule
from app.models.friend import Friend
class CapsuleService:
def __init__(self, db: AsyncSession):
self.db = db
async def create_capsule(self, sender_id: str, recipient_id: str, title: str,
content: str, unlock_at: datetime,
mood: str | None = None) -> dict:
"""创建时光胶囊"""
# 校验解锁时间是未来
if unlock_at <= datetime.utcnow():
raise ValueError("解锁时间必须是未来时间")
# 校验收件人:自己 或 好友
if recipient_id != sender_id:
friend_result = await self.db.execute(
select(Friend).where(
Friend.user_id == sender_id,
Friend.friend_user_id == recipient_id,
)
)
if not friend_result.scalars().first():
raise ValueError("只能给好友或自己寄胶囊")
capsule = TimeCapsule(
id=str(uuid.uuid4()),
sender_id=sender_id,
recipient_id=recipient_id,
title=title,
content=content,
unlock_at=unlock_at,
mood=mood,
)
self.db.add(capsule)
await self.db.flush()
return self._capsule_to_dict(capsule, sender_id)
async def get_capsules(self, user_id: str) -> list[dict]:
"""获取我发的 + 我收的胶囊"""
result = await self.db.execute(
select(TimeCapsule).where(
or_(
TimeCapsule.sender_id == user_id,
TimeCapsule.recipient_id == user_id,
)
).order_by(TimeCapsule.unlock_at.desc())
)
return [self._capsule_to_dict(c, user_id) for c in result.scalars().all()]
async def get_capsule(self, user_id: str, capsule_id: str) -> dict:
"""获取单个胶囊"""
result = await self.db.execute(
select(TimeCapsule).where(TimeCapsule.id == capsule_id)
)
capsule = result.scalars().first()
if not capsule:
raise ValueError("胶囊不存在")
if capsule.sender_id != user_id and capsule.recipient_id != user_id:
raise ValueError("无权查看此胶囊")
# 若已解锁且收件人首次查看,标记 opened
now = datetime.utcnow()
if now >= capsule.unlock_at and capsule.recipient_id == user_id and not capsule.is_opened:
capsule.is_opened = True
await self.db.flush()
return self._capsule_to_dict(capsule, user_id)
def _capsule_to_dict(self, capsule: TimeCapsule, viewer_id: str) -> dict:
"""转字典,应用懒解锁:未到期则隐藏内容"""
now = datetime.utcnow()
locked = now < capsule.unlock_at
seconds_left = int((capsule.unlock_at - now).total_seconds()) if locked else 0
# 内容可见性:发送人可看自己的(即使锁定);收件人锁定时不可看
can_see_content = (
capsule.sender_id == viewer_id # 发送人始终能看自己发的
or not locked # 已解锁
)
return {
"id": capsule.id,
"sender_id": capsule.sender_id,
"recipient_id": capsule.recipient_id,
"is_mine_sent": capsule.sender_id == viewer_id,
"is_mine_received": capsule.recipient_id == viewer_id,
"title": capsule.title,
"content": capsule.content if can_see_content else None,
"unlock_at": capsule.unlock_at.isoformat(),
"mood": capsule.mood,
"locked": locked,
"seconds_left": seconds_left,
"is_opened": capsule.is_opened,
"created_at": capsule.created_at.isoformat(),
}