""" Agent 注册服务 - 管理 Agent 的注册信息和状态 每个 Agent 有独立的目录存储其配置和状态 """ import asyncio from datetime import datetime from pathlib import Path from typing import Dict, List, Optional from dataclasses import dataclass, asdict from enum import Enum from .storage import get_storage class AgentRole(str, Enum): """Agent 角色枚举""" ARCHITECT = "architect" PRODUCT_MANAGER = "pm" DEVELOPER = "developer" QA = "qa" REVIEWER = "reviewer" HUMAN = "human" @dataclass class AgentInfo: """Agent 基本信息""" agent_id: str # 唯一标识符,如 claude-001 name: str # 显示名称,如 Claude Code role: str # 角色:architect, pm, developer, qa, reviewer, human model: str # 模型:claude-opus-4.6, gpt-4o, human 等 description: str = "" # 描述 created_at: str = "" # 注册时间 status: str = "idle" # 当前状态 def __post_init__(self): if not self.created_at: self.created_at = datetime.now().isoformat() @dataclass class AgentState: """Agent 运行时状态""" agent_id: str current_task: str = "" progress: int = 0 working_files: List[str] = None last_update: str = "" def __post_init__(self): if self.working_files is None: self.working_files = [] if not self.last_update: self.last_update = datetime.now().isoformat() class AgentRegistry: """ Agent 注册服务 管理所有 Agent 的注册信息和运行时状态 """ def __init__(self): self._storage = get_storage() self._lock = asyncio.Lock() def _get_agent_dir(self, agent_id: str) -> str: """获取 Agent 目录路径""" return f"agents/{agent_id}" def _get_agent_info_file(self, agent_id: str) -> str: """获取 Agent 信息文件路径""" return f"{self._get_agent_dir(agent_id)}/info.json" def _get_agent_state_file(self, agent_id: str) -> str: """获取 Agent 状态文件路径""" return f"{self._get_agent_dir(agent_id)}/state.json" async def register_agent( self, agent_id: str, name: str, role: str, model: str, description: str = "" ) -> AgentInfo: """ 注册新 Agent Args: agent_id: Agent ID name: 显示名称 role: 角色 model: 模型 description: 描述 Returns: 注册的 Agent 信息 """ async with self._lock: agent_info = AgentInfo( agent_id=agent_id, name=name, role=role, model=model, description=description ) # 确保 Agent 目录存在 await self._storage.ensure_dir(self._get_agent_dir(agent_id)) # 保存 Agent 信息 await self._storage.write_json( self._get_agent_info_file(agent_id), asdict(agent_info) ) # 初始化状态 await self._storage.write_json( self._get_agent_state_file(agent_id), asdict(AgentState(agent_id=agent_id)) ) return agent_info async def get_agent(self, agent_id: str) -> Optional[AgentInfo]: """ 获取 Agent 信息 Args: agent_id: Agent ID Returns: Agent 信息,不存在返回 None """ data = await self._storage.read_json(self._get_agent_info_file(agent_id)) if data: return AgentInfo(**data) return None async def list_agents(self) -> List[AgentInfo]: """ 列出所有已注册的 Agent Returns: Agent 信息列表 """ agents = [] agents_dir = Path(self._storage.base_path) / "agents" if not agents_dir.exists(): return [] for agent_dir in agents_dir.iterdir(): if agent_dir.is_dir(): info_file = agent_dir / "info.json" if info_file.exists(): data = await self._storage.read_json(f"agents/{agent_dir.name}/info.json") if data: agents.append(AgentInfo(**data)) return agents async def update_state( self, agent_id: str, task: str = "", progress: int = 0, working_files: List[str] = None ) -> None: """ 更新 Agent 状态 Args: agent_id: Agent ID task: 当前任务 progress: 进度 0-100 working_files: 正在处理的文件列表 """ async with self._lock: state_file = self._get_agent_state_file(agent_id) # 读取现有状态 existing = await self._storage.read_json(state_file) # 更新状态 state = AgentState( agent_id=agent_id, current_task=task or existing.get("current_task", ""), progress=progress or existing.get("progress", 0), working_files=working_files or existing.get("working_files", []), last_update=datetime.now().isoformat() ) await self._storage.write_json(state_file, asdict(state)) async def get_state(self, agent_id: str) -> Optional[AgentState]: """ 获取 Agent 状态 Args: agent_id: Agent ID Returns: Agent 状态,不存在返回 None """ data = await self._storage.read_json(self._get_agent_state_file(agent_id)) if data: return AgentState(**data) return None async def unregister_agent(self, agent_id: str) -> bool: """ 注销 Agent Args: agent_id: Agent ID Returns: 是否成功注销 """ async with self._lock: agent_info = await self.get_agent(agent_id) if not agent_info: return False # 删除 Agent 目录 agent_dir = self._get_agent_dir(agent_id) # 实际实现中可能需要递归删除 # 这里简化处理,只删除 info.json 和 state.json await self._storage.delete(f"{agent_dir}/info.json") await self._storage.delete(f"{agent_dir}/state.json") return True async def get_agents_by_role(self, role: str) -> List[AgentInfo]: """ 获取指定角色的所有 Agent Args: role: 角色 Returns: 符合条件的 Agent 列表 """ all_agents = await self.list_agents() return [agent for agent in all_agents if agent.role == role] # 全局单例 _registry_instance: Optional[AgentRegistry] = None def get_agent_registry() -> AgentRegistry: """获取 Agent 注册服务单例""" global _registry_instance if _registry_instance is None: _registry_instance = AgentRegistry() return _registry_instance