""" MCP服务 管理MCP工具的集成和调用 """ import json import os from typing import List, Dict, Any, Optional from pathlib import Path from loguru import logger class MCPService: """ MCP工具服务 集成MCP服务器,提供工具调用能力 """ # MCP服务器配置目录 MCP_CONFIG_DIR = Path(os.getenv("CURSOR_MCP_DIR", "~/.cursor/mcps")).expanduser() # 已注册的工具: server_name -> List[tool_info] _registered_tools: Dict[str, List[Dict[str, Any]]] = {} # Agent工具映射: agent_id -> List[tool_name] _agent_tools: Dict[str, List[str]] = {} @classmethod async def initialize(cls) -> None: """ 初始化MCP服务 扫描并注册可用的MCP工具 """ logger.info("初始化MCP服务...") if not cls.MCP_CONFIG_DIR.exists(): logger.warning(f"MCP配置目录不存在: {cls.MCP_CONFIG_DIR}") return # 扫描MCP服务器目录 for server_dir in cls.MCP_CONFIG_DIR.iterdir(): if server_dir.is_dir(): await cls._scan_server(server_dir) logger.info(f"MCP服务初始化完成,已注册 {len(cls._registered_tools)} 个服务器") @classmethod async def _scan_server(cls, server_dir: Path) -> None: """ 扫描MCP服务器目录 Args: server_dir: 服务器目录 """ server_name = server_dir.name tools_dir = server_dir / "tools" if not tools_dir.exists(): return tools = [] for tool_file in tools_dir.glob("*.json"): try: with open(tool_file, "r", encoding="utf-8") as f: tool_info = json.load(f) tool_info["_file"] = str(tool_file) tools.append(tool_info) except Exception as e: logger.warning(f"加载MCP工具配置失败: {tool_file} - {e}") if tools: cls._registered_tools[server_name] = tools logger.debug(f"注册MCP服务器: {server_name}, 工具数: {len(tools)}") @classmethod def list_servers(cls) -> List[str]: """ 列出所有可用的MCP服务器 Returns: 服务器名称列表 """ return list(cls._registered_tools.keys()) @classmethod def list_tools(cls, server: Optional[str] = None) -> List[Dict[str, Any]]: """ 列出可用的MCP工具 Args: server: 服务器名称(可选,不指定则返回所有) Returns: 工具信息列表 """ if server: return cls._registered_tools.get(server, []) # 返回所有工具 all_tools = [] for server_name, tools in cls._registered_tools.items(): for tool in tools: tool_copy = tool.copy() tool_copy["server"] = server_name all_tools.append(tool_copy) return all_tools @classmethod def get_tool(cls, server: str, tool_name: str) -> Optional[Dict[str, Any]]: """ 获取指定工具的信息 Args: server: 服务器名称 tool_name: 工具名称 Returns: 工具信息或None """ tools = cls._registered_tools.get(server, []) for tool in tools: if tool.get("name") == tool_name: return tool return None @classmethod async def call_tool( cls, server: str, tool_name: str, arguments: Dict[str, Any] ) -> Dict[str, Any]: """ 调用MCP工具 Args: server: 服务器名称 tool_name: 工具名称 arguments: 工具参数 Returns: 调用结果 """ tool = cls.get_tool(server, tool_name) if not tool: return { "success": False, "error": f"工具不存在: {server}/{tool_name}" } # TODO: 实际的MCP工具调用逻辑 # 这里需要根据MCP协议实现工具调用 # 目前返回模拟结果 logger.info(f"调用MCP工具: {server}/{tool_name}, 参数: {arguments}") return { "success": True, "result": f"MCP工具调用: {tool_name}", "tool": tool_name, "server": server, "arguments": arguments } @classmethod def register_tool_for_agent( cls, agent_id: str, tool_name: str ) -> bool: """ 为Agent注册可用工具 Args: agent_id: Agent ID tool_name: 工具名称(格式: server/tool_name) Returns: 是否注册成功 """ if agent_id not in cls._agent_tools: cls._agent_tools[agent_id] = [] if tool_name not in cls._agent_tools[agent_id]: cls._agent_tools[agent_id].append(tool_name) return True return False @classmethod def unregister_tool_for_agent( cls, agent_id: str, tool_name: str ) -> bool: """ 为Agent注销工具 Args: agent_id: Agent ID tool_name: 工具名称 Returns: 是否注销成功 """ if agent_id in cls._agent_tools: if tool_name in cls._agent_tools[agent_id]: cls._agent_tools[agent_id].remove(tool_name) return True return False @classmethod def get_agent_tools(cls, agent_id: str) -> List[str]: """ 获取Agent可用的工具列表 Args: agent_id: Agent ID Returns: 工具名称列表 """ return cls._agent_tools.get(agent_id, []) @classmethod def get_tools_for_prompt(cls, agent_id: str) -> str: """ 获取用于提示词的工具描述 Args: agent_id: Agent ID Returns: 工具描述文本 """ tool_names = cls.get_agent_tools(agent_id) if not tool_names: return "" descriptions = [] for full_name in tool_names: parts = full_name.split("/", 1) if len(parts) == 2: server, tool_name = parts tool = cls.get_tool(server, tool_name) if tool: desc = tool.get("description", "无描述") descriptions.append(f"- {tool_name}: {desc}") if not descriptions: return "" return "你可以使用以下工具:\n" + "\n".join(descriptions)