Files
multiAgentTry/backend/cli.py

1116 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Swarm Command Center - CLI 入口
提供命令行接口用于测试和管理后端服务
"""
import asyncio
import json
import typer
from rich.console import Console
from rich.panel import Panel
from rich.syntax import Syntax
from dataclasses import asdict
import sys
import os
# 添加父目录到 Python 路径,以便导入 app 模块
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from app.services.storage import get_storage
from app.services.file_lock import get_file_lock_service
from app.services.heartbeat import get_heartbeat_service
from app.services.agent_registry import get_agent_registry
from app.services.meeting_scheduler import get_meeting_scheduler
from app.services.meeting_recorder import get_meeting_recorder
from app.services.resource_manager import get_resource_manager
from app.services.workflow_engine import get_workflow_engine
from app.services.role_allocator import get_role_allocator
from app.services.human_input import get_human_input_service
# 创建 CLI 应用和 Console
app = typer.Typer(name="swarm", help="Swarm Command Center - 多智能体协作系统 CLI")
console = Console()
# 创建子命令组
storage_app = typer.Typer(help="存储服务操作")
app.add_typer(storage_app, name="storage")
lock_app = typer.Typer(help="文件锁操作")
app.add_typer(lock_app, name="lock")
heartbeat_app = typer.Typer(help="心跳服务操作")
app.add_typer(heartbeat_app, name="heartbeat")
agent_app = typer.Typer(help="Agent 注册管理")
app.add_typer(agent_app, name="agent")
meeting_app = typer.Typer(help="会议调度管理")
app.add_typer(meeting_app, name="meeting")
workflow_app = typer.Typer(help="工作流管理")
app.add_typer(workflow_app, name="workflow")
role_app = typer.Typer(help="角色分配管理")
app.add_typer(role_app, name="role")
human_app = typer.Typer(help="人类输入管理")
app.add_typer(human_app, name="human")
# execute 子命令(单个命令,不是组)
@app.command()
def hello(name: str = typer.Argument("Swarm", help="问候的名字")):
"""简单的问候命令 - 用于验证 CLI 是否正常工作"""
console.print(
Panel(
f"[bold cyan]Hello {name}![/bold cyan]",
title="[bold green]系统提示[/bold green]",
border_style="cyan",
)
)
@app.command()
def version():
"""显示版本信息"""
console.print(
Panel(
"[bold cyan]Swarm Command Center[/bold cyan]\n"
"[dim]Version: 0.1.0-alpha[/dim]\n"
"[dim]多智能体协作系统协调层[/dim]",
title="[bold green]版本信息[/bold green]",
border_style="cyan",
)
)
# ============ Storage 子命令 ============
@storage_app.command("write")
def storage_write(
path: str = typer.Argument(..., help="文件路径,如 .doc/test.json"),
content: str = typer.Argument(..., help="JSON 内容,如 '{\"foo\":\"bar\"}'")
):
"""写入 JSON 文件"""
async def _write():
storage = get_storage()
try:
data = json.loads(content)
await storage.write_json(path, data)
console.print(Panel(f"[green]✓[/green] 已写入: {path}", border_style="green"))
except json.JSONDecodeError as e:
console.print(Panel(f"[red]✗[/red] JSON 格式错误: {e}", border_style="red"))
asyncio.run(_write())
@storage_app.command("read")
def storage_read(
path: str = typer.Argument(..., help="文件路径,如 .doc/test.json")
):
"""读取 JSON 文件"""
async def _read():
storage = get_storage()
data = await storage.read_json(path)
if data:
output = json.dumps(data, ensure_ascii=False, indent=2)
console.print(Panel(Syntax(output, "json", theme="monokai"), title=f"[cyan]{path}[/cyan]", border_style="cyan"))
else:
console.print(Panel(f"[yellow]文件为空或不存在[/yellow]", border_style="yellow"))
asyncio.run(_read())
@storage_app.command("delete")
def storage_delete(
path: str = typer.Argument(..., help="文件路径")
):
"""删除文件"""
async def _delete():
storage = get_storage()
result = await storage.delete(path)
if result:
console.print(Panel(f"[green]✓[/green] 已删除: {path}", border_style="green"))
else:
console.print(Panel(f"[yellow]文件不存在[/yellow]", border_style="yellow"))
asyncio.run(_delete())
# ============ Lock 子命令 ============
@lock_app.command("acquire")
def lock_acquire(
file_path: str = typer.Argument(..., help="文件路径,如 src/auth/login.py"),
agent_id: str = typer.Argument(..., help="Agent ID如 claude-001")
):
"""获取文件锁"""
async def _acquire():
service = get_file_lock_service()
success = await service.acquire_lock(file_path, agent_id, agent_id[:3].upper())
if success:
console.print(Panel(f"[green]OK[/green] Lock acquired: {file_path} -> {agent_id}", border_style="green"))
else:
console.print(Panel(f"[red]FAIL[/red] Lock denied: {file_path} is locked", border_style="red"))
asyncio.run(_acquire())
@lock_app.command("release")
def lock_release(
file_path: str = typer.Argument(..., help="文件路径"),
agent_id: str = typer.Argument(..., help="Agent ID")
):
"""释放文件锁"""
async def _release():
service = get_file_lock_service()
success = await service.release_lock(file_path, agent_id)
if success:
console.print(Panel(f"[green]OK[/green] Lock released: {file_path}", border_style="green"))
else:
console.print(Panel(f"[yellow]FAIL[/yellow] Lock not found or not owned by {agent_id}", border_style="yellow"))
asyncio.run(_release())
@lock_app.command("status")
def lock_status():
"""显示所有文件锁状态"""
async def _status():
service = get_file_lock_service()
locks = await service.get_locks()
if locks:
console.print(Panel("[cyan]Active Locks:[/cyan]", border_style="cyan"))
for lock in locks:
console.print(f" {lock.file_path} -> [yellow]{lock.agent_id}[/yellow] ({lock.elapsed_display})")
else:
console.print(Panel("[dim]No active locks[/dim]", border_style="dim"))
asyncio.run(_status())
@lock_app.command("check")
def lock_check(
file_path: str = typer.Argument(..., help="文件路径")
):
"""检查文件是否被锁定"""
async def _check():
service = get_file_lock_service()
agent_id = await service.check_locked(file_path)
if agent_id:
console.print(Panel(f"[yellow]Locked by[/yellow] {agent_id}", border_style="yellow"))
else:
console.print(Panel(f"[green]Free[/green] {file_path}", border_style="green"))
asyncio.run(_check())
# ============ Heartbeat 子命令 ============
@heartbeat_app.command("ping")
def heartbeat_ping(
agent_id: str = typer.Argument(..., help="Agent ID"),
status: str = typer.Option("working", help="状态: working, waiting, idle, error"),
task: str = typer.Option("", help="当前任务描述"),
progress: int = typer.Option(0, help="进度 0-100")
):
"""发送心跳"""
async def _ping():
service = get_heartbeat_service()
await service.update_heartbeat(agent_id, status, task, progress)
console.print(Panel(f"[green]OK[/green] Heartbeat updated: {agent_id}", border_style="green"))
asyncio.run(_ping())
@heartbeat_app.command("list")
def heartbeat_list():
"""列出所有 Agent 心跳"""
async def _list():
service = get_heartbeat_service()
heartbeats = await service.get_all_heartbeats()
if heartbeats:
console.print(Panel("[cyan]Active Agents:[/cyan]", border_style="cyan"))
for agent_id, hb in heartbeats.items():
console.print(f" {agent_id}: [yellow]{hb.status}[/yellow] ({hb.elapsed_display})")
if hb.current_task:
console.print(f" Task: {hb.current_task} [{hb.progress}%]")
else:
console.print(Panel("[dim]No heartbeats recorded[/dim]", border_style="dim"))
asyncio.run(_list())
@heartbeat_app.command("check-timeout")
def heartbeat_check_timeout(
timeout: int = typer.Argument(30, help="超时秒数")
):
"""检查超时的 Agent"""
async def _check_timeout():
service = get_heartbeat_service()
timeout_agents = await service.check_timeout(timeout)
if timeout_agents:
console.print(Panel(f"[red]Timed out agents:[/red] {', '.join(timeout_agents)}", border_style="red"))
else:
console.print(Panel(f"[green]OK[/green] No timed out agents (within {timeout}s)", border_style="green"))
asyncio.run(_check_timeout())
# ============ Agent 子命令 ============
@agent_app.command("register")
def agent_register(
agent_id: str = typer.Argument(..., help="Agent ID如 claude-001"),
name: str = typer.Option(..., help="显示名称,如 'Claude Code'"),
role: str = typer.Option(..., help="角色architect, pm, developer, qa, reviewer"),
model: str = typer.Option(..., help="模型,如 claude-opus-4.6")
):
"""注册新 Agent"""
async def _register():
registry = get_agent_registry()
agent = await registry.register_agent(agent_id, name, role, model)
console.print(Panel(
f"[green]OK[/green] Agent registered: {agent_id}\n"
f" Name: {agent.name}\n"
f" Role: {agent.role}\n"
f" Model: {agent.model}",
border_style="green"
))
asyncio.run(_register())
@agent_app.command("list")
def agent_list():
"""列出所有 Agent"""
async def _list():
registry = get_agent_registry()
agents = await registry.list_agents()
if agents:
console.print(Panel("[cyan]Registered Agents:[/cyan]", border_style="cyan"))
for agent in agents:
console.print(f" [yellow]{agent.agent_id}[/yellow] | {agent.name} | {agent.role} | {agent.model}")
else:
console.print(Panel("[dim]No agents registered[/dim]", border_style="dim"))
asyncio.run(_list())
@agent_app.command("info")
def agent_info(
agent_id: str = typer.Argument(..., help="Agent ID")
):
"""获取 Agent 详情"""
async def _info():
registry = get_agent_registry()
agent = await registry.get_agent(agent_id)
if agent:
state = await registry.get_state(agent_id)
console.print(Panel(
f"[cyan]Agent Info:[/cyan]\n"
f" ID: {agent.agent_id}\n"
f" Name: {agent.name}\n"
f" Role: {agent.role}\n"
f" Model: {agent.model}\n"
f" Status: {agent.status}\n"
f" Created: {agent.created_at}\n"
f"\n[cyan]Current State:[/cyan]\n"
f" Task: {state.current_task if state else 'N/A'}\n"
f" Progress: {state.progress if state else 0}%",
border_style="cyan"
))
else:
console.print(Panel(f"[red]Agent not found:[/red] {agent_id}", border_style="red"))
asyncio.run(_info())
# agent state 子命令组
state_app = typer.Typer(help="Agent 状态操作")
agent_app.add_typer(state_app, name="state")
@state_app.command("set")
def agent_state_set(
agent_id: str = typer.Argument(..., help="Agent ID"),
task: str = typer.Option("", help="当前任务"),
progress: int = typer.Option(0, help="进度 0-100")
):
"""设置 Agent 状态"""
async def _set():
registry = get_agent_registry()
await registry.update_state(agent_id, task, progress)
console.print(Panel(f"[green]OK[/green] State updated: {agent_id}", border_style="green"))
asyncio.run(_set())
@state_app.command("get")
def agent_state_get(
agent_id: str = typer.Argument(..., help="Agent ID")
):
"""获取 Agent 状态"""
async def _get():
registry = get_agent_registry()
state = await registry.get_state(agent_id)
if state:
output = json.dumps(asdict(state), ensure_ascii=False, indent=2)
console.print(Panel(Syntax(output, "json", theme="monokai"), title=f"[cyan]{agent_id} State[/cyan]", border_style="cyan"))
else:
console.print(Panel(f"[yellow]No state found[/yellow]", border_style="yellow"))
asyncio.run(_get())
# ============ Meeting 子命令 ============
@meeting_app.command("create")
def meeting_create(
meeting_id: str = typer.Argument(..., help="会议 ID"),
title: str = typer.Option(..., help="会议标题"),
attendees: str = typer.Option(..., help="参会者列表,逗号分隔,如 claude-001,kimi-002")
):
"""创建会议"""
async def _create():
scheduler = get_meeting_scheduler()
attendee_list = [a.strip() for a in attendees.split(",")]
queue = await scheduler.create_meeting(meeting_id, title, attendee_list)
console.print(Panel(
f"[green]OK[/green] Meeting created: {meeting_id}\n"
f" Title: {queue.title}\n"
f" Expected: {', '.join(queue.expected_attendees)}\n"
f" Min required: {queue.min_required}",
border_style="green"
))
asyncio.run(_create())
@meeting_app.command("wait")
def meeting_wait(
meeting_id: str = typer.Argument(..., help="会议 ID"),
agent_id: str = typer.Option(..., help="Agent ID")
):
"""等待会议开始(阻塞)"""
async def _wait():
scheduler = get_meeting_scheduler()
console.print(f"[dim]Waiting for meeting: {meeting_id}...[/dim]")
result = await scheduler.wait_for_meeting(agent_id, meeting_id)
if result == "started":
console.print(Panel(f"[green]Meeting started![/green] {meeting_id}", border_style="green"))
elif result == "timeout":
console.print(Panel(f"[red]Timeout waiting for meeting[/red]", border_style="red"))
else:
console.print(Panel(f"[yellow]Error waiting for meeting[/yellow]", border_style="yellow"))
asyncio.run(_wait())
@meeting_app.command("queue")
def meeting_queue(
meeting_id: str = typer.Argument(..., help="会议 ID")
):
"""查看会议队列状态"""
async def _queue():
scheduler = get_meeting_scheduler()
queue = await scheduler.get_queue(meeting_id)
if queue:
missing = queue.missing_attendees
console.print(Panel(
f"[cyan]Meeting Queue:[/cyan] {meeting_id}\n"
f" Title: {queue.title}\n"
f" Status: [yellow]{queue.status}[/yellow]\n"
f" Progress: {queue.progress}\n"
f" Arrived: {', '.join(queue.arrived_attendees) or '(none)'}\n"
f" Missing: {', '.join(missing) or '(none)'}\n"
f" Expected: {', '.join(queue.expected_attendees)}",
border_style="cyan"
))
else:
console.print(Panel(f"[yellow]Meeting not found:[/yellow] {meeting_id}", border_style="yellow"))
asyncio.run(_queue())
@meeting_app.command("end")
def meeting_end(
meeting_id: str = typer.Argument(..., help="会议 ID")
):
"""结束会议"""
async def _end():
scheduler = get_meeting_scheduler()
success = await scheduler.end_meeting(meeting_id)
if success:
console.print(Panel(f"[green]OK[/green] Meeting ended: {meeting_id}", border_style="green"))
else:
console.print(Panel(f"[yellow]Meeting not found:[/yellow] {meeting_id}", border_style="yellow"))
asyncio.run(_end())
# ============ Meeting 记录子命令 ============
@meeting_app.command("record-create")
def meeting_record_create(
meeting_id: str = typer.Argument(..., help="会议 ID"),
title: str = typer.Option(..., help="会议标题"),
attendees: str = typer.Option(..., help="参会者列表,逗号分隔"),
steps: str = typer.Option("", help="会议步骤,逗号分隔")
):
"""创建会议记录"""
async def _create():
recorder = get_meeting_recorder()
attendee_list = [a.strip() for a in attendees.split(",")]
step_list = [s.strip() for s in steps.split(",")] if steps else None
meeting = await recorder.create_meeting(meeting_id, title, attendee_list, step_list)
console.print(Panel(
f"[green]OK[/green] Meeting record created: {meeting_id}\n"
f" Title: {meeting.title}\n"
f" Date: {meeting.date}\n"
f" Attendees: {', '.join(meeting.attendees)}\n"
f" Steps: {len(meeting.steps)}",
border_style="green"
))
asyncio.run(_create())
@meeting_app.command("discuss")
def meeting_discuss(
meeting_id: str = typer.Argument(..., help="会议 ID"),
agent: str = typer.Option(..., help="Agent ID"),
content: str = typer.Option(..., help="讨论内容"),
step: str = typer.Option("", help="当前步骤")
):
"""添加讨论记录"""
async def _discuss():
recorder = get_meeting_recorder()
await recorder.add_discussion(meeting_id, agent, agent[:3].upper(), content, step)
console.print(Panel(f"[green]OK[/green] Discussion added", border_style="green"))
asyncio.run(_discuss())
@meeting_app.command("progress")
def meeting_progress(
meeting_id: str = typer.Argument(..., help="会议 ID"),
step: str = typer.Argument(..., help="步骤名称")
):
"""更新会议进度"""
async def _progress():
recorder = get_meeting_recorder()
await recorder.update_progress(meeting_id, step)
console.print(Panel(f"[green]OK[/green] Progress updated: {step}", border_style="green"))
asyncio.run(_progress())
@meeting_app.command("show")
def meeting_show(
meeting_id: str = typer.Argument(..., help="会议 ID"),
date: str = typer.Option("", help="日期 YYYY-MM-DD默认今天")
):
"""显示会议详情"""
async def _show():
recorder = get_meeting_recorder()
meeting = await recorder.get_meeting(meeting_id, date or None)
if meeting:
console.print(Panel(
f"[cyan]Meeting:[/cyan] {meeting.title}\n"
f" ID: {meeting.meeting_id}\n"
f" Date: {meeting.date}\n"
f" Status: [yellow]{meeting.status}[/yellow]\n"
f" Progress: {meeting.progress_summary}\n"
f" Attendees: {', '.join(meeting.attendees)}\n"
f" Discussions: {len(meeting.discussions)}",
border_style="cyan"
))
else:
console.print(Panel(f"[yellow]Meeting not found[/yellow]", border_style="yellow"))
asyncio.run(_show())
@meeting_app.command("list")
def meeting_list(
date: str = typer.Option("", help="日期 YYYY-MM-DD默认今天")
):
"""列出指定日期的会议"""
async def _list():
recorder = get_meeting_recorder()
meetings = await recorder.list_meetings(date or None)
if meetings:
console.print(Panel(f"[cyan]Meetings for {date or 'today'}:[/cyan]", border_style="cyan"))
for m in meetings:
console.print(f" [yellow]{m.meeting_id}[/yellow] | {m.title} | {m.status}")
else:
console.print(Panel("[dim]No meetings found[/dim]", border_style="dim"))
asyncio.run(_list())
@meeting_app.command("finish")
def meeting_finish(
meeting_id: str = typer.Argument(..., help="会议 ID"),
consensus: str = typer.Option("", help="最终共识")
):
"""完成会议并保存共识"""
async def _finish():
recorder = get_meeting_recorder()
success = await recorder.end_meeting(meeting_id, consensus)
if success:
console.print(Panel(f"[green]OK[/green] Meeting completed: {meeting_id}", border_style="green"))
else:
console.print(Panel(f"[yellow]Meeting not found[/yellow]", border_style="yellow"))
asyncio.run(_finish())
# ============ Execute 命令 ============
@app.command("execute")
def execute_command(
agent_id: str = typer.Argument(..., help="Agent ID"),
task: str = typer.Argument(..., help="任务描述")
):
"""执行任务(自动管理文件锁和心跳)"""
async def _execute():
manager = get_resource_manager()
console.print(Panel(f"[dim]Executing task...[/dim]", border_style="dim"))
result = await manager.execute_task(agent_id, task)
if result.success:
console.print(Panel(
f"[green]OK[/green] Task completed\n"
f" Message: {result.message}\n"
f" Files locked: {len(result.files_locked)}\n"
f" Duration: {result.duration_seconds:.2f}s",
border_style="green"
))
if result.files_locked:
console.print(f" Files: {', '.join(result.files_locked)}")
else:
console.print(Panel(f"[red]Task failed[/red]", border_style="red"))
asyncio.run(_execute())
@app.command("status")
def status_command():
"""显示所有 Agent 状态"""
async def _status():
manager = get_resource_manager()
all_status = await manager.get_all_status()
if all_status:
console.print(Panel("[cyan]Agent Status:[/cyan]", border_style="cyan"))
for s in all_status:
info = s["info"]
hb = s["heartbeat"]
console.print(
f" [yellow]{s['agent_id']}[/yellow] | "
f"{info.get('name', '')} | "
f"{hb.get('status', 'unknown')} | "
f"{hb.get('current_task', 'N/A')}"
)
if s["locks"]:
console.print(f" Locks: {', '.join(l['file'] for l in s['locks'])}")
else:
console.print(Panel("[dim]No agents found[/dim]", border_style="dim"))
asyncio.run(_status())
# ============ Workflow 子命令 ============
@workflow_app.command("load")
def workflow_load(
path: str = typer.Argument(..., help="YAML 文件路径,如 example.yaml")
):
"""加载工作流"""
async def _load():
engine = get_workflow_engine()
try:
workflow = await engine.load_workflow(path)
console.print(Panel(
f"[green]OK[/green] Workflow loaded: {workflow.workflow_id}\n"
f" Name: {workflow.name}\n"
f" Description: {workflow.description}\n"
f" Meetings: {len(workflow.meetings)}",
border_style="green"
))
except Exception as e:
console.print(Panel(f"[red]Error:[/red] {e}", border_style="red"))
asyncio.run(_load())
@workflow_app.command("next")
def workflow_next(
workflow_id: str = typer.Argument(..., help="工作流 ID")
):
"""获取下一个会议"""
async def _next():
engine = get_workflow_engine()
meeting = await engine.get_next_meeting(workflow_id)
if meeting:
console.print(Panel(
f"[cyan]Next meeting:[/cyan] {meeting.meeting_id}\n"
f" Title: {meeting.title}\n"
f" Attendees: {', '.join(meeting.attendees)}",
border_style="cyan"
))
else:
console.print(Panel("[dim]No pending meetings (workflow completed)[/dim]", border_style="dim"))
asyncio.run(_next())
@workflow_app.command("status")
def workflow_status(
workflow_id: str = typer.Argument(..., help="工作流 ID")
):
"""获取工作流状态"""
async def _status():
engine = get_workflow_engine()
status = await engine.get_workflow_status(workflow_id)
if status:
console.print(Panel(
f"[cyan]Workflow:[/cyan] {status['name']}\n"
f" ID: {status['workflow_id']}\n"
f" Status: [yellow]{status['status']}[/yellow]\n"
f" Progress: {status['progress']}\n"
f" Meetings: {len(status['meetings'])}",
border_style="cyan"
))
for m in status['meetings']:
icon = "[green]✓[/green]" if m['completed'] else "[dim]○[/dim]"
console.print(f" {icon} {m['meeting_id']}: {m['title']}")
else:
console.print(Panel(f"[yellow]Workflow not found[/yellow]", border_style="yellow"))
asyncio.run(_status())
@workflow_app.command("complete")
def workflow_complete(
workflow_id: str = typer.Argument(..., help="工作流 ID"),
meeting_id: str = typer.Argument(..., help="会议 ID")
):
"""标记会议完成"""
async def _complete():
engine = get_workflow_engine()
success = await engine.complete_meeting(workflow_id, meeting_id)
if success:
console.print(Panel(f"[green]OK[/green] Meeting completed: {meeting_id}", border_style="green"))
else:
console.print(Panel(f"[yellow]Meeting or workflow not found[/yellow]", border_style="yellow"))
asyncio.run(_complete())
@workflow_app.command("show")
def workflow_show(
path: str = typer.Argument(..., help="YAML 文件路径")
):
"""显示工作流详情(直接从文件加载)"""
async def _show():
engine = get_workflow_engine()
try:
workflow = await engine.load_workflow(path)
console.print(Panel(
f"[cyan]Workflow:[/cyan] {workflow.name}\n"
f" ID: {workflow.workflow_id}\n"
f" Description: {workflow.description}\n"
f" Status: {workflow.status}\n"
f" Progress: {workflow.progress}\n"
f" Meetings: {len(workflow.meetings)}",
border_style="cyan"
))
for m in workflow.meetings:
icon = "[dim]○[/dim]" if not m.completed else "[green]✓[/green]"
console.print(f" {icon} {m.meeting_id}: {m.title}")
console.print(f" Attendees: {', '.join(m.attendees)}")
except Exception as e:
console.print(Panel(f"[red]Error:[/red] {e}", border_style="red"))
asyncio.run(_show())
@workflow_app.command("list-files")
def workflow_list_files():
"""列出可用的 YAML 工作流文件"""
async def _do_list():
from pathlib import Path
import os
# 查找项目根目录的 .doc/workflow
current = Path.cwd()
workflow_dir = None
for parent in [current] + list(current.parents):
candidate = parent / ".doc" / "workflow"
if candidate.exists():
workflow_dir = candidate
break
if workflow_dir and workflow_dir.exists():
yaml_files = list(workflow_dir.glob("*.yaml")) + list(workflow_dir.glob("*.yml"))
if yaml_files:
console.print(Panel("[cyan]Available workflow files:[/cyan]", border_style="cyan"))
for f in yaml_files:
console.print(f" {f.name}")
else:
console.print(Panel("[dim]No workflow files found[/dim]", border_style="dim"))
else:
console.print(Panel("[yellow]Workflow directory not found[/yellow]", border_style="yellow"))
asyncio.run(_do_list())
@workflow_app.command("start")
def workflow_start(
workflow_path: str = typer.Argument(..., help="工作流文件路径,如 default-dev-flow.yaml")
):
"""启动工作流(加载并准备运行)"""
async def _start():
engine = get_workflow_engine()
try:
workflow = await engine.load_workflow(workflow_path)
console.print(Panel(
f"[green]OK[/green] Workflow started: {workflow.workflow_id}\n"
f" Name: {workflow.name}\n"
f" Description: {workflow.description}\n"
f" Total nodes: {len(workflow.meetings)}\n"
f" First node: [yellow]{workflow.meetings[0].meeting_id}[/yellow]\n"
f" Type: {workflow.meetings[0].node_type}",
border_style="green"
))
except Exception as e:
console.print(Panel(f"[red]Error:[/red] {e}", border_style="red"))
asyncio.run(_start())
@workflow_app.command("current")
def workflow_current(
workflow_id: str = typer.Argument(..., help="工作流 ID")
):
"""获取工作流当前节点"""
async def _current():
engine = get_workflow_engine()
detail = await engine.get_workflow_detail(workflow_id)
if detail:
current_id = detail.get("current_node")
console.print(Panel(
f"[cyan]Workflow:[/cyan] {detail['name']}\n"
f" Status: [yellow]{detail['status']}[/yellow]\n"
f" Progress: {detail['progress']}\n"
f" Current Node: [yellow]{current_id or 'None'}[/yellow]",
border_style="cyan"
))
if current_id:
for m in detail["meetings"]:
if m["meeting_id"] == current_id:
console.print(f"\n[cyan]Current Node Details:[/cyan]")
console.print(f" ID: {m['meeting_id']}")
console.print(f" Title: {m['title']}")
console.print(f" Type: {m['node_type']}")
console.print(f" Attendees: {', '.join(m['attendees'])}")
if m.get("progress"):
console.print(f" Progress: {m['progress']}")
break
else:
console.print(Panel(f"[yellow]Workflow not found[/yellow]", border_style="yellow"))
asyncio.run(_current())
@workflow_app.command("join")
def workflow_join(
workflow_id: str = typer.Argument(..., help="工作流 ID"),
meeting_id: str = typer.Argument(..., help="节点 ID"),
agent_id: str = typer.Option(..., help="Agent ID")
):
"""Agent 加入执行节点(标记完成)"""
async def _join():
engine = get_workflow_engine()
result = await engine.join_execution_node(workflow_id, meeting_id, agent_id)
if result.get("status") == "ready":
console.print(Panel(
f"[green]✓[/green] Execution node ready!\n"
f" Progress: {result['progress']}\n"
f" {result['message']}",
border_style="green"
))
elif result.get("status") == "waiting":
console.print(Panel(
f"[yellow]Waiting for other agents...[/yellow]\n"
f" Progress: {result['progress']}\n"
f" Missing: {', '.join(result.get('missing', []))}",
border_style="yellow"
))
else:
console.print(Panel(f"[red]Error:[/red] {result.get('message', 'Unknown error')}", border_style="red"))
asyncio.run(_join())
@workflow_app.command("execution-status")
def workflow_execution_status(
workflow_id: str = typer.Argument(..., help="工作流 ID"),
meeting_id: str = typer.Argument(..., help="执行节点 ID")
):
"""获取执行节点状态"""
async def _status():
engine = get_workflow_engine()
status = await engine.get_execution_status(workflow_id, meeting_id)
if status:
console.print(Panel(
f"[cyan]Execution Node:[/cyan] {status['meeting_id']}\n"
f" Title: {status['title']}\n"
f" Type: {status['node_type']}\n"
f" Progress: {status['progress']}\n"
f" Ready: [yellow]{'Yes' if status['is_ready'] else 'No'}[/yellow]\n"
f" Completed: {', '.join(status['completed_attendees']) or '(none)'}\n"
f" Missing: {', '.join(status['missing']) or '(none)'}",
border_style="cyan"
))
else:
console.print(Panel(f"[yellow]Node not found[/yellow]", border_style="yellow"))
asyncio.run(_status())
@workflow_app.command("jump")
def workflow_jump(
workflow_id: str = typer.Argument(..., help="工作流 ID"),
target_meeting_id: str = typer.Argument(..., help="目标节点 ID")
):
"""强制跳转到指定节点(重置后续所有节点)"""
async def _jump():
engine = get_workflow_engine()
success = await engine.jump_to_node(workflow_id, target_meeting_id)
if success:
console.print(Panel(
f"[green]OK[/green] Jumped to node: {target_meeting_id}\n"
f"[yellow]Warning: All subsequent nodes have been reset.[/yellow]",
border_style="green"
))
else:
console.print(Panel(f"[red]Target node not found[/red]", border_style="red"))
asyncio.run(_jump())
@workflow_app.command("detail")
def workflow_detail(
workflow_id: str = typer.Argument(..., help="工作流 ID")
):
"""获取工作流详细信息"""
async def _detail():
engine = get_workflow_engine()
detail = await engine.get_workflow_detail(workflow_id)
if detail:
console.print(Panel(
f"[cyan]Workflow:[/cyan] {detail['name']}\n"
f" ID: {detail['workflow_id']}\n"
f" Status: [yellow]{detail['status']}[/yellow]\n"
f" Progress: {detail['progress']}",
border_style="cyan"
))
console.print("\n[cyan]Nodes:[/cyan]")
for m in detail["meetings"]:
icon = "[green]✓[/green]" if m['completed'] else "[dim]○[/dim]"
type_mark = "[yellow]⚡[/yellow]" if m.get("node_type") == "execution" else ""
on_fail = f" → [red]{m['on_failure']}[/red]" if m.get("on_failure") else ""
console.print(f" {icon} {type_mark}[cyan]{m['meeting_id']}[/cyan]: {m['title']}{on_fail}")
console.print(f" Type: {m.get('node_type', 'meeting')} | Attendees: {', '.join(m['attendees'])}")
if m.get("progress"):
console.print(f" Progress: {m['progress']}")
else:
console.print(Panel(f"[yellow]Workflow not found[/yellow]", border_style="yellow"))
asyncio.run(_detail())
# ============ Role 子命令 ============
@role_app.command("allocate")
def role_allocate(
task: str = typer.Argument(..., help="任务描述"),
agents: str = typer.Argument(..., help="Agent ID 列表,逗号分隔")
):
"""为任务分配角色"""
async def _allocate():
allocator = get_role_allocator()
agent_list = [a.strip() for a in agents.split(",")]
allocation = await allocator.allocate_roles(task, agent_list)
console.print(Panel(f"[cyan]Role Allocation:[/cyan]", border_style="cyan"))
console.print(f"Task: {task}")
console.print(f"Primary role: [yellow]{allocator.get_primary_role(task)}[/yellow]")
console.print("")
for agent_id, role in allocation.items():
console.print(f" {agent_id} -> [yellow]{role}[/yellow]")
asyncio.run(_allocate())
@role_app.command("explain")
def role_explain(
task: str = typer.Argument(..., help="任务描述"),
agents: str = typer.Argument(..., help="Agent ID 列表,逗号分隔")
):
"""解释角色分配原因"""
async def _explain():
allocator = get_role_allocator()
agent_list = [a.strip() for a in agents.split(",")]
allocation = await allocator.allocate_roles(task, agent_list)
explanation = allocator.explain_allocation(task, allocation)
console.print(Panel(explanation, title="[cyan]Role Allocation Explanation[/cyan]", border_style="cyan"))
asyncio.run(_explain())
@role_app.command("primary")
def role_primary(
task: str = typer.Argument(..., help="任务描述")
):
"""获取任务的主要角色"""
allocator = get_role_allocator()
primary = allocator.get_primary_role(task)
console.print(f"Primary role for '{task}': [yellow]{primary}[/yellow]")
# ============ Human 子命令 ============
@human_app.command("register")
def human_register(
user_id: str = typer.Argument(..., help="用户 ID如 user001"),
name: str = typer.Option(..., help="显示名称"),
role: str = typer.Option("", help="角色"),
avatar: str = typer.Option("👤", help="头像")
):
"""注册人类参与者"""
async def _register():
service = get_human_input_service()
await service.register_participant(user_id, name, role, avatar)
console.print(Panel(
f"[green]OK[/green] User registered: {user_id}\n"
f" Name: {name}\n"
f" Role: {role or 'N/A'}",
border_style="green"
))
asyncio.run(_register())
@human_app.command("add-task")
def human_add_task(
content: str = typer.Argument(..., help="任务内容"),
from_user: str = typer.Option("user001", help="提交者 ID"),
priority: str = typer.Option("medium", help="优先级: high, medium, low"),
title: str = typer.Option("", help="任务标题"),
agent: str = typer.Option("", help="建议的 Agent"),
urgent: bool = typer.Option(False, help="是否紧急")
):
"""提交任务请求"""
async def _add_task():
service = get_human_input_service()
task_id = await service.add_task_request(
from_user=from_user,
content=content,
priority=priority,
title=title,
suggested_agent=agent,
urgent=urgent
)
console.print(Panel(
f"[green]OK[/green] Task added: {task_id}\n"
f" Content: {content}\n"
f" Priority: {priority}\n"
f" Urgent: {urgent}",
border_style="green"
))
asyncio.run(_add_task())
@human_app.command("pending-tasks")
def human_pending_tasks(
priority: str = typer.Option("", help="过滤优先级"),
agent: str = typer.Option("", help="过滤 Agent")
):
"""查看待处理任务"""
async def _pending():
service = get_human_input_service()
tasks = await service.get_pending_tasks(
priority_filter=priority or None,
agent_filter=agent or None
)
if tasks:
console.print(Panel(f"[cyan]Pending Tasks ({len(tasks)}):[/cyan]", border_style="cyan"))
for t in tasks:
urgent_mark = "[red]⚠️[/red] " if t.is_urgent else ""
console.print(f" {urgent_mark}[yellow]{t.id}[/yellow]")
console.print(f" From: {t.from_user} | Priority: {t.priority}")
if t.title:
console.print(f" Title: {t.title}")
console.print(f" Content: {t.content}")
if t.suggested_agent:
console.print(f" Suggested: {t.suggested_agent}")
else:
console.print(Panel("[dim]No pending tasks[/dim]", border_style="dim"))
asyncio.run(_pending())
@human_app.command("urgent-tasks")
def human_urgent_tasks():
"""查看紧急任务"""
async def _urgent():
service = get_human_input_service()
tasks = await service.get_urgent_tasks()
if tasks:
console.print(Panel(f"[red]⚠️ Urgent Tasks ({len(tasks)}):[/red]", border_style="red"))
for t in tasks:
console.print(f" [yellow]{t.id}[/yellow] | {t.from_user}")
console.print(f" Content: {t.content}")
else:
console.print(Panel("[green]No urgent tasks[/green]", border_style="green"))
asyncio.run(_urgent())
@human_app.command("comment")
def human_comment(
meeting_id: str = typer.Argument(..., help="会议 ID"),
content: str = typer.Argument(..., help="评论内容"),
from_user: str = typer.Option("user001", help="提交者 ID"),
comment_type: str = typer.Option("proposal", help="类型: proposal, question, correction"),
priority: str = typer.Option("normal", help="优先级")
):
"""提交会议评论"""
async def _comment():
service = get_human_input_service()
comment_id = await service.add_meeting_comment(
from_user=from_user,
meeting_id=meeting_id,
content=content,
comment_type=comment_type,
priority=priority
)
console.print(Panel(
f"[green]OK[/green] Comment added: {comment_id}\n"
f" Meeting: {meeting_id}\n"
f" Content: {content}",
border_style="green"
))
asyncio.run(_comment())
@human_app.command("pending-comments")
def human_pending_comments(
meeting_id: str = typer.Option("", help="过滤会议 ID")
):
"""查看待处理评论"""
async def _pending():
service = get_human_input_service()
comments = await service.get_pending_comments(
meeting_id=meeting_id or None
)
if comments:
console.print(Panel(f"[cyan]Pending Comments ({len(comments)}):[/cyan]", border_style="cyan"))
for c in comments:
console.print(f" [yellow]{c.id}[/yellow] | {c.from_user} -> {c.meeting_id}")
console.print(f" Type: {c.type} | Priority: {c.priority}")
console.print(f" Content: {c.content}")
else:
console.print(Panel("[dim]No pending comments[/dim]", border_style="dim"))
asyncio.run(_pending())
@human_app.command("summary")
def human_summary():
"""查看人类输入服务摘要"""
async def _summary():
service = get_human_input_service()
summary = await service.get_summary()
console.print(Panel(
f"[cyan]Human Input Summary:[/cyan]\n"
f" Participants: {summary['participants']}\n"
f" Online: {summary['online_users']}\n"
f" Pending Tasks: {summary['pending_tasks']}\n"
f" Urgent Tasks: {summary['urgent_tasks']}\n"
f" Pending Comments: {summary['pending_comments']}\n"
f" Last Updated: {summary['last_updated']}",
border_style="cyan"
))
asyncio.run(_summary())
# 所有步骤完成!
def main():
"""CLI 入口点"""
app()
if __name__ == "__main__":
main()