- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py) - 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py) - 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py) - 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py) - 新增进程管理器 (backend/app/services/process_manager.py) - 新增 Agent 控制 API (backend/app/routers/agents_control.py) - 新增 WebSocket 实时通信 (backend/app/routers/websocket.py) - 更新前端 AgentsPage,支持启动/停止 Agent - 测试通过:Agent 启动、批量操作、栅栏同步 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
437 lines
14 KiB
Python
437 lines
14 KiB
Python
"""
|
|
Agent 进程管理器
|
|
|
|
负责启动、停止和监控 Agent 进程/任务。
|
|
支持两种类型的 Agent:
|
|
1. NativeLLMAgent - 异步任务,无需外部进程
|
|
2. ProcessWrapperAgent - 外部 CLI 工具,需要进程管理
|
|
"""
|
|
|
|
import asyncio
|
|
import logging
|
|
import signal
|
|
import subprocess
|
|
import psutil
|
|
from typing import Dict, List, Optional, Any
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
|
|
from ..adapters.native_llm_agent import NativeLLMAgent, NativeLLMAgentFactory
|
|
from .heartbeat import get_heartbeat_service
|
|
from .agent_registry import get_agent_registry
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class AgentStatus(Enum):
|
|
"""Agent 状态"""
|
|
STOPPED = "stopped"
|
|
STARTING = "starting"
|
|
RUNNING = "running"
|
|
STOPPING = "stopping"
|
|
CRASHED = "crashed"
|
|
UNKNOWN = "unknown"
|
|
|
|
|
|
@dataclass
|
|
class AgentProcess:
|
|
"""Agent 进程信息"""
|
|
agent_id: str
|
|
agent_type: str # native_llm, process_wrapper
|
|
status: AgentStatus = AgentStatus.STOPPED
|
|
process: Optional[subprocess.Popen] = None
|
|
task: Optional[asyncio.Task] = None
|
|
agent: Optional[NativeLLMAgent] = None
|
|
started_at: Optional[datetime] = None
|
|
stopped_at: Optional[datetime] = None
|
|
restart_count: int = 0
|
|
config: Dict[str, Any] = field(default_factory=dict)
|
|
|
|
@property
|
|
def uptime(self) -> Optional[float]:
|
|
"""运行时长(秒)"""
|
|
if self.started_at:
|
|
end = self.stopped_at or datetime.now()
|
|
return (end - self.started_at).total_seconds()
|
|
return None
|
|
|
|
@property
|
|
def is_alive(self) -> bool:
|
|
"""是否存活"""
|
|
if self.agent_type == "native_llm":
|
|
return self.status == AgentStatus.RUNNING and self.task and not self.task.done()
|
|
else:
|
|
return self.process and self.process.poll() is None
|
|
|
|
|
|
class ProcessManager:
|
|
"""
|
|
Agent 进程管理器
|
|
|
|
功能:
|
|
1. 启动/停止 Agent
|
|
2. 监控 Agent 健康状态
|
|
3. 自动重启崩溃的 Agent
|
|
4. 管理 Agent 生命周期
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.processes: Dict[str, AgentProcess] = {}
|
|
self.heartbeat_service = get_heartbeat_service()
|
|
self.registry = get_agent_registry()
|
|
self._monitor_task: Optional[asyncio.Task] = None
|
|
self._running = False
|
|
|
|
async def start_agent(
|
|
self,
|
|
agent_id: str,
|
|
agent_type: str = "native_llm",
|
|
config: Dict[str, Any] = None
|
|
) -> bool:
|
|
"""
|
|
启动 Agent
|
|
|
|
参数:
|
|
agent_id: Agent 唯一标识
|
|
agent_type: Agent 类型 (native_llm, process_wrapper)
|
|
config: Agent 配置
|
|
|
|
返回:
|
|
是否成功启动
|
|
"""
|
|
if agent_id in self.processes and self.processes[agent_id].is_alive:
|
|
logger.warning(f"Agent {agent_id} 已在运行")
|
|
return False
|
|
|
|
logger.info(f"启动 Agent: {agent_id} (类型: {agent_type})")
|
|
|
|
process_info = AgentProcess(
|
|
agent_id=agent_id,
|
|
agent_type=agent_type,
|
|
status=AgentStatus.STARTING,
|
|
config=config or {}
|
|
)
|
|
|
|
try:
|
|
if agent_type == "native_llm":
|
|
success = await self._start_native_agent(process_info)
|
|
elif agent_type == "process_wrapper":
|
|
success = await self._start_process_wrapper(process_info)
|
|
else:
|
|
logger.error(f"不支持的 Agent 类型: {agent_type}")
|
|
return False
|
|
|
|
if success:
|
|
process_info.status = AgentStatus.RUNNING
|
|
process_info.started_at = datetime.now()
|
|
self.processes[agent_id] = process_info
|
|
|
|
# 启动监控任务
|
|
if not self._running:
|
|
await self.start_monitor()
|
|
|
|
return True
|
|
return False
|
|
|
|
except Exception as e:
|
|
logger.error(f"启动 Agent 失败: {agent_id}: {e}", exc_info=True)
|
|
process_info.status = AgentStatus.CRASHED
|
|
return False
|
|
|
|
async def _start_native_agent(self, process_info: AgentProcess) -> bool:
|
|
"""启动原生 LLM Agent"""
|
|
try:
|
|
# 创建 Agent 实例
|
|
agent = await NativeLLMAgentFactory.create(
|
|
agent_id=process_info.agent_id,
|
|
name=process_info.config.get("name"),
|
|
role=process_info.config.get("role", "developer"),
|
|
model=process_info.config.get("model", "claude-sonnet-4.6"),
|
|
config=process_info.config
|
|
)
|
|
|
|
process_info.agent = agent
|
|
|
|
# 创建后台任务保持 Agent 运行
|
|
async def agent_loop():
|
|
try:
|
|
# Agent 定期发送心跳
|
|
while True:
|
|
await asyncio.sleep(30)
|
|
if await agent.health_check():
|
|
await agent.update_heartbeat("idle")
|
|
else:
|
|
logger.warning(f"Agent {process_info.agent_id} 健康检查失败")
|
|
break
|
|
except asyncio.CancelledError:
|
|
logger.info(f"Agent {process_info.agent_id} 任务被取消")
|
|
except Exception as e:
|
|
logger.error(f"Agent {process_info.agent_id} 循环出错: {e}")
|
|
|
|
task = asyncio.create_task(agent_loop())
|
|
process_info.task = task
|
|
|
|
logger.info(f"原生 LLM Agent 启动成功: {process_info.agent_id}")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"启动原生 Agent 失败: {e}")
|
|
return False
|
|
|
|
async def _start_process_wrapper(self, process_info: AgentProcess) -> bool:
|
|
"""启动进程包装 Agent"""
|
|
command = process_info.config.get("command")
|
|
args = process_info.config.get("args", [])
|
|
|
|
if not command:
|
|
logger.error("进程包装 Agent 需要指定 command")
|
|
return False
|
|
|
|
try:
|
|
# 启动子进程
|
|
proc = subprocess.Popen(
|
|
[command] + args,
|
|
stdin=subprocess.PIPE,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE,
|
|
text=True
|
|
)
|
|
|
|
process_info.process = proc
|
|
|
|
# 创建监控任务
|
|
async def process_monitor():
|
|
try:
|
|
while True:
|
|
if proc.poll() is not None:
|
|
logger.warning(f"进程 {process_info.agent_id} 已退出: {proc.returncode}")
|
|
break
|
|
await asyncio.sleep(5)
|
|
except asyncio.CancelledError:
|
|
# 尝试优雅关闭
|
|
try:
|
|
proc.terminate()
|
|
proc.wait(timeout=5)
|
|
except:
|
|
proc.kill()
|
|
|
|
task = asyncio.create_task(process_monitor())
|
|
process_info.task = task
|
|
|
|
logger.info(f"进程包装 Agent 启动成功: {process_info.agent_id} (PID: {proc.pid})")
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"启动进程失败: {e}")
|
|
return False
|
|
|
|
async def stop_agent(self, agent_id: str, graceful: bool = True) -> bool:
|
|
"""
|
|
停止 Agent
|
|
|
|
参数:
|
|
agent_id: Agent ID
|
|
graceful: 是否优雅关闭
|
|
|
|
返回:
|
|
是否成功停止
|
|
"""
|
|
if agent_id not in self.processes:
|
|
logger.warning(f"Agent {agent_id} 未运行")
|
|
return False
|
|
|
|
process_info = self.processes[agent_id]
|
|
logger.info(f"停止 Agent: {agent_id} (优雅: {graceful})")
|
|
|
|
process_info.status = AgentStatus.STOPPING
|
|
|
|
try:
|
|
if process_info.agent:
|
|
# 关闭原生 Agent
|
|
await process_info.agent.shutdown()
|
|
|
|
if process_info.task:
|
|
# 取消后台任务
|
|
process_info.task.cancel()
|
|
try:
|
|
await process_info.task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
if process_info.process:
|
|
# 终止外部进程
|
|
if graceful:
|
|
process_info.process.terminate()
|
|
try:
|
|
process_info.process.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
process_info.process.kill()
|
|
else:
|
|
process_info.process.kill()
|
|
|
|
process_info.status = AgentStatus.STOPPED
|
|
process_info.stopped_at = datetime.now()
|
|
|
|
# 从进程列表移除
|
|
del self.processes[agent_id]
|
|
|
|
return True
|
|
|
|
except Exception as e:
|
|
logger.error(f"停止 Agent 失败: {agent_id}: {e}")
|
|
process_info.status = AgentStatus.CRASHED
|
|
return False
|
|
|
|
async def restart_agent(self, agent_id: str) -> bool:
|
|
"""重启 Agent"""
|
|
logger.info(f"重启 Agent: {agent_id}")
|
|
|
|
if agent_id in self.processes:
|
|
process_info = self.processes[agent_id]
|
|
config = process_info.config
|
|
agent_type = process_info.agent_type
|
|
|
|
await self.stop_agent(agent_id)
|
|
process_info.restart_count += 1
|
|
|
|
return await self.start_agent(agent_id, agent_type, config)
|
|
|
|
return False
|
|
|
|
def get_agent_status(self, agent_id: str) -> Optional[AgentStatus]:
|
|
"""获取 Agent 状态"""
|
|
if agent_id in self.processes:
|
|
return self.processes[agent_id].status
|
|
return AgentStatus.UNKNOWN
|
|
|
|
def get_all_agents(self) -> Dict[str, AgentProcess]:
|
|
"""获取所有 Agent 信息"""
|
|
return self.processes.copy()
|
|
|
|
def get_running_agents(self) -> List[str]:
|
|
"""获取正在运行的 Agent ID 列表"""
|
|
return [
|
|
pid for pid, proc in self.processes.items()
|
|
if proc.is_alive
|
|
]
|
|
|
|
async def monitor_agent_health(self) -> Dict[str, bool]:
|
|
"""
|
|
监控所有 Agent 健康状态
|
|
|
|
返回:
|
|
{agent_id: is_healthy}
|
|
"""
|
|
results = {}
|
|
|
|
for agent_id, process_info in self.processes.items():
|
|
if process_info.agent_type == "native_llm" and process_info.agent:
|
|
# 检查原生 Agent 健康状态
|
|
results[agent_id] = await process_info.agent.health_check()
|
|
else:
|
|
# 检查进程是否存活
|
|
results[agent_id] = process_info.is_alive
|
|
|
|
return results
|
|
|
|
async def start_monitor(self, interval: int = 30):
|
|
"""启动监控任务"""
|
|
if self._running:
|
|
return
|
|
|
|
self._running = True
|
|
|
|
async def monitor_loop():
|
|
while self._running:
|
|
try:
|
|
await self._check_agents()
|
|
await asyncio.sleep(interval)
|
|
except asyncio.CancelledError:
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"监控循环出错: {e}", exc_info=True)
|
|
await asyncio.sleep(interval)
|
|
|
|
self._monitor_task = asyncio.create_task(monitor_loop())
|
|
logger.info("监控任务已启动")
|
|
|
|
async def stop_monitor(self):
|
|
"""停止监控任务"""
|
|
self._running = False
|
|
|
|
if self._monitor_task:
|
|
self._monitor_task.cancel()
|
|
try:
|
|
await self._monitor_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
logger.info("监控任务已停止")
|
|
|
|
async def _check_agents(self):
|
|
"""检查所有 Agent 状态"""
|
|
health_results = await self.monitor_agent_health()
|
|
|
|
for agent_id, is_healthy in health_results.items():
|
|
if not is_healthy:
|
|
logger.warning(f"Agent {agent_id} 不健康")
|
|
|
|
# 检查是否需要自动重启
|
|
process_info = self.processes.get(agent_id)
|
|
if process_info and process_info.config.get("auto_restart", False):
|
|
if process_info.restart_count < process_info.config.get("max_restarts", 3):
|
|
logger.info(f"自动重启 Agent: {agent_id}")
|
|
await self.restart_agent(agent_id)
|
|
else:
|
|
logger.error(f"Agent {agent_id} 重启次数超限,标记为崩溃")
|
|
process_info.status = AgentStatus.CRASHED
|
|
|
|
async def shutdown_all(self):
|
|
"""关闭所有 Agent"""
|
|
logger.info("关闭所有 Agent...")
|
|
|
|
agent_ids = list(self.processes.keys())
|
|
for agent_id in agent_ids:
|
|
await self.stop_agent(agent_id)
|
|
|
|
await self.stop_monitor()
|
|
|
|
logger.info("所有 Agent 已关闭")
|
|
|
|
def get_summary(self) -> Dict[str, Any]:
|
|
"""获取进程管理器摘要"""
|
|
running = self.get_running_agents()
|
|
total = len(self.processes)
|
|
|
|
status_counts = {}
|
|
for proc in self.processes.values():
|
|
status = proc.status.value
|
|
status_counts[status] = status_counts.get(status, 0) + 1
|
|
|
|
return {
|
|
"total_agents": total,
|
|
"running_agents": len(running),
|
|
"running_agent_ids": running,
|
|
"status_counts": status_counts,
|
|
"monitor_running": self._running
|
|
}
|
|
|
|
|
|
# 单例获取函数
|
|
_process_manager: Optional[ProcessManager] = None
|
|
|
|
|
|
def get_process_manager() -> ProcessManager:
|
|
"""获取进程管理器单例"""
|
|
global _process_manager
|
|
if _process_manager is None:
|
|
_process_manager = ProcessManager()
|
|
return _process_manager
|
|
|
|
|
|
def reset_process_manager():
|
|
"""重置进程管理器(主要用于测试)"""
|
|
global _process_manager
|
|
_process_manager = None
|