Files
multiAgentTry/backend/app/services/heartbeat.py
Claude Code dc398d7c7b 完整实现 Swarm 多智能体协作系统
- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py)
- 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py)
- 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py)
- 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py)
- 新增进程管理器 (backend/app/services/process_manager.py)
- 新增 Agent 控制 API (backend/app/routers/agents_control.py)
- 新增 WebSocket 实时通信 (backend/app/routers/websocket.py)
- 更新前端 AgentsPage,支持启动/停止 Agent
- 测试通过:Agent 启动、批量操作、栅栏同步

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:32:11 +08:00

213 lines
6.0 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
心跳服务 - 管理 Agent 心跳和超时检测
用于监控 Agent 活跃状态和检测掉线 Agent
"""
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from .storage import get_storage
@dataclass
class HeartbeatInfo:
"""心跳信息"""
agent_id: str
last_heartbeat: str # 最后心跳时间 (ISO format)
status: str # Agent 状态working, waiting, idle, error
current_task: str = "" # 当前任务描述
progress: int = 0 # 任务进度 0-100
@property
def elapsed_seconds(self) -> int:
"""距最后心跳的秒数"""
last_time = datetime.fromisoformat(self.last_heartbeat)
return int((datetime.now() - last_time).total_seconds())
def is_timeout(self, timeout_seconds: int = 60) -> bool:
"""是否超时"""
return self.elapsed_seconds > timeout_seconds
@property
def elapsed_display(self) -> str:
"""格式化的时间差"""
seconds = self.elapsed_seconds
if seconds < 10:
return f"{seconds}s ago"
elif seconds < 60:
return f"{seconds}s"
minutes = seconds // 60
secs = seconds % 60
return f"{minutes}m {secs:02d}s"
class HeartbeatService:
"""
心跳服务
管理所有 Agent 的心跳记录,检测超时 Agent
"""
HEARTBEATS_FILE = "cache/heartbeats.json"
DEFAULT_TIMEOUT = 60 # 默认超时时间(秒)
def __init__(self):
self._storage = get_storage()
self._lock = asyncio.Lock()
async def _load_heartbeats(self) -> Dict[str, Dict]:
"""加载心跳记录"""
return await self._storage.read_json(self.HEARTBEATS_FILE)
async def _save_heartbeats(self, heartbeats: Dict[str, Dict]) -> None:
"""保存心跳记录"""
await self._storage.write_json(self.HEARTBEATS_FILE, heartbeats)
async def update_heartbeat(
self,
agent_id: str,
status: str,
current_task: str = "",
progress: int = 0
) -> None:
"""
更新 Agent 心跳
Args:
agent_id: Agent ID
status: 状态 (working, waiting, idle, error)
current_task: 当前任务
progress: 进度 0-100
"""
async with self._lock:
heartbeats = await self._load_heartbeats()
heartbeat_info = HeartbeatInfo(
agent_id=agent_id,
last_heartbeat=datetime.now().isoformat(),
status=status,
current_task=current_task,
progress=progress
)
heartbeats[agent_id] = asdict(heartbeat_info)
await self._save_heartbeats(heartbeats)
async def get_heartbeat(self, agent_id: str) -> Optional[HeartbeatInfo]:
"""
获取指定 Agent 的心跳信息
Args:
agent_id: Agent ID
Returns:
心跳信息,如果不存在返回 None
"""
heartbeats = await self._load_heartbeats()
data = heartbeats.get(agent_id)
if data:
return HeartbeatInfo(**data)
return None
async def get_all_heartbeats(self) -> Dict[str, HeartbeatInfo]:
"""
获取所有 Agent 的心跳信息
Returns:
Agent ID -> 心跳信息 的字典
"""
heartbeats = await self._load_heartbeats()
result = {}
for agent_id, data in heartbeats.items():
result[agent_id] = HeartbeatInfo(**data)
return result
async def check_timeout(self, timeout_seconds: int = None) -> List[str]:
"""
检查超时的 Agent
Args:
timeout_seconds: 超时秒数,默认使用 DEFAULT_TIMEOUT
Returns:
超时的 Agent ID 列表
"""
if timeout_seconds is None:
timeout_seconds = self.DEFAULT_TIMEOUT
all_heartbeats = await self.get_all_heartbeats()
timeout_agents = []
for agent_id, heartbeat in all_heartbeats.items():
if heartbeat.is_timeout(timeout_seconds):
timeout_agents.append(agent_id)
return timeout_agents
async def remove_heartbeat(self, agent_id: str) -> bool:
"""
移除 Agent 的心跳记录
Args:
agent_id: Agent ID
Returns:
是否成功移除
"""
async with self._lock:
heartbeats = await self._load_heartbeats()
if agent_id in heartbeats:
del heartbeats[agent_id]
await self._save_heartbeats(heartbeats)
return True
return False
async def get_active_agents(self, within_seconds: int = 120) -> List[str]:
"""
获取活跃的 Agent 列表
Args:
within_seconds: 活跃判定时间窗口(秒)
Returns:
活跃 Agent ID 列表
"""
all_heartbeats = await self.get_all_heartbeats()
active_agents = []
for agent_id, heartbeat in all_heartbeats.items():
if heartbeat.elapsed_seconds <= within_seconds:
active_agents.append(agent_id)
return active_agents
async def get_agents_by_status(self, status: str) -> List[HeartbeatInfo]:
"""
获取指定状态的所有 Agent
Args:
status: 状态 (working, waiting, idle, error)
Returns:
符合条件的 Agent 心跳信息列表
"""
all_heartbeats = await self.get_all_heartbeats()
return [
hb for hb in all_heartbeats.values()
if hb.status == status
]
# 全局单例
_heartbeat_service_instance: Optional[HeartbeatService] = None
def get_heartbeat_service() -> HeartbeatService:
"""获取心跳服务单例"""
global _heartbeat_service_instance
if _heartbeat_service_instance is None:
_heartbeat_service_instance = HeartbeatService()
return _heartbeat_service_instance