重构 API 路由并新增工作流编排功能

后端:
- 重构 agents, heartbeats, locks, meetings, resources, roles, workflows 路由
- 新增 orchestrator 和 providers 路由
- 新增 CLI 调用器和流程编排服务
- 添加日志配置和依赖项

前端:
- 更新 AgentsPage、SettingsPage、WorkflowPage 页面
- 扩展 api.ts 新增 API 接口

其他:
- 清理测试 agent 数据文件
- 新增示例工作流和项目审计报告

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Claude Code
2026-03-10 16:36:25 +08:00
parent 7a5a58b4e5
commit 1719d1f1f9
54 changed files with 3175 additions and 612 deletions

View File

@@ -3,12 +3,16 @@ Swarm Command Center - FastAPI 主入口
多智能体协作系统的协调层后端服务
"""
import logging
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
import uvicorn
logging.basicConfig(level=logging.INFO, format="%(levelname)s:%(name)s: %(message)s")
from app.routers import agents, locks, meetings, heartbeats, workflows, resources, roles, humans
from app.routers import agents_control, websocket
from app.routers import agents_control, websocket, orchestrator, providers
# 创建 FastAPI 应用实例
app = FastAPI(
@@ -67,6 +71,8 @@ app.include_router(resources.router, prefix="/api", tags=["resources"])
app.include_router(roles.router, prefix="/api/roles", tags=["roles"])
app.include_router(humans.router, prefix="/api/humans", tags=["humans"])
app.include_router(websocket.router, tags=["websocket"])
app.include_router(orchestrator.router, prefix="/api", tags=["orchestrator"])
app.include_router(providers.router, prefix="/api", tags=["providers"])
def main():

View File

@@ -1,26 +1,16 @@
"""
Agent 管理 API 路由
接入 AgentRegistry 服务,提供 Agent 注册、查询、状态管理
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Optional
import time
from typing import Optional
from dataclasses import asdict
from ..services.agent_registry import get_agent_registry
router = APIRouter()
# 内存存储,实际应用应该使用持久化存储
agents_db = {}
class Agent(BaseModel):
agent_id: str
name: str
role: str
model: str
description: Optional[str] = None
status: str = "idle"
created_at: float = 0
class AgentCreate(BaseModel):
agent_id: str
@@ -30,138 +20,80 @@ class AgentCreate(BaseModel):
description: Optional[str] = None
# Agent状态存储
agent_states_db = {}
class AgentStateUpdate(BaseModel):
task: Optional[str] = ""
progress: Optional[int] = 0
working_files: Optional[list] = None
status: Optional[str] = "idle"
@router.get("")
@router.get("/")
async def list_agents():
"""获取所有 Agent 列表"""
# 合并数据库和默认agent
default_agents = [
{
"agent_id": "claude-001",
"name": "Claude Code",
"role": "developer",
"model": "claude-opus-4.6",
"status": "working",
"description": "主开发 Agent",
"created_at": time.time() - 86400
},
{
"agent_id": "kimi-001",
"name": "Kimi CLI",
"role": "architect",
"model": "kimi-k2",
"status": "idle",
"description": "架构设计 Agent",
"created_at": time.time() - 72000
},
{
"agent_id": "opencode-001",
"name": "OpenCode",
"role": "reviewer",
"model": "opencode-v1",
"status": "idle",
"description": "代码审查 Agent",
"created_at": time.time() - 36000
}
]
# 使用数据库中的agent覆盖默认的
agents_map = {a["agent_id"]: a for a in default_agents}
agents_map.update(agents_db)
return {"agents": list(agents_map.values())}
registry = get_agent_registry()
agents = await registry.list_agents()
return {
"agents": [asdict(agent) for agent in agents]
}
@router.post("/register")
async def register_agent(agent: AgentCreate):
"""注册新 Agent"""
agent_data = {
"agent_id": agent.agent_id,
"name": agent.name,
"role": agent.role,
"model": agent.model,
"description": agent.description or "",
"status": "idle",
"created_at": time.time()
}
agents_db[agent.agent_id] = agent_data
return agent_data
registry = get_agent_registry()
agent_info = await registry.register_agent(
agent_id=agent.agent_id,
name=agent.name,
role=agent.role,
model=agent.model,
description=agent.description or ""
)
return asdict(agent_info)
@router.get("/{agent_id}")
async def get_agent(agent_id: str):
"""获取指定 Agent 信息"""
if agent_id in agents_db:
return agents_db[agent_id]
raise HTTPException(status_code=404, detail="Agent not found")
registry = get_agent_registry()
agent_info = await registry.get_agent(agent_id)
if not agent_info:
raise HTTPException(status_code=404, detail="Agent not found")
return asdict(agent_info)
@router.delete("/{agent_id}")
async def delete_agent(agent_id: str):
"""删除 Agent"""
if agent_id in agents_db:
del agents_db[agent_id]
return {"message": "Agent deleted"}
raise HTTPException(status_code=404, detail="Agent not found")
registry = get_agent_registry()
success = await registry.unregister_agent(agent_id)
if not success:
raise HTTPException(status_code=404, detail="Agent not found")
return {"message": "Agent deleted"}
@router.get("/{agent_id}/state")
async def get_agent_state(agent_id: str):
"""获取 Agent 状态"""
# 如果存在真实状态,返回真实状态
if agent_id in agent_states_db:
return agent_states_db[agent_id]
# 默认mock状态
default_states = {
"claude-001": {
"agent_id": agent_id,
"task": "修复用户登录bug",
"progress": 65,
"working_files": ["src/auth/login.py", "src/auth/jwt.py"],
"status": "working",
"last_update": time.time() - 120
},
"kimi-001": {
"agent_id": agent_id,
"task": "等待会议开始",
"progress": 0,
"working_files": [],
"status": "waiting",
"last_update": time.time() - 300
},
"opencode-001": {
"agent_id": agent_id,
"task": "代码审查",
"progress": 30,
"working_files": ["src/components/Button.tsx"],
"status": "working",
"last_update": time.time() - 60
}
}
return default_states.get(agent_id, {
"agent_id": agent_id,
"task": "空闲",
"progress": 0,
"working_files": [],
"status": "idle",
"last_update": time.time()
})
registry = get_agent_registry()
state = await registry.get_state(agent_id)
if not state:
raise HTTPException(status_code=404, detail="Agent state not found")
return asdict(state)
@router.post("/{agent_id}/state")
async def update_agent_state(agent_id: str, data: dict):
async def update_agent_state(agent_id: str, data: AgentStateUpdate):
"""更新 Agent 状态"""
agent_states_db[agent_id] = {
"agent_id": agent_id,
"task": data.get("task", ""),
"progress": data.get("progress", 0),
"working_files": data.get("working_files", []),
"status": data.get("status", "idle"),
"last_update": time.time()
}
registry = get_agent_registry()
agent_info = await registry.get_agent(agent_id)
if not agent_info:
raise HTTPException(status_code=404, detail="Agent not found")
await registry.update_state(
agent_id=agent_id,
task=data.task or "",
progress=data.progress or 0,
working_files=data.working_files
)
return {"success": True}

View File

