Files
multiAgentTry/backend/app/routers/agents_control.py
Claude Code dc398d7c7b 完整实现 Swarm 多智能体协作系统
- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py)
- 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py)
- 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py)
- 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py)
- 新增进程管理器 (backend/app/services/process_manager.py)
- 新增 Agent 控制 API (backend/app/routers/agents_control.py)
- 新增 WebSocket 实时通信 (backend/app/routers/websocket.py)
- 更新前端 AgentsPage,支持启动/停止 Agent
- 测试通过:Agent 启动、批量操作、栅栏同步

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:32:11 +08:00

392 lines
11 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Agent 控制 API 路由
提供 Agent 启动、停止、状态查询等控制接口
"""
import logging
import uuid
from typing import Dict, List, Optional, Any
from datetime import datetime
from fastapi import APIRouter, HTTPException, BackgroundTasks
from pydantic import BaseModel, Field
from ..services.process_manager import get_process_manager, AgentStatus
from ..services.agent_registry import get_agent_registry
from ..services.heartbeat import get_heartbeat_service
from ..adapters.native_llm_agent import NativeLLMAgent, NativeLLMAgentFactory
logger = logging.getLogger(__name__)
router = APIRouter(prefix="/api/agents/control", tags=["agents-control"])
# ========== 请求模型 ==========
class StartAgentRequest(BaseModel):
"""启动 Agent 请求"""
agent_id: str = Field(..., description="Agent 唯一标识")
name: Optional[str] = Field(None, description="Agent 显示名称")
role: str = Field("developer", description="Agent 角色")
model: str = Field("claude-sonnet-4.6", description="使用的模型")
agent_type: str = Field("native_llm", description="Agent 类型")
config: Dict[str, Any] = Field(default_factory=dict, description="额外配置")
class StopAgentRequest(BaseModel):
"""停止 Agent 请求"""
agent_id: str = Field(..., description="Agent ID")
graceful: bool = Field(True, description="是否优雅关闭")
class ExecuteTaskRequest(BaseModel):
"""执行任务请求"""
agent_id: str = Field(..., description="Agent ID")
task_description: str = Field(..., description="任务描述")
context: Dict[str, Any] = Field(default_factory=dict, description="任务上下文")
class CreateMeetingRequest(BaseModel):
"""创建会议请求"""
meeting_id: str = Field(..., description="会议 ID")
title: str = Field(..., description="会议标题")
attendees: List[str] = Field(..., description="参会 Agent ID 列表")
class JoinMeetingRequest(BaseModel):
"""加入会议请求"""
agent_id: str = Field(..., description="Agent ID")
meeting_id: str = Field(..., description="会议 ID")
timeout: int = Field(300, description="等待超时时间(秒)")
# ========== 响应模型 ==========
class AgentControlResponse(BaseModel):
"""Agent 控制响应"""
success: bool
agent_id: str
status: str
message: str
class AgentStatusResponse(BaseModel):
"""Agent 状态响应"""
agent_id: str
status: str
is_alive: bool
uptime: Optional[float] = None
restart_count: int = 0
class ProcessManagerSummary(BaseModel):
"""进程管理器摘要"""
total_agents: int
running_agents: int
running_agent_ids: List[str]
status_counts: Dict[str, int]
monitor_running: bool
# ========== API 端点 ==========
@router.post("/start", response_model=AgentControlResponse)
async def start_agent(request: StartAgentRequest, background_tasks: BackgroundTasks):
"""
启动 Agent
启动一个新的 Agent 实例,支持两种类型:
- native_llm: 原生 LLM Agent异步任务
- process_wrapper: 进程包装 Agent外部 CLI 工具)
"""
process_manager = get_process_manager()
# 检查是否已在运行
if request.agent_id in process_manager.get_all_agents():
existing = process_manager.get_agent_status(request.agent_id)
if existing != AgentStatus.STOPPED:
return AgentControlResponse(
success=False,
agent_id=request.agent_id,
status=existing.value,
message="Agent 已在运行"
)
# 准备配置
config = request.config.copy()
config["name"] = request.name or request.agent_id.replace("-", " ").title()
config["role"] = request.role
config["model"] = request.model
# 启动 Agent
success = await process_manager.start_agent(
agent_id=request.agent_id,
agent_type=request.agent_type,
config=config
)
if success:
return AgentControlResponse(
success=True,
agent_id=request.agent_id,
status=AgentStatus.RUNNING.value,
message=f"Agent {request.agent_id} 启动成功"
)
else:
raise HTTPException(status_code=500, detail="启动 Agent 失败")
@router.post("/stop", response_model=AgentControlResponse)
async def stop_agent(request: StopAgentRequest):
"""停止 Agent"""
process_manager = get_process_manager()
success = await process_manager.stop_agent(
agent_id=request.agent_id,
graceful=request.graceful
)
if success:
return AgentControlResponse(
success=True,
agent_id=request.agent_id,
status=AgentStatus.STOPPED.value,
message=f"Agent {request.agent_id} 已停止"
)
else:
return AgentControlResponse(
success=False,
agent_id=request.agent_id,
status=AgentStatus.UNKNOWN.value,
message=f"停止 Agent 失败或 Agent 未运行"
)
@router.post("/restart", response_model=AgentControlResponse)
async def restart_agent(agent_id: str):
"""重启 Agent"""
process_manager = get_process_manager()
success = await process_manager.restart_agent(agent_id)
if success:
return AgentControlResponse(
success=True,
agent_id=agent_id,
status=AgentStatus.RUNNING.value,
message=f"Agent {agent_id} 重启成功"
)
else:
raise HTTPException(status_code=500, detail="重启 Agent 失败")
@router.get("/status/{agent_id}", response_model=AgentStatusResponse)
async def get_agent_status(agent_id: str):
"""获取 Agent 状态"""
process_manager = get_process_manager()
status = process_manager.get_agent_status(agent_id)
all_agents = process_manager.get_all_agents()
if agent_id in all_agents:
process_info = all_agents[agent_id]
return AgentStatusResponse(
agent_id=agent_id,
status=status.value,
is_alive=process_info.is_alive,
uptime=process_info.uptime,
restart_count=process_info.restart_count
)
else:
raise HTTPException(status_code=404, detail="Agent 不存在")
@router.get("/list", response_model=List[AgentStatusResponse])
async def list_agents():
"""列出所有 Agent 状态"""
process_manager = get_process_manager()
heartbeat_service = get_heartbeat_service()
agents = []
for agent_id, process_info in process_manager.get_all_agents().items():
# 获取心跳信息
heartbeat = await heartbeat_service.get_heartbeat(agent_id)
agents.append(AgentStatusResponse(
agent_id=agent_id,
status=process_info.status.value,
is_alive=process_info.is_alive,
uptime=process_info.uptime,
restart_count=process_info.restart_count
))
return agents
@router.get("/summary", response_model=ProcessManagerSummary)
async def get_summary():
"""获取进程管理器摘要"""
process_manager = get_process_manager()
summary = process_manager.get_summary()
return ProcessManagerSummary(**summary)
@router.post("/execute")
async def execute_task(request: ExecuteTaskRequest):
"""
让 Agent 执行任务
Agent 会自动:
1. 分析任务,识别需要的文件
2. 获取文件锁
3. 调用 LLM 执行任务
4. 释放文件锁
"""
process_manager = get_process_manager()
# 检查 Agent 是否运行
all_agents = process_manager.get_all_agents()
if request.agent_id not in all_agents:
raise HTTPException(status_code=404, detail="Agent 未运行")
process_info = all_agents[request.agent_id]
if not process_info.is_alive:
raise HTTPException(status_code=400, detail="Agent 未运行")
# 获取 Agent 实例
agent = process_info.agent
if not agent:
raise HTTPException(status_code=500, detail="Agent 实例不可用")
# 创建任务
from ..core.agent_adapter import Task
task = Task(
task_id=f"task_{uuid.uuid4().hex[:12]}",
description=request.task_description,
context=request.context
)
# 执行任务
result = await agent.execute(task)
return {
"success": result.success,
"output": result.output,
"error": result.error,
"metadata": result.metadata,
"execution_time": result.execution_time
}
@router.post("/meeting/create")
async def create_meeting(request: CreateMeetingRequest):
"""创建协作会议"""
from ..services.meeting_scheduler import get_meeting_scheduler
scheduler = get_meeting_scheduler()
queue = await scheduler.create_meeting(
request.meeting_id,
request.title,
request.attendees
)
return {
"success": True,
"meeting_id": request.meeting_id,
"title": queue.title,
"expected_attendees": queue.expected_attendees,
"min_required": queue.min_required,
"status": queue.status
}
@router.post("/meeting/join")
async def join_meeting(request: JoinMeetingRequest):
"""让 Agent 加入会议(栅栏同步)"""
process_manager = get_process_manager()
# 检查 Agent 是否运行
all_agents = process_manager.get_all_agents()
if request.agent_id not in all_agents:
raise HTTPException(status_code=404, detail="Agent 未运行")
process_info = all_agents[request.agent_id]
agent = process_info.agent
if not agent:
raise HTTPException(status_code=500, detail="Agent 实例不可用")
# 加入会议
result = await agent.join_meeting(request.meeting_id, request.timeout)
return {
"agent_id": request.agent_id,
"meeting_id": request.meeting_id,
"result": result
}
@router.post("/shutdown-all")
async def shutdown_all_agents():
"""关闭所有 Agent"""
process_manager = get_process_manager()
await process_manager.shutdown_all()
return {
"success": True,
"message": "所有 Agent 已关闭"
}
@router.post("/batch-start")
async def batch_start_agents(agents: List[StartAgentRequest]):
"""
批量启动 Agent
用于快速创建团队
"""
results = []
process_manager = get_process_manager()
for agent_request in agents:
config = agent_request.config.copy()
config["name"] = agent_request.name or agent_request.agent_id.replace("-", " ").title()
config["role"] = agent_request.role
config["model"] = agent_request.model
success = await process_manager.start_agent(
agent_id=agent_request.agent_id,
agent_type=agent_request.agent_type,
config=config
)
results.append({
"agent_id": agent_request.agent_id,
"success": success,
"status": AgentStatus.RUNNING.value if success else AgentStatus.CRASHED.value
})
return {
"results": results,
"total": len(results),
"successful": sum(1 for r in results if r["success"])
}
# 健康检查端点
@router.get("/health")
async def health_check():
"""健康检查"""
process_manager = get_process_manager()
summary = process_manager.get_summary()
return {
"status": "healthy",
"running_agents": summary["running_agents"],
"monitor_running": summary["monitor_running"]
}