233 lines
6.9 KiB
Python
233 lines
6.9 KiB
Python
|
|
"""
|
|||
|
|
资源管理器 - 整合文件锁和心跳服务
|
|||
|
|
提供声明式的任务执行接口,自动管理资源获取和释放
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import asyncio
|
|||
|
|
import re
|
|||
|
|
from typing import List, Dict, Optional
|
|||
|
|
from dataclasses import dataclass
|
|||
|
|
|
|||
|
|
from .storage import get_storage
|
|||
|
|
from .file_lock import get_file_lock_service
|
|||
|
|
from .heartbeat import get_heartbeat_service
|
|||
|
|
from .agent_registry import get_agent_registry
|
|||
|
|
|
|||
|
|
|
|||
|
|
@dataclass
|
|||
|
|
class TaskResult:
|
|||
|
|
"""任务执行结果"""
|
|||
|
|
success: bool
|
|||
|
|
message: str
|
|||
|
|
files_locked: List[str] = None
|
|||
|
|
duration_seconds: float = 0.0
|
|||
|
|
|
|||
|
|
def __post_init__(self):
|
|||
|
|
if self.files_locked is None:
|
|||
|
|
self.files_locked = []
|
|||
|
|
|
|||
|
|
|
|||
|
|
class ResourceManager:
|
|||
|
|
"""
|
|||
|
|
资源管理器
|
|||
|
|
|
|||
|
|
整合文件锁和心跳服务,提供声明式的任务执行接口:
|
|||
|
|
- 自动解析任务中的文件路径
|
|||
|
|
- 自动获取文件锁
|
|||
|
|
- 自动更新心跳
|
|||
|
|
- 任务完成后自动释放资源
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
def __init__(self):
|
|||
|
|
self._lock_service = get_file_lock_service()
|
|||
|
|
self._heartbeat_service = get_heartbeat_service()
|
|||
|
|
self._agent_registry = get_agent_registry()
|
|||
|
|
|
|||
|
|
# 文件路径正则模式
|
|||
|
|
FILE_PATTERNS = [
|
|||
|
|
r'[\w/]+\.(py|js|ts|tsx|jsx|java|go|rs|c|cpp|h|hpp|cs|swift|kt|rb|php|sh|bash|zsh|yaml|yml|json|xml|html|css|scss|md|txt|sql)',
|
|||
|
|
r'[\w/]+/(?:src|lib|app|components|services|utils|tests|test|spec|config|assets|static|views|controllers|models|routes)/[\w./]+',
|
|||
|
|
]
|
|||
|
|
|
|||
|
|
def _extract_files_from_task(self, task_description: str) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
从任务描述中提取文件路径
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
task_description: 任务描述
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
提取的文件路径列表
|
|||
|
|
"""
|
|||
|
|
files = []
|
|||
|
|
for pattern in self.FILE_PATTERNS:
|
|||
|
|
matches = re.findall(pattern, task_description)
|
|||
|
|
files.extend(matches)
|
|||
|
|
|
|||
|
|
# 去重并过滤
|
|||
|
|
seen = set()
|
|||
|
|
result = []
|
|||
|
|
for f in files:
|
|||
|
|
# 标准化路径
|
|||
|
|
normalized = f.strip().replace('\\', '/')
|
|||
|
|
if normalized and normalized not in seen and len(normalized) > 3:
|
|||
|
|
seen.add(normalized)
|
|||
|
|
result.append(normalized)
|
|||
|
|
|
|||
|
|
return result
|
|||
|
|
|
|||
|
|
async def execute_task(
|
|||
|
|
self,
|
|||
|
|
agent_id: str,
|
|||
|
|
task_description: str,
|
|||
|
|
timeout: int = 300
|
|||
|
|
) -> TaskResult:
|
|||
|
|
"""
|
|||
|
|
执行任务(声明式接口)
|
|||
|
|
|
|||
|
|
内部流程:
|
|||
|
|
1. 解析任务需要的文件
|
|||
|
|
2. 获取所有文件锁
|
|||
|
|
3. 更新心跳状态
|
|||
|
|
4. 执行任务(这里是模拟)
|
|||
|
|
5. finally: 释放所有锁
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
agent_id: Agent ID
|
|||
|
|
task_description: 任务描述
|
|||
|
|
timeout: 超时时间(秒)
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
任务执行结果
|
|||
|
|
"""
|
|||
|
|
import time
|
|||
|
|
start_time = time.time()
|
|||
|
|
|
|||
|
|
# 1. 解析文件
|
|||
|
|
files = self._extract_files_from_task(task_description)
|
|||
|
|
|
|||
|
|
# 2. 获取文件锁
|
|||
|
|
acquired_files = []
|
|||
|
|
for file_path in files:
|
|||
|
|
success = await self._lock_service.acquire_lock(
|
|||
|
|
file_path, agent_id, agent_id[:3].upper()
|
|||
|
|
)
|
|||
|
|
if success:
|
|||
|
|
acquired_files.append(file_path)
|
|||
|
|
|
|||
|
|
try:
|
|||
|
|
# 3. 更新心跳
|
|||
|
|
await self._heartbeat_service.update_heartbeat(
|
|||
|
|
agent_id,
|
|||
|
|
status="working",
|
|||
|
|
current_task=task_description,
|
|||
|
|
progress=0
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
# 4. 执行任务(这里只是模拟,实际需要调用 Agent)
|
|||
|
|
# 实际实现中,这里会通过 CLIPluginAdapter 调用 Agent
|
|||
|
|
await asyncio.sleep(0.1) # 模拟执行
|
|||
|
|
|
|||
|
|
duration = time.time() - start_time
|
|||
|
|
|
|||
|
|
return TaskResult(
|
|||
|
|
success=True,
|
|||
|
|
message=f"Task executed: {task_description}",
|
|||
|
|
files_locked=acquired_files,
|
|||
|
|
duration_seconds=duration
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
finally:
|
|||
|
|
# 5. 释放所有锁
|
|||
|
|
for file_path in acquired_files:
|
|||
|
|
await self._lock_service.release_lock(file_path, agent_id)
|
|||
|
|
|
|||
|
|
# 更新心跳为 idle
|
|||
|
|
await self._heartbeat_service.update_heartbeat(
|
|||
|
|
agent_id,
|
|||
|
|
status="idle",
|
|||
|
|
current_task="",
|
|||
|
|
progress=100
|
|||
|
|
)
|
|||
|
|
|
|||
|
|
async def parse_task_files(self, task_description: str) -> List[str]:
|
|||
|
|
"""
|
|||
|
|
解析任务中的文件路径
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
task_description: 任务描述
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
文件路径列表
|
|||
|
|
"""
|
|||
|
|
return self._extract_files_from_task(task_description)
|
|||
|
|
|
|||
|
|
async def get_agent_status(self, agent_id: str) -> Dict:
|
|||
|
|
"""
|
|||
|
|
获取 Agent 状态(整合锁和心跳信息)
|
|||
|
|
|
|||
|
|
Args:
|
|||
|
|
agent_id: Agent ID
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
Agent 状态信息
|
|||
|
|
"""
|
|||
|
|
# 获取心跳信息
|
|||
|
|
heartbeat = await self._heartbeat_service.get_heartbeat(agent_id)
|
|||
|
|
# 获取持有的锁
|
|||
|
|
locks = await self._lock_service.get_agent_locks(agent_id)
|
|||
|
|
# 获取注册信息
|
|||
|
|
agent_info = await self._agent_registry.get_agent(agent_id)
|
|||
|
|
# 获取运行时状态
|
|||
|
|
agent_state = await self._agent_registry.get_state(agent_id)
|
|||
|
|
|
|||
|
|
return {
|
|||
|
|
"agent_id": agent_id,
|
|||
|
|
"info": {
|
|||
|
|
"name": agent_info.name if agent_info else "",
|
|||
|
|
"role": agent_info.role if agent_info else "",
|
|||
|
|
"model": agent_info.model if agent_info else "",
|
|||
|
|
},
|
|||
|
|
"heartbeat": {
|
|||
|
|
"status": heartbeat.status if heartbeat else "unknown",
|
|||
|
|
"current_task": heartbeat.current_task if heartbeat else "",
|
|||
|
|
"progress": heartbeat.progress if heartbeat else 0,
|
|||
|
|
"elapsed": heartbeat.elapsed_display if heartbeat else "",
|
|||
|
|
},
|
|||
|
|
"locks": [
|
|||
|
|
{"file": lock.file_path, "elapsed": lock.elapsed_display}
|
|||
|
|
for lock in locks
|
|||
|
|
],
|
|||
|
|
"state": {
|
|||
|
|
"task": agent_state.current_task if agent_state else "",
|
|||
|
|
"progress": agent_state.progress if agent_state else 0,
|
|||
|
|
"working_files": agent_state.working_files if agent_state else [],
|
|||
|
|
}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
async def get_all_status(self) -> List[Dict]:
|
|||
|
|
"""
|
|||
|
|
获取所有 Agent 的状态
|
|||
|
|
|
|||
|
|
Returns:
|
|||
|
|
所有 Agent 状态列表
|
|||
|
|
"""
|
|||
|
|
agents = await self._agent_registry.list_agents()
|
|||
|
|
statuses = []
|
|||
|
|
for agent in agents:
|
|||
|
|
status = await self.get_agent_status(agent.agent_id)
|
|||
|
|
statuses.append(status)
|
|||
|
|
return statuses
|
|||
|
|
|
|||
|
|
|
|||
|
|
# 全局单例
|
|||
|
|
_manager_instance: Optional[ResourceManager] = None
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_resource_manager() -> ResourceManager:
|
|||
|
|
"""获取资源管理器单例"""
|
|||
|
|
global _manager_instance
|
|||
|
|
if _manager_instance is None:
|
|||
|
|
_manager_instance = ResourceManager()
|
|||
|
|
return _manager_instance
|