""" 人类输入服务 - 管理人类参与者的任务请求和会议评论 """ import asyncio import uuid from datetime import datetime from typing import Dict, List, Optional from dataclasses import dataclass, field, asdict from .storage import get_storage @dataclass class TaskRequest: """人类任务请求""" id: str from_user: str # 提交者 ID timestamp: str # 提交时间 priority: str # high | medium | low type: str # 任务类型 title: str = "" # 任务标题 content: str = "" # 任务内容 target_files: List[str] = field(default_factory=list) suggested_agent: str = "" # 建议的 Agent urgent: bool = False status: str = "pending" # pending | processing | completed | rejected @property def is_urgent(self) -> bool: """是否为紧急任务(高优先级 + urgent 标记)""" return self.priority == "high" and self.urgent @dataclass class MeetingComment: """会议评论""" id: str from_user: str # 提交者 ID meeting_id: str timestamp: str type: str # proposal | question | correction priority: str = "normal" content: str = "" status: str = "pending" # pending | addressed | ignored @dataclass class HumanParticipant: """人类参与者信息""" id: str name: str role: str = "" # tech_lead | product_owner | developer status: str = "offline" # online | offline | busy avatar: str = "👤" class HumanInputService: """ 人类输入服务 管理 humans.json 文件,处理人类任务请求和会议评论 """ HUMANS_FILE = "humans.json" # 优先级权重 PRIORITY_WEIGHT = { "high": 3, "medium": 2, "low": 1, "normal": 0 } def __init__(self): self._storage = get_storage() self._lock = asyncio.Lock() async def _load_humans(self) -> Dict: """加载 humans.json""" return await self._storage.read_json(self.HUMANS_FILE) async def _save_humans(self, data: Dict) -> None: """保存 humans.json""" await self._storage.write_json(self.HUMANS_FILE, data) async def _ensure_structure(self) -> None: """确保 humans.json 结构完整""" async with self._lock: data = await self._load_humans() if not data: data = { "version": "1.0", "last_updated": datetime.now().isoformat(), "participants": {}, "task_requests": [], "meeting_comments": [], "human_states": {} } await self._save_humans(data) async def register_participant( self, user_id: str, name: str, role: str = "", avatar: str = "👤" ) -> None: """注册人类参与者""" await self._ensure_structure() async with self._lock: data = await self._load_humans() data["participants"][user_id] = asdict(HumanParticipant( id=user_id, name=name, role=role, avatar=avatar )) data["last_updated"] = datetime.now().isoformat() await self._save_humans(data) async def add_task_request( self, from_user: str, content: str, priority: str = "medium", task_type: str = "user_task", title: str = "", target_files: List[str] = None, suggested_agent: str = "", urgent: bool = False ) -> str: """ 添加任务请求 Returns: 任务 ID """ await self._ensure_structure() async with self._lock: data = await self._load_humans() task_id = f"req_{uuid.uuid4().hex[:8]}" task = TaskRequest( id=task_id, from_user=from_user, timestamp=datetime.now().isoformat(), priority=priority, type=task_type, title=title, content=content, target_files=target_files or [], suggested_agent=suggested_agent, urgent=urgent ) # 保存时转换为 JSON 兼容格式(from_user -> from) task_dict = asdict(task) task_dict["from"] = task_dict.pop("from_user") data["task_requests"].append(task_dict) data["last_updated"] = datetime.now().isoformat() await self._save_humans(data) return task_id async def get_pending_tasks( self, priority_filter: str = None, agent_filter: str = None ) -> List[TaskRequest]: """ 获取待处理的任务请求 Args: priority_filter: 过滤优先级 agent_filter: 过滤建议的 Agent Returns: 按优先级排序的任务列表 """ await self._ensure_structure() data = await self._load_humans() tasks = [] for t in data.get("task_requests", []): if t["status"] != "pending": continue if priority_filter and t["priority"] != priority_filter: continue if agent_filter and t.get("suggested_agent") != agent_filter: continue # 转换 JSON 格式(from -> from_user) t_dict = t.copy() t_dict["from_user"] = t_dict.pop("from", "") tasks.append(TaskRequest(**t_dict)) # 按优先级排序 tasks.sort( key=lambda t: ( -self.PRIORITY_WEIGHT.get(t.priority, 0), -t.urgent, t.timestamp ) ) return tasks async def get_urgent_tasks(self) -> List[TaskRequest]: """获取紧急任务(高优先级 + urgent)""" return [t for t in await self.get_pending_tasks() if t.is_urgent] async def mark_task_processing(self, task_id: str) -> bool: """标记任务为处理中""" async with self._lock: data = await self._load_humans() for task in data.get("task_requests", []): if task["id"] == task_id: task["status"] = "processing" data["last_updated"] = datetime.now().isoformat() await self._save_humans(data) return True return False async def mark_task_completed(self, task_id: str) -> bool: """标记任务为已完成""" async with self._lock: data = await self._load_humans() for task in data.get("task_requests", []): if task["id"] == task_id: task["status"] = "completed" data["last_updated"] = datetime.now().isoformat() await self._save_humans(data) return True return False async def add_meeting_comment( self, from_user: str, meeting_id: str, content: str, comment_type: str = "proposal", priority: str = "normal" ) -> str: """ 添加会议评论 Returns: 评论 ID """ await self._ensure_structure() async with self._lock: data = await self._load_humans() comment_id = f"comment_{uuid.uuid4().hex[:8]}" comment = MeetingComment( id=comment_id, from_user=from_user, meeting_id=meeting_id, timestamp=datetime.now().isoformat(), type=comment_type, priority=priority, content=content ) # 保存时转换为 JSON 兼容格式(from_user -> from) comment_dict = asdict(comment) comment_dict["from"] = comment_dict.pop("from_user") data["meeting_comments"].append(comment_dict) data["last_updated"] = datetime.now().isoformat() await self._save_humans(data) return comment_id async def get_pending_comments(self, meeting_id: str = None) -> List[MeetingComment]: """ 获取待处理的会议评论 Args: meeting_id: 过滤指定会议的评论 Returns: 评论列表 """ await self._ensure_structure() data = await self._load_humans() comments = [] for c in data.get("meeting_comments", []): if c["status"] != "pending": continue if meeting_id and c["meeting_id"] != meeting_id: continue # 转换 JSON 格式(from -> from_user) c_dict = c.copy() c_dict["from_user"] = c_dict.pop("from", "") comments.append(MeetingComment(**c_dict)) return comments async def mark_comment_addressed(self, comment_id: str) -> bool: """标记评论为已处理""" async with self._lock: data = await self._load_humans() for comment in data["meeting_comments"]: if comment["id"] == comment_id: comment["status"] = "addressed" data["last_updated"] = datetime.now().isoformat() await self._save_humans(data) return True return False async def get_participants(self) -> List[HumanParticipant]: """获取所有人类参与者""" await self._ensure_structure() data = await self._load_humans() participants = [] for p in data.get("participants", {}).values(): participants.append(HumanParticipant(**p)) return participants async def update_user_status( self, user_id: str, status: str, current_focus: str = "" ) -> bool: """更新用户状态""" await self._ensure_structure() async with self._lock: data = await self._load_humans() if user_id not in data.get("participants", {}): return False data["participants"][user_id]["status"] = status if "human_states" not in data: data["human_states"] = {} data["human_states"][user_id] = { "status": status, "current_focus": current_focus, "last_update": datetime.now().isoformat() } data["last_updated"] = datetime.now().isoformat() await self._save_humans(data) return True async def get_summary(self) -> Dict: """获取人类输入服务的摘要信息""" await self._ensure_structure() data = await self._load_humans() pending_tasks = await self.get_pending_tasks() urgent_tasks = await self.get_urgent_tasks() pending_comments = await self.get_pending_comments() participants = await self.get_participants() return { "participants": len(participants), "online_users": len([p for p in participants if p.status == "online"]), "pending_tasks": len(pending_tasks), "urgent_tasks": len(urgent_tasks), "pending_comments": len(pending_comments), "last_updated": data.get("last_updated", "") } # 全局单例 _human_input_service: Optional[HumanInputService] = None def get_human_input_service() -> HumanInputService: """获取人类输入服务单例""" global _human_input_service if _human_input_service is None: _human_input_service = HumanInputService() return _human_input_service