- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py) - 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py) - 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py) - 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py) - 新增进程管理器 (backend/app/services/process_manager.py) - 新增 Agent 控制 API (backend/app/routers/agents_control.py) - 新增 WebSocket 实时通信 (backend/app/routers/websocket.py) - 更新前端 AgentsPage,支持启动/停止 Agent - 测试通过:Agent 启动、批量操作、栅栏同步 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
392 lines
11 KiB
Python
392 lines
11 KiB
Python
"""
|
||
Agent 控制 API 路由
|
||
|
||
提供 Agent 启动、停止、状态查询等控制接口
|
||
"""
|
||
|
||
import logging
|
||
import uuid
|
||
from typing import Dict, List, Optional, Any
|
||
from datetime import datetime
|
||
|
||
from fastapi import APIRouter, HTTPException, BackgroundTasks
|
||
from pydantic import BaseModel, Field
|
||
|
||
from ..services.process_manager import get_process_manager, AgentStatus
|
||
from ..services.agent_registry import get_agent_registry
|
||
from ..services.heartbeat import get_heartbeat_service
|
||
from ..adapters.native_llm_agent import NativeLLMAgent, NativeLLMAgentFactory
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
router = APIRouter(prefix="/api/agents/control", tags=["agents-control"])
|
||
|
||
|
||
# ========== 请求模型 ==========
|
||
|
||
|
||
class StartAgentRequest(BaseModel):
|
||
"""启动 Agent 请求"""
|
||
agent_id: str = Field(..., description="Agent 唯一标识")
|
||
name: Optional[str] = Field(None, description="Agent 显示名称")
|
||
role: str = Field("developer", description="Agent 角色")
|
||
model: str = Field("claude-sonnet-4.6", description="使用的模型")
|
||
agent_type: str = Field("native_llm", description="Agent 类型")
|
||
config: Dict[str, Any] = Field(default_factory=dict, description="额外配置")
|
||
|
||
|
||
class StopAgentRequest(BaseModel):
|
||
"""停止 Agent 请求"""
|
||
agent_id: str = Field(..., description="Agent ID")
|
||
graceful: bool = Field(True, description="是否优雅关闭")
|
||
|
||
|
||
class ExecuteTaskRequest(BaseModel):
|
||
"""执行任务请求"""
|
||
agent_id: str = Field(..., description="Agent ID")
|
||
task_description: str = Field(..., description="任务描述")
|
||
context: Dict[str, Any] = Field(default_factory=dict, description="任务上下文")
|
||
|
||
|
||
class CreateMeetingRequest(BaseModel):
|
||
"""创建会议请求"""
|
||
meeting_id: str = Field(..., description="会议 ID")
|
||
title: str = Field(..., description="会议标题")
|
||
attendees: List[str] = Field(..., description="参会 Agent ID 列表")
|
||
|
||
|
||
class JoinMeetingRequest(BaseModel):
|
||
"""加入会议请求"""
|
||
agent_id: str = Field(..., description="Agent ID")
|
||
meeting_id: str = Field(..., description="会议 ID")
|
||
timeout: int = Field(300, description="等待超时时间(秒)")
|
||
|
||
|
||
# ========== 响应模型 ==========
|
||
|
||
|
||
class AgentControlResponse(BaseModel):
|
||
"""Agent 控制响应"""
|
||
success: bool
|
||
agent_id: str
|
||
status: str
|
||
message: str
|
||
|
||
|
||
class AgentStatusResponse(BaseModel):
|
||
"""Agent 状态响应"""
|
||
agent_id: str
|
||
status: str
|
||
is_alive: bool
|
||
uptime: Optional[float] = None
|
||
restart_count: int = 0
|
||
|
||
|
||
class ProcessManagerSummary(BaseModel):
|
||
"""进程管理器摘要"""
|
||
total_agents: int
|
||
running_agents: int
|
||
running_agent_ids: List[str]
|
||
status_counts: Dict[str, int]
|
||
monitor_running: bool
|
||
|
||
|
||
# ========== API 端点 ==========
|
||
|
||
|
||
@router.post("/start", response_model=AgentControlResponse)
|
||
async def start_agent(request: StartAgentRequest, background_tasks: BackgroundTasks):
|
||
"""
|
||
启动 Agent
|
||
|
||
启动一个新的 Agent 实例,支持两种类型:
|
||
- native_llm: 原生 LLM Agent(异步任务)
|
||
- process_wrapper: 进程包装 Agent(外部 CLI 工具)
|
||
"""
|
||
process_manager = get_process_manager()
|
||
|
||
# 检查是否已在运行
|
||
if request.agent_id in process_manager.get_all_agents():
|
||
existing = process_manager.get_agent_status(request.agent_id)
|
||
if existing != AgentStatus.STOPPED:
|
||
return AgentControlResponse(
|
||
success=False,
|
||
agent_id=request.agent_id,
|
||
status=existing.value,
|
||
message="Agent 已在运行"
|
||
)
|
||
|
||
# 准备配置
|
||
config = request.config.copy()
|
||
config["name"] = request.name or request.agent_id.replace("-", " ").title()
|
||
config["role"] = request.role
|
||
config["model"] = request.model
|
||
|
||
# 启动 Agent
|
||
success = await process_manager.start_agent(
|
||
agent_id=request.agent_id,
|
||
agent_type=request.agent_type,
|
||
config=config
|
||
)
|
||
|
||
if success:
|
||
return AgentControlResponse(
|
||
success=True,
|
||
agent_id=request.agent_id,
|
||
status=AgentStatus.RUNNING.value,
|
||
message=f"Agent {request.agent_id} 启动成功"
|
||
)
|
||
else:
|
||
raise HTTPException(status_code=500, detail="启动 Agent 失败")
|
||
|
||
|
||
@router.post("/stop", response_model=AgentControlResponse)
|
||
async def stop_agent(request: StopAgentRequest):
|
||
"""停止 Agent"""
|
||
process_manager = get_process_manager()
|
||
|
||
success = await process_manager.stop_agent(
|
||
agent_id=request.agent_id,
|
||
graceful=request.graceful
|
||
)
|
||
|
||
if success:
|
||
return AgentControlResponse(
|
||
success=True,
|
||
agent_id=request.agent_id,
|
||
status=AgentStatus.STOPPED.value,
|
||
message=f"Agent {request.agent_id} 已停止"
|
||
)
|
||
else:
|
||
return AgentControlResponse(
|
||
success=False,
|
||
agent_id=request.agent_id,
|
||
status=AgentStatus.UNKNOWN.value,
|
||
message=f"停止 Agent 失败或 Agent 未运行"
|
||
)
|
||
|
||
|
||
@router.post("/restart", response_model=AgentControlResponse)
|
||
async def restart_agent(agent_id: str):
|
||
"""重启 Agent"""
|
||
process_manager = get_process_manager()
|
||
|
||
success = await process_manager.restart_agent(agent_id)
|
||
|
||
if success:
|
||
return AgentControlResponse(
|
||
success=True,
|
||
agent_id=agent_id,
|
||
status=AgentStatus.RUNNING.value,
|
||
message=f"Agent {agent_id} 重启成功"
|
||
)
|
||
else:
|
||
raise HTTPException(status_code=500, detail="重启 Agent 失败")
|
||
|
||
|
||
@router.get("/status/{agent_id}", response_model=AgentStatusResponse)
|
||
async def get_agent_status(agent_id: str):
|
||
"""获取 Agent 状态"""
|
||
process_manager = get_process_manager()
|
||
|
||
status = process_manager.get_agent_status(agent_id)
|
||
all_agents = process_manager.get_all_agents()
|
||
|
||
if agent_id in all_agents:
|
||
process_info = all_agents[agent_id]
|
||
return AgentStatusResponse(
|
||
agent_id=agent_id,
|
||
status=status.value,
|
||
is_alive=process_info.is_alive,
|
||
uptime=process_info.uptime,
|
||
restart_count=process_info.restart_count
|
||
)
|
||
else:
|
||
raise HTTPException(status_code=404, detail="Agent 不存在")
|
||
|
||
|
||
@router.get("/list", response_model=List[AgentStatusResponse])
|
||
async def list_agents():
|
||
"""列出所有 Agent 状态"""
|
||
process_manager = get_process_manager()
|
||
heartbeat_service = get_heartbeat_service()
|
||
|
||
agents = []
|
||
for agent_id, process_info in process_manager.get_all_agents().items():
|
||
# 获取心跳信息
|
||
heartbeat = await heartbeat_service.get_heartbeat(agent_id)
|
||
|
||
agents.append(AgentStatusResponse(
|
||
agent_id=agent_id,
|
||
status=process_info.status.value,
|
||
is_alive=process_info.is_alive,
|
||
uptime=process_info.uptime,
|
||
restart_count=process_info.restart_count
|
||
))
|
||
|
||
return agents
|
||
|
||
|
||
@router.get("/summary", response_model=ProcessManagerSummary)
|
||
async def get_summary():
|
||
"""获取进程管理器摘要"""
|
||
process_manager = get_process_manager()
|
||
summary = process_manager.get_summary()
|
||
|
||
return ProcessManagerSummary(**summary)
|
||
|
||
|
||
@router.post("/execute")
|
||
async def execute_task(request: ExecuteTaskRequest):
|
||
"""
|
||
让 Agent 执行任务
|
||
|
||
Agent 会自动:
|
||
1. 分析任务,识别需要的文件
|
||
2. 获取文件锁
|
||
3. 调用 LLM 执行任务
|
||
4. 释放文件锁
|
||
"""
|
||
process_manager = get_process_manager()
|
||
|
||
# 检查 Agent 是否运行
|
||
all_agents = process_manager.get_all_agents()
|
||
if request.agent_id not in all_agents:
|
||
raise HTTPException(status_code=404, detail="Agent 未运行")
|
||
|
||
process_info = all_agents[request.agent_id]
|
||
if not process_info.is_alive:
|
||
raise HTTPException(status_code=400, detail="Agent 未运行")
|
||
|
||
# 获取 Agent 实例
|
||
agent = process_info.agent
|
||
if not agent:
|
||
raise HTTPException(status_code=500, detail="Agent 实例不可用")
|
||
|
||
# 创建任务
|
||
from ..core.agent_adapter import Task
|
||
task = Task(
|
||
task_id=f"task_{uuid.uuid4().hex[:12]}",
|
||
description=request.task_description,
|
||
context=request.context
|
||
)
|
||
|
||
# 执行任务
|
||
result = await agent.execute(task)
|
||
|
||
return {
|
||
"success": result.success,
|
||
"output": result.output,
|
||
"error": result.error,
|
||
"metadata": result.metadata,
|
||
"execution_time": result.execution_time
|
||
}
|
||
|
||
|
||
@router.post("/meeting/create")
|
||
async def create_meeting(request: CreateMeetingRequest):
|
||
"""创建协作会议"""
|
||
from ..services.meeting_scheduler import get_meeting_scheduler
|
||
scheduler = get_meeting_scheduler()
|
||
|
||
queue = await scheduler.create_meeting(
|
||
request.meeting_id,
|
||
request.title,
|
||
request.attendees
|
||
)
|
||
|
||
return {
|
||
"success": True,
|
||
"meeting_id": request.meeting_id,
|
||
"title": queue.title,
|
||
"expected_attendees": queue.expected_attendees,
|
||
"min_required": queue.min_required,
|
||
"status": queue.status
|
||
}
|
||
|
||
|
||
@router.post("/meeting/join")
|
||
async def join_meeting(request: JoinMeetingRequest):
|
||
"""让 Agent 加入会议(栅栏同步)"""
|
||
process_manager = get_process_manager()
|
||
|
||
# 检查 Agent 是否运行
|
||
all_agents = process_manager.get_all_agents()
|
||
if request.agent_id not in all_agents:
|
||
raise HTTPException(status_code=404, detail="Agent 未运行")
|
||
|
||
process_info = all_agents[request.agent_id]
|
||
agent = process_info.agent
|
||
if not agent:
|
||
raise HTTPException(status_code=500, detail="Agent 实例不可用")
|
||
|
||
# 加入会议
|
||
result = await agent.join_meeting(request.meeting_id, request.timeout)
|
||
|
||
return {
|
||
"agent_id": request.agent_id,
|
||
"meeting_id": request.meeting_id,
|
||
"result": result
|
||
}
|
||
|
||
|
||
@router.post("/shutdown-all")
|
||
async def shutdown_all_agents():
|
||
"""关闭所有 Agent"""
|
||
process_manager = get_process_manager()
|
||
await process_manager.shutdown_all()
|
||
|
||
return {
|
||
"success": True,
|
||
"message": "所有 Agent 已关闭"
|
||
}
|
||
|
||
|
||
@router.post("/batch-start")
|
||
async def batch_start_agents(agents: List[StartAgentRequest]):
|
||
"""
|
||
批量启动 Agent
|
||
|
||
用于快速创建团队
|
||
"""
|
||
results = []
|
||
process_manager = get_process_manager()
|
||
|
||
for agent_request in agents:
|
||
config = agent_request.config.copy()
|
||
config["name"] = agent_request.name or agent_request.agent_id.replace("-", " ").title()
|
||
config["role"] = agent_request.role
|
||
config["model"] = agent_request.model
|
||
|
||
success = await process_manager.start_agent(
|
||
agent_id=agent_request.agent_id,
|
||
agent_type=agent_request.agent_type,
|
||
config=config
|
||
)
|
||
|
||
results.append({
|
||
"agent_id": agent_request.agent_id,
|
||
"success": success,
|
||
"status": AgentStatus.RUNNING.value if success else AgentStatus.CRASHED.value
|
||
})
|
||
|
||
return {
|
||
"results": results,
|
||
"total": len(results),
|
||
"successful": sum(1 for r in results if r["success"])
|
||
}
|
||
|
||
|
||
# 健康检查端点
|
||
@router.get("/health")
|
||
async def health_check():
|
||
"""健康检查"""
|
||
process_manager = get_process_manager()
|
||
summary = process_manager.get_summary()
|
||
|
||
return {
|
||
"status": "healthy",
|
||
"running_agents": summary["running_agents"],
|
||
"monitor_running": summary["monitor_running"]
|
||
}
|