Files
multiAgentTry/backend/app/core/agent_adapter.py
Claude Code dc398d7c7b 完整实现 Swarm 多智能体协作系统
- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py)
- 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py)
- 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py)
- 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py)
- 新增进程管理器 (backend/app/services/process_manager.py)
- 新增 Agent 控制 API (backend/app/routers/agents_control.py)
- 新增 WebSocket 实时通信 (backend/app/routers/websocket.py)
- 更新前端 AgentsPage,支持启动/停止 Agent
- 测试通过:Agent 启动、批量操作、栅栏同步

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:32:11 +08:00

225 lines
5.6 KiB
Python

"""
Swarm Agent 适配器核心接口
定义所有 Agent 适配器必须实现的统一接口,确保不同类型的 Agent
(原生 LLM Agent、外部 CLI 工具包装 Agent 等)能够无缝协作。
"""
from abc import ABC, abstractmethod
from typing import Dict, Optional, Any
from dataclasses import dataclass, field
from datetime import datetime
import asyncio
@dataclass
class Task:
"""Agent 任务描述"""
description: str
task_id: str
context: Dict[str, Any] = field(default_factory=dict)
priority: str = "medium" # high, medium, low
deadline: Optional[datetime] = None
metadata: Dict[str, Any] = field(default_factory=dict)
def to_dict(self) -> Dict:
return {
"task_id": self.task_id,
"description": self.description,
"context": self.context,
"priority": self.priority,
"deadline": self.deadline.isoformat() if self.deadline else None,
"metadata": self.metadata
}
@dataclass
class Result:
"""Agent 任务执行结果"""
success: bool
output: str
error: Optional[str] = None
metadata: Dict[str, Any] = field(default_factory=dict)
execution_time: float = 0.0
tokens_used: int = 0
def to_dict(self) -> Dict:
return {
"success": self.success,
"output": self.output,
"error": self.error,
"metadata": self.metadata,
"execution_time": self.execution_time,
"tokens_used": self.tokens_used
}
@dataclass
class AgentCapabilities:
"""Agent 能力声明"""
can_execute_code: bool = False
can_read_files: bool = False
can_write_files: bool = False
can_analyze_code: bool = False
can_generate_tests: bool = False
can_review_code: bool = False
supported_languages: list = field(default_factory=list)
max_context_length: int = 200000
class CLIPluginAdapter(ABC):
"""
CLI 工具适配器统一接口
所有 Agent 适配器必须实现此接口,确保:
1. 统一的任务执行方式
2. 一致的会议参与机制
3. 标准化的状态管理
4. 可靠的心跳报告
"""
@property
@abstractmethod
def id(self) -> str:
"""Agent 唯一标识符"""
pass
@property
@abstractmethod
def name(self) -> str:
"""Agent 显示名称"""
pass
@property
@abstractmethod
def version(self) -> str:
"""适配器版本号"""
pass
@property
def capabilities(self) -> AgentCapabilities:
"""Agent 能力声明"""
return AgentCapabilities()
# ========== 核心能力 ==========
@abstractmethod
async def execute(self, task: Task) -> Result:
"""
执行任务
这是 Agent 的核心方法,负责:
1. 解析任务描述
2. 调用适当的模型/API
3. 处理执行结果
4. 返回标准化结果
"""
pass
@abstractmethod
async def join_meeting(self, meeting_id: str, timeout: int = 300) -> str:
"""
加入会议等待队列(栅栏同步)
当 Agent 需要参与会议时调用此方法:
1. 向协调服务报告"我准备好了"
2. 等待其他参与者到达
3. 当所有人都到达时,会议自动开始
返回值:
- "started": 会议已开始
- "timeout": 等待超时
- "cancelled": 会议被取消
"""
pass
@abstractmethod
async def write_state(self, state: Dict) -> None:
"""
写入自己的状态文件
状态文件存储在 .doc/agents/{agent_id}/state.json
包含:当前任务、进度、临时数据等
"""
pass
@abstractmethod
async def read_others(self, agent_id: str) -> Dict:
"""
读取其他 Agent 的状态
用于 Agent 之间互相了解对方的工作状态
"""
pass
@abstractmethod
async def update_heartbeat(self, status: str, task: str = "", progress: int = 0) -> None:
"""
更新心跳
参数:
- status: working, waiting, idle, error
- task: 当前任务描述
- progress: 进度百分比 0-100
"""
pass
# ========== 生命周期钩子 ==========
async def initialize(self) -> None:
"""Agent 初始化时调用"""
pass
async def shutdown(self) -> None:
"""Agent 关闭前调用"""
pass
async def health_check(self) -> bool:
"""健康检查"""
return True
# ========== 会议相关 ==========
async def propose(self, meeting_id: str, content: str, step: str = "") -> None:
"""在会议中提出提案"""
pass
async def discuss(self, meeting_id: str, content: str, step: str = "") -> None:
"""在会议中参与讨论"""
pass
async def vote(self, meeting_id: str, proposal_id: str, agree: bool) -> None:
"""对提案进行投票"""
pass
# ========== 工具方法 ==========
def _generate_task_id(self) -> str:
"""生成唯一任务 ID"""
import uuid
return f"task_{uuid.uuid4().hex[:12]}"
async def _delay(self, seconds: float):
"""异步延迟"""
await asyncio.sleep(seconds)
class AdapterError(Exception):
"""适配器错误基类"""
pass
class AdapterConnectionError(AdapterError):
"""连接错误"""
pass
class AdapterExecutionError(AdapterError):
"""执行错误"""
pass
class AdapterTimeoutError(AdapterError):
"""超时错误"""
pass