Files
multiAgentTry/backend/app/services/resource_manager.py
Claude Code dc398d7c7b 完整实现 Swarm 多智能体协作系统
- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py)
- 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py)
- 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py)
- 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py)
- 新增进程管理器 (backend/app/services/process_manager.py)
- 新增 Agent 控制 API (backend/app/routers/agents_control.py)
- 新增 WebSocket 实时通信 (backend/app/routers/websocket.py)
- 更新前端 AgentsPage,支持启动/停止 Agent
- 测试通过:Agent 启动、批量操作、栅栏同步

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:32:11 +08:00

233 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
资源管理器 - 整合文件锁和心跳服务
提供声明式的任务执行接口,自动管理资源获取和释放
"""
import asyncio
import re
from typing import List, Dict, Optional
from dataclasses import dataclass
from .storage import get_storage
from .file_lock import get_file_lock_service
from .heartbeat import get_heartbeat_service
from .agent_registry import get_agent_registry
@dataclass
class TaskResult:
"""任务执行结果"""
success: bool
message: str
files_locked: List[str] = None
duration_seconds: float = 0.0
def __post_init__(self):
if self.files_locked is None:
self.files_locked = []
class ResourceManager:
"""
资源管理器
整合文件锁和心跳服务,提供声明式的任务执行接口:
- 自动解析任务中的文件路径
- 自动获取文件锁
- 自动更新心跳
- 任务完成后自动释放资源
"""
def __init__(self):
self._lock_service = get_file_lock_service()
self._heartbeat_service = get_heartbeat_service()
self._agent_registry = get_agent_registry()
# 文件路径正则模式
FILE_PATTERNS = [
r'[\w/]+\.(py|js|ts|tsx|jsx|java|go|rs|c|cpp|h|hpp|cs|swift|kt|rb|php|sh|bash|zsh|yaml|yml|json|xml|html|css|scss|md|txt|sql)',
r'[\w/]+/(?:src|lib|app|components|services|utils|tests|test|spec|config|assets|static|views|controllers|models|routes)/[\w./]+',
]
def _extract_files_from_task(self, task_description: str) -> List[str]:
"""
从任务描述中提取文件路径
Args:
task_description: 任务描述
Returns:
提取的文件路径列表
"""
files = []
for pattern in self.FILE_PATTERNS:
matches = re.findall(pattern, task_description)
files.extend(matches)
# 去重并过滤
seen = set()
result = []
for f in files:
# 标准化路径
normalized = f.strip().replace('\\', '/')
if normalized and normalized not in seen and len(normalized) > 3:
seen.add(normalized)
result.append(normalized)
return result
async def execute_task(
self,
agent_id: str,
task_description: str,
timeout: int = 300
) -> TaskResult:
"""
执行任务(声明式接口)
内部流程:
1. 解析任务需要的文件
2. 获取所有文件锁
3. 更新心跳状态
4. 执行任务(这里是模拟)
5. finally: 释放所有锁
Args:
agent_id: Agent ID
task_description: 任务描述
timeout: 超时时间(秒)
Returns:
任务执行结果
"""
import time
start_time = time.time()
# 1. 解析文件
files = self._extract_files_from_task(task_description)
# 2. 获取文件锁
acquired_files = []
for file_path in files:
success = await self._lock_service.acquire_lock(
file_path, agent_id, agent_id[:3].upper()
)
if success:
acquired_files.append(file_path)
try:
# 3. 更新心跳
await self._heartbeat_service.update_heartbeat(
agent_id,
status="working",
current_task=task_description,
progress=0
)
# 4. 执行任务(这里只是模拟,实际需要调用 Agent
# 实际实现中,这里会通过 CLIPluginAdapter 调用 Agent
await asyncio.sleep(0.1) # 模拟执行
duration = time.time() - start_time
return TaskResult(
success=True,
message=f"Task executed: {task_description}",
files_locked=acquired_files,
duration_seconds=duration
)
finally:
# 5. 释放所有锁
for file_path in acquired_files:
await self._lock_service.release_lock(file_path, agent_id)
# 更新心跳为 idle
await self._heartbeat_service.update_heartbeat(
agent_id,
status="idle",
current_task="",
progress=100
)
async def parse_task_files(self, task_description: str) -> List[str]:
"""
解析任务中的文件路径
Args:
task_description: 任务描述
Returns:
文件路径列表
"""
return self._extract_files_from_task(task_description)
async def get_agent_status(self, agent_id: str) -> Dict:
"""
获取 Agent 状态(整合锁和心跳信息)
Args:
agent_id: Agent ID
Returns:
Agent 状态信息
"""
# 获取心跳信息
heartbeat = await self._heartbeat_service.get_heartbeat(agent_id)
# 获取持有的锁
locks = await self._lock_service.get_agent_locks(agent_id)
# 获取注册信息
agent_info = await self._agent_registry.get_agent(agent_id)
# 获取运行时状态
agent_state = await self._agent_registry.get_state(agent_id)
return {
"agent_id": agent_id,
"info": {
"name": agent_info.name if agent_info else "",
"role": agent_info.role if agent_info else "",
"model": agent_info.model if agent_info else "",
},
"heartbeat": {
"status": heartbeat.status if heartbeat else "unknown",
"current_task": heartbeat.current_task if heartbeat else "",
"progress": heartbeat.progress if heartbeat else 0,
"elapsed": heartbeat.elapsed_display if heartbeat else "",
},
"locks": [
{"file": lock.file_path, "elapsed": lock.elapsed_display}
for lock in locks
],
"state": {
"task": agent_state.current_task if agent_state else "",
"progress": agent_state.progress if agent_state else 0,
"working_files": agent_state.working_files if agent_state else [],
}
}
async def get_all_status(self) -> List[Dict]:
"""
获取所有 Agent 的状态
Returns:
所有 Agent 状态列表
"""
agents = await self._agent_registry.list_agents()
statuses = []
for agent in agents:
status = await self.get_agent_status(agent.agent_id)
statuses.append(status)
return statuses
# 全局单例
_manager_instance: Optional[ResourceManager] = None
def get_resource_manager() -> ResourceManager:
"""获取资源管理器单例"""
global _manager_instance
if _manager_instance is None:
_manager_instance = ResourceManager()
return _manager_instance