80 lines
2.5 KiB
Python
80 lines
2.5 KiB
Python
"""认证服务"""
|
|
|
|
import uuid
|
|
from sqlalchemy import select
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
|
|
from app.models.user import User
|
|
from app.utils.security import hash_password, verify_password, create_access_token, create_refresh_token
|
|
|
|
|
|
class AuthService:
|
|
def __init__(self, db: AsyncSession):
|
|
self.db = db
|
|
|
|
async def register(self, username: str, email: str, password: str) -> dict:
|
|
"""用户注册"""
|
|
# 检查用户名是否已存在
|
|
result = await self.db.execute(select(User).where(User.username == username))
|
|
if result.scalars().first():
|
|
raise ValueError("用户名已存在")
|
|
|
|
# 检查邮箱是否已存在
|
|
result = await self.db.execute(select(User).where(User.email == email))
|
|
if result.scalars().first():
|
|
raise ValueError("邮箱已被注册")
|
|
|
|
# 创建用户
|
|
user = User(
|
|
id=str(uuid.uuid4()),
|
|
username=username,
|
|
email=email,
|
|
password_hash=hash_password(password),
|
|
)
|
|
self.db.add(user)
|
|
await self.db.flush()
|
|
|
|
# 生成 Token
|
|
tokens = self._generate_tokens(user)
|
|
return {**tokens, "user": user}
|
|
|
|
async def login(self, username: str, password: str) -> dict:
|
|
"""用户登录"""
|
|
result = await self.db.execute(
|
|
select(User).where(
|
|
(User.username == username) | (User.email == username)
|
|
)
|
|
)
|
|
user = result.scalars().first()
|
|
|
|
if not user or not verify_password(password, user.password_hash):
|
|
raise ValueError("用户名或密码错误")
|
|
|
|
if user.is_banned:
|
|
raise ValueError("账号已被封禁")
|
|
|
|
# 更新在线状态
|
|
user.status = "online"
|
|
from datetime import datetime, timezone
|
|
user.last_seen_at = datetime.utcnow()
|
|
|
|
tokens = self._generate_tokens(user)
|
|
return {**tokens, "user": user}
|
|
|
|
async def refresh_token(self, user_id: str) -> dict:
|
|
"""刷新 Token"""
|
|
result = await self.db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalars().first()
|
|
if not user:
|
|
raise ValueError("用户不存在")
|
|
|
|
return self._generate_tokens(user)
|
|
|
|
def _generate_tokens(self, user: User) -> dict:
|
|
"""生成 JWT Token 对"""
|
|
data = {"sub": user.id, "username": user.username}
|
|
return {
|
|
"access_token": create_access_token(data),
|
|
"refresh_token": create_refresh_token(data),
|
|
}
|