Files
multiAgentTry/backend/app/services/meeting_scheduler.py
Claude Code dc398d7c7b 完整实现 Swarm 多智能体协作系统
- 新增 CLIPluginAdapter 统一接口 (backend/app/core/agent_adapter.py)
- 新增 LLM 服务层,支持 Anthropic/OpenAI/DeepSeek/Ollama (backend/app/services/llm_service.py)
- 新增 Agent 执行引擎,支持文件锁自动管理 (backend/app/services/agent_executor.py)
- 新增 NativeLLMAgent 原生 LLM 适配器 (backend/app/adapters/native_llm_agent.py)
- 新增进程管理器 (backend/app/services/process_manager.py)
- 新增 Agent 控制 API (backend/app/routers/agents_control.py)
- 新增 WebSocket 实时通信 (backend/app/routers/websocket.py)
- 更新前端 AgentsPage,支持启动/停止 Agent
- 测试通过:Agent 启动、批量操作、栅栏同步

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-09 17:32:11 +08:00

173 lines
5.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
会议调度器 - 实现栅栏同步Barrier Synchronization
"""
import asyncio
from datetime import datetime
from typing import Dict, List, Optional, Set
from dataclasses import dataclass, asdict
from .storage import get_storage
@dataclass
class MeetingQueue:
"""会议等待队列"""
meeting_id: str
title: str
expected_attendees: List[str]
arrived_attendees: List[str]
status: str = "waiting"
created_at: str = ""
started_at: str = ""
min_required: int = 2
def __post_init__(self):
if not self.created_at:
self.created_at = datetime.now().isoformat()
@property
def is_ready(self) -> bool:
expected = set(self.expected_attendees)
arrived = set(self.arrived_attendees)
return expected.issubset(arrived) and len(arrived) >= self.min_required
@property
def missing_attendees(self) -> List[str]:
return list(set(self.expected_attendees) - set(self.arrived_attendees))
@property
def progress(self) -> str:
return f"{len(self.arrived_attendees)}/{len(self.expected_attendees)}"
class MeetingScheduler:
"""会议调度器 - 栅栏同步实现"""
QUEUES_FILE = "cache/meeting_queue.json"
def __init__(self):
self._storage = get_storage()
self._lock = asyncio.Lock()
self._events: Dict[str, asyncio.Event] = {}
async def _load_queues(self) -> Dict[str, Dict]:
return await self._storage.read_json(self.QUEUES_FILE)
async def _save_queues(self, queues: Dict[str, Dict]) -> None:
await self._storage.write_json(self.QUEUES_FILE, queues)
async def create_meeting(
self,
meeting_id: str,
title: str,
expected_attendees: List[str],
min_required: int = None
) -> MeetingQueue:
async with self._lock:
queue = MeetingQueue(
meeting_id=meeting_id,
title=title,
expected_attendees=expected_attendees,
arrived_attendees=[],
min_required=min_required or len(expected_attendees)
)
queues = await self._load_queues()
queues[meeting_id] = asdict(queue)
await self._save_queues(queues)
return queue
async def get_queue(self, meeting_id: str) -> Optional[MeetingQueue]:
queues = await self._load_queues()
return MeetingQueue(**queues[meeting_id]) if meeting_id in queues else None
async def wait_for_meeting(
self,
agent_id: str,
meeting_id: str,
timeout: int = 300
) -> str:
async with self._lock:
queues = await self._load_queues()
if meeting_id not in queues:
await self.create_meeting(
meeting_id=meeting_id,
title=f"Meeting: {meeting_id}",
expected_attendees=[agent_id],
min_required=1
)
return "started"
queue_data = queues[meeting_id]
if agent_id not in queue_data.get("arrived_attendees", []):
queue_data["arrived_attendees"].append(agent_id)
queue_data["arrived_attendees"].sort()
await self._save_queues(queues)
queue = MeetingQueue(**queue_data)
is_ready = queue.is_ready
if is_ready:
await self._start_meeting(meeting_id)
return "started"
# 等待会议开始
event = self._events.setdefault(meeting_id, asyncio.Event())
try:
await asyncio.wait_for(event.wait(), timeout=timeout)
return "started"
except asyncio.TimeoutError:
return "timeout"
async def _start_meeting(self, meeting_id: str) -> None:
async with self._lock:
queues = await self._load_queues()
if meeting_id in queues:
queues[meeting_id]["status"] = "ready"
queues[meeting_id]["started_at"] = datetime.now().isoformat()
await self._save_queues(queues)
# 唤醒所有等待者
event = self._events.get(meeting_id)
if event:
event.set()
async def end_meeting(self, meeting_id: str) -> bool:
async with self._lock:
queues = await self._load_queues()
if meeting_id not in queues:
return False
queues[meeting_id]["status"] = "ended"
await self._save_queues(queues)
self._events.pop(meeting_id, None)
return True
async def get_all_queues(self) -> List[MeetingQueue]:
queues = await self._load_queues()
return [MeetingQueue(**data) for data in queues.values()]
async def add_attendee(self, meeting_id: str, agent_id: str) -> bool:
async with self._lock:
queues = await self._load_queues()
if meeting_id not in queues:
return False
if agent_id not in queues[meeting_id]["expected_attendees"]:
queues[meeting_id]["expected_attendees"].append(agent_id)
await self._save_queues(queues)
return True
return False
# 简化单例实现
_scheduler_instance: Optional[MeetingScheduler] = None
def get_meeting_scheduler() -> MeetingScheduler:
global _scheduler_instance
if _scheduler_instance is None:
_scheduler_instance = MeetingScheduler()
return _scheduler_instance