Files
multiAgentTry/backend/app/services/workflow_orchestrator.py

573 lines
19 KiB
Python
Raw Permalink Normal View History

"""
工作流编排器
串联 WorkflowEngine + MeetingRecorder + LLM Service
自动驱动整个工作流加载 逐节点执行 记录讨论 达成共识
支持两种模式
- LLM API Key调用真实模型生成讨论内容
- API Key使用内置模拟仍然走完整流程
"""
import asyncio
import logging
import time
import uuid
from typing import Dict, List, Optional, Any
from dataclasses import dataclass, field, asdict
from enum import Enum
from .workflow_engine import get_workflow_engine, WorkflowMeeting
from .meeting_recorder import get_meeting_recorder
from .agent_registry import get_agent_registry, AgentInfo
from .heartbeat import get_heartbeat_service
from .llm_service import get_llm_service, LLMMessage, LLMResponse, LLMConfig
from .cli_invoker import resolve_cli, invoke_cli, detect_available_clis
logger = logging.getLogger(__name__)
class OrchestratorStatus(str, Enum):
IDLE = "idle"
RUNNING = "running"
PAUSED = "paused"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class AgentTurn:
"""一次 Agent 发言记录"""
agent_id: str
agent_name: str
role: str
model: str
content: str
timestamp: float
latency: float = 0.0
tokens_used: int = 0
is_mock: bool = False
@dataclass
class MeetingResult:
"""一次会议的完整结果"""
meeting_id: str
title: str
node_type: str
turns: List[AgentTurn] = field(default_factory=list)
consensus: str = ""
started_at: float = 0
finished_at: float = 0
status: str = "pending"
@dataclass
class OrchestrationRun:
"""一次编排运行的完整状态"""
run_id: str
workflow_id: str
workflow_name: str
status: OrchestratorStatus = OrchestratorStatus.IDLE
current_node: str = ""
meeting_results: List[MeetingResult] = field(default_factory=list)
started_at: float = 0
finished_at: float = 0
error: str = ""
def to_dict(self) -> Dict:
return {
"run_id": self.run_id,
"workflow_id": self.workflow_id,
"workflow_name": self.workflow_name,
"status": self.status.value,
"current_node": self.current_node,
"meeting_results": [
{
"meeting_id": mr.meeting_id,
"title": mr.title,
"node_type": mr.node_type,
"status": mr.status,
"turns": [asdict(t) for t in mr.turns],
"consensus": mr.consensus,
"started_at": mr.started_at,
"finished_at": mr.finished_at,
}
for mr in self.meeting_results
],
"started_at": self.started_at,
"finished_at": self.finished_at,
"error": self.error,
"elapsed_seconds": round(
(self.finished_at or time.time()) - self.started_at, 1
) if self.started_at else 0,
}
# 每个角色在不同会议主题下的 system prompt
DINNER_ROLE_PROMPTS = {
"chef": (
"你是团队的美食达人/大厨。你对各种菜系了如指掌,"
"关注口味、食材新鲜度和烹饪方式。请用简短生动的语言2-3句话表达你的观点。"
),
"health": (
"你是团队的健康顾问/营养师。你关注饮食均衡、热量控制和食品安全。"
"请用简短专业的语言2-3句话表达你的观点。"
),
"budget": (
"你是团队的预算管理者/财务。你关注性价比、人均消费和优惠活动。"
"请用简短务实的语言2-3句话表达你的观点。"
),
"pm": (
"你是产品经理,负责综合各方意见做最终决策。"
"请用简短有决断力的语言2-3句话表达你的观点。"
),
"architect": (
"你是系统架构师,逻辑严谨,擅长分析和对比。"
"请用简短条理清晰的语言2-3句话表达你的观点。"
),
"developer": (
"你是开发工程师,务实高效,喜欢简单直接的方案。"
"请用简短直接的语言2-3句话表达你的观点。"
),
"qa": (
"你是质量保证工程师,注重细节和风险控制。"
"请用简短谨慎的语言2-3句话表达你的观点。"
),
"reviewer": (
"你是代码审查专家,善于发现问题和提出改进。"
"请用简短有见地的语言2-3句话表达你的观点。"
),
}
def _get_role_prompt(role: str) -> str:
"""根据角色获取 system prompt未匹配则使用通用提示"""
return DINNER_ROLE_PROMPTS.get(
role,
f"你是团队中的{role}角色。请用简短的语言2-3句话表达你的观点。"
)
class WorkflowOrchestrator:
"""
工作流编排器
自动驱动工作流中的每个节点
- meeting 节点逐个让 Agent 调用 LLM 发言最后生成共识
- execution 节点模拟执行并标记完成
"""
def __init__(self):
self._runs: Dict[str, OrchestrationRun] = {}
self._running_task: Optional[asyncio.Task] = None
def get_run(self, run_id: str) -> Optional[OrchestrationRun]:
return self._runs.get(run_id)
def list_runs(self) -> List[Dict]:
return [r.to_dict() for r in self._runs.values()]
async def start_workflow(
self,
workflow_path: str,
agent_overrides: Dict[str, str] = None,
) -> OrchestrationRun:
"""
启动一个工作流的自动编排
参数:
workflow_path: YAML 文件名 dinner-decision.yaml
agent_overrides: 可选的 agent_id model 覆盖映射
返回:
OrchestrationRun 对象后台异步执行
"""
engine = get_workflow_engine()
workflow = await engine.load_workflow(workflow_path)
run = OrchestrationRun(
run_id=f"run-{uuid.uuid4().hex[:8]}",
workflow_id=workflow.workflow_id,
workflow_name=workflow.name,
status=OrchestratorStatus.RUNNING,
started_at=time.time(),
)
self._runs[run.run_id] = run
# 在后台启动编排
self._running_task = asyncio.create_task(
self._run_workflow(run, workflow_path, agent_overrides or {})
)
return run
async def _run_workflow(
self,
run: OrchestrationRun,
workflow_path: str,
agent_overrides: Dict[str, str],
):
"""后台执行完整工作流"""
engine = get_workflow_engine()
try:
while True:
next_node = await engine.get_next_meeting(run.workflow_id)
if next_node is None:
break
run.current_node = next_node.meeting_id
logger.info(f"[{run.run_id}] 开始节点: {next_node.title} ({next_node.node_type})")
if next_node.node_type == "meeting":
result = await self._run_meeting_node(
run, next_node, agent_overrides
)
else:
result = await self._run_execution_node(
run, next_node, agent_overrides
)
run.meeting_results.append(result)
# 标记节点完成
await engine.complete_meeting(run.workflow_id, next_node.meeting_id)
logger.info(f"[{run.run_id}] 节点完成: {next_node.title}")
run.status = OrchestratorStatus.COMPLETED
run.finished_at = time.time()
run.current_node = ""
logger.info(f"[{run.run_id}] 工作流完成,耗时 {run.finished_at - run.started_at:.1f}s")
except Exception as e:
run.status = OrchestratorStatus.FAILED
run.error = str(e)
run.finished_at = time.time()
logger.error(f"[{run.run_id}] 工作流失败: {e}", exc_info=True)
async def _run_meeting_node(
self,
run: OrchestrationRun,
node: WorkflowMeeting,
agent_overrides: Dict[str, str],
) -> MeetingResult:
"""执行一个会议节点:各 Agent 依次发言 → 生成共识"""
registry = get_agent_registry()
recorder = get_meeting_recorder()
heartbeat = get_heartbeat_service()
result = MeetingResult(
meeting_id=node.meeting_id,
title=node.title,
node_type="meeting",
started_at=time.time(),
status="in_progress",
)
# 创建会议记录
steps = ["提议", "讨论", "共识"]
await recorder.create_meeting(
meeting_id=node.meeting_id,
title=node.title,
attendees=node.attendees,
steps=steps,
)
await recorder.update_progress(node.meeting_id, "提议")
# 收集之前节点的讨论上下文
previous_context = self._build_previous_context(run)
# 逐个 Agent 发言
for agent_id in node.attendees:
agent = await registry.get_agent(agent_id)
if not agent:
logger.warning(f"Agent {agent_id} 未注册,跳过")
continue
# 更新心跳
await heartbeat.update_heartbeat(
agent_id, "working", f"参与会议: {node.title}", 50
)
# 用 LLM 生成发言
model = agent_overrides.get(agent_id, agent.model)
turn = await self._generate_agent_turn(
agent, model, node.title, previous_context, result.turns
)
result.turns.append(turn)
# 写入会议记录
await recorder.add_discussion(
meeting_id=node.meeting_id,
agent_id=agent.agent_id,
agent_name=agent.name,
content=turn.content,
step="讨论",
)
# 恢复心跳
await heartbeat.update_heartbeat(agent_id, "idle", "", 100)
# 生成共识
await recorder.update_progress(node.meeting_id, "共识")
consensus = await self._generate_consensus(node, result.turns)
result.consensus = consensus
# 完成会议
await recorder.end_meeting(node.meeting_id, consensus=consensus)
result.status = "completed"
result.finished_at = time.time()
return result
async def _run_execution_node(
self,
run: OrchestrationRun,
node: WorkflowMeeting,
agent_overrides: Dict[str, str],
) -> MeetingResult:
"""执行一个 execution 节点:模拟任务执行"""
registry = get_agent_registry()
heartbeat = get_heartbeat_service()
engine = get_workflow_engine()
result = MeetingResult(
meeting_id=node.meeting_id,
title=node.title,
node_type="execution",
started_at=time.time(),
status="in_progress",
)
# 获取上一个会议的共识作为执行指令
last_consensus = ""
for mr in reversed(run.meeting_results):
if mr.consensus:
last_consensus = mr.consensus
break
for agent_id in node.attendees:
agent = await registry.get_agent(agent_id)
if not agent:
continue
await heartbeat.update_heartbeat(
agent_id, "working", f"执行: {node.title}", 30
)
model = agent_overrides.get(agent_id, agent.model)
turn = await self._generate_execution_turn(
agent, model, node.title, last_consensus
)
result.turns.append(turn)
# 向工作流引擎签到
await engine.join_execution_node(
run.workflow_id, node.meeting_id, agent_id
)
await heartbeat.update_heartbeat(agent_id, "idle", "", 100)
result.status = "completed"
result.finished_at = time.time()
return result
async def _generate_agent_turn(
self,
agent: AgentInfo,
model: str,
meeting_title: str,
previous_context: str,
existing_turns: List[AgentTurn],
) -> AgentTurn:
"""
调用 LLM 为一个 Agent 生成会议发言
LLM 不可用则使用内置 mock
"""
role_prompt = _get_role_prompt(agent.role)
# 构建其他 Agent 已发言的内容
others_said = ""
for t in existing_turns:
content = t.content[:150] if len(t.content) > 150 else t.content
others_said += f" {t.agent_name}: {content}\n"
# 直接把所有信息揉进一段连贯的指令中
prompt = (
f"场景:团队正在讨论「{meeting_title}」。\n"
f"你的角色:{agent.name}{role_prompt}\n"
)
if previous_context:
prompt += f"\n上一轮讨论的结论:{previous_context}\n"
if others_said:
prompt += f"\n已有发言:\n{others_said}\n"
prompt += (
f"\n请以{agent.name}的身份直接给出2-3句具体建议。"
f"不要自我介绍,不要提问,不要使用工具,直接说你的观点。"
)
messages = [
LLMMessage(role="user", content=prompt),
]
start = time.time()
content, tokens, is_mock = await self._call_llm(model, messages)
latency = time.time() - start
return AgentTurn(
agent_id=agent.agent_id,
agent_name=agent.name,
role=agent.role,
model=model,
content=content,
timestamp=time.time(),
latency=round(latency, 2),
tokens_used=tokens,
is_mock=is_mock,
)
async def _generate_execution_turn(
self,
agent: AgentInfo,
model: str,
task_title: str,
consensus: str,
) -> AgentTurn:
"""为执行节点生成 Agent 的执行结果"""
prompt = (
f"你是{agent.name}。团队讨论后做出了以下决定:\n{consensus}\n\n"
f"现在需要你执行「{task_title}」这个任务。"
f"请用2-3句话直接汇报你的执行结果和下一步安排。"
)
messages = [
LLMMessage(role="user", content=prompt),
]
start = time.time()
content, tokens, is_mock = await self._call_llm(model, messages)
latency = time.time() - start
return AgentTurn(
agent_id=agent.agent_id,
agent_name=agent.name,
role=agent.role,
model=model,
content=content,
timestamp=time.time(),
latency=round(latency, 2),
tokens_used=tokens,
is_mock=is_mock,
)
async def _generate_consensus(
self,
node: WorkflowMeeting,
turns: List[AgentTurn],
) -> str:
"""基于所有发言生成会议共识,使用 kimi CLI 效果最佳"""
discussion_summary = ""
for t in turns:
# 截取每人发言前 200 字,避免 prompt 过长
content = t.content[:200] if len(t.content) > 200 else t.content
discussion_summary += f" {t.agent_name}: {content}\n"
prompt = (
f"请总结以下关于「{node.title}」的讨论用3-5句话给出共识。\n\n"
f"讨论记录:\n{discussion_summary}\n"
f"要求:直接输出总结,包含最终决定和行动方案。不要提问。"
)
messages = [
LLMMessage(role="user", content=prompt),
]
# 优先用 kimi CLI 做总结(它最擅长按指令行事)
content, _, _ = await self._call_llm("kimi", messages)
return content
async def _call_llm(
self, model: str, messages: List[LLMMessage]
) -> tuple:
"""
调用 AI 生成内容优先级CLI LLM API 报错
返回: (content, tokens_used, is_mock)
"""
# 分离 system prompt 和 user prompt
system_prompt = ""
user_prompt = ""
for m in messages:
if m.role == "system":
system_prompt += m.content + "\n"
else:
user_prompt += m.content + "\n"
user_prompt = user_prompt.strip()
# 1. 优先尝试 CLI
cli_name = resolve_cli(model) if model else None
if cli_name:
logger.info(f"使用 CLI [{cli_name}] (model={model})")
result = await invoke_cli(
cli_name, user_prompt, timeout=120,
system_prompt=system_prompt.strip(),
)
if result.success:
return result.content, 0, False
else:
logger.warning(f"CLI [{cli_name}] 失败: {result.error},尝试其他方式")
# 2. 如果 model 未指定 CLI 或 CLI 失败,尝试任意可用 CLI
available = detect_available_clis()
if available:
fallback_cli = list(available.keys())[0]
logger.info(f"使用 fallback CLI [{fallback_cli}]")
result = await invoke_cli(
fallback_cli, user_prompt, timeout=120,
system_prompt=system_prompt.strip(),
)
if result.success:
return result.content, 0, False
else:
logger.warning(f"Fallback CLI [{fallback_cli}] 也失败: {result.error}")
# 3. 尝试 LLM API
try:
llm = get_llm_service()
providers = llm.get_available_providers()
real_providers = [p for p in providers if p != "ollama"]
if real_providers:
response = await llm.route_task(
task=messages[-1].content,
messages=messages,
preferred_model=model if model else None,
)
return response.content, response.tokens_used, False
except Exception as e:
logger.warning(f"LLM API 也不可用: {e}")
raise RuntimeError(
f"无可用的 AI 提供商。CLI={list(available.keys()) if available else ''}"
f"LLM API Key 未配置。请安装 CLI 工具或配置 API Key。"
)
def _build_previous_context(self, run: OrchestrationRun) -> str:
"""从已完成的会议中构建上下文"""
parts = []
for mr in run.meeting_results:
if mr.consensus:
parts.append(f"[{mr.title}] 共识: {mr.consensus}")
return "\n".join(parts)
# 单例
_orchestrator: Optional[WorkflowOrchestrator] = None
def get_workflow_orchestrator() -> WorkflowOrchestrator:
"""获取编排器单例"""
global _orchestrator
if _orchestrator is None:
_orchestrator = WorkflowOrchestrator()
return _orchestrator