Files
multiAgentTry/backend/app/services/role_allocator.py

200 lines
5.7 KiB
Python
Raw Normal View History

"""
角色分配器 - AI 驱动的角色分配
分析任务描述自动为 Agent 分配最适合的角色
"""
import asyncio
from typing import Dict, List, Optional
from dataclasses import dataclass
from enum import Enum
from .agent_registry import AgentRegistry, AgentInfo
class AgentRole(str, Enum):
"""Agent 角色枚举"""
ARCHITECT = "architect"
PRODUCT_MANAGER = "pm"
DEVELOPER = "developer"
QA = "qa"
REVIEWER = "reviewer"
@dataclass
class RoleWeight:
"""角色权重配置"""
role: str
weight: float
keywords: List[str]
def matches(self, text: str) -> int:
"""计算匹配分数"""
score = 0
text_lower = text.lower()
for keyword in self.keywords:
if keyword.lower() in text_lower:
score += 1
return score
class RoleAllocator:
"""
角色分配器
分析任务描述 Agent 分配最适合的角色
"""
# 角色权重配置(来自 design-spec.md
ROLE_WEIGHTS = {
"pm": RoleWeight("pm", 1.5, ["需求", "产品", "规划", "用户", "功能", "priority", "requirement", "product"]),
"architect": RoleWeight("architect", 1.5, ["架构", "设计", "方案", "技术", "系统", "design", "architecture"]),
"developer": RoleWeight("developer", 1.0, ["开发", "实现", "编码", "代码", "function", "implement", "code"]),
"reviewer": RoleWeight("reviewer", 1.3, ["审查", "review", "检查", "验证", "校对", "check"]),
"qa": RoleWeight("qa", 1.2, ["测试", "test", "质量", "bug", "验证", "quality"]),
}
def __init__(self):
pass
def _analyze_task_roles(self, task: str) -> Dict[str, float]:
"""
分析任务需要的角色及其权重
Args:
task: 任务描述
Returns:
角色权重字典
"""
scores = {}
for role_name, role_weight in self.ROLE_WEIGHTS.items():
match_score = role_weight.matches(task)
if match_score > 0:
scores[role_name] = match_score * role_weight.weight
else:
# 即使没有匹配关键词,也给予基础权重
scores[role_name] = 0.1 * role_weight.weight
return scores
async def allocate_roles(
self,
task: str,
available_agents: List[str]
) -> Dict[str, str]:
"""
为任务分配角色
Args:
task: 任务描述
available_agents: 可用的 Agent ID 列表
Returns:
Agent ID -> 角色映射
"""
# 获取所有 Agent 信息
# 注意:在实际实现中,这会从 AgentRegistry 获取
# 这里简化处理,假设已有 Agent 信息
# 分析任务需要的角色
role_scores = self._analyze_task_roles(task)
# 按分数排序角色
sorted_roles = sorted(role_scores.items(), key=lambda x: -x[1])
# 简单分配:将可用 Agent 按顺序分配给角色
allocation = {}
for i, agent_id in enumerate(available_agents):
if i < len(sorted_roles):
allocation[agent_id] = sorted_roles[i][0]
else:
allocation[agent_id] = "developer" # 默认角色
return allocation
def get_primary_role(self, task: str) -> str:
"""
获取任务的主要角色
Args:
task: 任务描述
Returns:
主要角色名称
"""
role_scores = self._analyze_task_roles(task)
if not role_scores:
return "developer"
return max(role_scores.items(), key=lambda x: x[1])[0]
async def suggest_agents_for_task(
self,
task: str,
all_agents: List[AgentInfo],
count: int = 3
) -> List[AgentInfo]:
"""
为任务推荐合适的 Agent
Args:
task: 任务描述
all_agents: 所有可用 Agent 列表
count: 推荐数量
Returns:
推荐的 Agent 列表
"""
primary_role = self.get_primary_role(task)
# 按角色匹配度排序
scored_agents = []
for agent in all_agents:
if agent.role == primary_role:
scored_agents.append((agent, 10)) # 完全匹配高分
elif agent.role in ["architect", "developer", "reviewer"]:
scored_agents.append((agent, 5)) # 相关角色中分
else:
scored_agents.append((agent, 1)) # 其他角色低分
# 按分数排序
scored_agents.sort(key=lambda x: -x[1])
return [agent for agent, _ in scored_agents[:count]]
def explain_allocation(self, task: str, allocation: Dict[str, str]) -> str:
"""
解释角色分配的原因
Args:
task: 任务描述
allocation: 分配结果
Returns:
解释文本
"""
role_scores = self._analyze_task_roles(task)
primary = self.get_primary_role(task)
lines = [f"任务分析: {task}", f"主要角色: {primary}"]
lines.append("角色权重:")
for role, score in sorted(role_scores.items(), key=lambda x: -x[1]):
lines.append(f" - {role}: {score:.2f}")
lines.append("分配结果:")
for agent_id, role in allocation.items():
lines.append(f" - {agent_id}: {role}")
return "\n".join(lines)
# 全局单例
_allocator_instance: Optional[RoleAllocator] = None
def get_role_allocator() -> RoleAllocator:
"""获取角色分配器单例"""
global _allocator_instance
if _allocator_instance is None:
_allocator_instance = RoleAllocator()
return _allocator_instance