""" 工作流编排器 串联 WorkflowEngine + MeetingRecorder + LLM Service, 自动驱动整个工作流:加载 → 逐节点执行 → 记录讨论 → 达成共识。 支持两种模式: - 有 LLM API Key:调用真实模型生成讨论内容 - 无 API Key:使用内置模拟,仍然走完整流程 """ import asyncio import logging import time import uuid from typing import Dict, List, Optional, Any from dataclasses import dataclass, field, asdict from enum import Enum from .workflow_engine import get_workflow_engine, WorkflowMeeting from .meeting_recorder import get_meeting_recorder from .agent_registry import get_agent_registry, AgentInfo from .heartbeat import get_heartbeat_service from .llm_service import get_llm_service, LLMMessage, LLMResponse, LLMConfig from .cli_invoker import resolve_cli, invoke_cli, detect_available_clis logger = logging.getLogger(__name__) class OrchestratorStatus(str, Enum): IDLE = "idle" RUNNING = "running" PAUSED = "paused" COMPLETED = "completed" FAILED = "failed" @dataclass class AgentTurn: """一次 Agent 发言记录""" agent_id: str agent_name: str role: str model: str content: str timestamp: float latency: float = 0.0 tokens_used: int = 0 is_mock: bool = False @dataclass class MeetingResult: """一次会议的完整结果""" meeting_id: str title: str node_type: str turns: List[AgentTurn] = field(default_factory=list) consensus: str = "" started_at: float = 0 finished_at: float = 0 status: str = "pending" @dataclass class OrchestrationRun: """一次编排运行的完整状态""" run_id: str workflow_id: str workflow_name: str status: OrchestratorStatus = OrchestratorStatus.IDLE current_node: str = "" meeting_results: List[MeetingResult] = field(default_factory=list) started_at: float = 0 finished_at: float = 0 error: str = "" def to_dict(self) -> Dict: return { "run_id": self.run_id, "workflow_id": self.workflow_id, "workflow_name": self.workflow_name, "status": self.status.value, "current_node": self.current_node, "meeting_results": [ { "meeting_id": mr.meeting_id, "title": mr.title, "node_type": mr.node_type, "status": mr.status, "turns": [asdict(t) for t in mr.turns], "consensus": mr.consensus, "started_at": mr.started_at, "finished_at": mr.finished_at, } for mr in self.meeting_results ], "started_at": self.started_at, "finished_at": self.finished_at, "error": self.error, "elapsed_seconds": round( (self.finished_at or time.time()) - self.started_at, 1 ) if self.started_at else 0, } # 每个角色在不同会议主题下的 system prompt DINNER_ROLE_PROMPTS = { "chef": ( "你是团队的美食达人/大厨。你对各种菜系了如指掌," "关注口味、食材新鲜度和烹饪方式。请用简短生动的语言(2-3句话)表达你的观点。" ), "health": ( "你是团队的健康顾问/营养师。你关注饮食均衡、热量控制和食品安全。" "请用简短专业的语言(2-3句话)表达你的观点。" ), "budget": ( "你是团队的预算管理者/财务。你关注性价比、人均消费和优惠活动。" "请用简短务实的语言(2-3句话)表达你的观点。" ), "pm": ( "你是产品经理,负责综合各方意见做最终决策。" "请用简短有决断力的语言(2-3句话)表达你的观点。" ), "architect": ( "你是系统架构师,逻辑严谨,擅长分析和对比。" "请用简短条理清晰的语言(2-3句话)表达你的观点。" ), "developer": ( "你是开发工程师,务实高效,喜欢简单直接的方案。" "请用简短直接的语言(2-3句话)表达你的观点。" ), "qa": ( "你是质量保证工程师,注重细节和风险控制。" "请用简短谨慎的语言(2-3句话)表达你的观点。" ), "reviewer": ( "你是代码审查专家,善于发现问题和提出改进。" "请用简短有见地的语言(2-3句话)表达你的观点。" ), } def _get_role_prompt(role: str) -> str: """根据角色获取 system prompt,未匹配则使用通用提示""" return DINNER_ROLE_PROMPTS.get( role, f"你是团队中的{role}角色。请用简短的语言(2-3句话)表达你的观点。" ) class WorkflowOrchestrator: """ 工作流编排器 自动驱动工作流中的每个节点: - meeting 节点:逐个让 Agent 调用 LLM 发言,最后生成共识 - execution 节点:模拟执行并标记完成 """ def __init__(self): self._runs: Dict[str, OrchestrationRun] = {} self._running_task: Optional[asyncio.Task] = None def get_run(self, run_id: str) -> Optional[OrchestrationRun]: return self._runs.get(run_id) def list_runs(self) -> List[Dict]: return [r.to_dict() for r in self._runs.values()] async def start_workflow( self, workflow_path: str, agent_overrides: Dict[str, str] = None, ) -> OrchestrationRun: """ 启动一个工作流的自动编排 参数: workflow_path: YAML 文件名(如 dinner-decision.yaml) agent_overrides: 可选的 agent_id → model 覆盖映射 返回: OrchestrationRun 对象(后台异步执行) """ engine = get_workflow_engine() workflow = await engine.load_workflow(workflow_path) run = OrchestrationRun( run_id=f"run-{uuid.uuid4().hex[:8]}", workflow_id=workflow.workflow_id, workflow_name=workflow.name, status=OrchestratorStatus.RUNNING, started_at=time.time(), ) self._runs[run.run_id] = run # 在后台启动编排 self._running_task = asyncio.create_task( self._run_workflow(run, workflow_path, agent_overrides or {}) ) return run async def _run_workflow( self, run: OrchestrationRun, workflow_path: str, agent_overrides: Dict[str, str], ): """后台执行完整工作流""" engine = get_workflow_engine() try: while True: next_node = await engine.get_next_meeting(run.workflow_id) if next_node is None: break run.current_node = next_node.meeting_id logger.info(f"[{run.run_id}] 开始节点: {next_node.title} ({next_node.node_type})") if next_node.node_type == "meeting": result = await self._run_meeting_node( run, next_node, agent_overrides ) else: result = await self._run_execution_node( run, next_node, agent_overrides ) run.meeting_results.append(result) # 标记节点完成 await engine.complete_meeting(run.workflow_id, next_node.meeting_id) logger.info(f"[{run.run_id}] 节点完成: {next_node.title}") run.status = OrchestratorStatus.COMPLETED run.finished_at = time.time() run.current_node = "" logger.info(f"[{run.run_id}] 工作流完成,耗时 {run.finished_at - run.started_at:.1f}s") except Exception as e: run.status = OrchestratorStatus.FAILED run.error = str(e) run.finished_at = time.time() logger.error(f"[{run.run_id}] 工作流失败: {e}", exc_info=True) async def _run_meeting_node( self, run: OrchestrationRun, node: WorkflowMeeting, agent_overrides: Dict[str, str], ) -> MeetingResult: """执行一个会议节点:各 Agent 依次发言 → 生成共识""" registry = get_agent_registry() recorder = get_meeting_recorder() heartbeat = get_heartbeat_service() result = MeetingResult( meeting_id=node.meeting_id, title=node.title, node_type="meeting", started_at=time.time(), status="in_progress", ) # 创建会议记录 steps = ["提议", "讨论", "共识"] await recorder.create_meeting( meeting_id=node.meeting_id, title=node.title, attendees=node.attendees, steps=steps, ) await recorder.update_progress(node.meeting_id, "提议") # 收集之前节点的讨论上下文 previous_context = self._build_previous_context(run) # 逐个 Agent 发言 for agent_id in node.attendees: agent = await registry.get_agent(agent_id) if not agent: logger.warning(f"Agent {agent_id} 未注册,跳过") continue # 更新心跳 await heartbeat.update_heartbeat( agent_id, "working", f"参与会议: {node.title}", 50 ) # 用 LLM 生成发言 model = agent_overrides.get(agent_id, agent.model) turn = await self._generate_agent_turn( agent, model, node.title, previous_context, result.turns ) result.turns.append(turn) # 写入会议记录 await recorder.add_discussion( meeting_id=node.meeting_id, agent_id=agent.agent_id, agent_name=agent.name, content=turn.content, step="讨论", ) # 恢复心跳 await heartbeat.update_heartbeat(agent_id, "idle", "", 100) # 生成共识 await recorder.update_progress(node.meeting_id, "共识") consensus = await self._generate_consensus(node, result.turns) result.consensus = consensus # 完成会议 await recorder.end_meeting(node.meeting_id, consensus=consensus) result.status = "completed" result.finished_at = time.time() return result async def _run_execution_node( self, run: OrchestrationRun, node: WorkflowMeeting, agent_overrides: Dict[str, str], ) -> MeetingResult: """执行一个 execution 节点:模拟任务执行""" registry = get_agent_registry() heartbeat = get_heartbeat_service() engine = get_workflow_engine() result = MeetingResult( meeting_id=node.meeting_id, title=node.title, node_type="execution", started_at=time.time(), status="in_progress", ) # 获取上一个会议的共识作为执行指令 last_consensus = "" for mr in reversed(run.meeting_results): if mr.consensus: last_consensus = mr.consensus break for agent_id in node.attendees: agent = await registry.get_agent(agent_id) if not agent: continue await heartbeat.update_heartbeat( agent_id, "working", f"执行: {node.title}", 30 ) model = agent_overrides.get(agent_id, agent.model) turn = await self._generate_execution_turn( agent, model, node.title, last_consensus ) result.turns.append(turn) # 向工作流引擎签到 await engine.join_execution_node( run.workflow_id, node.meeting_id, agent_id ) await heartbeat.update_heartbeat(agent_id, "idle", "", 100) result.status = "completed" result.finished_at = time.time() return result async def _generate_agent_turn( self, agent: AgentInfo, model: str, meeting_title: str, previous_context: str, existing_turns: List[AgentTurn], ) -> AgentTurn: """ 调用 LLM 为一个 Agent 生成会议发言 若 LLM 不可用则使用内置 mock """ role_prompt = _get_role_prompt(agent.role) # 构建其他 Agent 已发言的内容 others_said = "" for t in existing_turns: content = t.content[:150] if len(t.content) > 150 else t.content others_said += f" {t.agent_name}: {content}\n" # 直接把所有信息揉进一段连贯的指令中 prompt = ( f"场景:团队正在讨论「{meeting_title}」。\n" f"你的角色:{agent.name}。{role_prompt}\n" ) if previous_context: prompt += f"\n上一轮讨论的结论:{previous_context}\n" if others_said: prompt += f"\n已有发言:\n{others_said}\n" prompt += ( f"\n请以{agent.name}的身份,直接给出2-3句具体建议。" f"不要自我介绍,不要提问,不要使用工具,直接说你的观点。" ) messages = [ LLMMessage(role="user", content=prompt), ] start = time.time() content, tokens, is_mock = await self._call_llm(model, messages) latency = time.time() - start return AgentTurn( agent_id=agent.agent_id, agent_name=agent.name, role=agent.role, model=model, content=content, timestamp=time.time(), latency=round(latency, 2), tokens_used=tokens, is_mock=is_mock, ) async def _generate_execution_turn( self, agent: AgentInfo, model: str, task_title: str, consensus: str, ) -> AgentTurn: """为执行节点生成 Agent 的执行结果""" prompt = ( f"你是{agent.name}。团队讨论后做出了以下决定:\n{consensus}\n\n" f"现在需要你执行「{task_title}」这个任务。" f"请用2-3句话直接汇报你的执行结果和下一步安排。" ) messages = [ LLMMessage(role="user", content=prompt), ] start = time.time() content, tokens, is_mock = await self._call_llm(model, messages) latency = time.time() - start return AgentTurn( agent_id=agent.agent_id, agent_name=agent.name, role=agent.role, model=model, content=content, timestamp=time.time(), latency=round(latency, 2), tokens_used=tokens, is_mock=is_mock, ) async def _generate_consensus( self, node: WorkflowMeeting, turns: List[AgentTurn], ) -> str: """基于所有发言生成会议共识,使用 kimi CLI 效果最佳""" discussion_summary = "" for t in turns: # 截取每人发言前 200 字,避免 prompt 过长 content = t.content[:200] if len(t.content) > 200 else t.content discussion_summary += f" {t.agent_name}: {content}\n" prompt = ( f"请总结以下关于「{node.title}」的讨论,用3-5句话给出共识。\n\n" f"讨论记录:\n{discussion_summary}\n" f"要求:直接输出总结,包含最终决定和行动方案。不要提问。" ) messages = [ LLMMessage(role="user", content=prompt), ] # 优先用 kimi CLI 做总结(它最擅长按指令行事) content, _, _ = await self._call_llm("kimi", messages) return content async def _call_llm( self, model: str, messages: List[LLMMessage] ) -> tuple: """ 调用 AI 生成内容,优先级:CLI → LLM API → 报错 返回: (content, tokens_used, is_mock) """ # 分离 system prompt 和 user prompt system_prompt = "" user_prompt = "" for m in messages: if m.role == "system": system_prompt += m.content + "\n" else: user_prompt += m.content + "\n" user_prompt = user_prompt.strip() # 1. 优先尝试 CLI cli_name = resolve_cli(model) if model else None if cli_name: logger.info(f"使用 CLI [{cli_name}] (model={model})") result = await invoke_cli( cli_name, user_prompt, timeout=120, system_prompt=system_prompt.strip(), ) if result.success: return result.content, 0, False else: logger.warning(f"CLI [{cli_name}] 失败: {result.error},尝试其他方式") # 2. 如果 model 未指定 CLI 或 CLI 失败,尝试任意可用 CLI available = detect_available_clis() if available: fallback_cli = list(available.keys())[0] logger.info(f"使用 fallback CLI [{fallback_cli}]") result = await invoke_cli( fallback_cli, user_prompt, timeout=120, system_prompt=system_prompt.strip(), ) if result.success: return result.content, 0, False else: logger.warning(f"Fallback CLI [{fallback_cli}] 也失败: {result.error}") # 3. 尝试 LLM API try: llm = get_llm_service() providers = llm.get_available_providers() real_providers = [p for p in providers if p != "ollama"] if real_providers: response = await llm.route_task( task=messages[-1].content, messages=messages, preferred_model=model if model else None, ) return response.content, response.tokens_used, False except Exception as e: logger.warning(f"LLM API 也不可用: {e}") raise RuntimeError( f"无可用的 AI 提供商。CLI={list(available.keys()) if available else '无'}," f"LLM API Key 未配置。请安装 CLI 工具或配置 API Key。" ) def _build_previous_context(self, run: OrchestrationRun) -> str: """从已完成的会议中构建上下文""" parts = [] for mr in run.meeting_results: if mr.consensus: parts.append(f"[{mr.title}] 共识: {mr.consensus}") return "\n".join(parts) # 单例 _orchestrator: Optional[WorkflowOrchestrator] = None def get_workflow_orchestrator() -> WorkflowOrchestrator: """获取编排器单例""" global _orchestrator if _orchestrator is None: _orchestrator = WorkflowOrchestrator() return _orchestrator