Files
chat/backend/app/websocket/handlers.py
T
2026-06-13 10:40:59 +08:00

125 lines
4.9 KiB
Python

"""WebSocket 事件处理器"""
import json
from datetime import datetime, timezone
from fastapi import WebSocket
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.websocket.events import EventType
from app.websocket.manager import manager
async def handle_chat_send(ws: WebSocket, user_id: str, data: dict, db: AsyncSession):
"""处理发送消息事件"""
from app.services.message_service import MessageService
service = MessageService(db)
try:
message = await service.send_message(
conversation_id=data["conversation_id"],
sender_id=user_id,
content=data["content"],
msg_type=data.get("type", "text"),
reply_to_id=data.get("reply_to_id"),
)
await db.commit()
# 获取发送者信息
from app.services.user_service import UserService
user_service = UserService(db)
sender = await user_service.get_by_id(user_id)
# 获取会话成员列表
from app.services.conversation_service import ConversationService
conv_service = ConversationService(db)
detail = await conv_service.get_conversation_detail(data["conversation_id"], user_id)
# 获取被引用消息的信息
reply_to_content = None
reply_to_sender_name = None
if message.reply_to_id:
from app.models.message import Message
reply_msg_result = await db.execute(
select(Message).where(Message.id == message.reply_to_id)
)
reply_msg = reply_msg_result.scalars().first()
if reply_msg:
reply_to_content = reply_msg.content[:200] if reply_msg.content else None
reply_sender = await user_service.get_by_id(reply_msg.sender_id)
reply_to_sender_name = reply_sender.username if reply_sender else None
msg_data = {
"id": message.id,
"conversation_id": message.conversation_id,
"sender_id": user_id,
"sender_name": sender.username if sender else "未知",
"sender_avatar": sender.avatar_url if sender else None,
"type": message.type,
"content": message.content,
"reply_to_id": message.reply_to_id,
"reply_to_content": reply_to_content,
"reply_to_sender_name": reply_to_sender_name,
"created_at": message.created_at.isoformat(),
}
# 广播给会话中的所有成员
if detail and "members" in detail:
member_ids = [m["user_id"] for m in detail["members"]]
await manager.broadcast_to_conversation(
member_ids, EventType.CHAT_MESSAGE, msg_data
)
except Exception as e:
await manager.send_to_user(user_id, EventType.ERROR, {"message": str(e)})
async def handle_chat_typing(ws: WebSocket, user_id: str, data: dict, db: AsyncSession):
"""处理输入中事件"""
from app.services.user_service import UserService
user_service = UserService(db)
user = await user_service.get_by_id(user_id)
from app.services.conversation_service import ConversationService
conv_service = ConversationService(db)
detail = await conv_service.get_conversation_detail(data["conversation_id"], user_id)
if detail and "members" in detail:
member_ids = [m["user_id"] for m in detail["members"]]
await manager.broadcast_to_conversation(
member_ids, EventType.CHAT_TYPING_INDICATOR,
{"conversation_id": data["conversation_id"], "user_id": user_id,
"username": user.username if user else "未知"},
exclude_user=user_id,
)
async def handle_chat_read(ws: WebSocket, user_id: str, data: dict, db: AsyncSession):
"""处理已读事件"""
from app.services.message_service import MessageService
service = MessageService(db)
await service.mark_as_read(data["conversation_id"], user_id, data["message_id"])
await db.commit()
from app.services.conversation_service import ConversationService
conv_service = ConversationService(db)
detail = await conv_service.get_conversation_detail(data["conversation_id"], user_id)
if detail and "members" in detail:
member_ids = [m["user_id"] for m in detail["members"]]
await manager.broadcast_to_conversation(
member_ids, EventType.CHAT_READ_RECEIPT,
{"conversation_id": data["conversation_id"], "user_id": user_id,
"read_up_to": data["message_id"]},
)
async def handle_presence_update(ws: WebSocket, user_id: str, data: dict, db: AsyncSession):
"""处理在线状态更新"""
from app.services.user_service import UserService
user_service = UserService(db)
await user_service.update_status(user_id, data["status"])
await db.commit()
event = EventType.PRESENCE_ONLINE if data["status"] == "online" else EventType.PRESENCE_OFFLINE
await manager.broadcast(event, {"user_id": user_id})