Files

262 lines
7.1 KiB
Python
Raw Permalink Normal View History

"""
Agent 注册服务 - 管理 Agent 的注册信息和状态
每个 Agent 有独立的目录存储其配置和状态
"""
import asyncio
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, asdict
from enum import Enum
from .storage import get_storage
class AgentRole(str, Enum):
"""Agent 角色枚举"""
ARCHITECT = "architect"
PRODUCT_MANAGER = "pm"
DEVELOPER = "developer"
QA = "qa"
REVIEWER = "reviewer"
HUMAN = "human"
@dataclass
class AgentInfo:
"""Agent 基本信息"""
agent_id: str # 唯一标识符,如 claude-001
name: str # 显示名称,如 Claude Code
role: str # 角色architect, pm, developer, qa, reviewer, human
model: str # 模型claude-opus-4.6, gpt-4o, human 等
description: str = "" # 描述
created_at: str = "" # 注册时间
status: str = "idle" # 当前状态
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
@dataclass
class AgentState:
"""Agent 运行时状态"""
agent_id: str
current_task: str = ""
progress: int = 0
working_files: List[str] = None
last_update: str = ""
def __post_init__(self):
if self.working_files is None:
self.working_files = []
if not self.last_update:
self.last_update = datetime.now().isoformat()
class AgentRegistry:
"""
Agent 注册服务
管理所有 Agent 的注册信息和运行时状态
"""
def __init__(self):
self._storage = get_storage()
self._lock = asyncio.Lock()
def _get_agent_dir(self, agent_id: str) -> str:
"""获取 Agent 目录路径"""
return f"agents/{agent_id}"
def _get_agent_info_file(self, agent_id: str) -> str:
"""获取 Agent 信息文件路径"""
return f"{self._get_agent_dir(agent_id)}/info.json"
def _get_agent_state_file(self, agent_id: str) -> str:
"""获取 Agent 状态文件路径"""
return f"{self._get_agent_dir(agent_id)}/state.json"
async def register_agent(
self,
agent_id: str,
name: str,
role: str,
model: str,
description: str = ""
) -> AgentInfo:
"""
注册新 Agent
Args:
agent_id: Agent ID
name: 显示名称
role: 角色
model: 模型
description: 描述
Returns:
注册的 Agent 信息
"""
async with self._lock:
agent_info = AgentInfo(
agent_id=agent_id,
name=name,
role=role,
model=model,
description=description
)
# 确保 Agent 目录存在
await self._storage.ensure_dir(self._get_agent_dir(agent_id))
# 保存 Agent 信息
await self._storage.write_json(
self._get_agent_info_file(agent_id),
asdict(agent_info)
)
# 初始化状态
await self._storage.write_json(
self._get_agent_state_file(agent_id),
asdict(AgentState(agent_id=agent_id))
)
return agent_info
async def get_agent(self, agent_id: str) -> Optional[AgentInfo]:
"""
获取 Agent 信息
Args:
agent_id: Agent ID
Returns:
Agent 信息不存在返回 None
"""
data = await self._storage.read_json(self._get_agent_info_file(agent_id))
if data:
return AgentInfo(**data)
return None
async def list_agents(self) -> List[AgentInfo]:
"""
列出所有已注册的 Agent
Returns:
Agent 信息列表
"""
agents = []
agents_dir = Path(self._storage.base_path) / "agents"
if not agents_dir.exists():
return []
for agent_dir in agents_dir.iterdir():
if agent_dir.is_dir():
info_file = agent_dir / "info.json"
if info_file.exists():
data = await self._storage.read_json(f"agents/{agent_dir.name}/info.json")
if data:
agents.append(AgentInfo(**data))
return agents
async def update_state(
self,
agent_id: str,
task: str = "",
progress: int = 0,
working_files: List[str] = None
) -> None:
"""
更新 Agent 状态
Args:
agent_id: Agent ID
task: 当前任务
progress: 进度 0-100
working_files: 正在处理的文件列表
"""
async with self._lock:
state_file = self._get_agent_state_file(agent_id)
# 读取现有状态
existing = await self._storage.read_json(state_file)
# 更新状态
state = AgentState(
agent_id=agent_id,
current_task=task or existing.get("current_task", ""),
progress=progress or existing.get("progress", 0),
working_files=working_files or existing.get("working_files", []),
last_update=datetime.now().isoformat()
)
await self._storage.write_json(state_file, asdict(state))
async def get_state(self, agent_id: str) -> Optional[AgentState]:
"""
获取 Agent 状态
Args:
agent_id: Agent ID
Returns:
Agent 状态不存在返回 None
"""
data = await self._storage.read_json(self._get_agent_state_file(agent_id))
if data:
return AgentState(**data)
return None
async def unregister_agent(self, agent_id: str) -> bool:
"""
注销 Agent
Args:
agent_id: Agent ID
Returns:
是否成功注销
"""
async with self._lock:
agent_info = await self.get_agent(agent_id)
if not agent_info:
return False
# 删除 Agent 目录
agent_dir = self._get_agent_dir(agent_id)
# 实际实现中可能需要递归删除
# 这里简化处理,只删除 info.json 和 state.json
await self._storage.delete(f"{agent_dir}/info.json")
await self._storage.delete(f"{agent_dir}/state.json")
return True
async def get_agents_by_role(self, role: str) -> List[AgentInfo]:
"""
获取指定角色的所有 Agent
Args:
role: 角色
Returns:
符合条件的 Agent 列表
"""
all_agents = await self.list_agents()
return [agent for agent in all_agents if agent.role == role]
# 全局单例
_registry_instance: Optional[AgentRegistry] = None
def get_agent_registry() -> AgentRegistry:
"""获取 Agent 注册服务单例"""
global _registry_instance
if _registry_instance is None:
_registry_instance = AgentRegistry()
return _registry_instance