"""认证服务""" import uuid from sqlalchemy import select from sqlalchemy.ext.asyncio import AsyncSession from app.models.user import User from app.utils.security import hash_password, verify_password, create_access_token, create_refresh_token class AuthService: def __init__(self, db: AsyncSession): self.db = db async def register(self, username: str, email: str, password: str) -> dict: """用户注册""" # 检查用户名是否已存在 result = await self.db.execute(select(User).where(User.username == username)) if result.scalars().first(): raise ValueError("用户名已存在") # 检查邮箱是否已存在 result = await self.db.execute(select(User).where(User.email == email)) if result.scalars().first(): raise ValueError("邮箱已被注册") # 创建用户 user = User( id=str(uuid.uuid4()), username=username, email=email, password_hash=hash_password(password), ) self.db.add(user) await self.db.flush() # 生成 Token tokens = self._generate_tokens(user) return {**tokens, "user": user} async def login(self, username: str, password: str) -> dict: """用户登录""" result = await self.db.execute( select(User).where( (User.username == username) | (User.email == username) ) ) user = result.scalars().first() if not user or not verify_password(password, user.password_hash): raise ValueError("用户名或密码错误") if user.is_banned: raise ValueError("账号已被封禁") # 更新在线状态 user.status = "online" from datetime import datetime, timezone user.last_seen_at = datetime.utcnow() tokens = self._generate_tokens(user) return {**tokens, "user": user} async def refresh_token(self, user_id: str) -> dict: """刷新 Token""" result = await self.db.execute(select(User).where(User.id == user_id)) user = result.scalars().first() if not user: raise ValueError("用户不存在") return self._generate_tokens(user) def _generate_tokens(self, user: User) -> dict: """生成 JWT Token 对""" data = {"sub": user.id, "username": user.username} return { "access_token": create_access_token(data), "refresh_token": create_refresh_token(data), }