""" 共识管理器 判断讨论是否达成共识 """ import json from typing import Dict, Any, Optional from loguru import logger from models.agent import Agent from models.chatroom import ChatRoom from services.ai_provider_service import AIProviderService class ConsensusManager: """ 共识管理器 使用主持人Agent判断讨论共识 """ # 共识判断提示词模板 CONSENSUS_PROMPT = """你是讨论的主持人,负责判断讨论是否达成共识。 讨论目标:{objective} 对话历史: {history} 请仔细分析对话内容,判断: 1. 参与者是否对核心问题达成一致意见? 2. 是否还有重要分歧未解决? 3. 讨论结果是否足够明确和可执行? 请以JSON格式回复(不要包含任何其他文字): {{ "consensus_reached": true或false, "confidence": 0到1之间的数字, "summary": "讨论结果摘要,简洁概括达成的共识或当前状态", "action_items": ["具体的行动项列表"], "unresolved_issues": ["未解决的问题列表"], "key_decisions": ["关键决策列表"] }} 注意: - consensus_reached为true表示核心问题已有明确结论 - confidence表示你对共识判断的信心程度 - 如果讨论仍有争议或不够深入,应该返回false - action_items应该是具体可执行的任务 - 请确保返回有效的JSON格式""" @classmethod async def check_consensus( cls, moderator: Agent, context: "DiscussionContext", chatroom: ChatRoom ) -> Dict[str, Any]: """ 检查是否达成共识 Args: moderator: 主持人Agent context: 讨论上下文 chatroom: 聊天室 Returns: 共识判断结果 """ from services.discussion_engine import DiscussionContext # 构建历史记录 history_text = "" for msg in context.messages: if msg.agent_id: history_text += f"[{msg.agent_id}]: {msg.content}\n\n" else: history_text += f"[系统]: {msg.content}\n\n" if not history_text: return { "consensus_reached": False, "confidence": 0, "summary": "讨论尚未开始", "action_items": [], "unresolved_issues": [], "key_decisions": [] } # 构建提示词 prompt = cls.CONSENSUS_PROMPT.format( objective=context.objective, history=history_text ) try: # 调用主持人Agent的AI接口 response = await AIProviderService.chat( provider_id=moderator.provider_id, messages=[{"role": "user", "content": prompt}], temperature=0.3, # 使用较低温度以获得更一致的结果 max_tokens=1000 ) if not response.success: logger.error(f"共识判断失败: {response.error}") return cls._default_result("AI接口调用失败") # 解析JSON响应 content = response.content.strip() # 尝试提取JSON部分 try: # 尝试直接解析 result = json.loads(content) except json.JSONDecodeError: # 尝试提取JSON块 import re json_match = re.search(r'\{[\s\S]*\}', content) if json_match: try: result = json.loads(json_match.group()) except json.JSONDecodeError: logger.warning(f"无法解析共识判断结果: {content}") return cls._default_result("无法解析AI响应") else: return cls._default_result("AI响应格式错误") # 验证和规范化结果 return cls._normalize_result(result) except Exception as e: logger.error(f"共识判断异常: {e}") return cls._default_result(str(e)) @classmethod async def generate_summary( cls, moderator: Agent, context: "DiscussionContext" ) -> str: """ 生成讨论摘要 Args: moderator: 主持人Agent context: 讨论上下文 Returns: 讨论摘要 """ from services.discussion_engine import DiscussionContext # 构建历史记录 history_text = "" for msg in context.messages: if msg.agent_id: history_text += f"[{msg.agent_id}]: {msg.content}\n\n" prompt = f"""请为以下讨论生成一份简洁的摘要。 讨论目标:{context.objective} 对话记录: {history_text} 请提供: 1. 讨论的主要观点和结论 2. 参与者的立场和建议 3. 最终的决策或共识(如果有) 摘要应该简洁明了,控制在300字以内。""" try: response = await AIProviderService.chat( provider_id=moderator.provider_id, messages=[{"role": "user", "content": prompt}], temperature=0.5, max_tokens=500 ) if response.success: return response.content.strip() else: return "无法生成摘要" except Exception as e: logger.error(f"生成摘要异常: {e}") return "生成摘要时发生错误" @classmethod def _default_result(cls, error: str = "") -> Dict[str, Any]: """ 返回默认结果 Args: error: 错误信息 Returns: 默认共识结果 """ return { "consensus_reached": False, "confidence": 0, "summary": error if error else "共识判断失败", "action_items": [], "unresolved_issues": [], "key_decisions": [] } @classmethod def _normalize_result(cls, result: Dict[str, Any]) -> Dict[str, Any]: """ 规范化共识结果 Args: result: 原始结果 Returns: 规范化的结果 """ return { "consensus_reached": bool(result.get("consensus_reached", False)), "confidence": max(0, min(1, float(result.get("confidence", 0)))), "summary": str(result.get("summary", "")), "action_items": list(result.get("action_items", [])), "unresolved_issues": list(result.get("unresolved_issues", [])), "key_decisions": list(result.get("key_decisions", [])) }