Files
multiAgentTry/backend/test_api_integration.py
Claude Code 1719d1f1f9 重构 API 路由并新增工作流编排功能
后端:
- 重构 agents, heartbeats, locks, meetings, resources, roles, workflows 路由
- 新增 orchestrator 和 providers 路由
- 新增 CLI 调用器和流程编排服务
- 添加日志配置和依赖项

前端:
- 更新 AgentsPage、SettingsPage、WorkflowPage 页面
- 扩展 api.ts 新增 API 接口

其他:
- 清理测试 agent 数据文件
- 新增示例工作流和项目审计报告

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-10 16:36:25 +08:00

295 lines
10 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
API 集成测试 - 验证路由层正确接入服务层
通过 HTTP 请求测试所有 API 端点
"""
import asyncio
import httpx
import time
import sys
import os
os.environ.pop("HTTP_PROXY", None)
os.environ.pop("HTTPS_PROXY", None)
os.environ.pop("http_proxy", None)
os.environ.pop("https_proxy", None)
os.environ["NO_PROXY"] = "*"
BASE = "http://127.0.0.1:8000"
passed = 0
failed = 0
errors = []
async def test(name: str, method: str, path: str, json_data=None, expect_status=200, expect_key=None):
"""执行单个 API 测试"""
global passed, failed
try:
async with httpx.AsyncClient(base_url=BASE, timeout=10) as client:
if method == "GET":
r = await client.get(path)
elif method == "POST":
r = await client.post(path, json=json_data)
elif method == "DELETE":
r = await client.delete(path)
elif method == "PUT":
r = await client.put(path, json=json_data)
else:
raise ValueError(f"Unknown method: {method}")
if r.status_code != expect_status:
failed += 1
msg = f"[FAIL] {name}: 期望 {expect_status}, 得到 {r.status_code} - {r.text[:200]}"
errors.append(msg)
print(msg)
return None
data = r.json()
if expect_key and expect_key not in data:
failed += 1
msg = f"[FAIL] {name}: 响应缺少 key '{expect_key}', 有: {list(data.keys())}"
errors.append(msg)
print(msg)
return None
passed += 1
print(f"[PASS] {name}")
return data
except Exception as e:
failed += 1
msg = f"[FAIL] {name}: {e}"
errors.append(msg)
print(msg)
return None
async def main():
global passed, failed
print("=" * 60)
print("Swarm API 集成测试")
print("=" * 60)
# ========== 健康检查 ==========
print("\n=== 健康检查 ===")
await test("GET /health", "GET", "/health", expect_key="status")
await test("GET /api/health", "GET", "/api/health", expect_key="status")
# ========== Agent API ==========
print("\n=== Agent API ===")
data = await test("列出 Agent初始", "GET", "/api/agents/", expect_key="agents")
await test("注册 Agent", "POST", "/api/agents/register", json_data={
"agent_id": "test-api-001",
"name": "Test Agent",
"role": "developer",
"model": "test-model"
}, expect_key="agent_id")
data = await test("列出 Agent注册后", "GET", "/api/agents/", expect_key="agents")
if data:
agent_ids = [a["agent_id"] for a in data["agents"]]
if "test-api-001" in agent_ids:
passed += 1
print("[PASS] 注册的 Agent 出现在列表中")
else:
failed += 1
msg = f"[FAIL] 注册的 Agent 未出现在列表中: {agent_ids}"
errors.append(msg)
print(msg)
await test("获取 Agent 详情", "GET", "/api/agents/test-api-001", expect_key="agent_id")
await test("更新 Agent 状态", "POST", "/api/agents/test-api-001/state", json_data={
"task": "测试任务",
"progress": 50,
"working_files": ["test.py"]
}, expect_key="success")
data = await test("获取 Agent 状态", "GET", "/api/agents/test-api-001/state", expect_key="agent_id")
if data and data.get("current_task") == "测试任务":
passed += 1
print("[PASS] Agent 状态正确持久化")
elif data:
failed += 1
msg = f"[FAIL] Agent 状态不匹配: {data}"
errors.append(msg)
print(msg)
await test("获取不存在的 Agent", "GET", "/api/agents/nonexistent-agent", expect_status=404)
# ========== 文件锁 API ==========
print("\n=== 文件锁 API ===")
await test("列出文件锁(初始)", "GET", "/api/locks/", expect_key="locks")
await test("获取文件锁", "POST", "/api/locks/acquire", json_data={
"file_path": "test/main.py",
"agent_id": "test-api-001",
"agent_name": "Test Agent"
}, expect_key="success")
data = await test("列出文件锁(获取后)", "GET", "/api/locks/", expect_key="locks")
if data and len(data["locks"]) > 0:
found = any(l["file_path"] == "test/main.py" for l in data["locks"])
if found:
passed += 1
print("[PASS] 获取的锁出现在列表中")
else:
failed += 1
msg = "[FAIL] 获取的锁未出现在列表中"
errors.append(msg)
print(msg)
data = await test("检查文件锁", "GET", "/api/locks/check?file_path=test/main.py", expect_key="locked")
if data and data["locked"]:
passed += 1
print("[PASS] 文件锁状态正确")
elif data:
failed += 1
msg = "[FAIL] 文件锁状态错误"
errors.append(msg)
print(msg)
await test("释放文件锁", "POST", "/api/locks/release", json_data={
"file_path": "test/main.py",
"agent_id": "test-api-001"
}, expect_key="success")
data = await test("检查释放后", "GET", "/api/locks/check?file_path=test/main.py", expect_key="locked")
if data and not data["locked"]:
passed += 1
print("[PASS] 锁释放成功")
elif data:
failed += 1
msg = "[FAIL] 锁释放后仍显示锁定"
errors.append(msg)
print(msg)
# ========== 心跳 API ==========
print("\n=== 心跳 API ===")
await test("列出心跳(初始)", "GET", "/api/heartbeats/", expect_key="heartbeats")
await test("更新心跳", "POST", "/api/heartbeats/test-api-001", json_data={
"status": "working",
"current_task": "测试中",
"progress": 30
}, expect_key="success")
data = await test("列出心跳(更新后)", "GET", "/api/heartbeats/", expect_key="heartbeats")
if data and "test-api-001" in data["heartbeats"]:
hb = data["heartbeats"]["test-api-001"]
if hb["status"] == "working":
passed += 1
print("[PASS] 心跳数据正确持久化")
else:
failed += 1
msg = f"[FAIL] 心跳状态不匹配: {hb}"
errors.append(msg)
print(msg)
await test("检查超时", "GET", "/api/heartbeats/timeouts?timeout_seconds=60", expect_key="timeout_agents")
# ========== 会议 API ==========
print("\n=== 会议 API ===")
data = await test("创建会议", "POST", "/api/meetings/", json_data={
"title": "API 测试会议",
"agenda": "测试议程",
"attendees": ["test-api-001"],
"steps": ["讨论", "决策", "总结"]
}, expect_key="meeting_id")
meeting_id = data["meeting_id"] if data else None
if meeting_id:
await test("获取会议详情", "GET", f"/api/meetings/{meeting_id}", expect_key="meeting_id")
await test("获取会议队列", "GET", f"/api/meetings/{meeting_id}/queue", expect_key="meeting_id")
await test("添加讨论", "POST", f"/api/meetings/{meeting_id}/discuss", json_data={
"agent_id": "test-api-001",
"agent_name": "Test Agent",
"content": "这是一条测试讨论",
"step": "讨论"
}, expect_key="success")
await test("更新进度", "POST", f"/api/meetings/{meeting_id}/progress", json_data={
"step": "讨论"
}, expect_key="success")
await test("完成会议", "POST", f"/api/meetings/{meeting_id}/finish", json_data={
"consensus": "测试共识"
}, expect_key="success")
await test("列出今日会议", "GET", "/api/meetings/today", expect_key="meetings")
# ========== 资源 API ==========
print("\n=== 资源 API ===")
await test("解析任务文件", "POST", "/api/parse-task", json_data={
"task": "修改 src/utils/helper.js 修复 bug"
}, expect_key="files")
await test("执行任务", "POST", "/api/execute", json_data={
"agent_id": "test-api-001",
"task": "修改 src/utils/helper.js",
"timeout": 30
}, expect_key="success")
await test("获取所有状态", "GET", "/api/status", expect_key="agents")
# ========== 角色 API ==========
print("\n=== 角色 API ===")
data = await test("获取主要角色", "POST", "/api/roles/primary", json_data={
"task": "设计系统架构方案"
}, expect_key="primary_role")
if data and data["primary_role"] == "architect":
passed += 1
print(f"[PASS] 角色分析正确: architect任务含'设计'+'架构'")
elif data:
failed += 1
msg = f"[FAIL] 角色分析不正确: 期望 architect, 得到 {data['primary_role']}"
errors.append(msg)
print(msg)
await test("分配角色", "POST", "/api/roles/allocate", json_data={
"task": "开发用户登录功能",
"agents": ["agent-1", "agent-2"]
}, expect_key="allocation")
await test("解释角色分配", "POST", "/api/roles/explain", json_data={
"task": "测试 API 接口",
"agents": ["agent-1"]
}, expect_key="explanation")
# ========== 工作流 API ==========
print("\n=== 工作流 API ===")
await test("工作流文件列表", "GET", "/api/workflows/files", expect_key="files")
await test("已加载工作流列表", "GET", "/api/workflows/list", expect_key="workflows")
# ========== 人类输入 API ==========
print("\n=== 人类输入 API ===")
await test("获取摘要", "GET", "/api/humans/summary", expect_key="participants")
# ========== 清理 ==========
print("\n=== 清理 ===")
await test("删除测试 Agent", "DELETE", "/api/agents/test-api-001", expect_key="message")
# ========== 汇总 ==========
print("\n" + "=" * 60)
print(f"测试结果汇总")
print("=" * 60)
print(f"通过: {passed}")
print(f"失败: {failed}")
print(f"总计: {passed + failed}")
print("=" * 60)
if errors:
print("\n失败详情:")
for e in errors:
print(f" {e}")
return failed == 0
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)