""" 会议调度器 - 实现栅栏同步(Barrier Synchronization) """ import asyncio from datetime import datetime from typing import Dict, List, Optional, Set from dataclasses import dataclass, asdict from .storage import get_storage @dataclass class MeetingQueue: """会议等待队列""" meeting_id: str title: str expected_attendees: List[str] arrived_attendees: List[str] status: str = "waiting" created_at: str = "" started_at: str = "" min_required: int = 2 def __post_init__(self): if not self.created_at: self.created_at = datetime.now().isoformat() @property def is_ready(self) -> bool: expected = set(self.expected_attendees) arrived = set(self.arrived_attendees) return expected.issubset(arrived) and len(arrived) >= self.min_required @property def missing_attendees(self) -> List[str]: return list(set(self.expected_attendees) - set(self.arrived_attendees)) @property def progress(self) -> str: return f"{len(self.arrived_attendees)}/{len(self.expected_attendees)}" class MeetingScheduler: """会议调度器 - 栅栏同步实现""" QUEUES_FILE = "cache/meeting_queue.json" def __init__(self): self._storage = get_storage() self._lock = asyncio.Lock() self._events: Dict[str, asyncio.Event] = {} async def _load_queues(self) -> Dict[str, Dict]: return await self._storage.read_json(self.QUEUES_FILE) async def _save_queues(self, queues: Dict[str, Dict]) -> None: await self._storage.write_json(self.QUEUES_FILE, queues) async def create_meeting( self, meeting_id: str, title: str, expected_attendees: List[str], min_required: int = None ) -> MeetingQueue: async with self._lock: queue = MeetingQueue( meeting_id=meeting_id, title=title, expected_attendees=expected_attendees, arrived_attendees=[], min_required=min_required or len(expected_attendees) ) queues = await self._load_queues() queues[meeting_id] = asdict(queue) await self._save_queues(queues) return queue async def get_queue(self, meeting_id: str) -> Optional[MeetingQueue]: queues = await self._load_queues() return MeetingQueue(**queues[meeting_id]) if meeting_id in queues else None async def wait_for_meeting( self, agent_id: str, meeting_id: str, timeout: int = 300 ) -> str: async with self._lock: queues = await self._load_queues() if meeting_id not in queues: await self.create_meeting( meeting_id=meeting_id, title=f"Meeting: {meeting_id}", expected_attendees=[agent_id], min_required=1 ) return "started" queue_data = queues[meeting_id] if agent_id not in queue_data.get("arrived_attendees", []): queue_data["arrived_attendees"].append(agent_id) queue_data["arrived_attendees"].sort() await self._save_queues(queues) queue = MeetingQueue(**queue_data) is_ready = queue.is_ready if is_ready: await self._start_meeting(meeting_id) return "started" # 等待会议开始 event = self._events.setdefault(meeting_id, asyncio.Event()) try: await asyncio.wait_for(event.wait(), timeout=timeout) return "started" except asyncio.TimeoutError: return "timeout" async def _start_meeting(self, meeting_id: str) -> None: async with self._lock: queues = await self._load_queues() if meeting_id in queues: queues[meeting_id]["status"] = "ready" queues[meeting_id]["started_at"] = datetime.now().isoformat() await self._save_queues(queues) # 唤醒所有等待者 event = self._events.get(meeting_id) if event: event.set() async def end_meeting(self, meeting_id: str) -> bool: async with self._lock: queues = await self._load_queues() if meeting_id not in queues: return False queues[meeting_id]["status"] = "ended" await self._save_queues(queues) self._events.pop(meeting_id, None) return True async def get_all_queues(self) -> List[MeetingQueue]: queues = await self._load_queues() return [MeetingQueue(**data) for data in queues.values()] async def add_attendee(self, meeting_id: str, agent_id: str) -> bool: async with self._lock: queues = await self._load_queues() if meeting_id not in queues: return False if agent_id not in queues[meeting_id]["expected_attendees"]: queues[meeting_id]["expected_attendees"].append(agent_id) await self._save_queues(queues) return True return False # 简化单例实现 _scheduler_instance: Optional[MeetingScheduler] = None def get_meeting_scheduler() -> MeetingScheduler: global _scheduler_instance if _scheduler_instance is None: _scheduler_instance = MeetingScheduler() return _scheduler_instance