""" 工作流管理 API 路由 """ from fastapi import APIRouter, HTTPException, UploadFile, File from pydantic import BaseModel from typing import List, Optional, Dict, Any from pathlib import Path from app.services.workflow_engine import get_workflow_engine router = APIRouter() # ========== 请求/响应模型 ========== class MeetingNode(BaseModel): """工作流节点""" meeting_id: str title: str node_type: str = "meeting" attendees: List[str] depends_on: List[str] = [] completed: bool = False on_failure: Optional[str] = None progress: Optional[str] = None class WorkflowDetail(BaseModel): """工作流详情""" workflow_id: str name: str description: str status: str progress: str current_node: Optional[str] = None meetings: List[MeetingNode] class WorkflowSummary(BaseModel): """工作流摘要""" workflow_id: str name: str status: str progress: str class JoinExecutionRequest(BaseModel): """加入执行节点请求""" agent_id: str class JumpRequest(BaseModel): """跳转请求""" target_meeting_id: str # ========== API 端点 ========== @router.get("/files") async def list_workflow_files(): """获取工作流文件列表""" engine = get_workflow_engine() workflow_dir = Path(engine._storage.base_path) / engine.WORKFLOWS_DIR if not workflow_dir.exists(): return {"files": []} yaml_files = list(workflow_dir.glob("*.yaml")) + list(workflow_dir.glob("*.yml")) files = [] for f in yaml_files: stat = f.stat() files.append({ "name": f.name, "path": f"workflow/{f.name}", "size": stat.st_size, "modified": stat.st_mtime }) return {"files": files} @router.post("/upload") async def upload_workflow(file: UploadFile = File(...)): """上传工作流 YAML 文件""" if not file.filename or not file.filename.endswith(('.yaml', '.yml')): raise HTTPException(status_code=400, detail="仅支持 .yaml 或 .yml 文件") engine = get_workflow_engine() workflow_dir = Path(engine._storage.base_path) / engine.WORKFLOWS_DIR workflow_dir.mkdir(parents=True, exist_ok=True) dest = workflow_dir / file.filename content = await file.read() dest.write_bytes(content) return { "success": True, "message": f"已上传 {file.filename}", "path": f"workflow/{file.filename}", "size": len(content) } @router.get("/list") async def list_workflows(): """获取已加载的工作流列表""" engine = get_workflow_engine() workflows = await engine.list_workflows() return {"workflows": workflows} @router.post("/start/{workflow_path:path}") async def start_workflow(workflow_path: str): """ 启动工作流 加载 YAML 工作流文件并准备执行 """ engine = get_workflow_engine() try: workflow = await engine.load_workflow(workflow_path) detail = await engine.get_workflow_detail(workflow.workflow_id) return detail except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) @router.get("/{workflow_id}") async def get_workflow(workflow_id: str): """获取工作流详情""" engine = get_workflow_engine() detail = await engine.get_workflow_detail(workflow_id) if not detail: raise HTTPException(status_code=404, detail="Workflow not found") return detail @router.get("/{workflow_id}/status") async def get_workflow_status(workflow_id: str): """获取工作流状态""" engine = get_workflow_engine() status = await engine.get_workflow_status(workflow_id) if not status: raise HTTPException(status_code=404, detail="Workflow not found") return status @router.get("/{workflow_id}/next") async def get_next_node(workflow_id: str): """获取下一个待执行节点""" engine = get_workflow_engine() meeting = await engine.get_next_meeting(workflow_id) if not meeting: return {"meeting": None, "message": "Workflow completed"} return { "meeting": { "meeting_id": meeting.meeting_id, "title": meeting.title, "node_type": meeting.node_type, "attendees": meeting.attendees, "depends_on": meeting.depends_on } } @router.post("/{workflow_id}/complete/{meeting_id}") async def complete_node(workflow_id: str, meeting_id: str): """标记节点完成""" engine = get_workflow_engine() success = await engine.complete_meeting(workflow_id, meeting_id) if not success: raise HTTPException(status_code=404, detail="Workflow or meeting not found") return {"success": True, "message": "Node completed"} @router.post("/{workflow_id}/join/{meeting_id}") async def join_execution_node(workflow_id: str, meeting_id: str, request: JoinExecutionRequest): """ Agent 加入执行节点 标记 Agent 已完成执行,当所有 Agent 都完成时返回 ready """ engine = get_workflow_engine() result = await engine.join_execution_node(workflow_id, meeting_id, request.agent_id) if result.get("status") == "error": raise HTTPException(status_code=400, detail=result.get("message")) return result @router.get("/{workflow_id}/execution/{meeting_id}") async def get_execution_node_status(workflow_id: str, meeting_id: str): """获取执行节点状态""" engine = get_workflow_engine() status = await engine.get_execution_status(workflow_id, meeting_id) if not status: raise HTTPException(status_code=404, detail="Execution node not found") return status @router.post("/{workflow_id}/jump") async def jump_to_node(workflow_id: str, request: JumpRequest): """ 强制跳转到指定节点 重置目标节点及所有后续节点的完成状态 """ engine = get_workflow_engine() success = await engine.jump_to_node(workflow_id, request.target_meeting_id) if not success: raise HTTPException(status_code=404, detail="Target node not found") return { "success": True, "message": f"Jumped to {request.target_meeting_id}", "detail": await engine.get_workflow_detail(workflow_id) } @router.post("/{workflow_id}/fail/{meeting_id}") async def handle_node_failure(workflow_id: str, meeting_id: str): """ 处理节点失败 根据 on_failure 配置跳转到指定节点 """ engine = get_workflow_engine() target = await engine.handle_failure(workflow_id, meeting_id) if target: return { "success": True, "message": f"Jumped to {target} due to failure", "target": target, "detail": await engine.get_workflow_detail(workflow_id) } return { "success": True, "message": "No failure handler configured" }