Files

213 lines
6.0 KiB
Python
Raw Permalink Normal View History

"""
心跳服务 - 管理 Agent 心跳和超时检测
用于监控 Agent 活跃状态和检测掉线 Agent
"""
import asyncio
from datetime import datetime, timedelta
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from .storage import get_storage
@dataclass
class HeartbeatInfo:
"""心跳信息"""
agent_id: str
last_heartbeat: str # 最后心跳时间 (ISO format)
status: str # Agent 状态working, waiting, idle, error
current_task: str = "" # 当前任务描述
progress: int = 0 # 任务进度 0-100
@property
def elapsed_seconds(self) -> int:
"""距最后心跳的秒数"""
last_time = datetime.fromisoformat(self.last_heartbeat)
return int((datetime.now() - last_time).total_seconds())
def is_timeout(self, timeout_seconds: int = 60) -> bool:
"""是否超时"""
return self.elapsed_seconds > timeout_seconds
@property
def elapsed_display(self) -> str:
"""格式化的时间差"""
seconds = self.elapsed_seconds
if seconds < 10:
return f"{seconds}s ago"
elif seconds < 60:
return f"{seconds}s"
minutes = seconds // 60
secs = seconds % 60
return f"{minutes}m {secs:02d}s"
class HeartbeatService:
"""
心跳服务
管理所有 Agent 的心跳记录检测超时 Agent
"""
HEARTBEATS_FILE = "cache/heartbeats.json"
DEFAULT_TIMEOUT = 60 # 默认超时时间(秒)
def __init__(self):
self._storage = get_storage()
self._lock = asyncio.Lock()
async def _load_heartbeats(self) -> Dict[str, Dict]:
"""加载心跳记录"""
return await self._storage.read_json(self.HEARTBEATS_FILE)
async def _save_heartbeats(self, heartbeats: Dict[str, Dict]) -> None:
"""保存心跳记录"""
await self._storage.write_json(self.HEARTBEATS_FILE, heartbeats)
async def update_heartbeat(
self,
agent_id: str,
status: str,
current_task: str = "",
progress: int = 0
) -> None:
"""
更新 Agent 心跳
Args:
agent_id: Agent ID
status: 状态 (working, waiting, idle, error)
current_task: 当前任务
progress: 进度 0-100
"""
async with self._lock:
heartbeats = await self._load_heartbeats()
heartbeat_info = HeartbeatInfo(
agent_id=agent_id,
last_heartbeat=datetime.now().isoformat(),
status=status,
current_task=current_task,
progress=progress
)
heartbeats[agent_id] = asdict(heartbeat_info)
await self._save_heartbeats(heartbeats)
async def get_heartbeat(self, agent_id: str) -> Optional[HeartbeatInfo]:
"""
获取指定 Agent 的心跳信息
Args:
agent_id: Agent ID
Returns:
心跳信息如果不存在返回 None
"""
heartbeats = await self._load_heartbeats()
data = heartbeats.get(agent_id)
if data:
return HeartbeatInfo(**data)
return None
async def get_all_heartbeats(self) -> Dict[str, HeartbeatInfo]:
"""
获取所有 Agent 的心跳信息
Returns:
Agent ID -> 心跳信息 的字典
"""
heartbeats = await self._load_heartbeats()
result = {}
for agent_id, data in heartbeats.items():
result[agent_id] = HeartbeatInfo(**data)
return result
async def check_timeout(self, timeout_seconds: int = None) -> List[str]:
"""
检查超时的 Agent
Args:
timeout_seconds: 超时秒数默认使用 DEFAULT_TIMEOUT
Returns:
超时的 Agent ID 列表
"""
if timeout_seconds is None:
timeout_seconds = self.DEFAULT_TIMEOUT
all_heartbeats = await self.get_all_heartbeats()
timeout_agents = []
for agent_id, heartbeat in all_heartbeats.items():
if heartbeat.is_timeout(timeout_seconds):
timeout_agents.append(agent_id)
return timeout_agents
async def remove_heartbeat(self, agent_id: str) -> bool:
"""
移除 Agent 的心跳记录
Args:
agent_id: Agent ID
Returns:
是否成功移除
"""
async with self._lock:
heartbeats = await self._load_heartbeats()
if agent_id in heartbeats:
del heartbeats[agent_id]
await self._save_heartbeats(heartbeats)
return True
return False
async def get_active_agents(self, within_seconds: int = 120) -> List[str]:
"""
获取活跃的 Agent 列表
Args:
within_seconds: 活跃判定时间窗口
Returns:
活跃 Agent ID 列表
"""
all_heartbeats = await self.get_all_heartbeats()
active_agents = []
for agent_id, heartbeat in all_heartbeats.items():
if heartbeat.elapsed_seconds <= within_seconds:
active_agents.append(agent_id)
return active_agents
async def get_agents_by_status(self, status: str) -> List[HeartbeatInfo]:
"""
获取指定状态的所有 Agent
Args:
status: 状态 (working, waiting, idle, error)
Returns:
符合条件的 Agent 心跳信息列表
"""
all_heartbeats = await self.get_all_heartbeats()
return [
hb for hb in all_heartbeats.values()
if hb.status == status
]
# 全局单例
_heartbeat_service_instance: Optional[HeartbeatService] = None
def get_heartbeat_service() -> HeartbeatService:
"""获取心跳服务单例"""
global _heartbeat_service_instance
if _heartbeat_service_instance is None:
_heartbeat_service_instance = HeartbeatService()
return _heartbeat_service_instance