Files
multiAgentTry/docs/backend-steps.md
Claude Code dc398d7c7b 完整实现 Swarm 多智能体协作系统
- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py)
- 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py)
- 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py)
- 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py)
- 新增进程管理器 (backend/app/services/process_manager.py)
- 新增 Agent 控制 API (backend/app/routers/agents_control.py)
- 新增 WebSocket 实时通信 (backend/app/routers/websocket.py)
- 更新前端 AgentsPage,支持启动/停止 Agent
- 测试通过:Agent 启动、批量操作、栅栏同步

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:32:11 +08:00

471 lines
13 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 后端开发步骤
每一步完成后都有验证方法,前端逐步替换 mock 数据。
---
## 第一步:项目初始化
**操作**
```bash
cd backend
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install fastapi uvicorn pydantic typer aiofiles
mkdir -p .doc/agents .doc/cache .doc/meetings .doc/resources .doc/workflow
```
**创建文件**
- `backend/main.py` - FastAPI 入口
- `backend/cli.py` - CLI 入口
**验证**
```bash
python cli.py hello # 输出: Hello Swarm
python -m uvicorn main:app --reload
curl http://localhost:8000/ # 输出: {"status":"ok"}
```
---
## 第二步:基础存储服务
**操作** - 创建 `backend/services/storage.py`
```python
class StorageService:
async def read_json(self, path: str) -> dict
async def write_json(self, path: str, data: dict)
async def ensure_dir(self, path: str)
```
**验证**
```bash
python cli.py storage write .doc/test.json '{"foo":"bar"}'
python cli.py storage read .doc/test.json
# 输出: {"foo":"bar"}
```
---
## 第三步:文件锁服务
**操作** - 创建 `backend/services/file_lock.py`
```python
class FileLockService:
async def acquire_lock(self, file_path: str, agent_id: str) -> bool
async def release_lock(self, file_path: str, agent_id: str) -> bool
async def get_locks(self) -> List[LockInfo]
async def check_locked(self, file_path: str) -> Optional[str]
```
存储到 `.doc/cache/file_locks.json`
**验证**
```bash
# CLI 测试
python cli.py lock acquire src/auth/login.py claude-001
# 输出: Lock acquired
python cli.py lock status
# 输出: src/auth/login.py -> claude-001 (2s ago)
python cli.py lock release src/auth/login.py claude-001
# 输出: Lock released
```
**API 测试**
```bash
curl http://localhost:8000/api/locks
# 输出: {"locks": []}
curl -X POST http://localhost:8000/api/locks/acquire \
-H "Content-Type: application/json" \
-d '{"file_path":"src/auth/login.py","agent_id":"claude-001"}'
# 输出: {"acquired":true}
```
**前端对接** - 修改 `ResourceMonitorCard.tsx`
```typescript
// 替换 lockedFiles mock
const response = await fetch('http://localhost:8000/api/locks');
const data = await response.json();
setLockedFiles(data.locks);
```
---
## 第四步:心跳服务
**操作** - 创建 `backend/services/heartbeat.py`
```python
class HeartbeatService:
async def update_heartbeat(self, agent_id: str, status: dict) -> None
async def get_heartbeat(self, agent_id: str) -> Optional[dict]
async def get_all_heartbeats(self) -> dict
async def check_timeout(self, timeout_seconds: int) -> List[str]
```
存储到 `.doc/cache/heartbeats.json`
**验证**
```bash
# 模拟 Agent 发送心跳
python cli.py heartbeat ping claude-001 '{"status":"working","task":"fixing bug"}'
# 输出: Heartbeat updated
python cli.py heartbeat list
# 输出: claude-001: working (5s ago)
python cli.py heartbeat check-timeout 30
# 输出: No timed out agents
```
**API 测试**
```bash
curl http://localhost:8000/api/heartbeats
# 输出: {"heartbeats": {"claude-001": {...}}}
```
---
## 第五步Agent 注册服务
**操作** - 创建 `backend/services/agent_registry.py`
```python
class AgentRegistry:
async def register_agent(self, agent_info: AgentInfo) -> None
async def get_agent(self, agent_id: str) -> Optional[AgentInfo]
async def list_agents(self) -> List[AgentInfo]
async def update_state(self, agent_id: str, state: dict) -> None
async def get_state(self, agent_id: str) -> Optional[dict]
```
存储到 `.doc/agents/{agent_id}/` 目录
**验证**
```bash
python cli.py agent register claude-001 \
--name "Claude Code" \
--role "architect" \
--model "claude-opus-4.6"
# 输出: Agent registered
python cli.py agent list
# 输出: claude-001 | Claude Code | architect | claude-opus-4.6
python cli.py agent state claude-001 set '{"task":"fixing bug","progress":50}'
python cli.py agent state claude-001 get
# 输出: {"task":"fixing bug","progress":50}
```
**API 测试**
```bash
curl http://localhost:8000/api/agents
# 输出: {"agents": [...]}
curl http://localhost:8000/api/agents/claude-001/state
# 输出: {"task":"fixing bug","progress":50}
```
**前端对接** - 修改 `AgentStatusCard.tsx`
```typescript
// 替换 agents mock
const response = await fetch('http://localhost:8000/api/agents');
const data = await response.json();
setAgents(data.agents);
```
---
## 第六步:会议调度器(栅栏同步)
**操作** - 创建 `backend/services/meeting_scheduler.py`
```python
class MeetingScheduler:
async def wait_for_meeting(self, agent_id: str, meeting_id: str) -> str
async def get_queue(self, meeting_id: str) -> List[str]
async def start_meeting(self, meeting_id: str) -> None
async def end_meeting(self, meeting_id: str) -> None
```
存储到 `.doc/cache/meeting_queue.json`
**验证**
```bash
# 终端1 - Agent 1 等待
python cli.py meeting wait design-review claude-001
# 阻塞等待...
# 终端2 - Agent 2 等待
python cli.py meeting wait design-review kimi-002
# 触发会议,输出: Meeting started!
python cli.py meeting queue design-review
# 输出: [claude-001, kimi-002]
```
**API 测试**
```bash
curl -X POST http://localhost:8000/api/meetings/design-review/wait \
-H "Content-Type: application/json" \
-d '{"agent_id":"claude-001"}'
# 输出: {"status":"waiting"}
curl http://localhost:8000/api/meetings/design-review/queue
# 输出: {"waiting_agents":["claude-001"]}
```
---
## 第七步:会议记录服务
**操作** - 创建 `backend/services/meeting_recorder.py`
```python
class MeetingRecorder:
async def create_meeting(self, meeting_info: MeetingInfo) -> str
async def add_discussion(self, meeting_id: str, agent_id: str, content: str) -> None
async def update_progress(self, meeting_id: str, step: str) -> None
async def get_meeting(self, meeting_id: str) -> MeetingInfo
async def list_meetings(self, date: str) -> List[MeetingInfo]
```
存储到 `.doc/meetings/{YYYY-MM-DD}/{meeting_id}.md`
**验证**
```bash
python cli.py meeting create design-review \
--title "认证方案设计评审" \
--attendees claude-001,kimi-002,opencode-003
# 输出: Meeting created: design_review_20260304_141000
python cli.py meeting discuss design_review_20260304_141000 \
--agent claude-001 \
--content "建议使用 JWT"
# 输出: Discussion added
python cli.py meeting progress design_review_20260304_141000 "收集初步想法"
# 输出: Progress updated
```
**API 测试**
```bash
curl http://localhost:8000/api/meetings/2026-03-04
# 输出: {"meetings": [...]}
curl http://localhost:8000/api/meetings/design_review_20260304_141000
# 输出: {...}
```
**前端对接** - 修改 `MeetingProgressCard.tsx``RecentMeetingsCard.tsx`
```typescript
// 替换 steps mock
const response = await fetch('http://localhost:8000/api/meetings/active');
const data = await response.json);
setSteps(data.meeting.steps);
```
---
## 第八步:资源管理器(整合锁 + 心跳)
**操作** - 创建 `backend/services/resource_manager.py`
```python
class ResourceManager:
async def execute_task(self, agent_id: str, task_description: str) -> str
# 内部逻辑:
# 1. 解析任务需要的文件
# 2. 获取所有文件锁
# 3. 执行任务
# 4. finally: 释放所有锁
```
**验证**
```bash
python cli.py execute claude-001 "修改 src/auth/login.py 修复登录 bug"
# 输出: Task executing...
# 输出: Locks acquired: [src/auth/login.py, src/utils/crypto.py]
# 输出: Task completed
# 输出: Locks released
```
---
## 第九步:工作流引擎
**操作** - 创建 `backend/services/workflow_engine.py`
```python
class WorkflowEngine:
async def load_workflow(self, workflow_path: str) -> Workflow
async def get_next_meeting(self, workflow_id: str) -> Optional[MeetingInfo]
async def complete_meeting(self, workflow_id: str, meeting_id: str) -> None
```
**验证**
```bash
python cli.py workflow load .doc/workflow/project_workflow.yaml
# 输出: Workflow loaded: my_project
python cli.py workflow next my_project
# 输出: Next meeting: design_review (attendees: claude-001,kimi-002)
```
---
## 第十步角色分配器AI 路由)
**操作** - 创建 `backend/services/role_allocator.py`
```python
class RoleAllocator:
async def allocate_roles(self, task: str, agents: List[str]) -> Dict[str, str]
# 内部调用 LLM 分析任务,返回角色分配
```
**验证**
```bash
python cli.py role allocate "重构认证模块" claude-001,kimi-002,opencode-003
# 输出: claude-001 -> architect
# 输出: kimi-002 -> pm
# 输出: opencode-003 -> developer
```
---
## API 接口汇总
| 路径 | 方法 | 说明 |
|------|------|------|
| `/api/agents` | GET | 获取所有 Agent |
| `/api/agents/{id}/state` | GET/POST | Agent 状态 |
| `/api/locks` | GET | 获取所有锁 |
| `/api/locks/acquire` | POST | 获取锁 |
| `/api/locks/release` | POST | 释放锁 |
| `/api/heartbeats` | GET | 获取心跳列表 |
| `/api/heartbeats/{id}` | POST | 更新心跳 |
| `/api/meetings/{date}` | GET | 获取会议列表 |
| `/api/meetings/{id}` | GET | 获取会议详情 |
| `/api/meetings/{id}/wait` | POST | 加入会议等待 |
| `/api/workflows/{id}/next` | GET | 获取下一步 |
---
## 前端 Mock 替换顺序
| 步骤 | 组件 | API 端点 |
|------|------|----------|
| 3 | ResourceMonitorCard | `/api/locks` |
| 5 | AgentStatusCard | `/api/agents` |
| 7 | MeetingProgressCard | `/api/meetings/{id}` |
| 7 | RecentMeetingsCard | `/api/meetings/{date}` |
| - | WorkflowCard | `/api/workflows/{id}/next` |
---
## 参考开源项目
### 多智能体协作框架
| 项目 | 链接 | 参考点 |
|------|------|--------|
| **AutoGen** | [github.com/microsoft/autogen](https://github.com/microsoft/autogen) | 对话驱动协作、异步消息传递、Agent 交接机制 |
| **CrewAI** | [github.com/CrewAIInc/crewAI](https://github.com/CrewAIInc/crewAI) | 角色扮演式 Agent、任务编排、Crew 结构 |
| **AgentScope** | [阿里通义实验室](https://github.com/modelscope/agentscope) | Agent 编排、安全沙箱、跨框架互操作 |
| **MetaGPT** | [github.com/FoundationAgents/MetaGPT](https://github.com/FoundationAgents/MetaGPT) | AI 软件公司、多角色协作、文档生成 |
### FastAPI 项目结构
| 项目 | 链接 | 参考点 |
|------|------|--------|
| **FastAPI Project Structure** | [github.com/brianobot/fastapi_project_structure](https://github.com/brianobot/fastapi_project_structure) | 分层架构、services/routers/schemas 分离 |
| **FastAPI Best Architecture** | [github.com](搜索) | 伪三层架构、插件系统 |
| **FastAPI Tips** | [github.com](搜索) | WebSocket 实时通信、性能优化 |
```
推荐目录结构:
backend/
├── app/
│ ├── main.py # FastAPI 入口
│ ├── settings.py # 配置管理
│ ├── dependencies.py # 依赖注入
│ ├── middlewares.py # 中间件
│ ├── api_router.py # 路由汇总
│ ├── routers/ # API 路由
│ │ ├── agents.py
│ │ ├── locks.py
│ │ └── meetings.py
│ ├── services/ # 业务逻辑
│ │ ├── storage.py
│ │ ├── file_lock.py
│ │ ├── heartbeat.py
│ │ ├── agent_registry.py
│ │ └── meeting_scheduler.py
│ ├── schemas/ # Pydantic 模型
│ │ ├── agent.py
│ │ ├── lock.py
│ │ └── meeting.py
│ └── models/ # 数据模型(如用数据库)
├── cli.py # CLI 入口 (Typer)
└── requirements.txt
```
### 文件锁实现
| 库 | 链接 | 特点 |
|-----|------|------|
| **filelock** | [github.com/tox-dev/filelock](https://github.com/tox-dev/filelock) | 跨平台、上下文管理器、超时机制 |
| **fasteners** | [github.com/harlowja/fasteners](https://github.com/harlowja/fasteners) | 进程间读写锁 |
**推荐使用 filelock**
```python
from filelock import FileLock, Timeout
lock = FileLock("/path/to/file.lock", timeout=10)
with lock:
# 临界区代码
pass
```
### WebSocket 实时更新
FastAPI 原生支持 WebSocket参考官方文档[fastapi.tiangolo.com/zh/websockets](https://fastapi.tiangolo.com/zh/advanced/websockets/)
```python
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
async for data in websocket.iter_text():
await websocket.send_text(f"Received: {data}")
```
### 技术栈推荐
```txt
# requirements.txt
fastapi>=0.110.0
uvicorn[standard]>=0.27.0
pydantic>=2.0.0
typer>=0.9.0
aiofiles>=23.0.0
filelock>=3.13.0
pyyaml>=6.0
anthropic>=0.18.0 # Claude API
openai>=1.0.0 # OpenAI API
```
### 设计文档参考
| 文档 | 链接 |
|------|------|
| AutoGen 编程模型 | [github.com/microsoft/autogen/docs/design](https://github.com/microsoft/autogen/tree/main/docs/design) |
| Agent Worker Protocol | [github.com/microsoft/autogen/docs/design](https://github.com/microsoft/autogen/blob/main/docs/design/03%20-%20Agent%20Worker%20Protocol.md) |