- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py) - 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py) - 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py) - 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py) - 新增进程管理器 (backend/app/services/process_manager.py) - 新增 Agent 控制 API (backend/app/routers/agents_control.py) - 新增 WebSocket 实时通信 (backend/app/routers/websocket.py) - 更新前端 AgentsPage,支持启动/停止 Agent - 测试通过:Agent 启动、批量操作、栅栏同步 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
213 lines
6.0 KiB
Python
213 lines
6.0 KiB
Python
"""
|
||
心跳服务 - 管理 Agent 心跳和超时检测
|
||
用于监控 Agent 活跃状态和检测掉线 Agent
|
||
"""
|
||
|
||
import asyncio
|
||
from datetime import datetime, timedelta
|
||
from typing import Dict, List, Optional
|
||
from dataclasses import dataclass, asdict
|
||
|
||
from .storage import get_storage
|
||
|
||
|
||
@dataclass
|
||
class HeartbeatInfo:
|
||
"""心跳信息"""
|
||
agent_id: str
|
||
last_heartbeat: str # 最后心跳时间 (ISO format)
|
||
status: str # Agent 状态:working, waiting, idle, error
|
||
current_task: str = "" # 当前任务描述
|
||
progress: int = 0 # 任务进度 0-100
|
||
|
||
@property
|
||
def elapsed_seconds(self) -> int:
|
||
"""距最后心跳的秒数"""
|
||
last_time = datetime.fromisoformat(self.last_heartbeat)
|
||
return int((datetime.now() - last_time).total_seconds())
|
||
|
||
def is_timeout(self, timeout_seconds: int = 60) -> bool:
|
||
"""是否超时"""
|
||
return self.elapsed_seconds > timeout_seconds
|
||
|
||
@property
|
||
def elapsed_display(self) -> str:
|
||
"""格式化的时间差"""
|
||
seconds = self.elapsed_seconds
|
||
if seconds < 10:
|
||
return f"{seconds}s ago"
|
||
elif seconds < 60:
|
||
return f"{seconds}s"
|
||
minutes = seconds // 60
|
||
secs = seconds % 60
|
||
return f"{minutes}m {secs:02d}s"
|
||
|
||
|
||
class HeartbeatService:
|
||
"""
|
||
心跳服务
|
||
|
||
管理所有 Agent 的心跳记录,检测超时 Agent
|
||
"""
|
||
|
||
HEARTBEATS_FILE = "cache/heartbeats.json"
|
||
DEFAULT_TIMEOUT = 60 # 默认超时时间(秒)
|
||
|
||
def __init__(self):
|
||
self._storage = get_storage()
|
||
self._lock = asyncio.Lock()
|
||
|
||
async def _load_heartbeats(self) -> Dict[str, Dict]:
|
||
"""加载心跳记录"""
|
||
return await self._storage.read_json(self.HEARTBEATS_FILE)
|
||
|
||
async def _save_heartbeats(self, heartbeats: Dict[str, Dict]) -> None:
|
||
"""保存心跳记录"""
|
||
await self._storage.write_json(self.HEARTBEATS_FILE, heartbeats)
|
||
|
||
async def update_heartbeat(
|
||
self,
|
||
agent_id: str,
|
||
status: str,
|
||
current_task: str = "",
|
||
progress: int = 0
|
||
) -> None:
|
||
"""
|
||
更新 Agent 心跳
|
||
|
||
Args:
|
||
agent_id: Agent ID
|
||
status: 状态 (working, waiting, idle, error)
|
||
current_task: 当前任务
|
||
progress: 进度 0-100
|
||
"""
|
||
async with self._lock:
|
||
heartbeats = await self._load_heartbeats()
|
||
|
||
heartbeat_info = HeartbeatInfo(
|
||
agent_id=agent_id,
|
||
last_heartbeat=datetime.now().isoformat(),
|
||
status=status,
|
||
current_task=current_task,
|
||
progress=progress
|
||
)
|
||
|
||
heartbeats[agent_id] = asdict(heartbeat_info)
|
||
await self._save_heartbeats(heartbeats)
|
||
|
||
async def get_heartbeat(self, agent_id: str) -> Optional[HeartbeatInfo]:
|
||
"""
|
||
获取指定 Agent 的心跳信息
|
||
|
||
Args:
|
||
agent_id: Agent ID
|
||
|
||
Returns:
|
||
心跳信息,如果不存在返回 None
|
||
"""
|
||
heartbeats = await self._load_heartbeats()
|
||
data = heartbeats.get(agent_id)
|
||
if data:
|
||
return HeartbeatInfo(**data)
|
||
return None
|
||
|
||
async def get_all_heartbeats(self) -> Dict[str, HeartbeatInfo]:
|
||
"""
|
||
获取所有 Agent 的心跳信息
|
||
|
||
Returns:
|
||
Agent ID -> 心跳信息 的字典
|
||
"""
|
||
heartbeats = await self._load_heartbeats()
|
||
result = {}
|
||
for agent_id, data in heartbeats.items():
|
||
result[agent_id] = HeartbeatInfo(**data)
|
||
return result
|
||
|
||
async def check_timeout(self, timeout_seconds: int = None) -> List[str]:
|
||
"""
|
||
检查超时的 Agent
|
||
|
||
Args:
|
||
timeout_seconds: 超时秒数,默认使用 DEFAULT_TIMEOUT
|
||
|
||
Returns:
|
||
超时的 Agent ID 列表
|
||
"""
|
||
if timeout_seconds is None:
|
||
timeout_seconds = self.DEFAULT_TIMEOUT
|
||
|
||
all_heartbeats = await self.get_all_heartbeats()
|
||
timeout_agents = []
|
||
|
||
for agent_id, heartbeat in all_heartbeats.items():
|
||
if heartbeat.is_timeout(timeout_seconds):
|
||
timeout_agents.append(agent_id)
|
||
|
||
return timeout_agents
|
||
|
||
async def remove_heartbeat(self, agent_id: str) -> bool:
|
||
"""
|
||
移除 Agent 的心跳记录
|
||
|
||
Args:
|
||
agent_id: Agent ID
|
||
|
||
Returns:
|
||
是否成功移除
|
||
"""
|
||
async with self._lock:
|
||
heartbeats = await self._load_heartbeats()
|
||
if agent_id in heartbeats:
|
||
del heartbeats[agent_id]
|
||
await self._save_heartbeats(heartbeats)
|
||
return True
|
||
return False
|
||
|
||
async def get_active_agents(self, within_seconds: int = 120) -> List[str]:
|
||
"""
|
||
获取活跃的 Agent 列表
|
||
|
||
Args:
|
||
within_seconds: 活跃判定时间窗口(秒)
|
||
|
||
Returns:
|
||
活跃 Agent ID 列表
|
||
"""
|
||
all_heartbeats = await self.get_all_heartbeats()
|
||
active_agents = []
|
||
|
||
for agent_id, heartbeat in all_heartbeats.items():
|
||
if heartbeat.elapsed_seconds <= within_seconds:
|
||
active_agents.append(agent_id)
|
||
|
||
return active_agents
|
||
|
||
async def get_agents_by_status(self, status: str) -> List[HeartbeatInfo]:
|
||
"""
|
||
获取指定状态的所有 Agent
|
||
|
||
Args:
|
||
status: 状态 (working, waiting, idle, error)
|
||
|
||
Returns:
|
||
符合条件的 Agent 心跳信息列表
|
||
"""
|
||
all_heartbeats = await self.get_all_heartbeats()
|
||
return [
|
||
hb for hb in all_heartbeats.values()
|
||
if hb.status == status
|
||
]
|
||
|
||
|
||
# 全局单例
|
||
_heartbeat_service_instance: Optional[HeartbeatService] = None
|
||
|
||
|
||
def get_heartbeat_service() -> HeartbeatService:
|
||
"""获取心跳服务单例"""
|
||
global _heartbeat_service_instance
|
||
if _heartbeat_service_instance is None:
|
||
_heartbeat_service_instance = HeartbeatService()
|
||
return _heartbeat_service_instance
|