Files

241 lines
6.9 KiB
Python
Raw Permalink Normal View History

"""
工作流管理 API 路由
"""
from fastapi import APIRouter, HTTPException, UploadFile, File
from pydantic import BaseModel
from typing import List, Optional, Dict, Any
from pathlib import Path
from app.services.workflow_engine import get_workflow_engine
router = APIRouter()
# ========== 请求/响应模型 ==========
class MeetingNode(BaseModel):
"""工作流节点"""
meeting_id: str
title: str
node_type: str = "meeting"
attendees: List[str]
depends_on: List[str] = []
completed: bool = False
on_failure: Optional[str] = None
progress: Optional[str] = None
class WorkflowDetail(BaseModel):
"""工作流详情"""
workflow_id: str
name: str
description: str
status: str
progress: str
current_node: Optional[str] = None
meetings: List[MeetingNode]
class WorkflowSummary(BaseModel):
"""工作流摘要"""
workflow_id: str
name: str
status: str
progress: str
class JoinExecutionRequest(BaseModel):
"""加入执行节点请求"""
agent_id: str
class JumpRequest(BaseModel):
"""跳转请求"""
target_meeting_id: str
# ========== API 端点 ==========
@router.get("/files")
async def list_workflow_files():
"""获取工作流文件列表"""
engine = get_workflow_engine()
workflow_dir = Path(engine._storage.base_path) / engine.WORKFLOWS_DIR
if not workflow_dir.exists():
return {"files": []}
yaml_files = list(workflow_dir.glob("*.yaml")) + list(workflow_dir.glob("*.yml"))
files = []
for f in yaml_files:
stat = f.stat()
files.append({
"name": f.name,
"path": f"workflow/{f.name}",
"size": stat.st_size,
"modified": stat.st_mtime
})
return {"files": files}
@router.post("/upload")
async def upload_workflow(file: UploadFile = File(...)):
"""上传工作流 YAML 文件"""
if not file.filename or not file.filename.endswith(('.yaml', '.yml')):
raise HTTPException(status_code=400, detail="仅支持 .yaml 或 .yml 文件")
engine = get_workflow_engine()
workflow_dir = Path(engine._storage.base_path) / engine.WORKFLOWS_DIR
workflow_dir.mkdir(parents=True, exist_ok=True)
dest = workflow_dir / file.filename
content = await file.read()
dest.write_bytes(content)
return {
"success": True,
"message": f"已上传 {file.filename}",
"path": f"workflow/{file.filename}",
"size": len(content)
}
@router.get("/list")
async def list_workflows():
"""获取已加载的工作流列表"""
engine = get_workflow_engine()
workflows = await engine.list_workflows()
return {"workflows": workflows}
@router.post("/start/{workflow_path:path}")
async def start_workflow(workflow_path: str):
"""
启动工作流
加载 YAML 工作流文件并准备执行
"""
engine = get_workflow_engine()
try:
workflow = await engine.load_workflow(workflow_path)
detail = await engine.get_workflow_detail(workflow.workflow_id)
return detail
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@router.get("/{workflow_id}")
async def get_workflow(workflow_id: str):
"""获取工作流详情"""
engine = get_workflow_engine()
detail = await engine.get_workflow_detail(workflow_id)
if not detail:
raise HTTPException(status_code=404, detail="Workflow not found")
return detail
@router.get("/{workflow_id}/status")
async def get_workflow_status(workflow_id: str):
"""获取工作流状态"""
engine = get_workflow_engine()
status = await engine.get_workflow_status(workflow_id)
if not status:
raise HTTPException(status_code=404, detail="Workflow not found")
return status
@router.get("/{workflow_id}/next")
async def get_next_node(workflow_id: str):
"""获取下一个待执行节点"""
engine = get_workflow_engine()
meeting = await engine.get_next_meeting(workflow_id)
if not meeting:
return {"meeting": None, "message": "Workflow completed"}
return {
"meeting": {
"meeting_id": meeting.meeting_id,
"title": meeting.title,
"node_type": meeting.node_type,
"attendees": meeting.attendees,
"depends_on": meeting.depends_on
}
}
@router.post("/{workflow_id}/complete/{meeting_id}")
async def complete_node(workflow_id: str, meeting_id: str):
"""标记节点完成"""
engine = get_workflow_engine()
success = await engine.complete_meeting(workflow_id, meeting_id)
if not success:
raise HTTPException(status_code=404, detail="Workflow or meeting not found")
return {"success": True, "message": "Node completed"}
@router.post("/{workflow_id}/join/{meeting_id}")
async def join_execution_node(workflow_id: str, meeting_id: str, request: JoinExecutionRequest):
"""
Agent 加入执行节点
标记 Agent 已完成执行当所有 Agent 都完成时返回 ready
"""
engine = get_workflow_engine()
result = await engine.join_execution_node(workflow_id, meeting_id, request.agent_id)
if result.get("status") == "error":
raise HTTPException(status_code=400, detail=result.get("message"))
return result
@router.get("/{workflow_id}/execution/{meeting_id}")
async def get_execution_node_status(workflow_id: str, meeting_id: str):
"""获取执行节点状态"""
engine = get_workflow_engine()
status = await engine.get_execution_status(workflow_id, meeting_id)
if not status:
raise HTTPException(status_code=404, detail="Execution node not found")
return status
@router.post("/{workflow_id}/jump")
async def jump_to_node(workflow_id: str, request: JumpRequest):
"""
强制跳转到指定节点
重置目标节点及所有后续节点的完成状态
"""
engine = get_workflow_engine()
success = await engine.jump_to_node(workflow_id, request.target_meeting_id)
if not success:
raise HTTPException(status_code=404, detail="Target node not found")
return {
"success": True,
"message": f"Jumped to {request.target_meeting_id}",
"detail": await engine.get_workflow_detail(workflow_id)
}
@router.post("/{workflow_id}/fail/{meeting_id}")
async def handle_node_failure(workflow_id: str, meeting_id: str):
"""
处理节点失败
根据 on_failure 配置跳转到指定节点
"""
engine = get_workflow_engine()
target = await engine.handle_failure(workflow_id, meeting_id)
if target:
return {
"success": True,
"message": f"Jumped to {target} due to failure",
"target": target,
"detail": await engine.get_workflow_detail(workflow_id)
}
return {
"success": True,
"message": "No failure handler configured"
}