Files
multiAgentTry/docs/backend-steps.md

471 lines
13 KiB
Markdown
Raw Permalink Normal View History

# 后端开发步骤
每一步完成后都有验证方法,前端逐步替换 mock 数据。
---
## 第一步:项目初始化
**操作**
```bash
cd backend
python -m venv venv
source venv/bin/activate # Windows: venv\Scripts\activate
pip install fastapi uvicorn pydantic typer aiofiles
mkdir -p .doc/agents .doc/cache .doc/meetings .doc/resources .doc/workflow
```
**创建文件**
- `backend/main.py` - FastAPI 入口
- `backend/cli.py` - CLI 入口
**验证**
```bash
python cli.py hello # 输出: Hello Swarm
python -m uvicorn main:app --reload
curl http://localhost:8000/ # 输出: {"status":"ok"}
```
---
## 第二步:基础存储服务
**操作** - 创建 `backend/services/storage.py`
```python
class StorageService:
async def read_json(self, path: str) -> dict
async def write_json(self, path: str, data: dict)
async def ensure_dir(self, path: str)
```
**验证**
```bash
python cli.py storage write .doc/test.json '{"foo":"bar"}'
python cli.py storage read .doc/test.json
# 输出: {"foo":"bar"}
```
---
## 第三步:文件锁服务
**操作** - 创建 `backend/services/file_lock.py`
```python
class FileLockService:
async def acquire_lock(self, file_path: str, agent_id: str) -> bool
async def release_lock(self, file_path: str, agent_id: str) -> bool
async def get_locks(self) -> List[LockInfo]
async def check_locked(self, file_path: str) -> Optional[str]
```
存储到 `.doc/cache/file_locks.json`
**验证**
```bash
# CLI 测试
python cli.py lock acquire src/auth/login.py claude-001
# 输出: Lock acquired
python cli.py lock status
# 输出: src/auth/login.py -> claude-001 (2s ago)
python cli.py lock release src/auth/login.py claude-001
# 输出: Lock released
```
**API 测试**
```bash
curl http://localhost:8000/api/locks
# 输出: {"locks": []}
curl -X POST http://localhost:8000/api/locks/acquire \
-H "Content-Type: application/json" \
-d '{"file_path":"src/auth/login.py","agent_id":"claude-001"}'
# 输出: {"acquired":true}
```
**前端对接** - 修改 `ResourceMonitorCard.tsx`
```typescript
// 替换 lockedFiles mock
const response = await fetch('http://localhost:8000/api/locks');
const data = await response.json();
setLockedFiles(data.locks);
```
---
## 第四步:心跳服务
**操作** - 创建 `backend/services/heartbeat.py`
```python
class HeartbeatService:
async def update_heartbeat(self, agent_id: str, status: dict) -> None
async def get_heartbeat(self, agent_id: str) -> Optional[dict]
async def get_all_heartbeats(self) -> dict
async def check_timeout(self, timeout_seconds: int) -> List[str]
```
存储到 `.doc/cache/heartbeats.json`
**验证**
```bash
# 模拟 Agent 发送心跳
python cli.py heartbeat ping claude-001 '{"status":"working","task":"fixing bug"}'
# 输出: Heartbeat updated
python cli.py heartbeat list
# 输出: claude-001: working (5s ago)
python cli.py heartbeat check-timeout 30
# 输出: No timed out agents
```
**API 测试**
```bash
curl http://localhost:8000/api/heartbeats
# 输出: {"heartbeats": {"claude-001": {...}}}
```
---
## 第五步Agent 注册服务
**操作** - 创建 `backend/services/agent_registry.py`
```python
class AgentRegistry:
async def register_agent(self, agent_info: AgentInfo) -> None
async def get_agent(self, agent_id: str) -> Optional[AgentInfo]
async def list_agents(self) -> List[AgentInfo]
async def update_state(self, agent_id: str, state: dict) -> None
async def get_state(self, agent_id: str) -> Optional[dict]
```
存储到 `.doc/agents/{agent_id}/` 目录
**验证**
```bash
python cli.py agent register claude-001 \
--name "Claude Code" \
--role "architect" \
--model "claude-opus-4.6"
# 输出: Agent registered
python cli.py agent list
# 输出: claude-001 | Claude Code | architect | claude-opus-4.6
python cli.py agent state claude-001 set '{"task":"fixing bug","progress":50}'
python cli.py agent state claude-001 get
# 输出: {"task":"fixing bug","progress":50}
```
**API 测试**
```bash
curl http://localhost:8000/api/agents
# 输出: {"agents": [...]}
curl http://localhost:8000/api/agents/claude-001/state
# 输出: {"task":"fixing bug","progress":50}
```
**前端对接** - 修改 `AgentStatusCard.tsx`
```typescript
// 替换 agents mock
const response = await fetch('http://localhost:8000/api/agents');
const data = await response.json();
setAgents(data.agents);
```
---
## 第六步:会议调度器(栅栏同步)
**操作** - 创建 `backend/services/meeting_scheduler.py`
```python
class MeetingScheduler:
async def wait_for_meeting(self, agent_id: str, meeting_id: str) -> str
async def get_queue(self, meeting_id: str) -> List[str]
async def start_meeting(self, meeting_id: str) -> None
async def end_meeting(self, meeting_id: str) -> None
```
存储到 `.doc/cache/meeting_queue.json`
**验证**
```bash
# 终端1 - Agent 1 等待
python cli.py meeting wait design-review claude-001
# 阻塞等待...
# 终端2 - Agent 2 等待
python cli.py meeting wait design-review kimi-002
# 触发会议,输出: Meeting started!
python cli.py meeting queue design-review
# 输出: [claude-001, kimi-002]
```
**API 测试**
```bash
curl -X POST http://localhost:8000/api/meetings/design-review/wait \
-H "Content-Type: application/json" \
-d '{"agent_id":"claude-001"}'
# 输出: {"status":"waiting"}
curl http://localhost:8000/api/meetings/design-review/queue
# 输出: {"waiting_agents":["claude-001"]}
```
---
## 第七步:会议记录服务
**操作** - 创建 `backend/services/meeting_recorder.py`
```python
class MeetingRecorder:
async def create_meeting(self, meeting_info: MeetingInfo) -> str
async def add_discussion(self, meeting_id: str, agent_id: str, content: str) -> None
async def update_progress(self, meeting_id: str, step: str) -> None
async def get_meeting(self, meeting_id: str) -> MeetingInfo
async def list_meetings(self, date: str) -> List[MeetingInfo]
```
存储到 `.doc/meetings/{YYYY-MM-DD}/{meeting_id}.md`
**验证**
```bash
python cli.py meeting create design-review \
--title "认证方案设计评审" \
--attendees claude-001,kimi-002,opencode-003
# 输出: Meeting created: design_review_20260304_141000
python cli.py meeting discuss design_review_20260304_141000 \
--agent claude-001 \
--content "建议使用 JWT"
# 输出: Discussion added
python cli.py meeting progress design_review_20260304_141000 "收集初步想法"
# 输出: Progress updated
```
**API 测试**
```bash
curl http://localhost:8000/api/meetings/2026-03-04
# 输出: {"meetings": [...]}
curl http://localhost:8000/api/meetings/design_review_20260304_141000
# 输出: {...}
```
**前端对接** - 修改 `MeetingProgressCard.tsx``RecentMeetingsCard.tsx`
```typescript
// 替换 steps mock
const response = await fetch('http://localhost:8000/api/meetings/active');
const data = await response.json);
setSteps(data.meeting.steps);
```
---
## 第八步:资源管理器(整合锁 + 心跳)
**操作** - 创建 `backend/services/resource_manager.py`
```python
class ResourceManager:
async def execute_task(self, agent_id: str, task_description: str) -> str
# 内部逻辑:
# 1. 解析任务需要的文件
# 2. 获取所有文件锁
# 3. 执行任务
# 4. finally: 释放所有锁
```
**验证**
```bash
python cli.py execute claude-001 "修改 src/auth/login.py 修复登录 bug"
# 输出: Task executing...
# 输出: Locks acquired: [src/auth/login.py, src/utils/crypto.py]
# 输出: Task completed
# 输出: Locks released
```
---
## 第九步:工作流引擎
**操作** - 创建 `backend/services/workflow_engine.py`
```python
class WorkflowEngine:
async def load_workflow(self, workflow_path: str) -> Workflow
async def get_next_meeting(self, workflow_id: str) -> Optional[MeetingInfo]
async def complete_meeting(self, workflow_id: str, meeting_id: str) -> None
```
**验证**
```bash
python cli.py workflow load .doc/workflow/project_workflow.yaml
# 输出: Workflow loaded: my_project
python cli.py workflow next my_project
# 输出: Next meeting: design_review (attendees: claude-001,kimi-002)
```
---
## 第十步角色分配器AI 路由)
**操作** - 创建 `backend/services/role_allocator.py`
```python
class RoleAllocator:
async def allocate_roles(self, task: str, agents: List[str]) -> Dict[str, str]
# 内部调用 LLM 分析任务,返回角色分配
```
**验证**
```bash
python cli.py role allocate "重构认证模块" claude-001,kimi-002,opencode-003
# 输出: claude-001 -> architect
# 输出: kimi-002 -> pm
# 输出: opencode-003 -> developer
```
---
## API 接口汇总
| 路径 | 方法 | 说明 |
|------|------|------|
| `/api/agents` | GET | 获取所有 Agent |
| `/api/agents/{id}/state` | GET/POST | Agent 状态 |
| `/api/locks` | GET | 获取所有锁 |
| `/api/locks/acquire` | POST | 获取锁 |
| `/api/locks/release` | POST | 释放锁 |
| `/api/heartbeats` | GET | 获取心跳列表 |
| `/api/heartbeats/{id}` | POST | 更新心跳 |
| `/api/meetings/{date}` | GET | 获取会议列表 |
| `/api/meetings/{id}` | GET | 获取会议详情 |
| `/api/meetings/{id}/wait` | POST | 加入会议等待 |
| `/api/workflows/{id}/next` | GET | 获取下一步 |
---
## 前端 Mock 替换顺序
| 步骤 | 组件 | API 端点 |
|------|------|----------|
| 3 | ResourceMonitorCard | `/api/locks` |
| 5 | AgentStatusCard | `/api/agents` |
| 7 | MeetingProgressCard | `/api/meetings/{id}` |
| 7 | RecentMeetingsCard | `/api/meetings/{date}` |
| - | WorkflowCard | `/api/workflows/{id}/next` |
---
## 参考开源项目
### 多智能体协作框架
| 项目 | 链接 | 参考点 |
|------|------|--------|
| **AutoGen** | [github.com/microsoft/autogen](https://github.com/microsoft/autogen) | 对话驱动协作、异步消息传递、Agent 交接机制 |
| **CrewAI** | [github.com/CrewAIInc/crewAI](https://github.com/CrewAIInc/crewAI) | 角色扮演式 Agent、任务编排、Crew 结构 |
| **AgentScope** | [阿里通义实验室](https://github.com/modelscope/agentscope) | Agent 编排、安全沙箱、跨框架互操作 |
| **MetaGPT** | [github.com/FoundationAgents/MetaGPT](https://github.com/FoundationAgents/MetaGPT) | AI 软件公司、多角色协作、文档生成 |
### FastAPI 项目结构
| 项目 | 链接 | 参考点 |
|------|------|--------|
| **FastAPI Project Structure** | [github.com/brianobot/fastapi_project_structure](https://github.com/brianobot/fastapi_project_structure) | 分层架构、services/routers/schemas 分离 |
| **FastAPI Best Architecture** | [github.com](搜索) | 伪三层架构、插件系统 |
| **FastAPI Tips** | [github.com](搜索) | WebSocket 实时通信、性能优化 |
```
推荐目录结构:
backend/
├── app/
│ ├── main.py # FastAPI 入口
│ ├── settings.py # 配置管理
│ ├── dependencies.py # 依赖注入
│ ├── middlewares.py # 中间件
│ ├── api_router.py # 路由汇总
│ ├── routers/ # API 路由
│ │ ├── agents.py
│ │ ├── locks.py
│ │ └── meetings.py
│ ├── services/ # 业务逻辑
│ │ ├── storage.py
│ │ ├── file_lock.py
│ │ ├── heartbeat.py
│ │ ├── agent_registry.py
│ │ └── meeting_scheduler.py
│ ├── schemas/ # Pydantic 模型
│ │ ├── agent.py
│ │ ├── lock.py
│ │ └── meeting.py
│ └── models/ # 数据模型(如用数据库)
├── cli.py # CLI 入口 (Typer)
└── requirements.txt
```
### 文件锁实现
| 库 | 链接 | 特点 |
|-----|------|------|
| **filelock** | [github.com/tox-dev/filelock](https://github.com/tox-dev/filelock) | 跨平台、上下文管理器、超时机制 |
| **fasteners** | [github.com/harlowja/fasteners](https://github.com/harlowja/fasteners) | 进程间读写锁 |
**推荐使用 filelock**
```python
from filelock import FileLock, Timeout
lock = FileLock("/path/to/file.lock", timeout=10)
with lock:
# 临界区代码
pass
```
### WebSocket 实时更新
FastAPI 原生支持 WebSocket参考官方文档[fastapi.tiangolo.com/zh/websockets](https://fastapi.tiangolo.com/zh/advanced/websockets/)
```python
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
async for data in websocket.iter_text():
await websocket.send_text(f"Received: {data}")
```
### 技术栈推荐
```txt
# requirements.txt
fastapi>=0.110.0
uvicorn[standard]>=0.27.0
pydantic>=2.0.0
typer>=0.9.0
aiofiles>=23.0.0
filelock>=3.13.0
pyyaml>=6.0
anthropic>=0.18.0 # Claude API
openai>=1.0.0 # OpenAI API
```
### 设计文档参考
| 文档 | 链接 |
|------|------|
| AutoGen 编程模型 | [github.com/microsoft/autogen/docs/design](https://github.com/microsoft/autogen/tree/main/docs/design) |
| Agent Worker Protocol | [github.com/microsoft/autogen/docs/design](https://github.com/microsoft/autogen/blob/main/docs/design/03%20-%20Agent%20Worker%20Protocol.md) |