"""萤火虫时刻服务:全服协作型随机掉落""" import hashlib import uuid from datetime import datetime, timedelta from sqlalchemy import select, func from sqlalchemy.ext.asyncio import AsyncSession from app.models.flash_event import FlashEvent, FlashParticipation from app.websocket.events import EventType from app.websocket.manager import manager # Redis(用于原子计数 + 单日限频) import redis.asyncio as aioredis from app.config import settings class FlashService: def __init__(self, db: AsyncSession): self.db = db self._redis = None async def _get_redis(self): if self._redis is None: self._redis = aioredis.from_url(settings.REDIS_URL, decode_responses=True) return self._redis async def get_active_event(self) -> dict | None: """获取当前进行中且尚未达标的萤火虫事件""" now = datetime.utcnow() result = await self.db.execute( select(FlashEvent).where( FlashEvent.start_at <= now, FlashEvent.end_at > now, FlashEvent.reached == False, # 已达标的不再算作可参与 ).order_by(FlashEvent.start_at.desc()) ) event = result.scalars().first() if not event: return None return self._event_to_dict(event) async def try_spawn(self) -> dict | None: """尝试触发新事件(单日限频,由前端定时调用或后台触发)""" r = await self._get_redis() today = datetime.utcnow().strftime("%Y%m%d") spawn_key = f"flash:spawned:{today}" # 单日最多 1 次(用 SETNX 抢占),让萤火虫保持稀有惊喜 count = await r.get(spawn_key) if count and int(count) >= 1: return None now = datetime.utcnow() # 事件持续 60-90 秒 duration = 60 + (hashlib.md5(today.encode()).hexdigest()[:2] is not None) * 0 duration = 75 # 固定 75 秒便于体验 end_at = now + timedelta(seconds=duration) variant = f"firefly-{today}-{count or '0'}" seed = hashlib.md5(variant.encode()).hexdigest()[:16] target = 30 # 目标点击数(小规模便于达标) event = FlashEvent( id=str(uuid.uuid4()), type="firefly", start_at=now, end_at=end_at, target_clicks=target, leaf_seed=seed, leaf_variant=variant, ) self.db.add(event) await self.db.flush() await r.incr(spawn_key) await r.expire(spawn_key, 86400) payload = self._event_to_dict(event) # 全服广播 await manager.broadcast(EventType.FLASH_SPAWN, payload) return payload async def click(self, user_id: str, event_id: str) -> dict: """用户点击集气""" r = await self._get_redis() result = await self.db.execute( select(FlashEvent).where(FlashEvent.id == event_id) ) event = result.scalars().first() if not event: raise ValueError("事件不存在") now = datetime.utcnow() if now < event.start_at or now > event.end_at: raise ValueError("事件已结束") # 原子计数:全服总点击 + 个人点击 total_key = f"flash:total:{event_id}" user_key = f"flash:user:{event_id}:{user_id}" new_total = await r.incr(total_key) await r.expire(total_key, 300) user_clicks = await r.incr(user_key) await r.expire(user_key, 300) # 同步到 DB(用于持久化和查询) event.total_clicks = new_total await self.db.flush() reached = new_total >= event.target_clicks if reached and not event.reached: event.reached = True # 广播达标 await manager.broadcast(EventType.FLASH_RESULT, { "event_id": event_id, "reached": True, "leaf_seed": event.leaf_seed, "leaf_variant": event.leaf_variant, "message": "萤火虫被点亮了!限定纪念叶降临 🌟", }) # 广播进度(节流:每 5 次或达标时) if new_total % 5 == 0 or reached: await manager.broadcast(EventType.FLASH_PROGRESS, { "event_id": event_id, "total_clicks": new_total, "target": event.target_clicks, "progress": min(1, new_total / event.target_clicks), }) return { "event_id": event_id, "total_clicks": new_total, "target": event.target_clicks, "progress": min(1, new_total / event.target_clicks), "my_clicks": user_clicks, "reached": reached, } async def claim_reward(self, user_id: str, event_id: str) -> dict: """达标后领取限定纪念叶""" result = await self.db.execute( select(FlashEvent).where(FlashEvent.id == event_id) ) event = result.scalars().first() if not event: raise ValueError("事件不存在") if not event.reached: raise ValueError("尚未达标") # 记录参与 p_result = await self.db.execute( select(FlashParticipation).where( FlashParticipation.event_id == event_id, FlashParticipation.user_id == user_id, ) ) p = p_result.scalars().first() r = await self._get_redis() user_clicks = int(await r.get(f"flash:user:{event_id}:{user_id}") or 0) if p: p.clicks = user_clicks p.earned = True else: p = FlashParticipation( id=str(uuid.uuid4()), event_id=event_id, user_id=user_id, clicks=user_clicks, earned=True, ) self.db.add(p) await self.db.flush() return { "earned": True, "leaf_seed": event.leaf_seed, "leaf_variant": event.leaf_variant, "type": event.type, "my_clicks": user_clicks, } async def get_my_album(self, user_id: str) -> list[dict]: """获取我获得的限定纪念叶图鉴""" result = await self.db.execute( select(FlashParticipation).where( FlashParticipation.user_id == user_id, FlashParticipation.earned == True, ).order_by(FlashParticipation.created_at.desc()) ) album = [] for p in result.scalars().all(): ev = await self.db.execute(select(FlashEvent).where(FlashEvent.id == p.event_id)) e = ev.scalars().first() if e: album.append({ "leaf_seed": e.leaf_seed, "leaf_variant": e.leaf_variant, "type": e.type, "my_clicks": p.clicks, "earned_at": p.created_at.isoformat(), }) return album def _event_to_dict(self, e: FlashEvent) -> dict: return { "id": e.id, "type": e.type, "start_at": e.start_at.isoformat(), "end_at": e.end_at.isoformat(), "target_clicks": e.target_clicks, "total_clicks": e.total_clicks, "reached": e.reached, "leaf_seed": e.leaf_seed, "leaf_variant": e.leaf_variant, "progress": min(1, (e.total_clicks or 0) / e.target_clicks) if e.target_clicks else 0, }