Files
AIChatRoom/backend/routers/chatrooms.py
Claude Code edbddf855d feat: AI聊天室多Agent协作讨论平台
- 实现Agent管理,支持AI辅助生成系统提示词
- 支持多个AI提供商(OpenRouter、智谱、MiniMax等)
- 实现聊天室和讨论引擎
- WebSocket实时消息推送
- 前端使用React + Ant Design
- 后端使用FastAPI + MongoDB

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 19:20:02 +08:00

388 lines
12 KiB
Python

"""
聊天室管理路由
"""
from typing import List, Optional, Dict, Any
from fastapi import APIRouter, HTTPException, WebSocket, WebSocketDisconnect, status
from pydantic import BaseModel, Field
from loguru import logger
from services.chatroom_service import ChatRoomService
from services.discussion_engine import DiscussionEngine
from services.message_router import MessageRouter
router = APIRouter()
# ============ 请求/响应模型 ============
class ChatRoomConfigModel(BaseModel):
"""聊天室配置模型"""
max_rounds: int = 50
message_history_size: int = 20
consensus_threshold: float = 0.8
round_interval: float = 1.0
allow_user_interrupt: bool = True
class ChatRoomCreateRequest(BaseModel):
"""创建聊天室请求"""
name: str = Field(..., description="聊天室名称")
description: str = Field(default="", description="描述")
agents: List[str] = Field(default=[], description="Agent ID列表")
moderator_agent_id: Optional[str] = Field(default=None, description="主持人Agent ID")
config: Optional[ChatRoomConfigModel] = None
class Config:
json_schema_extra = {
"example": {
"name": "产品设计讨论室",
"description": "用于讨论新产品功能设计",
"agents": ["agent-abc123", "agent-def456"],
"moderator_agent_id": "agent-xyz789"
}
}
class ChatRoomUpdateRequest(BaseModel):
"""更新聊天室请求"""
name: Optional[str] = None
description: Optional[str] = None
agents: Optional[List[str]] = None
moderator_agent_id: Optional[str] = None
config: Optional[ChatRoomConfigModel] = None
class ChatRoomResponse(BaseModel):
"""聊天室响应"""
room_id: str
name: str
description: str
objective: str
agents: List[str]
moderator_agent_id: Optional[str]
config: Dict[str, Any]
status: str
current_round: int
current_discussion_id: Optional[str]
created_at: str
updated_at: str
completed_at: Optional[str]
class MessageResponse(BaseModel):
"""消息响应"""
message_id: str
room_id: str
discussion_id: str
agent_id: Optional[str]
content: str
message_type: str
round: int
created_at: str
class StartDiscussionRequest(BaseModel):
"""启动讨论请求"""
objective: str = Field(..., description="讨论目标")
class DiscussionStatusResponse(BaseModel):
"""讨论状态响应"""
is_active: bool
room_id: str
discussion_id: Optional[str] = None
current_round: int = 0
status: str
# ============ 路由处理 ============
@router.post("", response_model=ChatRoomResponse, status_code=status.HTTP_201_CREATED)
async def create_chatroom(request: ChatRoomCreateRequest):
"""
创建新的聊天室
"""
try:
chatroom = await ChatRoomService.create_chatroom(
name=request.name,
description=request.description,
agents=request.agents,
moderator_agent_id=request.moderator_agent_id,
config=request.config.dict() if request.config else None
)
return _to_response(chatroom)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"创建聊天室失败: {e}")
raise HTTPException(status_code=500, detail="创建失败")
@router.get("", response_model=List[ChatRoomResponse])
async def list_chatrooms():
"""
获取所有聊天室
"""
chatrooms = await ChatRoomService.get_all_chatrooms()
return [_to_response(c) for c in chatrooms]
@router.get("/{room_id}", response_model=ChatRoomResponse)
async def get_chatroom(room_id: str):
"""
获取指定聊天室
"""
chatroom = await ChatRoomService.get_chatroom(room_id)
if not chatroom:
raise HTTPException(status_code=404, detail="聊天室不存在")
return _to_response(chatroom)
@router.put("/{room_id}", response_model=ChatRoomResponse)
async def update_chatroom(room_id: str, request: ChatRoomUpdateRequest):
"""
更新聊天室配置
"""
update_data = request.dict(exclude_unset=True)
if "config" in update_data and update_data["config"]:
if hasattr(update_data["config"], "dict"):
update_data["config"] = update_data["config"].dict()
try:
chatroom = await ChatRoomService.update_chatroom(room_id, **update_data)
if not chatroom:
raise HTTPException(status_code=404, detail="聊天室不存在")
return _to_response(chatroom)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/{room_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_chatroom(room_id: str):
"""
删除聊天室
"""
success = await ChatRoomService.delete_chatroom(room_id)
if not success:
raise HTTPException(status_code=404, detail="聊天室不存在")
@router.post("/{room_id}/agents/{agent_id}", response_model=ChatRoomResponse)
async def add_agent_to_chatroom(room_id: str, agent_id: str):
"""
向聊天室添加Agent
"""
try:
chatroom = await ChatRoomService.add_agent(room_id, agent_id)
if not chatroom:
raise HTTPException(status_code=404, detail="聊天室不存在")
return _to_response(chatroom)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.delete("/{room_id}/agents/{agent_id}", response_model=ChatRoomResponse)
async def remove_agent_from_chatroom(room_id: str, agent_id: str):
"""
从聊天室移除Agent
"""
chatroom = await ChatRoomService.remove_agent(room_id, agent_id)
if not chatroom:
raise HTTPException(status_code=404, detail="聊天室不存在")
return _to_response(chatroom)
@router.get("/{room_id}/messages", response_model=List[MessageResponse])
async def get_chatroom_messages(
room_id: str,
limit: int = 50,
skip: int = 0,
discussion_id: Optional[str] = None
):
"""
获取聊天室消息历史
"""
messages = await ChatRoomService.get_messages(
room_id, limit, skip, discussion_id
)
return [_message_to_response(m) for m in messages]
@router.post("/{room_id}/start", response_model=DiscussionStatusResponse)
async def start_discussion(room_id: str, request: StartDiscussionRequest):
"""
启动讨论
"""
try:
# 异步启动讨论(不等待完成)
import asyncio
asyncio.create_task(
DiscussionEngine.start_discussion(room_id, request.objective)
)
# 等待一小段时间让讨论初始化
await asyncio.sleep(0.5)
chatroom = await ChatRoomService.get_chatroom(room_id)
return DiscussionStatusResponse(
is_active=True,
room_id=room_id,
discussion_id=chatroom.current_discussion_id if chatroom else None,
current_round=chatroom.current_round if chatroom else 0,
status=chatroom.status if chatroom else "unknown"
)
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
@router.post("/{room_id}/pause", response_model=DiscussionStatusResponse)
async def pause_discussion(room_id: str):
"""
暂停讨论
"""
success = await DiscussionEngine.pause_discussion(room_id)
if not success:
raise HTTPException(status_code=400, detail="没有进行中的讨论")
chatroom = await ChatRoomService.get_chatroom(room_id)
return DiscussionStatusResponse(
is_active=False,
room_id=room_id,
discussion_id=chatroom.current_discussion_id if chatroom else None,
current_round=chatroom.current_round if chatroom else 0,
status="paused"
)
@router.post("/{room_id}/resume", response_model=DiscussionStatusResponse)
async def resume_discussion(room_id: str):
"""
恢复讨论
"""
success = await DiscussionEngine.resume_discussion(room_id)
if not success:
raise HTTPException(status_code=400, detail="聊天室不在暂停状态")
chatroom = await ChatRoomService.get_chatroom(room_id)
return DiscussionStatusResponse(
is_active=True,
room_id=room_id,
discussion_id=chatroom.current_discussion_id if chatroom else None,
current_round=chatroom.current_round if chatroom else 0,
status="active"
)
@router.post("/{room_id}/stop", response_model=DiscussionStatusResponse)
async def stop_discussion(room_id: str):
"""
停止讨论
"""
success = await DiscussionEngine.stop_discussion(room_id)
chatroom = await ChatRoomService.get_chatroom(room_id)
return DiscussionStatusResponse(
is_active=False,
room_id=room_id,
discussion_id=chatroom.current_discussion_id if chatroom else None,
current_round=chatroom.current_round if chatroom else 0,
status="stopping" if success else chatroom.status if chatroom else "unknown"
)
@router.get("/{room_id}/status", response_model=DiscussionStatusResponse)
async def get_discussion_status(room_id: str):
"""
获取讨论状态
"""
chatroom = await ChatRoomService.get_chatroom(room_id)
if not chatroom:
raise HTTPException(status_code=404, detail="聊天室不存在")
is_active = DiscussionEngine.is_discussion_active(room_id)
return DiscussionStatusResponse(
is_active=is_active,
room_id=room_id,
discussion_id=chatroom.current_discussion_id,
current_round=chatroom.current_round,
status=chatroom.status
)
# ============ WebSocket端点 ============
@router.websocket("/ws/{room_id}")
async def chatroom_websocket(websocket: WebSocket, room_id: str):
"""
聊天室WebSocket连接
"""
# 验证聊天室存在
chatroom = await ChatRoomService.get_chatroom(room_id)
if not chatroom:
await websocket.close(code=4004, reason="聊天室不存在")
return
await MessageRouter.connect(room_id, websocket)
try:
while True:
# 保持连接,接收客户端消息(如心跳)
data = await websocket.receive_text()
# 处理心跳
if data == "ping":
await websocket.send_text("pong")
except WebSocketDisconnect:
await MessageRouter.disconnect(room_id, websocket)
except Exception as e:
logger.error(f"WebSocket错误: {e}")
await MessageRouter.disconnect(room_id, websocket)
# ============ 辅助函数 ============
def _to_response(chatroom) -> ChatRoomResponse:
"""
转换为响应模型
"""
return ChatRoomResponse(
room_id=chatroom.room_id,
name=chatroom.name,
description=chatroom.description,
objective=chatroom.objective,
agents=chatroom.agents,
moderator_agent_id=chatroom.moderator_agent_id,
config=chatroom.config,
status=chatroom.status,
current_round=chatroom.current_round,
current_discussion_id=chatroom.current_discussion_id,
created_at=chatroom.created_at.isoformat(),
updated_at=chatroom.updated_at.isoformat(),
completed_at=chatroom.completed_at.isoformat() if chatroom.completed_at else None
)
def _message_to_response(message) -> MessageResponse:
"""
转换消息为响应模型
"""
return MessageResponse(
message_id=message.message_id,
room_id=message.room_id,
discussion_id=message.discussion_id,
agent_id=message.agent_id,
content=message.content,
message_type=message.message_type,
round=message.round,
created_at=message.created_at.isoformat()
)