Files
multiAgentTry/backend/app/services/workflow_engine.py
Claude Code dc398d7c7b 完整实现 Swarm 多智能体协作系统
- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py)
- 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py)
- 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py)
- 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py)
- 新增进程管理器 (backend/app/services/process_manager.py)
- 新增 Agent 控制 API (backend/app/routers/agents_control.py)
- 新增 WebSocket 实时通信 (backend/app/routers/websocket.py)
- 更新前端 AgentsPage,支持启动/停止 Agent
- 测试通过:Agent 启动、批量操作、栅栏同步

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:32:11 +08:00

474 lines
14 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
工作流引擎 - 管理和执行工作流
支持从 YAML 文件加载工作流定义,并跟踪进度
"""
import asyncio
from pathlib import Path
from typing import Dict, List, Optional
from dataclasses import dataclass, field, asdict
from datetime import datetime
import yaml
from .storage import get_storage
from .meeting_recorder import get_meeting_recorder
@dataclass
class WorkflowMeeting:
"""工作流中的节点"""
meeting_id: str
title: str
attendees: List[str]
depends_on: List[str] = field(default_factory=list)
completed: bool = False
node_type: str = "meeting" # meeting | execution
min_required: int = None # 最少完成人数execution 节点用)
on_failure: str = None # 失败时跳转的节点 ID
# 执行状态追踪execution 节点专用)
completed_attendees: List[str] = field(default_factory=list)
@property
def is_ready(self) -> bool:
"""检查节点是否可以开始(依赖已完成)"""
return all(dep in self.depends_on for dep in self.depends_on)
@property
def is_execution_ready(self) -> bool:
"""检查执行节点是否所有人都完成了"""
if self.node_type != "execution":
return False
required = self.min_required or len(self.attendees)
return len(self.completed_attendees) >= required
@property
def missing_attendees(self) -> List[str]:
"""获取未完成的人员列表"""
if self.node_type != "execution":
return []
return [a for a in self.attendees if a not in self.completed_attendees]
@property
def progress(self) -> str:
"""执行进度"""
if self.node_type != "execution":
return "N/A"
return f"{len(self.completed_attendees)}/{len(self.attendees)}"
@dataclass
class Workflow:
"""工作流定义"""
workflow_id: str
name: str
description: str
meetings: List[WorkflowMeeting]
created_at: str = ""
status: str = "pending" # pending, in_progress, completed
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
@property
def progress(self) -> str:
"""进度摘要"""
total = len(self.meetings)
completed = sum(1 for m in self.meetings if m.completed)
return f"{completed}/{total}"
@property
def current_meeting(self) -> Optional[WorkflowMeeting]:
"""获取当前应该进行的会议(第一个未完成的)"""
for meeting in self.meetings:
if not meeting.completed:
return meeting
return None
@property
def is_completed(self) -> bool:
"""工作流是否完成"""
return all(m.completed for m in self.meetings)
class WorkflowEngine:
"""
工作流引擎
管理工作流的加载、执行和进度跟踪
"""
WORKFLOWS_DIR = "workflow"
def __init__(self):
self._storage = get_storage()
self._recorder = get_meeting_recorder()
self._loaded_workflows: Dict[str, Workflow] = {}
# 注册的工作流文件路径
self._workflow_files: Dict[str, str] = {}
async def load_workflow(self, workflow_path: str) -> Workflow:
"""
从 YAML 文件加载工作流
Args:
workflow_path: YAML 文件路径(相对于 .doc/workflow/
Returns:
加载的工作流
"""
import aiofiles
# 构建完整路径
full_path = f"{self.WORKFLOWS_DIR}/{workflow_path}"
yaml_path = Path(self._storage.base_path) / full_path
if not yaml_path.exists():
raise ValueError(f"Workflow file not found: {workflow_path}")
# 读取 YAML 内容
async with aiofiles.open(yaml_path, mode="r", encoding="utf-8") as f:
yaml_content = await f.read()
content = yaml.safe_load(yaml_content)
# 解析工作流
meetings = []
for m in content.get("meetings", []):
meetings.append(WorkflowMeeting(
meeting_id=m["meeting_id"],
title=m["title"],
attendees=m["attendees"],
depends_on=m.get("depends_on", []),
node_type=m.get("node_type", "meeting"),
min_required=m.get("min_required"),
on_failure=m.get("on_failure")
))
workflow = Workflow(
workflow_id=content["workflow_id"],
name=content["name"],
description=content.get("description", ""),
meetings=meetings
)
self._loaded_workflows[workflow.workflow_id] = workflow
# 保存源文件路径,以便后续可以重新加载
self._workflow_files[workflow.workflow_id] = workflow_path
return workflow
async def get_next_meeting(self, workflow_id: str) -> Optional[WorkflowMeeting]:
"""
获取工作流中下一个应该进行的会议
Args:
workflow_id: 工作流 ID
Returns:
下一个会议,如果没有或已完成返回 None
"""
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return None
return workflow.current_meeting
async def complete_meeting(self, workflow_id: str, meeting_id: str) -> bool:
"""
标记会议为已完成
Args:
workflow_id: 工作流 ID
meeting_id: 会议 ID
Returns:
是否成功标记
"""
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return False
for meeting in workflow.meetings:
if meeting.meeting_id == meeting_id:
meeting.completed = True
# 更新工作流状态
if workflow.is_completed:
workflow.status = "completed"
else:
workflow.status = "in_progress"
return True
return False
async def create_workflow_meeting(
self,
workflow_id: str,
meeting_id: str
) -> bool:
"""
创建工作流中的会议记录
Args:
workflow_id: 工作流 ID
meeting_id: 会议 ID
Returns:
是否成功创建
"""
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return False
meeting = None
for m in workflow.meetings:
if m.meeting_id == meeting_id:
meeting = m
break
if not meeting:
return False
# 创建会议记录
await self._recorder.create_meeting(
meeting_id=meeting.meeting_id,
title=f"{workflow.name}: {meeting.title}",
attendees=meeting.attendees,
steps=["收集初步想法", "讨论与迭代", "生成共识版本"]
)
return True
async def get_workflow_status(self, workflow_id: str) -> Optional[Dict]:
"""
获取工作流状态
Args:
workflow_id: 工作流 ID
Returns:
工作流状态信息
"""
# 如果不在内存中,尝试重新加载
if workflow_id not in self._loaded_workflows and workflow_id in self._workflow_files:
await self.load_workflow(self._workflow_files[workflow_id])
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return None
return {
"workflow_id": workflow.workflow_id,
"name": workflow.name,
"description": workflow.description,
"status": workflow.status,
"progress": workflow.progress,
"meetings": [
{
"meeting_id": m.meeting_id,
"title": m.title,
"completed": m.completed
}
for m in workflow.meetings
]
}
async def list_workflows(self) -> List[Dict]:
"""
列出所有加载的工作流
Returns:
工作流列表
"""
return [
{
"workflow_id": w.workflow_id,
"name": w.name,
"status": w.status,
"progress": w.progress
}
for w in self._loaded_workflows.values()
]
# ========== 执行节点相关方法 ==========
async def join_execution_node(
self,
workflow_id: str,
meeting_id: str,
agent_id: str
) -> Dict:
"""
Agent 加入执行节点(标记完成)
Args:
workflow_id: 工作流 ID
meeting_id: 执行节点 ID
agent_id: Agent ID
Returns:
状态信息 {"status": "waiting"|"ready"|"completed", "progress": "2/3"}
"""
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return {"status": "error", "message": "Workflow not found"}
for meeting in workflow.meetings:
if meeting.meeting_id == meeting_id:
if meeting.node_type != "execution":
return {"status": "error", "message": "Not an execution node"}
if agent_id not in meeting.completed_attendees:
meeting.completed_attendees.append(agent_id)
if meeting.is_execution_ready:
return {
"status": "ready",
"progress": meeting.progress,
"message": "所有 Agent 已完成,可以进入下一节点"
}
return {
"status": "waiting",
"progress": meeting.progress,
"missing": meeting.missing_attendees,
"message": f"等待其他 Agent 完成: {meeting.missing_attendees}"
}
return {"status": "error", "message": "Meeting not found"}
async def get_execution_status(
self,
workflow_id: str,
meeting_id: str
) -> Optional[Dict]:
"""
获取执行节点的状态
Returns:
执行状态信息
"""
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return None
for meeting in workflow.meetings:
if meeting.meeting_id == meeting_id:
return {
"meeting_id": meeting.meeting_id,
"title": meeting.title,
"node_type": meeting.node_type,
"attendees": meeting.attendees,
"completed_attendees": meeting.completed_attendees,
"progress": meeting.progress,
"is_ready": meeting.is_execution_ready,
"missing": meeting.missing_attendees
}
return None
# ========== 条件跳转相关方法 ==========
async def jump_to_node(
self,
workflow_id: str,
target_meeting_id: str
) -> bool:
"""
强制跳转到指定节点(重置后续所有节点)
Args:
workflow_id: 工作流 ID
target_meeting_id: 目标节点 ID
Returns:
是否成功跳转
"""
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return False
# 找到目标节点并重置从它开始的所有后续节点
target_found = False
for meeting in workflow.meetings:
if meeting.meeting_id == target_meeting_id:
target_found = True
meeting.completed = False
meeting.completed_attendees = []
elif target_found:
# 目标节点之后的所有节点都重置
meeting.completed = False
meeting.completed_attendees = []
workflow.status = "in_progress"
return target_found
async def handle_failure(
self,
workflow_id: str,
meeting_id: str
) -> Optional[str]:
"""
处理节点失败,根据 on_failure 配置跳转
Args:
workflow_id: 工作流 ID
meeting_id: 失败的节点 ID
Returns:
跳转目标节点 ID如果没有配置则返回 None
"""
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return None
for meeting in workflow.meetings:
if meeting.meeting_id == meeting_id and meeting.on_failure:
await self.jump_to_node(workflow_id, meeting.on_failure)
return meeting.on_failure
return None
async def get_workflow_detail(self, workflow_id: str) -> Optional[Dict]:
"""
获取工作流详细信息(包含所有节点状态)
Returns:
工作流详细信息
"""
workflow = self._loaded_workflows.get(workflow_id)
if not workflow:
return None
return {
"workflow_id": workflow.workflow_id,
"name": workflow.name,
"description": workflow.description,
"status": workflow.status,
"progress": workflow.progress,
"current_node": workflow.current_meeting.meeting_id if workflow.current_meeting else None,
"meetings": [
{
"meeting_id": m.meeting_id,
"title": m.title,
"node_type": m.node_type,
"attendees": m.attendees,
"depends_on": m.depends_on,
"completed": m.completed,
"on_failure": m.on_failure,
"progress": m.progress if m.node_type == "execution" else None
}
for m in workflow.meetings
]
}
# 全局单例
_engine_instance: Optional[WorkflowEngine] = None
def get_workflow_engine() -> WorkflowEngine:
"""获取工作流引擎单例"""
global _engine_instance
if _engine_instance is None:
_engine_instance = WorkflowEngine()
return _engine_instance