#!/usr/bin/env python3 """ 后端服务完整测试脚本 测试所有 10 个步骤的服务是否正常工作 """ import asyncio import sys import os sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from app.services.storage import get_storage from app.services.file_lock import get_file_lock_service from app.services.heartbeat import get_heartbeat_service from app.services.agent_registry import get_agent_registry from app.services.meeting_scheduler import get_meeting_scheduler from app.services.meeting_recorder import get_meeting_recorder from app.services.resource_manager import get_resource_manager from app.services.workflow_engine import get_workflow_engine from app.services.role_allocator import get_role_allocator async def test_storage_service(): """测试存储服务""" print("\n=== 测试 StorageService ===") storage = get_storage() # 测试写入 await storage.write_json("cache/test_storage.json", {"test": "data", "number": 42}) print("[PASS] 写入 JSON 文件") # 测试读取 data = await storage.read_json("cache/test_storage.json") assert data["test"] == "data", "读取数据不匹配" print("[PASS] 读取 JSON 文件") # 测试存在检查 exists = await storage.exists("cache/test_storage.json") assert exists, "文件应该存在" print("[PASS] 文件存在检查") # 测试删除 await storage.delete("cache/test_storage.json") exists = await storage.exists("cache/test_storage.json") assert not exists, "文件应该已被删除" print("[PASS] 删除文件") print("StorageService 测试通过") return True async def test_file_lock_service(): """测试文件锁服务""" print("\n=== 测试 FileLockService ===") service = get_file_lock_service() # 测试获取锁 success = await service.acquire_lock("src/test/file.py", "agent-001", "TestAgent") assert success, "应该成功获取锁" print("[PASS] 获取文件锁") # 测试检查锁定 locked_by = await service.check_locked("src/test/file.py") assert locked_by == "agent-001", "锁持有者应该匹配" print("[PASS] 检查文件锁定状态") # 测试其他 Agent 无法获取 success = await service.acquire_lock("src/test/file.py", "agent-002", "OtherAgent") assert not success, "其他 Agent 不应该能获取已被锁定的文件" print("[PASS] 冲突检测正常工作") # 测试获取所有锁 locks = await service.get_locks() assert len(locks) >= 1, "应该有至少一个锁" print("[PASS] 获取所有锁列表") # 测试释放锁 success = await service.release_lock("src/test/file.py", "agent-001") assert success, "应该成功释放锁" print("[PASS] 释放文件锁") print("FileLockService 测试通过") return True async def test_heartbeat_service(): """测试心跳服务""" print("\n=== 测试 HeartbeatService ===") service = get_heartbeat_service() # 测试更新心跳 await service.update_heartbeat("agent-001", "working", "测试任务", 50) print("[PASS] 更新心跳") # 测试获取心跳 hb = await service.get_heartbeat("agent-001") assert hb is not None, "心跳信息应该存在" assert hb.status == "working", "状态应该匹配" assert hb.progress == 50, "进度应该匹配" print("[PASS] 获取心跳信息") # 测试获取所有心跳 all_hbs = await service.get_all_heartbeats() assert "agent-001" in all_hbs, "应该在所有心跳列表中" print("[PASS] 获取所有心跳") # 测试活跃 Agent active = await service.get_active_agents(within_seconds=10) assert "agent-001" in active, "应该是活跃 Agent" print("[PASS] 获取活跃 Agent") print("HeartbeatService 测试通过") return True async def test_agent_registry(): """测试 Agent 注册服务""" print("\n=== 测试 AgentRegistry ===") registry = get_agent_registry() # 测试注册 Agent agent = await registry.register_agent( "test-agent-001", "Test Agent", "developer", "claude-opus-4.6", "测试用的 Agent" ) assert agent.agent_id == "test-agent-001", "ID 应该匹配" print("[PASS] 注册 Agent") # 测试获取 Agent fetched = await registry.get_agent("test-agent-001") assert fetched is not None, "应该能获取到 Agent" assert fetched.name == "Test Agent", "名称应该匹配" print("[PASS] 获取 Agent 信息") # 测试更新状态 await registry.update_state("test-agent-001", "修复 bug", 75) print("[PASS] 更新 Agent 状态") # 测试获取状态 state = await registry.get_state("test-agent-001") assert state is not None, "状态应该存在" assert state.progress == 75, "进度应该匹配" print("[PASS] 获取 Agent 状态") # 测试列出所有 Agent agents = await registry.list_agents() assert len(agents) >= 1, "应该至少有一个 Agent" print("[PASS] 列出所有 Agent") print("AgentRegistry 测试通过") return True async def test_meeting_scheduler(): """测试会议调度器""" print("\n=== 测试 MeetingScheduler ===") scheduler = get_meeting_scheduler() # 测试创建会议 queue = await scheduler.create_meeting( "test-meeting-001", "测试会议", ["agent-001", "agent-002"], min_required=2 ) assert queue.meeting_id == "test-meeting-001", "ID 应该匹配" print("[PASS] 创建会议") # 测试获取队列 fetched = await scheduler.get_queue("test-meeting-001") assert fetched is not None, "队列应该存在" print("[PASS] 获取会议队列") # 测试等待会议(模拟两个 Agent 到达) result1 = await scheduler.wait_for_meeting("agent-001", "test-meeting-001", timeout=1) print(f" Agent-1 到达: {result1}") result2 = await scheduler.wait_for_meeting("agent-002", "test-meeting-001", timeout=1) print(f" Agent-2 到达: {result2}") # 最后一个到达者应该触发会议开始 assert result2 == "started", "最后一个到达者应该触发会议开始" print("[PASS] 栅栏同步工作正常") # 测试结束会议 success = await scheduler.end_meeting("test-meeting-001") assert success, "应该成功结束会议" print("[PASS] 结束会议") print("MeetingScheduler 测试通过") return True async def test_meeting_recorder(): """测试会议记录服务""" print("\n=== 测试 MeetingRecorder ===") recorder = get_meeting_recorder() # 测试创建会议记录 meeting = await recorder.create_meeting( "test-record-001", "测试记录会议", ["agent-001", "agent-002"], ["步骤1", "步骤2", "步骤3"] ) assert meeting.meeting_id == "test-record-001", "ID 应该匹配" assert len(meeting.steps) == 3, "应该有 3 个步骤" print("[PASS] 创建会议记录") # 测试添加讨论 await recorder.add_discussion("test-record-001", "agent-001", "Agent1", "这是第一条讨论") await recorder.add_discussion("test-record-001", "agent-002", "Agent2", "这是第二条讨论") print("[PASS] 添加讨论记录") # 测试更新进度 await recorder.update_progress("test-record-001", "步骤1") print("[PASS] 更新会议进度") # 测试获取会议 fetched = await recorder.get_meeting("test-record-001") assert fetched is not None, "会议应该存在" assert len(fetched.discussions) == 2, "应该有 2 条讨论" print("[PASS] 获取会议详情") # 测试结束会议 success = await recorder.end_meeting("test-record-001", "达成共识:继续开发") assert success, "应该成功结束会议" print("[PASS] 结束会议并保存共识") print("MeetingRecorder 测试通过") return True async def test_resource_manager(): """测试资源管理器""" print("\n=== 测试 ResourceManager ===") manager = get_resource_manager() # 测试解析任务文件 files = await manager.parse_task_files("修复 src/auth/login.py 和 src/utils/helper.js 中的 bug") assert "src/auth/login.py" in files or "src/utils/helper.js" in files, "应该能解析出文件路径" print(f"[PASS] 解析任务文件: {files}") # 测试获取 Agent 状态(需要先有注册的 Agent) try: status = await manager.get_agent_status("test-agent-001") print(f"[PASS] 获取 Agent 状态: {status['agent_id']}") except Exception as e: print(f" [WARN] 获取状态警告: {e}") print("ResourceManager 测试通过") return True async def test_workflow_engine(): """测试工作流引擎""" print("\n=== 测试 WorkflowEngine ===") engine = get_workflow_engine() # 确保测试工作流文件存在 workflow_content = """ workflow_id: "test-workflow" name: "测试工作流" description: "用于测试的工作流" meetings: - meeting_id: "step1" title: "第一步" attendees: ["agent-001"] depends_on: [] - meeting_id: "step2" title: "第二步" attendees: ["agent-001", "agent-002"] depends_on: ["step1"] """ import aiofiles from pathlib import Path workflow_path = Path(engine._storage.base_path) / "workflow" / "test.yaml" async with aiofiles.open(workflow_path, mode="w", encoding="utf-8") as f: await f.write(workflow_content) # 测试加载工作流 workflow = await engine.load_workflow("test.yaml") assert workflow.workflow_id == "test-workflow", "ID 应该匹配" assert len(workflow.meetings) == 2, "应该有 2 个会议" print("[PASS] 加载工作流") # 测试获取下一个会议 next_meeting = await engine.get_next_meeting("test-workflow") assert next_meeting is not None, "应该有下一个会议" assert next_meeting.meeting_id == "step1", "第一个会议应该是 step1" print("[PASS] 获取下一个会议") # 测试完成会议 success = await engine.complete_meeting("test-workflow", "step1") assert success, "应该成功标记会议完成" print("[PASS] 标记会议完成") # 测试获取工作流状态 status = await engine.get_workflow_status("test-workflow") assert status is not None, "状态应该存在" assert status["progress"] == "1/2", "进度应该是 1/2" print("[PASS] 获取工作流状态") print("WorkflowEngine 测试通过") return True async def test_role_allocator(): """测试角色分配器""" print("\n=== 测试 RoleAllocator ===") allocator = get_role_allocator() # 测试获取主要角色 primary = allocator.get_primary_role("实现登录功能并编写测试用例") assert primary in ["pm", "developer", "qa", "architect", "reviewer"], "应该是有效角色" print(f"[PASS] 获取主要角色: {primary}") # 测试角色分配 allocation = await allocator.allocate_roles( "设计数据库架构并实现 API", ["claude-001", "kimi-002", "opencode-003"] ) assert len(allocation) == 3, "应该为 3 个 Agent 分配角色" print(f"[PASS] 角色分配: {allocation}") # 测试解释分配 explanation = allocator.explain_allocation("修复 bug", allocation) assert "主要角色" in explanation, "解释应该包含主要角色" print("[PASS] 解释角色分配") print("RoleAllocator 测试通过") return True async def run_all_tests(): """运行所有测试""" print("=" * 60) print("Swarm Command Center - 后端服务完整测试") print("=" * 60) tests = [ ("StorageService", test_storage_service), ("FileLockService", test_file_lock_service), ("HeartbeatService", test_heartbeat_service), ("AgentRegistry", test_agent_registry), ("MeetingScheduler", test_meeting_scheduler), ("MeetingRecorder", test_meeting_recorder), ("ResourceManager", test_resource_manager), ("WorkflowEngine", test_workflow_engine), ("RoleAllocator", test_role_allocator), ] results = [] for name, test_func in tests: try: success = await test_func() results.append((name, success, None)) except Exception as e: print(f"[FAIL] {name} 测试失败: {e}") import traceback traceback.print_exc() results.append((name, False, str(e))) # 打印总结 print("\n" + "=" * 60) print("测试结果总结") print("=" * 60) passed = sum(1 for _, success, _ in results if success) total = len(results) for name, success, error in results: status = "[PASS]" if success else f"[FAIL: {error}]" print(f"{name:20s} {status}") print("=" * 60) print(f"总计: {passed}/{total} 通过") print("=" * 60) return passed == total if __name__ == "__main__": success = asyncio.run(run_all_tests()) sys.exit(0 if success else 1)