""" 会议记录服务 - 记录会议内容、讨论和共识 将会议记录保存为 Markdown 文件,按日期组织 """ import asyncio from datetime import datetime from pathlib import Path from typing import Dict, List, Optional from dataclasses import dataclass, field, asdict from .storage import get_storage @dataclass class DiscussionEntry: """单条讨论记录""" agent_id: str agent_name: str content: str timestamp: str = "" step: str = "" def __post_init__(self): if not self.timestamp: self.timestamp = datetime.now().isoformat() @dataclass class ProgressStep: """会议进度步骤""" step_id: str label: str status: str = "pending" # pending, active, completed completed_at: str = "" def mark_active(self): self.status = "active" def mark_completed(self): self.status = "completed" self.completed_at = datetime.now().isoformat() @dataclass class MeetingInfo: """会议信息""" meeting_id: str title: str date: str # YYYY-MM-DD attendees: List[str] = field(default_factory=list) steps: List[ProgressStep] = field(default_factory=list) discussions: List[DiscussionEntry] = field(default_factory=list) status: str = "in_progress" # in_progress, completed created_at: str = "" ended_at: str = "" consensus: str = "" # 最终共识 def __post_init__(self): if not self.created_at: self.created_at = datetime.now().isoformat() if not self.date: self.date = datetime.now().strftime("%Y-%m-%d") @property def current_step(self) -> Optional[ProgressStep]: """获取当前活跃的步骤""" for step in self.steps: if step.status == "active": return step return None @property def completed_steps(self) -> List[ProgressStep]: """获取已完成的步骤""" return [s for s in self.steps if s.status == "completed"] @property def progress_summary(self) -> str: """进度摘要""" total = len(self.steps) if total == 0: return "0/0" completed = len(self.completed_steps) active = 1 if self.current_step else 0 return f"{completed}/{total}" + (" (active)" if active else "") class MeetingRecorder: """ 会议记录服务 记录会议的讨论内容、进度和共识,保存为 Markdown 文件 """ def __init__(self): self._storage = get_storage() self._lock = asyncio.Lock() def _parse_meeting_data(self, data: dict) -> MeetingInfo: """将字典数据转换为 MeetingInfo 对象""" # 转换 steps steps = [ ProgressStep(**s) if isinstance(s, dict) else s for s in data.get("steps", []) ] # 转换 discussions discussions = [ DiscussionEntry(**d) if isinstance(d, dict) else d for d in data.get("discussions", []) ] # 创建 MeetingInfo data["steps"] = steps data["discussions"] = discussions return MeetingInfo(**data) def _get_meeting_dir(self, date: str) -> str: """获取会议目录路径""" return f"meetings/{date}" def _get_meeting_file(self, meeting_id: str, date: str) -> str: """获取会议文件路径""" return f"{self._get_meeting_dir(date)}/{meeting_id}.md" async def create_meeting( self, meeting_id: str, title: str, attendees: List[str], steps: List[str] = None ) -> MeetingInfo: """ 创建新会议记录 Args: meeting_id: 会议 ID title: 会议标题 attendees: 参会者列表 steps: 会议步骤列表 Returns: 创建的会议信息 """ async with self._lock: date = datetime.now().strftime("%Y-%m-%d") # 创建进度步骤 progress_steps = [] if steps: for i, step_label in enumerate(steps): progress_steps.append(ProgressStep( step_id=f"step_{i+1}", label=step_label, status="pending" )) meeting = MeetingInfo( meeting_id=meeting_id, title=title, date=date, attendees=attendees, steps=progress_steps ) # 保存为 Markdown await self._save_meeting_markdown(meeting) return meeting async def _save_meeting_markdown(self, meeting: MeetingInfo) -> None: """将会议保存为 Markdown 文件""" lines = [ f"# {meeting.title}", "", f"**会议 ID**: {meeting.meeting_id}", f"**日期**: {meeting.date}", f"**状态**: {meeting.status}", f"**参会者**: {', '.join(meeting.attendees)}", "", "## 会议进度", "", ] # 进度步骤 for step in meeting.steps: status_icon = { "pending": "○", "active": "◐", "completed": "●" }.get(step.status, "○") time_str = f" ({step.completed_at})" if step.completed_at else "" lines.append(f"- {status_icon} **{step.label}**{time_str}") lines.append("") lines.append("## 讨论记录") lines.append("") # 讨论内容 for discussion in meeting.discussions: lines.append(f"### {discussion.agent_name} - {discussion.timestamp[:19]}") if discussion.step: lines.append(f"*步骤: {discussion.step}*") lines.append("") lines.append(discussion.content) lines.append("") # 共识 if meeting.consensus: lines.append("## 共识") lines.append("") lines.append(meeting.consensus) lines.append("") # 元数据 lines.append("---") lines.append("") lines.append(f"**创建时间**: {meeting.created_at}") if meeting.ended_at: lines.append(f"**结束时间**: {meeting.ended_at}") content = "\n".join(lines) file_path = self._get_meeting_file(meeting.meeting_id, meeting.date) # 使用存储服务保存 await self._storage.ensure_dir(self._get_meeting_dir(meeting.date)) await self._storage.write_json(file_path.replace(".md", ".json"), asdict(meeting)) # 同时保存 Markdown import aiofiles full_path = Path(self._storage.base_path) / file_path async with aiofiles.open(full_path, mode="w", encoding="utf-8") as f: await f.write(content) async def add_discussion( self, meeting_id: str, agent_id: str, agent_name: str, content: str, step: str = "" ) -> None: """ 添加讨论记录 Args: meeting_id: 会议 ID agent_id: Agent ID agent_name: Agent 名称 content: 讨论内容 step: 当前步骤 """ async with self._lock: # 加载会议信息 date = datetime.now().strftime("%Y-%m-%d") file_path = self._get_meeting_file(meeting_id, date) json_path = file_path.replace(".md", ".json") data = await self._storage.read_json(json_path) if not data: return # 会议不存在 meeting = self._parse_meeting_data(data) meeting.discussions.append(DiscussionEntry( agent_id=agent_id, agent_name=agent_name, content=content, step=step )) # 保存 await self._save_meeting_markdown(meeting) async def update_progress( self, meeting_id: str, step_label: str ) -> None: """ 更新会议进度 Args: meeting_id: 会议 ID step_label: 步骤名称 """ async with self._lock: date = datetime.now().strftime("%Y-%m-%d") json_path = self._get_meeting_file(meeting_id, date).replace(".md", ".json") data = await self._storage.read_json(json_path) if not data: return meeting = self._parse_meeting_data(data) # 查找并更新步骤 step_found = False for step in meeting.steps: if step.label == step_label: # 将之前的活跃步骤标记为完成 if meeting.current_step: meeting.current_step.mark_completed() step.mark_active() step_found = True break if not step_found and meeting.steps: # 如果找不到,将第一个 pending 步骤设为活跃 for step in meeting.steps: if step.status == "pending": if meeting.current_step: meeting.current_step.mark_completed() step.mark_active() break await self._save_meeting_markdown(meeting) async def get_meeting(self, meeting_id: str, date: str = None) -> Optional[MeetingInfo]: """ 获取会议信息 Args: meeting_id: 会议 ID date: 日期,默认为今天 Returns: 会议信息 """ if date is None: date = datetime.now().strftime("%Y-%m-%d") json_path = self._get_meeting_file(meeting_id, date).replace(".md", ".json") data = await self._storage.read_json(json_path) if data: return self._parse_meeting_data(data) return None async def list_meetings(self, date: str = None) -> List[MeetingInfo]: """ 列出指定日期的会议 Args: date: 日期,默认为今天 Returns: 会议列表 """ if date is None: date = datetime.now().strftime("%Y-%m-%d") meetings_dir = Path(self._storage.base_path) / self._get_meeting_dir(date) if not meetings_dir.exists(): return [] meetings = [] for json_file in meetings_dir.glob("*.json"): data = await self._storage.read_json(f"meetings/{date}/{json_file.name}") if data: meetings.append(self._parse_meeting_data(data)) return sorted(meetings, key=lambda m: m.created_at) async def end_meeting(self, meeting_id: str, consensus: str = "") -> bool: """ 结束会议 Args: meeting_id: 会议 ID consensus: 最终共识 Returns: 是否成功 """ async with self._lock: date = datetime.now().strftime("%Y-%m-%d") json_path = self._get_meeting_file(meeting_id, date).replace(".md", ".json") data = await self._storage.read_json(json_path) if not data: return False meeting = self._parse_meeting_data(data) meeting.status = "completed" meeting.ended_at = datetime.now().isoformat() if consensus: meeting.consensus = consensus # 完成当前步骤 if meeting.current_step: meeting.current_step.mark_completed() await self._save_meeting_markdown(meeting) return True # 全局单例 _recorder_instance: Optional[MeetingRecorder] = None def get_meeting_recorder() -> MeetingRecorder: """获取会议记录服务单例""" global _recorder_instance if _recorder_instance is None: _recorder_instance = MeetingRecorder() return _recorder_instance