@@ -1,48 +1,64 @@
"""
心跳管理 API 路由
接入 HeartbeatService 服务,监控 Agent 活跃状态
"""
from fastapi import APIRouter
from pydantic import BaseModel
from typing import Dict
import time
from typing import Optional
from ..services.heartbeat import get_heartbeat_service
router = APIRouter()
heartbeats_db = {}
class Heartbeat(BaseModel):
agent_id: str
timestamp: float
is_timeout: bool = False
class HeartbeatUpdate(BaseModel):
status: str = "idle"
current_task: Optional[str] = ""
progress: Optional[int] = 0
@router.get("")
@router.get("/")
async def list_heartbeats():
"""获取所有 Agent 心跳"""
return {
"heartbeats": {
"claude-001": {
"agent_id": "claude-001",
"timestamp": time.time() - 30,
"is_timeout": False
},
"kimi-001": {
"agent_id": "kimi-001",
"timestamp": time.time() - 60,
"is_timeout": False
}
service = get_heartbeat_service()
all_hb = await service.get_all_heartbeats()
heartbeats = {}
for agent_id, hb in all_hb.items():
heartbeats[agent_id] = {
"agent_id": hb.agent_id,
"last_heartbeat": hb.last_heartbeat,
"status": hb.status,
"current_task": hb.current_task,
"progress": hb.progress,
"elapsed_display": hb.elapsed_display,
"is_timeout": hb.is_timeout()
}
}
return {"heartbeats": heartbeats}
@router.post("/{agent_id}")
async def update_heartbeat(agent_id: str):
async def update_heartbeat(agent_id: str, data: HeartbeatUpdate = None):
"""更新 Agent 心跳"""
heartbeats_db[agent_id] = {
"agent_id": agent_id,
"timestamp": time.time(),
"is_timeout": False
}
service = get_heartbeat_service()
if data is None:
data = HeartbeatUpdate()
await service.update_heartbeat(
agent_id=agent_id,
status=data.status,
current_task=data.current_task or "",
progress=data.progress or 0
)
return {"success": True}
@router.get("/timeouts")
async def check_timeouts(timeout_seconds: int = 60):
"""检查超时的 Agent"""
service = get_heartbeat_service()
timeout_agents = await service.check_timeout(timeout_seconds)
return {
"timeout_seconds": timeout_seconds,
"timeout_agents": timeout_agents,
"count": len(timeout_agents)
}

View File

@@ -1,89 +1,82 @@
"""
文件锁 API 路由
接入 FileLockService 服务,管理文件的排他锁
"""
from fastapi import APIRouter
from pydantic import BaseModel
from typing import List, Optional
import time
from typing import Optional
from dataclasses import asdict
from ..services.file_lock import get_file_lock_service
router = APIRouter()
locks_db = [
{
"file_path": "src/main.py",
"agent_id": "claude-001",
"agent_name": "Claude Code",
"locked_at": time.time() - 3600
},
{
"file_path": "src/utils.py",
"agent_id": "kimi-001",
"agent_name": "Kimi CLI",
"locked_at": time.time() - 1800
}
]
class FileLock(BaseModel):
class LockAcquireRequest(BaseModel):
file_path: str
agent_id: str
agent_name: str = ""
locked_at: float
agent_name: Optional[str] = ""
def format_elapsed(locked_at: float) -> str:
"""格式化已锁定时间"""
elapsed = time.time() - locked_at
if elapsed < 60:
return f"{int(elapsed)}"
elif elapsed < 3600:
return f"{int(elapsed / 60)}分钟"
else:
return f"{elapsed / 3600:.1f}小时"
class LockReleaseRequest(BaseModel):
file_path: str
agent_id: str
@router.get("")
@router.get("/")
async def list_locks():
"""获取所有文件锁列表"""
locks_with_display = []
for lock in locks_db:
lock_copy = lock.copy()
lock_copy["elapsed_display"] = format_elapsed(lock["locked_at"])
locks_with_display.append(lock_copy)
return {"locks": locks_with_display}
service = get_file_lock_service()
locks = await service.get_locks()
return {
"locks": [
{
"file_path": lock.file_path,
"agent_id": lock.agent_id,
"agent_name": lock.agent_name,
"acquired_at": lock.acquired_at,
"elapsed_display": lock.elapsed_display
}
for lock in locks
]
}
@router.post("/acquire")
async def acquire_lock(lock: FileLock):
async def acquire_lock(request: LockAcquireRequest):
"""获取文件锁"""
# 检查是否已被锁定
for existing in locks_db:
if existing["file_path"] == lock.file_path:
return {"success": False, "message": "File already locked"}
locks_db.append({
"file_path": lock.file_path,
"agent_id": lock.agent_id,
"agent_name": lock.agent_name or lock.agent_id,
"locked_at": time.time()
})
return {"success": True, "message": "Lock acquired"}
service = get_file_lock_service()
success = await service.acquire_lock(
file_path=request.file_path,
agent_id=request.agent_id,
agent_name=request.agent_name or ""
)
if success:
return {"success": True, "message": "Lock acquired"}
return {"success": False, "message": "File already locked by another agent"}
@router.post("/release")
async def release_lock(data: dict):
async def release_lock(request: LockReleaseRequest):
"""释放文件锁"""
file_path = data.get("file_path", "")
agent_id = data.get("agent_id", "")
global locks_db
locks_db = [l for l in locks_db if not (l["file_path"] == file_path and l["agent_id"] == agent_id)]
return {"success": True, "message": "Lock released"}
service = get_file_lock_service()
success = await service.release_lock(
file_path=request.file_path,
agent_id=request.agent_id
)
if success:
return {"success": True, "message": "Lock released"}
return {"success": False, "message": "Lock not found or not owned by this agent"}
@router.get("/check")
async def check_lock(file_path: str):
"""检查文件锁定状态"""
for lock in locks_db:
if lock["file_path"] == file_path:
return {"file_path": file_path, "locked": True, "locked_by": lock["agent_id"]}
return {"file_path": file_path, "locked": False}
service = get_file_lock_service()
locked_by = await service.check_locked(file_path)
return {
"file_path": file_path,
"locked": locked_by is not None,
"locked_by": locked_by
}

View File

@@ -1,195 +1,260 @@
"""
会议管理 API 路由
接入 MeetingScheduler栅栏同步+ MeetingRecorder会议记录
"""
from fastapi import APIRouter
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import List, Optional
from dataclasses import asdict
from datetime import datetime
import time
from ..services.meeting_scheduler import get_meeting_scheduler
from ..services.meeting_recorder import get_meeting_recorder
router = APIRouter()
meetings_db = []
class Meeting(BaseModel):
meeting_id: str
title: str
status: str
attendees: List[str]
agenda: str
progress_summary: str
created_at: float
class MeetingCreate(BaseModel):
title: str
agenda: str
meeting_type: str = "design_review"
agenda: Optional[str] = ""
meeting_type: Optional[str] = "design_review"
attendees: List[str] = []
steps: Optional[List[str]] = None
class MeetingWaitRequest(BaseModel):
agent_id: str
timeout: Optional[int] = 300
class DiscussionRequest(BaseModel):
agent_id: str
agent_name: Optional[str] = ""
content: str
step: Optional[str] = ""
class ProgressRequest(BaseModel):
step: str
class FinishRequest(BaseModel):
consensus: Optional[str] = ""
def _meeting_to_dict(meeting) -> dict:
"""将 MeetingInfo 转为前端友好的 dict"""
return {
"meeting_id": meeting.meeting_id,
"title": meeting.title,
"date": meeting.date,
"status": meeting.status,
"attendees": meeting.attendees,
"steps": [
{"step_id": s.step_id, "label": s.label, "status": s.status}
for s in meeting.steps
],
"discussions": [
{
"agent_id": d.agent_id,
"agent_name": d.agent_name,
"content": d.content,
"timestamp": d.timestamp,
"step": d.step
}
for d in meeting.discussions
],
"progress_summary": meeting.progress_summary,
"consensus": meeting.consensus,
"created_at": meeting.created_at,
"ended_at": meeting.ended_at
}
@router.get("")
@router.get("/")
async def list_meetings():
"""获取所有会议列表"""
return {
"meetings": [
{
"meeting_id": "meeting-001",
"title": "架构设计评审",
"status": "in_progress",
"attendees": ["claude-001", "kimi-001"],
"agenda": "讨论系统架构设计",
"progress_summary": "50%",
"created_at": time.time() - 7200
},
{
"meeting_id": "meeting-002",
"title": "代码审查会议",
"status": "completed",
"attendees": ["claude-001"],
"agenda": "审查前端组件代码",
"progress_summary": "100%",
"created_at": time.time() - 86400
}
]
}
async def list_meetings(date: Optional[str] = None):
"""获取会议列表(默认今天)"""
recorder = get_meeting_recorder()
meetings = await recorder.list_meetings(date)
return {"meetings": [_meeting_to_dict(m) for m in meetings]}
@router.get("/today")
async def list_today_meetings():
"""获取今日会议"""
today = datetime.now().strftime("%Y-%m-%d")
return {
"meetings": [
{
"meeting_id": "meeting-001",
"title": "架构设计评审",
"date": today,
"status": "in_progress",
"attendees": ["claude-001", "kimi-001"],
"steps": [
{"step_id": "step-1", "label": "收集想法", "status": "completed"},
{"step_id": "step-2", "label": "讨论迭代", "status": "active"},
{"step_id": "step-3", "label": "生成共识", "status": "pending"}
],
"discussions": [
{
"agent_id": "claude-001",
"agent_name": "Claude Code",
"content": "建议采用微服务架构",
"timestamp": datetime.now().isoformat(),
"step": "讨论迭代"
}
],
"progress_summary": "50%",
"consensus": ""
},
{
"meeting_id": "meeting-002",
"title": "代码审查会议",
"date": today,
"status": "completed",
"attendees": ["claude-001"],
"steps": [
{"step_id": "step-1", "label": "代码检查", "status": "completed"},
{"step_id": "step-2", "label": "问题讨论", "status": "completed"}
],
"discussions": [],
"progress_summary": "100%",
"consensus": "代码质量良好,可以合并"
}
]
}
recorder = get_meeting_recorder()
meetings = await recorder.list_meetings()
return {"meetings": [_meeting_to_dict(m) for m in meetings]}
@router.post("/")
async def create_meeting(meeting: MeetingCreate):
"""创建新会议"""
meeting_id = f"meeting-{int(time.time())}"
meeting_data = {
"meeting_id": meeting_id,
"title": meeting.title,
"status": "waiting",
"attendees": meeting.attendees,
"agenda": meeting.agenda,
"progress_summary": "0%",
"created_at": time.time()
}
meetings_db.append(meeting_data)
return meeting_data
"""创建新会议(同时创建调度记录和会议记录)"""
recorder = get_meeting_recorder()
scheduler = get_meeting_scheduler()
meeting_id = f"meeting-{int(datetime.now().timestamp())}"
@router.get("/{meeting_id}")
async def get_meeting(meeting_id: str):
"""获取会议详情"""
for meeting in meetings_db:
if meeting["meeting_id"] == meeting_id:
return meeting
# 返回模拟数据
return {
"meeting_id": meeting_id,
"title": "测试会议",
"status": "in_progress",
"attendees": ["claude-001"],
"agenda": "测试议程",
"progress_summary": "50%",
"created_at": time.time()
}
# 在调度器中创建(用于栅栏同步)
await scheduler.create_meeting(
meeting_id=meeting_id,
title=meeting.title,
expected_attendees=meeting.attendees
)
# 在记录器中创建(用于记录内容)
meeting_info = await recorder.create_meeting(
meeting_id=meeting_id,
title=meeting.title,
attendees=meeting.attendees,
steps=meeting.steps
)
return _meeting_to_dict(meeting_info)
@router.post("/create")
async def create_meeting_api(meeting: MeetingCreate):
"""创建会议 API前端使用的端点"""
async def create_meeting_alt(meeting: MeetingCreate):
"""创建会议 API前端使用的端点,与 POST / 相同"""
return await create_meeting(meeting)
@router.get("/{meeting_id}")
async def get_meeting(meeting_id: str, date: Optional[str] = None):
"""获取会议详情"""
recorder = get_meeting_recorder()
meeting_info = await recorder.get_meeting(meeting_id, date)
if not meeting_info:
raise HTTPException(status_code=404, detail="Meeting not found")
return _meeting_to_dict(meeting_info)
@router.get("/{meeting_id}/queue")
async def get_meeting_queue(meeting_id: str):
"""获取会议等待队列"""
scheduler = get_meeting_scheduler()
queue = await scheduler.get_queue(meeting_id)
if not queue:
raise HTTPException(status_code=404, detail="Meeting queue not found")
return {
"meeting_id": queue.meeting_id,
"title": queue.title,
"status": queue.status,
"expected_attendees": queue.expected_attendees,
"arrived_attendees": queue.arrived_attendees,
"missing_attendees": queue.missing_attendees,
"progress": queue.progress,
"is_ready": queue.is_ready
}
@router.post("/{meeting_id}/wait")
async def wait_for_meeting(meeting_id: str, request: MeetingWaitRequest):
"""栅栏同步等待(阻塞直到所有参会者到齐或超时)"""
scheduler = get_meeting_scheduler()
status = await scheduler.wait_for_meeting(
agent_id=request.agent_id,
meeting_id=meeting_id,
timeout=request.timeout or 300
)
return {"meeting_id": meeting_id, "status": status}
@router.post("/{meeting_id}/end")
async def end_meeting(meeting_id: str):
"""结束会议(调度层)"""
scheduler = get_meeting_scheduler()
success = await scheduler.end_meeting(meeting_id)
if not success:
raise HTTPException(status_code=404, detail="Meeting not found")
return {"success": True, "meeting_id": meeting_id}
@router.post("/{meeting_id}/join")
async def join_meeting(meeting_id: str, data: dict):
"""Agent 加入会议"""
agent_id = data.get("agent_id", "")
scheduler = get_meeting_scheduler()
await scheduler.add_attendee(meeting_id, agent_id)
return {"success": True, "meeting_id": meeting_id, "agent_id": agent_id}
@router.post("/{meeting_id}/discuss")
async def add_discussion(meeting_id: str, data: dict):
async def add_discussion(meeting_id: str, data: DiscussionRequest):
"""添加讨论内容"""
recorder = get_meeting_recorder()
await recorder.add_discussion(
meeting_id=meeting_id,
agent_id=data.agent_id,
agent_name=data.agent_name or data.agent_id,
content=data.content,
step=data.step or ""
)
return {"success": True, "meeting_id": meeting_id}
@router.post("/{meeting_id}/finish")
async def finish_meeting(meeting_id: str, data: dict):
"""完成会议"""
async def finish_meeting(meeting_id: str, data: FinishRequest):
"""完成会议(记录层 - 保存共识并标记完成)"""
recorder = get_meeting_recorder()
success = await recorder.end_meeting(
meeting_id=meeting_id,
consensus=data.consensus or ""
)
if not success:
raise HTTPException(status_code=404, detail="Meeting not found")
# 同时结束调度
scheduler = get_meeting_scheduler()
await scheduler.end_meeting(meeting_id)
return {"success": True, "meeting_id": meeting_id}
@router.post("/{meeting_id}/progress")
async def update_progress(meeting_id: str, data: dict):
"""更新进度"""
async def update_progress(meeting_id: str, data: ProgressRequest):
"""更新会议进度"""
recorder = get_meeting_recorder()
await recorder.update_progress(
meeting_id=meeting_id,
step_label=data.step
)
return {"success": True, "meeting_id": meeting_id}
@router.post("/record/create")
async def create_meeting_record(data: dict):
"""创建会议记录(前端使用的端点)"""
meeting_id = f"meeting-{int(time.time())}"
meeting_data = {
"meeting_id": meeting_id,
"title": data.get("title", "未命名会议"),
"agenda": data.get("agenda", ""),
"attendees": data.get("attendees", []),
"status": "waiting",
"progress_summary": "0%",
"steps": data.get("steps", []),
"discussions": [],
"created_at": time.time()
}
meetings_db.append(meeting_data)
return meeting_data
recorder = get_meeting_recorder()
meeting_id = data.get("meeting_id", f"meeting-{int(datetime.now().timestamp())}")
meeting_info = await recorder.create_meeting(
meeting_id=meeting_id,
title=data.get("title", "未命名会议"),
attendees=data.get("attendees", []),
steps=data.get("steps", [])
)
# 同时在调度器中注册
scheduler = get_meeting_scheduler()
await scheduler.create_meeting(
meeting_id=meeting_id,
title=data.get("title", "未命名会议"),
expected_attendees=data.get("attendees", [])
)
return _meeting_to_dict(meeting_info)
@router.post("/record/{meeting_id}/discussion")
async def add_meeting_discussion(meeting_id: str, data: dict):
"""添加会议讨论(前端使用的端点)"""
recorder = get_meeting_recorder()
await recorder.add_discussion(
meeting_id=meeting_id,
agent_id=data.get("agent_id", ""),
agent_name=data.get("agent_name", data.get("agent_id", "")),
content=data.get("content", ""),
step=data.get("step", "")
)
return {"success": True, "meeting_id": meeting_id, "discussion": data}

View File

@@ -0,0 +1,57 @@
"""
工作流编排器 API
提供启动自动工作流、查看运行状态的端点
"""
from fastapi import APIRouter, HTTPException
from pydantic import BaseModel
from typing import Dict, Optional
from ..services.workflow_orchestrator import get_workflow_orchestrator
router = APIRouter(prefix="/orchestrator", tags=["orchestrator"])
class StartWorkflowRequest(BaseModel):
"""启动工作流请求"""
workflow_path: str # YAML 文件名,如 dinner-decision.yaml
agent_overrides: Optional[Dict[str, str]] = None # agent_id → model 覆盖
@router.post("/start")
async def start_workflow(request: StartWorkflowRequest):
"""启动一个工作流的自动编排(后台异步执行)"""
orchestrator = get_workflow_orchestrator()
try:
run = await orchestrator.start_workflow(
workflow_path=request.workflow_path,
agent_overrides=request.agent_overrides,
)
return {
"success": True,
"message": f"工作流已启动: {run.workflow_name}",
"run_id": run.run_id,
"workflow_id": run.workflow_id,
}
except FileNotFoundError:
raise HTTPException(status_code=404, detail=f"工作流文件不存在: {request.workflow_path}")
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/runs")
async def list_runs():
"""列出所有编排运行"""
orchestrator = get_workflow_orchestrator()
return {"runs": orchestrator.list_runs()}
@router.get("/runs/{run_id}")
async def get_run(run_id: str):
"""获取指定运行的详细状态"""
orchestrator = get_workflow_orchestrator()
run = orchestrator.get_run(run_id)
if not run:
raise HTTPException(status_code=404, detail=f"运行不存在: {run_id}")
return run.to_dict()

View File

@@ -0,0 +1,129 @@
"""
Provider / CLI 检测与配置 API
提供系统可用的 AI CLI 工具检测和 LLM Provider 状态查询
"""
import os
from fastapi import APIRouter
from ..services.cli_invoker import detect_available_clis, CLI_REGISTRY
router = APIRouter(prefix="/providers", tags=["providers"])
# 支持的 CLI 工具元信息
CLI_META = {
"claude": {
"display_name": "Claude Code",
"description": "Anthropic Claude CLI",
"models": ["claude", "claude-sonnet", "claude-opus"],
},
"kimi": {
"display_name": "Kimi CLI",
"description": "Moonshot Kimi CLI",
"models": ["kimi", "kimi-k2", "moonshot"],
},
"opencode": {
"display_name": "OpenCode",
"description": "OpenCode CLI (支持多种模型)",
"models": ["opencode"],
},
}
# 支持的 LLM API Provider
API_PROVIDERS = {
"anthropic": {
"display_name": "Anthropic",
"env_key": "ANTHROPIC_API_KEY",
"models": ["claude-opus-4.6", "claude-sonnet-4.6", "claude-haiku-4.6"],
},
"openai": {
"display_name": "OpenAI",
"env_key": "OPENAI_API_KEY",
"models": ["gpt-4o", "gpt-4-turbo", "gpt-3.5-turbo"],
},
"deepseek": {
"display_name": "DeepSeek",
"env_key": "DEEPSEEK_API_KEY",
"models": ["deepseek-chat", "deepseek-coder"],
},
"google": {
"display_name": "Google Gemini",
"env_key": "GOOGLE_API_KEY",
"models": ["gemini-2.5-pro", "gemini-2.5-flash"],
},
}
@router.get("/")
async def list_providers():
"""
列出所有可用的 AI ProviderCLI + API
前端用于填充 Agent 注册的模型下拉框和 Settings 的 Provider 配置区
"""
available_clis = detect_available_clis()
cli_list = []
for name, meta in CLI_META.items():
installed = name in available_clis
cli_list.append({
"id": name,
"type": "cli",
"display_name": meta["display_name"],
"description": meta["description"],
"installed": installed,
"path": available_clis.get(name, ""),
"models": meta["models"],
})
api_list = []
for name, meta in API_PROVIDERS.items():
has_key = bool(os.environ.get(meta["env_key"]))
api_list.append({
"id": name,
"type": "api",
"display_name": meta["display_name"],
"env_key": meta["env_key"],
"configured": has_key,
"models": meta["models"],
})
return {
"cli": cli_list,
"api": api_list,
}
@router.get("/models")
async def list_available_models():
"""
列出当前可用的所有模型(已安装 CLI + 已配置 API Key 的模型)
前端 Agent 注册弹窗的模型下拉框直接使用此接口
"""
available_clis = detect_available_clis()
models = []
for name in available_clis:
meta = CLI_META.get(name, {})
display = meta.get("display_name", name)
for model in meta.get("models", [name]):
models.append({
"value": model,
"label": f"{model} ({display})",
"provider": name,
"type": "cli",
})
for name, meta in API_PROVIDERS.items():
if os.environ.get(meta["env_key"]):
for model in meta["models"]:
models.append({
"value": model,
"label": f"{model} ({meta['display_name']} API)",
"provider": name,
"type": "api",
})
return {"models": models}

View File

@@ -1,10 +1,12 @@
"""
资源管理 API 路由
接入 ResourceManager 服务,提供声明式任务执行
"""
from fastapi import APIRouter
from pydantic import BaseModel
from typing import List, Optional
import time
from typing import Optional
from ..services.resource_manager import get_resource_manager
router = APIRouter()
@@ -21,55 +23,35 @@ class TaskParseRequest(BaseModel):
@router.post("/execute")
async def execute_task(request: TaskRequest):
"""执行任务"""
"""执行任务(自动管理文件锁和心跳)"""
manager = get_resource_manager()
result = await manager.execute_task(
agent_id=request.agent_id,
task_description=request.task,
timeout=request.timeout or 300
)
return {
"success": True,
"message": f"任务 '{request.task}' 已执行",
"files_locked": ["src/main.py"],
"duration_seconds": 5.5
"success": result.success,
"message": result.message,
"files_locked": result.files_locked,
"duration_seconds": round(result.duration_seconds, 2)
}
@router.get("/status")
async def get_all_status():
"""获取所有 Agent 状态"""
from ..services.agent_registry import get_agent_registry
from ..services.heartbeat import get_heartbeat_service
registry = get_agent_registry()
heartbeat_service = get_heartbeat_service()
# 获取所有已注册的 Agent
all_agents = await registry.list_agents()
agent_map = {a.agent_id: a for a in all_agents}
# 获取所有心跳
heartbeats_data = await heartbeat_service.get_all_heartbeats()
result = []
for agent_id, agent in agent_map.items():
heartbeat = heartbeats_data.get(agent_id)
result.append({
"agent_id": agent_id,
"info": {
"name": agent.name,
"role": agent.role,
"model": agent.model
},
"heartbeat": {
"status": heartbeat.status if heartbeat else "offline",
"current_task": heartbeat.current_task if heartbeat else "",
"progress": heartbeat.progress if heartbeat else 0
}
})
return {"agents": result}
"""获取所有 Agent 状态(整合注册、心跳、锁信息)"""
manager = get_resource_manager()
statuses = await manager.get_all_status()
return {"agents": statuses}
@router.post("/parse-task")
async def parse_task(request: TaskParseRequest):
"""解析任务文件"""
"""解析任务中涉及的文件路径"""
manager = get_resource_manager()
files = await manager.parse_task_files(request.task)
return {
"task": request.task,
"files": ["src/main.py", "src/utils.py"]
"files": files
}

View File

@@ -1,9 +1,12 @@
"""
角色分配 API 路由
接入 RoleAllocator 服务,基于任务分析分配角色
"""
from fastapi import APIRouter
from pydantic import BaseModel
from typing import List, Dict
from typing import List
from ..services.role_allocator import get_role_allocator
router = APIRouter()
@@ -20,29 +23,28 @@ class RoleAllocateRequest(BaseModel):
@router.post("/primary")
async def get_primary_role(request: RoleRequest):
"""获取任务主要角色"""
allocator = get_role_allocator()
primary = allocator.get_primary_role(request.task)
role_scores = allocator._analyze_task_roles(request.task)
return {
"task": request.task,
"primary_role": "developer",
"role_scores": {
"developer": 0.8,
"architect": 0.6,
"qa": 0.4,
"pm": 0.2
}
"primary_role": primary,
"role_scores": {k: round(v, 2) for k, v in role_scores.items()}
}
@router.post("/allocate")
async def allocate_roles(request: RoleAllocateRequest):
"""分配角色"""
allocation = {}
for i, agent in enumerate(request.agents):
roles = ["developer", "architect", "qa"]
allocation[agent] = roles[i % len(roles)]
allocator = get_role_allocator()
allocation = await allocator.allocate_roles(
task=request.task,
available_agents=request.agents
)
primary = allocator.get_primary_role(request.task)
return {
"task": request.task,
"primary_role": "developer",
"primary_role": primary,
"allocation": allocation
}
@@ -50,6 +52,10 @@ async def allocate_roles(request: RoleAllocateRequest):
@router.post("/explain")
async def explain_roles(request: RoleAllocateRequest):
"""解释角色分配"""
return {
"explanation": f"基于任务 '{request.task}' 的分析,推荐了最适合的角色分配方案。"
}
allocator = get_role_allocator()
allocation = await allocator.allocate_roles(
task=request.task,
available_agents=request.agents
)
explanation = allocator.explain_allocation(request.task, allocation)
return {"explanation": explanation}

View File

@@ -1,7 +1,7 @@
"""
工作流管理 API 路由
"""
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from pathlib import Path
@@ -80,6 +80,28 @@ async def list_workflow_files():
return {"files": files}
@router.post("/upload")
async def upload_workflow(file: UploadFile = File(...)):
"""上传工作流 YAML 文件"""
if not file.filename or not file.filename.endswith(('.yaml', '.yml')):
raise HTTPException(status_code=400, detail="仅支持 .yaml 或 .yml 文件")
engine = get_workflow_engine()
workflow_dir = Path(engine._storage.base_path) / engine.WORKFLOWS_DIR
workflow_dir.mkdir(parents=True, exist_ok=True)
dest = workflow_dir / file.filename
content = await file.read()
dest.write_bytes(content)
return {
"success": True,
"message": f"已上传 {file.filename}",
"path": f"workflow/{file.filename}",
"size": len(content)
}
@router.get("/list")
async def list_workflows():
"""获取已加载的工作流列表"""

View File

@@ -0,0 +1,279 @@
"""
CLI 调用器
通过子进程调用真实的 AI CLI 工具Claude Code / Kimi CLI / OpenCode
将 prompt 发送给 CLI 并捕获输出。
支持的 CLI
- claude: Claude Code CLI使用 -p 参数发送单轮 prompt
- kimi: Kimi CLI使用 -p 参数发送单轮 prompt
- opencode: OpenCode CLI
"""
import asyncio
import logging
import os
import re
import time
import shutil
from typing import Optional, Tuple
from dataclasses import dataclass
logger = logging.getLogger(__name__)
# CLI 命令映射model 前缀 → (二进制名, 构造参数的函数)
CLI_REGISTRY = {
"claude": "claude",
"kimi": "kimi",
"opencode": "opencode",
}
@dataclass
class CLIResult:
"""CLI 调用结果"""
content: str
cli_name: str
exit_code: int
latency: float
success: bool
error: str = ""
def detect_available_clis() -> dict:
"""检测系统中可用的 CLI 工具"""
available = {}
for name, binary in CLI_REGISTRY.items():
path = shutil.which(binary)
if path:
available[name] = path
return available
def resolve_cli(model: str) -> Optional[str]:
"""
根据 agent 的 model 字段判断应使用哪个 CLI
规则:
- 以 "claude" 开头 → claude CLI
- 以 "kimi" 开头 → kimi CLI
- 以 "opencode" 开头 → opencode CLI
- 完全匹配 CLI 名 → 直接使用
"""
model_lower = model.lower().strip()
for prefix in CLI_REGISTRY:
if model_lower.startswith(prefix):
return prefix
if model_lower in CLI_REGISTRY:
return model_lower
return None
async def invoke_cli(
cli_name: str,
prompt: str,
timeout: int = 120,
max_tokens: int = 1024,
system_prompt: str = "",
) -> CLIResult:
"""
调用指定的 CLI 工具并返回结果
参数:
cli_name: CLI 名称claude / kimi / opencode
prompt: 要发送的 prompt
timeout: 超时秒数
max_tokens: 最大 token 数
"""
binary = CLI_REGISTRY.get(cli_name)
if not binary:
return CLIResult(
content="", cli_name=cli_name, exit_code=-1,
latency=0, success=False, error=f"未知 CLI: {cli_name}"
)
# 必须获取完整路径,否则 subprocess 在不同环境下可能找不到
full_path = shutil.which(binary)
if not full_path:
return CLIResult(
content="", cli_name=cli_name, exit_code=-1,
latency=0, success=False, error=f"CLI 未安装: {binary}"
)
cmd = _build_command(cli_name, prompt, max_tokens, full_path, system_prompt)
logger.info(f"调用 CLI [{cli_name}]: {full_path} (prompt 长度={len(prompt)})")
# Windows 下需要设置 PYTHONIOENCODING 解决 GBK 编码问题
env = dict(os.environ)
env["PYTHONIOENCODING"] = "utf-8"
start = time.time()
try:
proc = await asyncio.create_subprocess_exec(
*cmd,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=env,
)
try:
# 立即关闭 stdin防止 CLI 阻塞等待输入
stdout, stderr = await asyncio.wait_for(
proc.communicate(input=b""), timeout=timeout
)
except asyncio.TimeoutError:
proc.kill()
await proc.communicate()
return CLIResult(
content="", cli_name=cli_name, exit_code=-1,
latency=time.time() - start, success=False,
error=f"CLI 超时 ({timeout}s)"
)
latency = time.time() - start
stdout_text = stdout.decode("utf-8", errors="replace").strip()
stderr_text = stderr.decode("utf-8", errors="replace").strip()
# 过滤掉 OpenCode 的 INFO 日志行和 kimi 的框线
stdout_text = _clean_output(cli_name, stdout_text)
if proc.returncode == 0 and stdout_text:
logger.info(f"CLI [{cli_name}] 完成: {latency:.1f}s, {len(stdout_text)} chars")
return CLIResult(
content=stdout_text,
cli_name=cli_name,
exit_code=0,
latency=round(latency, 2),
success=True,
)
else:
error_msg = stderr_text or f"退出码 {proc.returncode}"
logger.warning(f"CLI [{cli_name}] 失败: {error_msg}")
return CLIResult(
content=stdout_text or "",
cli_name=cli_name,
exit_code=proc.returncode or -1,
latency=round(latency, 2),
success=False,
error=error_msg,
)
except FileNotFoundError:
return CLIResult(
content="", cli_name=cli_name, exit_code=-1,
latency=0, success=False, error=f"找不到命令: {binary}"
)
except Exception as e:
return CLIResult(
content="", cli_name=cli_name, exit_code=-1,
latency=time.time() - start, success=False, error=str(e)
)
def _build_command(
cli_name: str, prompt: str, max_tokens: int, full_path: str, system_prompt: str = ""
) -> list:
"""
为不同 CLI 构造命令行参数
使用完整二进制路径确保跨环境兼容
"""
default_sys = (
"这是一个角色扮演讨论场景,不是编程任务。"
"请直接用中文回答,不要使用任何工具、不要读取文件、不要执行代码。"
"直接给出你作为角色的观点和建议2-3句话即可。"
)
sys_prompt = system_prompt or default_sys
if cli_name == "claude":
return [
full_path,
"-p", prompt,
"--output-format", "text",
"--system-prompt", sys_prompt,
]
elif cli_name == "kimi":
return [
full_path,
"-p", f"{sys_prompt}\n\n{prompt}",
]
elif cli_name == "opencode":
return [
full_path,
"run", f"{sys_prompt}\n\n{prompt}",
"--model", "opencode/minimax-m2.5-free",
]
else:
return [full_path, "-p", prompt]
def _clean_output(cli_name: str, text: str) -> str:
"""清理 CLI 输出中的框线、日志、prompt 回显等噪音"""
if cli_name == "kimi":
return _clean_kimi_output(text)
lines = text.splitlines()
cleaned = []
for line in lines:
if line.strip().startswith("INFO "):
continue
cleaned.append(line)
result = "\n".join(cleaned).strip()
return result if result else text.strip()
def _clean_kimi_output(text: str) -> str:
"""
Kimi CLI 输出格式:
┌─────────────────────┐
│ (prompt 回显) │
└─────────────────────┘
• 思考过程...
• 实际回复内容
需要1) 移除框线和框内的 prompt 回显
2) 只保留最后一个 bullet 作为实际回复
"""
lines = text.splitlines()
# 找到框线结束位置(最后一个 └ 或 ╰ 行)
box_end = -1
for i, line in enumerate(lines):
stripped = line.strip()
if stripped and stripped[0] in "└╰" and all(
c in "└┘─╰╯ " for c in stripped
):
box_end = i
# 跳过框线区域
content_lines = lines[box_end + 1:] if box_end >= 0 else lines
# Kimi 用 • 输出思考过程和最终回复,最后一个 • 块通常是实际回复
bullets = []
current_bullet = []
for line in content_lines:
stripped = line.strip()
if not stripped:
if current_bullet:
current_bullet.append(line)
continue
if stripped.startswith("") or stripped.startswith("? "):
if current_bullet:
bullets.append("\n".join(current_bullet))
current_bullet = [stripped.lstrip("•? ").strip()]
elif current_bullet:
current_bullet.append(stripped)
if current_bullet:
bullets.append("\n".join(current_bullet))
if not bullets:
return text.strip()
# 最后一个 bullet 是实际回复
return bullets[-1].strip()

View File

@@ -0,0 +1,572 @@
"""
工作流编排器
串联 WorkflowEngine + MeetingRecorder + LLM Service
自动驱动整个工作流:加载 → 逐节点执行 → 记录讨论 → 达成共识。
支持两种模式:
- 有 LLM API Key调用真实模型生成讨论内容
- 无 API Key使用内置模拟仍然走完整流程
"""
import asyncio
import logging
import time
import uuid
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field, asdict
from enum import Enum
from .workflow_engine import get_workflow_engine, WorkflowMeeting
from .meeting_recorder import get_meeting_recorder
from .agent_registry import get_agent_registry, AgentInfo
from .heartbeat import get_heartbeat_service
from .llm_service import get_llm_service, LLMMessage, LLMResponse, LLMConfig
from .cli_invoker import resolve_cli, invoke_cli, detect_available_clis
logger = logging.getLogger(__name__)
class OrchestratorStatus(str, Enum):
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentTurn:
"""一次 Agent 发言记录"""
agent_id: str
agent_name: str
role: str
model: str
content: str
timestamp: float
latency: float = 0.0
tokens_used: int = 0
is_mock: bool = False
@dataclass
class MeetingResult:
"""一次会议的完整结果"""
meeting_id: str
title: str
node_type: str
turns: List[AgentTurn] = field(default_factory=list)
consensus: str = ""
started_at: float = 0
finished_at: float = 0
status: str = "pending"
@dataclass
class OrchestrationRun:
"""一次编排运行的完整状态"""
run_id: str
workflow_id: str
workflow_name: str
status: OrchestratorStatus = OrchestratorStatus.IDLE
current_node: str = ""
meeting_results: List[MeetingResult] = field(default_factory=list)
started_at: float = 0
finished_at: float = 0
error: str = ""
def to_dict(self) -> Dict:
return {
"run_id": self.run_id,
"workflow_id": self.workflow_id,
"workflow_name": self.workflow_name,
"status": self.status.value,
"current_node": self.current_node,
"meeting_results": [
{
"meeting_id": mr.meeting_id,
"title": mr.title,
"node_type": mr.node_type,
"status": mr.status,
"turns": [asdict(t) for t in mr.turns],
"consensus": mr.consensus,
"started_at": mr.started_at,
"finished_at": mr.finished_at,
}
for mr in self.meeting_results
],
"started_at": self.started_at,
"finished_at": self.finished_at,
"error": self.error,
"elapsed_seconds": round(
(self.finished_at or time.time()) - self.started_at, 1
) if self.started_at else 0,
}
# 每个角色在不同会议主题下的 system prompt
DINNER_ROLE_PROMPTS = {
"chef": (
"你是团队的美食达人/大厨。你对各种菜系了如指掌,"
"关注口味、食材新鲜度和烹饪方式。请用简短生动的语言2-3句话表达你的观点。"
),
"health": (
"你是团队的健康顾问/营养师。你关注饮食均衡、热量控制和食品安全。"
"请用简短专业的语言2-3句话表达你的观点。"
),
"budget": (
"你是团队的预算管理者/财务。你关注性价比、人均消费和优惠活动。"
"请用简短务实的语言2-3句话表达你的观点。"
),
"pm": (
"你是产品经理,负责综合各方意见做最终决策。"
"请用简短有决断力的语言2-3句话表达你的观点。"
),
"architect": (
"你是系统架构师,逻辑严谨,擅长分析和对比。"
"请用简短条理清晰的语言2-3句话表达你的观点。"
),
"developer": (
"你是开发工程师,务实高效,喜欢简单直接的方案。"
"请用简短直接的语言2-3句话表达你的观点。"
),
"qa": (
"你是质量保证工程师,注重细节和风险控制。"
"请用简短谨慎的语言2-3句话表达你的观点。"
),
"reviewer": (
"你是代码审查专家,善于发现问题和提出改进。"
"请用简短有见地的语言2-3句话表达你的观点。"
),
}
def _get_role_prompt(role: str) -> str:
"""根据角色获取 system prompt未匹配则使用通用提示"""
return DINNER_ROLE_PROMPTS.get(
role,
f"你是团队中的{role}角色。请用简短的语言2-3句话表达你的观点。"
)
class WorkflowOrchestrator:
"""
工作流编排器
自动驱动工作流中的每个节点:
- meeting 节点:逐个让 Agent 调用 LLM 发言,最后生成共识
- execution 节点:模拟执行并标记完成
"""
def __init__(self):
self._runs: Dict[str, OrchestrationRun] = {}
self._running_task: Optional[asyncio.Task] = None
def get_run(self, run_id: str) -> Optional[OrchestrationRun]:
return self._runs.get(run_id)
def list_runs(self) -> List[Dict]:
return [r.to_dict() for r in self._runs.values()]
async def start_workflow(
self,
workflow_path: str,
agent_overrides: Dict[str, str] = None,
) -> OrchestrationRun:
"""
启动一个工作流的自动编排
参数:
workflow_path: YAML 文件名(如 dinner-decision.yaml
agent_overrides: 可选的 agent_id → model 覆盖映射
返回:
OrchestrationRun 对象(后台异步执行)
"""
engine = get_workflow_engine()
workflow = await engine.load_workflow(workflow_path)
run = OrchestrationRun(
run_id=f"run-{uuid.uuid4().hex[:8]}",
workflow_id=workflow.workflow_id,
workflow_name=workflow.name,
status=OrchestratorStatus.RUNNING,
started_at=time.time(),
)
self._runs[run.run_id] = run
# 在后台启动编排
self._running_task = asyncio.create_task(
self._run_workflow(run, workflow_path, agent_overrides or {})
)
return run
async def _run_workflow(
self,
run: OrchestrationRun,
workflow_path: str,
agent_overrides: Dict[str, str],
):
"""后台执行完整工作流"""
engine = get_workflow_engine()
try:
while True:
next_node = await engine.get_next_meeting(run.workflow_id)
if next_node is None:
break
run.current_node = next_node.meeting_id
logger.info(f"[{run.run_id}] 开始节点: {next_node.title} ({next_node.node_type})")
if next_node.node_type == "meeting":
result = await self._run_meeting_node(
run, next_node, agent_overrides
)
else:
result = await self._run_execution_node(
run, next_node, agent_overrides
)
run.meeting_results.append(result)
# 标记节点完成
await engine.complete_meeting(run.workflow_id, next_node.meeting_id)
logger.info(f"[{run.run_id}] 节点完成: {next_node.title}")
run.status = OrchestratorStatus.COMPLETED
run.finished_at = time.time()
run.current_node = ""
logger.info(f"[{run.run_id}] 工作流完成,耗时 {run.finished_at - run.started_at:.1f}s")
except Exception as e:
run.status = OrchestratorStatus.FAILED
run.error = str(e)
run.finished_at = time.time()
logger.error(f"[{run.run_id}] 工作流失败: {e}", exc_info=True)
async def _run_meeting_node(
self,
run: OrchestrationRun,
node: WorkflowMeeting,
agent_overrides: Dict[str, str],
) -> MeetingResult:
"""执行一个会议节点:各 Agent 依次发言 → 生成共识"""
registry = get_agent_registry()
recorder = get_meeting_recorder()
heartbeat = get_heartbeat_service()
result = MeetingResult(
meeting_id=node.meeting_id,
title=node.title,
node_type="meeting",
started_at=time.time(),
status="in_progress",
)
# 创建会议记录
steps = ["提议", "讨论", "共识"]
await recorder.create_meeting(
meeting_id=node.meeting_id,
title=node.title,
attendees=node.attendees,
steps=steps,
)
await recorder.update_progress(node.meeting_id, "提议")
# 收集之前节点的讨论上下文
previous_context = self._build_previous_context(run)
# 逐个 Agent 发言
for agent_id in node.attendees:
agent = await registry.get_agent(agent_id)
if not agent:
logger.warning(f"Agent {agent_id} 未注册,跳过")
continue
# 更新心跳
await heartbeat.update_heartbeat(
agent_id, "working", f"参与会议: {node.title}", 50
)
# 用 LLM 生成发言
model = agent_overrides.get(agent_id, agent.model)
turn = await self._generate_agent_turn(
agent, model, node.title, previous_context, result.turns
)
result.turns.append(turn)
# 写入会议记录
await recorder.add_discussion(
meeting_id=node.meeting_id,
agent_id=agent.agent_id,
agent_name=agent.name,
content=turn.content,
step="讨论",
)
# 恢复心跳
await heartbeat.update_heartbeat(agent_id, "idle", "", 100)
# 生成共识
await recorder.update_progress(node.meeting_id, "共识")
consensus = await self._generate_consensus(node, result.turns)
result.consensus = consensus
# 完成会议
await recorder.end_meeting(node.meeting_id, consensus=consensus)
result.status = "completed"
result.finished_at = time.time()
return result
async def _run_execution_node(
self,
run: OrchestrationRun,
node: WorkflowMeeting,
agent_overrides: Dict[str, str],
) -> MeetingResult:
"""执行一个 execution 节点:模拟任务执行"""
registry = get_agent_registry()
heartbeat = get_heartbeat_service()
engine = get_workflow_engine()
result = MeetingResult(
meeting_id=node.meeting_id,
title=node.title,
node_type="execution",
started_at=time.time(),
status="in_progress",
)
# 获取上一个会议的共识作为执行指令
last_consensus = ""
for mr in reversed(run.meeting_results):
if mr.consensus:
last_consensus = mr.consensus
break
for agent_id in node.attendees:
agent = await registry.get_agent(agent_id)
if not agent:
continue
await heartbeat.update_heartbeat(
agent_id, "working", f"执行: {node.title}", 30
)
model = agent_overrides.get(agent_id, agent.model)
turn = await self._generate_execution_turn(
agent, model, node.title, last_consensus
)
result.turns.append(turn)
# 向工作流引擎签到
await engine.join_execution_node(
run.workflow_id, node.meeting_id, agent_id
)
await heartbeat.update_heartbeat(agent_id, "idle", "", 100)
result.status = "completed"
result.finished_at = time.time()
return result
async def _generate_agent_turn(
self,
agent: AgentInfo,
model: str,
meeting_title: str,
previous_context: str,
existing_turns: List[AgentTurn],
) -> AgentTurn:
"""
调用 LLM 为一个 Agent 生成会议发言
若 LLM 不可用则使用内置 mock
"""
role_prompt = _get_role_prompt(agent.role)
# 构建其他 Agent 已发言的内容
others_said = ""
for t in existing_turns:
content = t.content[:150] if len(t.content) > 150 else t.content
others_said += f" {t.agent_name}: {content}\n"
# 直接把所有信息揉进一段连贯的指令中
prompt = (
f"场景:团队正在讨论「{meeting_title}」。\n"
f"你的角色:{agent.name}{role_prompt}\n"
)
if previous_context:
prompt += f"\n上一轮讨论的结论:{previous_context}\n"
if others_said:
prompt += f"\n已有发言:\n{others_said}\n"
prompt += (
f"\n请以{agent.name}的身份直接给出2-3句具体建议。"
f"不要自我介绍,不要提问,不要使用工具,直接说你的观点。"
)
messages = [
LLMMessage(role="user", content=prompt),
]
start = time.time()
content, tokens, is_mock = await self._call_llm(model, messages)
latency = time.time() - start
return AgentTurn(
agent_id=agent.agent_id,
agent_name=agent.name,
role=agent.role,
model=model,
content=content,
timestamp=time.time(),
latency=round(latency, 2),
tokens_used=tokens,
is_mock=is_mock,
)
async def _generate_execution_turn(
self,
agent: AgentInfo,
model: str,
task_title: str,
consensus: str,
) -> AgentTurn:
"""为执行节点生成 Agent 的执行结果"""
prompt = (
f"你是{agent.name}。团队讨论后做出了以下决定:\n{consensus}\n\n"
f"现在需要你执行「{task_title}」这个任务。"
f"请用2-3句话直接汇报你的执行结果和下一步安排。"
)
messages = [
LLMMessage(role="user", content=prompt),
]
start = time.time()
content, tokens, is_mock = await self._call_llm(model, messages)
latency = time.time() - start
return AgentTurn(
agent_id=agent.agent_id,
agent_name=agent.name,
role=agent.role,
model=model,
content=content,
timestamp=time.time(),
latency=round(latency, 2),
tokens_used=tokens,
is_mock=is_mock,
)
async def _generate_consensus(
self,
node: WorkflowMeeting,
turns: List[AgentTurn],
) -> str:
"""基于所有发言生成会议共识,使用 kimi CLI 效果最佳"""
discussion_summary = ""
for t in turns:
# 截取每人发言前 200 字,避免 prompt 过长
content = t.content[:200] if len(t.content) > 200 else t.content
discussion_summary += f" {t.agent_name}: {content}\n"
prompt = (
f"请总结以下关于「{node.title}」的讨论用3-5句话给出共识。\n\n"
f"讨论记录:\n{discussion_summary}\n"
f"要求:直接输出总结,包含最终决定和行动方案。不要提问。"
)
messages = [
LLMMessage(role="user", content=prompt),
]
# 优先用 kimi CLI 做总结(它最擅长按指令行事)
content, _, _ = await self._call_llm("kimi", messages)
return content
async def _call_llm(
self, model: str, messages: List[LLMMessage]
) -> tuple:
"""
调用 AI 生成内容优先级CLI → LLM API → 报错
返回: (content, tokens_used, is_mock)
"""
# 分离 system prompt 和 user prompt
system_prompt = ""
user_prompt = ""
for m in messages:
if m.role == "system":
system_prompt += m.content + "\n"
else:
user_prompt += m.content + "\n"
user_prompt = user_prompt.strip()
# 1. 优先尝试 CLI
cli_name = resolve_cli(model) if model else None
if cli_name:
logger.info(f"使用 CLI [{cli_name}] (model={model})")
result = await invoke_cli(
cli_name, user_prompt, timeout=120,
system_prompt=system_prompt.strip(),
)
if result.success:
return result.content, 0, False
else:
logger.warning(f"CLI [{cli_name}] 失败: {result.error},尝试其他方式")
# 2. 如果 model 未指定 CLI 或 CLI 失败,尝试任意可用 CLI
available = detect_available_clis()
if available:
fallback_cli = list(available.keys())[0]
logger.info(f"使用 fallback CLI [{fallback_cli}]")
result = await invoke_cli(
fallback_cli, user_prompt, timeout=120,
system_prompt=system_prompt.strip(),
)
if result.success:
return result.content, 0, False
else:
logger.warning(f"Fallback CLI [{fallback_cli}] 也失败: {result.error}")
# 3. 尝试 LLM API
try:
llm = get_llm_service()
providers = llm.get_available_providers()
real_providers = [p for p in providers if p != "ollama"]
if real_providers:
response = await llm.route_task(
task=messages[-1].content,
messages=messages,
preferred_model=model if model else None,
)
return response.content, response.tokens_used, False
except Exception as e:
logger.warning(f"LLM API 也不可用: {e}")
raise RuntimeError(
f"无可用的 AI 提供商。CLI={list(available.keys()) if available else ''}"
f"LLM API Key 未配置。请安装 CLI 工具或配置 API Key。"
)
def _build_previous_context(self, run: OrchestrationRun) -> str:
"""从已完成的会议中构建上下文"""
parts = []
for mr in run.meeting_results:
if mr.consensus:
parts.append(f"[{mr.title}] 共识: {mr.consensus}")
return "\n".join(parts)
# 单例
_orchestrator: Optional[WorkflowOrchestrator] = None
def get_workflow_orchestrator() -> WorkflowOrchestrator:
"""获取编排器单例"""
global _orchestrator
if _orchestrator is None:
_orchestrator = WorkflowOrchestrator()
return _orchestrator

View File

@@ -19,6 +19,9 @@ pyyaml>=6.0
# HTTP 客户端(调用 LLM API
httpx>=0.25.0
# 异步 HTTP 客户端Ollama 等 LLM 调用)
aiohttp>=3.9.0
# Anthropic APIClaude
anthropic>=0.18.0

View File

@@ -0,0 +1,294 @@
"""
API 集成测试 - 验证路由层正确接入服务层
通过 HTTP 请求测试所有 API 端点
"""
import asyncio
import httpx
import time
import sys
import os
os.environ.pop("HTTP_PROXY", None)
os.environ.pop("HTTPS_PROXY", None)
os.environ.pop("http_proxy", None)
os.environ.pop("https_proxy", None)
os.environ["NO_PROXY"] = "*"
BASE = "http://127.0.0.1:8000"
passed = 0
failed = 0
errors = []
async def test(name: str, method: str, path: str, json_data=None, expect_status=200, expect_key=None):
"""执行单个 API 测试"""
global passed, failed
try:
async with httpx.AsyncClient(base_url=BASE, timeout=10) as client:
if method == "GET":
r = await client.get(path)
elif method == "POST":
r = await client.post(path, json=json_data)
elif method == "DELETE":
r = await client.delete(path)
elif method == "PUT":
r = await client.put(path, json=json_data)
else:
raise ValueError(f"Unknown method: {method}")
if r.status_code != expect_status:
failed += 1
msg = f"[FAIL] {name}: 期望 {expect_status}, 得到 {r.status_code} - {r.text[:200]}"
errors.append(msg)
print(msg)
return None
data = r.json()
if expect_key and expect_key not in data:
failed += 1
msg = f"[FAIL] {name}: 响应缺少 key '{expect_key}', 有: {list(data.keys())}"
errors.append(msg)
print(msg)
return None
passed += 1
print(f"[PASS] {name}")
return data
except Exception as e:
failed += 1
msg = f"[FAIL] {name}: {e}"
errors.append(msg)
print(msg)
return None
async def main():
global passed, failed
print("=" * 60)
print("Swarm API 集成测试")
print("=" * 60)
# ========== 健康检查 ==========
print("\n=== 健康检查 ===")
await test("GET /health", "GET", "/health", expect_key="status")
await test("GET /api/health", "GET", "/api/health", expect_key="status")
# ========== Agent API ==========
print("\n=== Agent API ===")
data = await test("列出 Agent初始", "GET", "/api/agents/", expect_key="agents")
await test("注册 Agent", "POST", "/api/agents/register", json_data={
"agent_id": "test-api-001",
"name": "Test Agent",
"role": "developer",
"model": "test-model"
}, expect_key="agent_id")
data = await test("列出 Agent注册后", "GET", "/api/agents/", expect_key="agents")
if data:
agent_ids = [a["agent_id"] for a in data["agents"]]
if "test-api-001" in agent_ids:
passed += 1
print("[PASS] 注册的 Agent 出现在列表中")
else:
failed += 1
msg = f"[FAIL] 注册的 Agent 未出现在列表中: {agent_ids}"
errors.append(msg)
print(msg)
await test("获取 Agent 详情", "GET", "/api/agents/test-api-001", expect_key="agent_id")
await test("更新 Agent 状态", "POST", "/api/agents/test-api-001/state", json_data={
"task": "测试任务",
"progress": 50,
"working_files": ["test.py"]
}, expect_key="success")
data = await test("获取 Agent 状态", "GET", "/api/agents/test-api-001/state", expect_key="agent_id")
if data and data.get("current_task") == "测试任务":
passed += 1
print("[PASS] Agent 状态正确持久化")
elif data:
failed += 1
msg = f"[FAIL] Agent 状态不匹配: {data}"
errors.append(msg)
print(msg)
await test("获取不存在的 Agent", "GET", "/api/agents/nonexistent-agent", expect_status=404)
# ========== 文件锁 API ==========
print("\n=== 文件锁 API ===")
await test("列出文件锁(初始)", "GET", "/api/locks/", expect_key="locks")
await test("获取文件锁", "POST", "/api/locks/acquire", json_data={
"file_path": "test/main.py",
"agent_id": "test-api-001",
"agent_name": "Test Agent"
}, expect_key="success")
data = await test("列出文件锁(获取后)", "GET", "/api/locks/", expect_key="locks")
if data and len(data["locks"]) > 0:
found = any(l["file_path"] == "test/main.py" for l in data["locks"])
if found:
passed += 1
print("[PASS] 获取的锁出现在列表中")
else:
failed += 1
msg = "[FAIL] 获取的锁未出现在列表中"
errors.append(msg)
print(msg)
data = await test("检查文件锁", "GET", "/api/locks/check?file_path=test/main.py", expect_key="locked")
if data and data["locked"]:
passed += 1
print("[PASS] 文件锁状态正确")
elif data:
failed += 1
msg = "[FAIL] 文件锁状态错误"
errors.append(msg)
print(msg)
await test("释放文件锁", "POST", "/api/locks/release", json_data={
"file_path": "test/main.py",
"agent_id": "test-api-001"
}, expect_key="success")
data = await test("检查释放后", "GET", "/api/locks/check?file_path=test/main.py", expect_key="locked")
if data and not data["locked"]:
passed += 1
print("[PASS] 锁释放成功")
elif data:
failed += 1
msg = "[FAIL] 锁释放后仍显示锁定"
errors.append(msg)
print(msg)
# ========== 心跳 API ==========
print("\n=== 心跳 API ===")
await test("列出心跳(初始)", "GET", "/api/heartbeats/", expect_key="heartbeats")
await test("更新心跳", "POST", "/api/heartbeats/test-api-001", json_data={
"status": "working",
"current_task": "测试中",
"progress": 30
}, expect_key="success")
data = await test("列出心跳(更新后)", "GET", "/api/heartbeats/", expect_key="heartbeats")
if data and "test-api-001" in data["heartbeats"]:
hb = data["heartbeats"]["test-api-001"]
if hb["status"] == "working":
passed += 1
print("[PASS] 心跳数据正确持久化")
else:
failed += 1
msg = f"[FAIL] 心跳状态不匹配: {hb}"
errors.append(msg)
print(msg)
await test("检查超时", "GET", "/api/heartbeats/timeouts?timeout_seconds=60", expect_key="timeout_agents")
# ========== 会议 API ==========
print("\n=== 会议 API ===")
data = await test("创建会议", "POST", "/api/meetings/", json_data={
"title": "API 测试会议",
"agenda": "测试议程",
"attendees": ["test-api-001"],
"steps": ["讨论", "决策", "总结"]
}, expect_key="meeting_id")
meeting_id = data["meeting_id"] if data else None
if meeting_id:
await test("获取会议详情", "GET", f"/api/meetings/{meeting_id}", expect_key="meeting_id")
await test("获取会议队列", "GET", f"/api/meetings/{meeting_id}/queue", expect_key="meeting_id")
await test("添加讨论", "POST", f"/api/meetings/{meeting_id}/discuss", json_data={
"agent_id": "test-api-001",
"agent_name": "Test Agent",
"content": "这是一条测试讨论",
"step": "讨论"
}, expect_key="success")
await test("更新进度", "POST", f"/api/meetings/{meeting_id}/progress", json_data={
"step": "讨论"
}, expect_key="success")
await test("完成会议", "POST", f"/api/meetings/{meeting_id}/finish", json_data={
"consensus": "测试共识"
}, expect_key="success")
await test("列出今日会议", "GET", "/api/meetings/today", expect_key="meetings")
# ========== 资源 API ==========
print("\n=== 资源 API ===")
await test("解析任务文件", "POST", "/api/parse-task", json_data={
"task": "修改 src/utils/helper.js 修复 bug"
}, expect_key="files")
await test("执行任务", "POST", "/api/execute", json_data={
"agent_id": "test-api-001",
"task": "修改 src/utils/helper.js",
"timeout": 30
}, expect_key="success")
await test("获取所有状态", "GET", "/api/status", expect_key="agents")
# ========== 角色 API ==========
print("\n=== 角色 API ===")
data = await test("获取主要角色", "POST", "/api/roles/primary", json_data={
"task": "设计系统架构方案"
}, expect_key="primary_role")
if data and data["primary_role"] == "architect":
passed += 1
print(f"[PASS] 角色分析正确: architect任务含'设计'+'架构'")
elif data:
failed += 1
msg = f"[FAIL] 角色分析不正确: 期望 architect, 得到 {data['primary_role']}"
errors.append(msg)
print(msg)
await test("分配角色", "POST", "/api/roles/allocate", json_data={
"task": "开发用户登录功能",
"agents": ["agent-1", "agent-2"]
}, expect_key="allocation")
await test("解释角色分配", "POST", "/api/roles/explain", json_data={
"task": "测试 API 接口",
"agents": ["agent-1"]
}, expect_key="explanation")
# ========== 工作流 API ==========
print("\n=== 工作流 API ===")
await test("工作流文件列表", "GET", "/api/workflows/files", expect_key="files")
await test("已加载工作流列表", "GET", "/api/workflows/list", expect_key="workflows")
# ========== 人类输入 API ==========
print("\n=== 人类输入 API ===")
await test("获取摘要", "GET", "/api/humans/summary", expect_key="participants")
# ========== 清理 ==========
print("\n=== 清理 ===")
await test("删除测试 Agent", "DELETE", "/api/agents/test-api-001", expect_key="message")
# ========== 汇总 ==========
print("\n" + "=" * 60)
print(f"测试结果汇总")
print("=" * 60)
print(f"通过: {passed}")
print(f"失败: {failed}")
print(f"总计: {passed + failed}")
print("=" * 60)
if errors:
print("\n失败详情:")
for e in errors:
print(f" {e}")
return failed == 0
if __name__ == "__main__":
success = asyncio.run(main())
sys.exit(0 if success else 1)