Files
multiAgentTry/backend/app/services/resource_manager.py

233 lines
6.9 KiB
Python
Raw Permalink Normal View History

"""
资源管理器 - 整合文件锁和心跳服务
提供声明式的任务执行接口自动管理资源获取和释放
"""
import asyncio
import re
from typing import List, Dict, Optional
from dataclasses import dataclass
from .storage import get_storage
from .file_lock import get_file_lock_service
from .heartbeat import get_heartbeat_service
from .agent_registry import get_agent_registry
@dataclass
class TaskResult:
"""任务执行结果"""
success: bool
message: str
files_locked: List[str] = None
duration_seconds: float = 0.0
def __post_init__(self):
if self.files_locked is None:
self.files_locked = []
class ResourceManager:
"""
资源管理器
整合文件锁和心跳服务提供声明式的任务执行接口
- 自动解析任务中的文件路径
- 自动获取文件锁
- 自动更新心跳
- 任务完成后自动释放资源
"""
def __init__(self):
self._lock_service = get_file_lock_service()
self._heartbeat_service = get_heartbeat_service()
self._agent_registry = get_agent_registry()
# 文件路径正则模式
FILE_PATTERNS = [
r'[\w/]+\.(py|js|ts|tsx|jsx|java|go|rs|c|cpp|h|hpp|cs|swift|kt|rb|php|sh|bash|zsh|yaml|yml|json|xml|html|css|scss|md|txt|sql)',
r'[\w/]+/(?:src|lib|app|components|services|utils|tests|test|spec|config|assets|static|views|controllers|models|routes)/[\w./]+',
]
def _extract_files_from_task(self, task_description: str) -> List[str]:
"""
从任务描述中提取文件路径
Args:
task_description: 任务描述
Returns:
提取的文件路径列表
"""
files = []
for pattern in self.FILE_PATTERNS:
matches = re.findall(pattern, task_description)
files.extend(matches)
# 去重并过滤
seen = set()
result = []
for f in files:
# 标准化路径
normalized = f.strip().replace('\\', '/')
if normalized and normalized not in seen and len(normalized) > 3:
seen.add(normalized)
result.append(normalized)
return result
async def execute_task(
self,
agent_id: str,
task_description: str,
timeout: int = 300
) -> TaskResult:
"""
执行任务声明式接口
内部流程
1. 解析任务需要的文件
2. 获取所有文件锁
3. 更新心跳状态
4. 执行任务这里是模拟
5. finally: 释放所有锁
Args:
agent_id: Agent ID
task_description: 任务描述
timeout: 超时时间
Returns:
任务执行结果
"""
import time
start_time = time.time()
# 1. 解析文件
files = self._extract_files_from_task(task_description)
# 2. 获取文件锁
acquired_files = []
for file_path in files:
success = await self._lock_service.acquire_lock(
file_path, agent_id, agent_id[:3].upper()
)
if success:
acquired_files.append(file_path)
try:
# 3. 更新心跳
await self._heartbeat_service.update_heartbeat(
agent_id,
status="working",
current_task=task_description,
progress=0
)
# 4. 执行任务(这里只是模拟,实际需要调用 Agent
# 实际实现中,这里会通过 CLIPluginAdapter 调用 Agent
await asyncio.sleep(0.1) # 模拟执行
duration = time.time() - start_time
return TaskResult(
success=True,
message=f"Task executed: {task_description}",
files_locked=acquired_files,
duration_seconds=duration
)
finally:
# 5. 释放所有锁
for file_path in acquired_files:
await self._lock_service.release_lock(file_path, agent_id)
# 更新心跳为 idle
await self._heartbeat_service.update_heartbeat(
agent_id,
status="idle",
current_task="",
progress=100
)
async def parse_task_files(self, task_description: str) -> List[str]:
"""
解析任务中的文件路径
Args:
task_description: 任务描述
Returns:
文件路径列表
"""
return self._extract_files_from_task(task_description)
async def get_agent_status(self, agent_id: str) -> Dict:
"""
获取 Agent 状态整合锁和心跳信息
Args:
agent_id: Agent ID
Returns:
Agent 状态信息
"""
# 获取心跳信息
heartbeat = await self._heartbeat_service.get_heartbeat(agent_id)
# 获取持有的锁
locks = await self._lock_service.get_agent_locks(agent_id)
# 获取注册信息
agent_info = await self._agent_registry.get_agent(agent_id)
# 获取运行时状态
agent_state = await self._agent_registry.get_state(agent_id)
return {
"agent_id": agent_id,
"info": {
"name": agent_info.name if agent_info else "",
"role": agent_info.role if agent_info else "",
"model": agent_info.model if agent_info else "",
},
"heartbeat": {
"status": heartbeat.status if heartbeat else "unknown",
"current_task": heartbeat.current_task if heartbeat else "",
"progress": heartbeat.progress if heartbeat else 0,
"elapsed": heartbeat.elapsed_display if heartbeat else "",
},
"locks": [
{"file": lock.file_path, "elapsed": lock.elapsed_display}
for lock in locks
],
"state": {
"task": agent_state.current_task if agent_state else "",
"progress": agent_state.progress if agent_state else 0,
"working_files": agent_state.working_files if agent_state else [],
}
}
async def get_all_status(self) -> List[Dict]:
"""
获取所有 Agent 的状态
Returns:
所有 Agent 状态列表
"""
agents = await self._agent_registry.list_agents()
statuses = []
for agent in agents:
status = await self.get_agent_status(agent.agent_id)
statuses.append(status)
return statuses
# 全局单例
_manager_instance: Optional[ResourceManager] = None
def get_resource_manager() -> ResourceManager:
"""获取资源管理器单例"""
global _manager_instance
if _manager_instance is None:
_manager_instance = ResourceManager()
return _manager_instance