""" API 集成测试 - 验证路由层正确接入服务层 通过 HTTP 请求测试所有 API 端点 """ import asyncio import httpx import time import sys import os os.environ.pop("HTTP_PROXY", None) os.environ.pop("HTTPS_PROXY", None) os.environ.pop("http_proxy", None) os.environ.pop("https_proxy", None) os.environ["NO_PROXY"] = "*" BASE = "http://127.0.0.1:8000" passed = 0 failed = 0 errors = [] async def test(name: str, method: str, path: str, json_data=None, expect_status=200, expect_key=None): """执行单个 API 测试""" global passed, failed try: async with httpx.AsyncClient(base_url=BASE, timeout=10) as client: if method == "GET": r = await client.get(path) elif method == "POST": r = await client.post(path, json=json_data) elif method == "DELETE": r = await client.delete(path) elif method == "PUT": r = await client.put(path, json=json_data) else: raise ValueError(f"Unknown method: {method}") if r.status_code != expect_status: failed += 1 msg = f"[FAIL] {name}: 期望 {expect_status}, 得到 {r.status_code} - {r.text[:200]}" errors.append(msg) print(msg) return None data = r.json() if expect_key and expect_key not in data: failed += 1 msg = f"[FAIL] {name}: 响应缺少 key '{expect_key}', 有: {list(data.keys())}" errors.append(msg) print(msg) return None passed += 1 print(f"[PASS] {name}") return data except Exception as e: failed += 1 msg = f"[FAIL] {name}: {e}" errors.append(msg) print(msg) return None async def main(): global passed, failed print("=" * 60) print("Swarm API 集成测试") print("=" * 60) # ========== 健康检查 ========== print("\n=== 健康检查 ===") await test("GET /health", "GET", "/health", expect_key="status") await test("GET /api/health", "GET", "/api/health", expect_key="status") # ========== Agent API ========== print("\n=== Agent API ===") data = await test("列出 Agent(初始)", "GET", "/api/agents/", expect_key="agents") await test("注册 Agent", "POST", "/api/agents/register", json_data={ "agent_id": "test-api-001", "name": "Test Agent", "role": "developer", "model": "test-model" }, expect_key="agent_id") data = await test("列出 Agent(注册后)", "GET", "/api/agents/", expect_key="agents") if data: agent_ids = [a["agent_id"] for a in data["agents"]] if "test-api-001" in agent_ids: passed += 1 print("[PASS] 注册的 Agent 出现在列表中") else: failed += 1 msg = f"[FAIL] 注册的 Agent 未出现在列表中: {agent_ids}" errors.append(msg) print(msg) await test("获取 Agent 详情", "GET", "/api/agents/test-api-001", expect_key="agent_id") await test("更新 Agent 状态", "POST", "/api/agents/test-api-001/state", json_data={ "task": "测试任务", "progress": 50, "working_files": ["test.py"] }, expect_key="success") data = await test("获取 Agent 状态", "GET", "/api/agents/test-api-001/state", expect_key="agent_id") if data and data.get("current_task") == "测试任务": passed += 1 print("[PASS] Agent 状态正确持久化") elif data: failed += 1 msg = f"[FAIL] Agent 状态不匹配: {data}" errors.append(msg) print(msg) await test("获取不存在的 Agent", "GET", "/api/agents/nonexistent-agent", expect_status=404) # ========== 文件锁 API ========== print("\n=== 文件锁 API ===") await test("列出文件锁(初始)", "GET", "/api/locks/", expect_key="locks") await test("获取文件锁", "POST", "/api/locks/acquire", json_data={ "file_path": "test/main.py", "agent_id": "test-api-001", "agent_name": "Test Agent" }, expect_key="success") data = await test("列出文件锁(获取后)", "GET", "/api/locks/", expect_key="locks") if data and len(data["locks"]) > 0: found = any(l["file_path"] == "test/main.py" for l in data["locks"]) if found: passed += 1 print("[PASS] 获取的锁出现在列表中") else: failed += 1 msg = "[FAIL] 获取的锁未出现在列表中" errors.append(msg) print(msg) data = await test("检查文件锁", "GET", "/api/locks/check?file_path=test/main.py", expect_key="locked") if data and data["locked"]: passed += 1 print("[PASS] 文件锁状态正确") elif data: failed += 1 msg = "[FAIL] 文件锁状态错误" errors.append(msg) print(msg) await test("释放文件锁", "POST", "/api/locks/release", json_data={ "file_path": "test/main.py", "agent_id": "test-api-001" }, expect_key="success") data = await test("检查释放后", "GET", "/api/locks/check?file_path=test/main.py", expect_key="locked") if data and not data["locked"]: passed += 1 print("[PASS] 锁释放成功") elif data: failed += 1 msg = "[FAIL] 锁释放后仍显示锁定" errors.append(msg) print(msg) # ========== 心跳 API ========== print("\n=== 心跳 API ===") await test("列出心跳(初始)", "GET", "/api/heartbeats/", expect_key="heartbeats") await test("更新心跳", "POST", "/api/heartbeats/test-api-001", json_data={ "status": "working", "current_task": "测试中", "progress": 30 }, expect_key="success") data = await test("列出心跳(更新后)", "GET", "/api/heartbeats/", expect_key="heartbeats") if data and "test-api-001" in data["heartbeats"]: hb = data["heartbeats"]["test-api-001"] if hb["status"] == "working": passed += 1 print("[PASS] 心跳数据正确持久化") else: failed += 1 msg = f"[FAIL] 心跳状态不匹配: {hb}" errors.append(msg) print(msg) await test("检查超时", "GET", "/api/heartbeats/timeouts?timeout_seconds=60", expect_key="timeout_agents") # ========== 会议 API ========== print("\n=== 会议 API ===") data = await test("创建会议", "POST", "/api/meetings/", json_data={ "title": "API 测试会议", "agenda": "测试议程", "attendees": ["test-api-001"], "steps": ["讨论", "决策", "总结"] }, expect_key="meeting_id") meeting_id = data["meeting_id"] if data else None if meeting_id: await test("获取会议详情", "GET", f"/api/meetings/{meeting_id}", expect_key="meeting_id") await test("获取会议队列", "GET", f"/api/meetings/{meeting_id}/queue", expect_key="meeting_id") await test("添加讨论", "POST", f"/api/meetings/{meeting_id}/discuss", json_data={ "agent_id": "test-api-001", "agent_name": "Test Agent", "content": "这是一条测试讨论", "step": "讨论" }, expect_key="success") await test("更新进度", "POST", f"/api/meetings/{meeting_id}/progress", json_data={ "step": "讨论" }, expect_key="success") await test("完成会议", "POST", f"/api/meetings/{meeting_id}/finish", json_data={ "consensus": "测试共识" }, expect_key="success") await test("列出今日会议", "GET", "/api/meetings/today", expect_key="meetings") # ========== 资源 API ========== print("\n=== 资源 API ===") await test("解析任务文件", "POST", "/api/parse-task", json_data={ "task": "修改 src/utils/helper.js 修复 bug" }, expect_key="files") await test("执行任务", "POST", "/api/execute", json_data={ "agent_id": "test-api-001", "task": "修改 src/utils/helper.js", "timeout": 30 }, expect_key="success") await test("获取所有状态", "GET", "/api/status", expect_key="agents") # ========== 角色 API ========== print("\n=== 角色 API ===") data = await test("获取主要角色", "POST", "/api/roles/primary", json_data={ "task": "设计系统架构方案" }, expect_key="primary_role") if data and data["primary_role"] == "architect": passed += 1 print(f"[PASS] 角色分析正确: architect(任务含'设计'+'架构')") elif data: failed += 1 msg = f"[FAIL] 角色分析不正确: 期望 architect, 得到 {data['primary_role']}" errors.append(msg) print(msg) await test("分配角色", "POST", "/api/roles/allocate", json_data={ "task": "开发用户登录功能", "agents": ["agent-1", "agent-2"] }, expect_key="allocation") await test("解释角色分配", "POST", "/api/roles/explain", json_data={ "task": "测试 API 接口", "agents": ["agent-1"] }, expect_key="explanation") # ========== 工作流 API ========== print("\n=== 工作流 API ===") await test("工作流文件列表", "GET", "/api/workflows/files", expect_key="files") await test("已加载工作流列表", "GET", "/api/workflows/list", expect_key="workflows") # ========== 人类输入 API ========== print("\n=== 人类输入 API ===") await test("获取摘要", "GET", "/api/humans/summary", expect_key="participants") # ========== 清理 ========== print("\n=== 清理 ===") await test("删除测试 Agent", "DELETE", "/api/agents/test-api-001", expect_key="message") # ========== 汇总 ========== print("\n" + "=" * 60) print(f"测试结果汇总") print("=" * 60) print(f"通过: {passed}") print(f"失败: {failed}") print(f"总计: {passed + failed}") print("=" * 60) if errors: print("\n失败详情:") for e in errors: print(f" {e}") return failed == 0 if __name__ == "__main__": success = asyncio.run(main()) sys.exit(0 if success else 1)