Files
AIChatRoom/backend/services/mcp_service.py
Claude Code edbddf855d feat: AI聊天室多Agent协作讨论平台
- 实现Agent管理,支持AI辅助生成系统提示词
- 支持多个AI提供商(OpenRouter、智谱、MiniMax等)
- 实现聊天室和讨论引擎
- WebSocket实时消息推送
- 前端使用React + Ant Design
- 后端使用FastAPI + MongoDB

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-02-03 19:20:02 +08:00

253 lines
6.9 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
MCP服务
管理MCP工具的集成和调用
"""
import json
import os
from typing import List, Dict, Any, Optional
from pathlib import Path
from loguru import logger
class MCPService:
"""
MCP工具服务
集成MCP服务器提供工具调用能力
"""
# MCP服务器配置目录
MCP_CONFIG_DIR = Path(os.getenv("CURSOR_MCP_DIR", "~/.cursor/mcps")).expanduser()
# 已注册的工具: server_name -> List[tool_info]
_registered_tools: Dict[str, List[Dict[str, Any]]] = {}
# Agent工具映射: agent_id -> List[tool_name]
_agent_tools: Dict[str, List[str]] = {}
@classmethod
async def initialize(cls) -> None:
"""
初始化MCP服务
扫描并注册可用的MCP工具
"""
logger.info("初始化MCP服务...")
if not cls.MCP_CONFIG_DIR.exists():
logger.warning(f"MCP配置目录不存在: {cls.MCP_CONFIG_DIR}")
return
# 扫描MCP服务器目录
for server_dir in cls.MCP_CONFIG_DIR.iterdir():
if server_dir.is_dir():
await cls._scan_server(server_dir)
logger.info(f"MCP服务初始化完成已注册 {len(cls._registered_tools)} 个服务器")
@classmethod
async def _scan_server(cls, server_dir: Path) -> None:
"""
扫描MCP服务器目录
Args:
server_dir: 服务器目录
"""
server_name = server_dir.name
tools_dir = server_dir / "tools"
if not tools_dir.exists():
return
tools = []
for tool_file in tools_dir.glob("*.json"):
try:
with open(tool_file, "r", encoding="utf-8") as f:
tool_info = json.load(f)
tool_info["_file"] = str(tool_file)
tools.append(tool_info)
except Exception as e:
logger.warning(f"加载MCP工具配置失败: {tool_file} - {e}")
if tools:
cls._registered_tools[server_name] = tools
logger.debug(f"注册MCP服务器: {server_name}, 工具数: {len(tools)}")
@classmethod
def list_servers(cls) -> List[str]:
"""
列出所有可用的MCP服务器
Returns:
服务器名称列表
"""
return list(cls._registered_tools.keys())
@classmethod
def list_tools(cls, server: Optional[str] = None) -> List[Dict[str, Any]]:
"""
列出可用的MCP工具
Args:
server: 服务器名称(可选,不指定则返回所有)
Returns:
工具信息列表
"""
if server:
return cls._registered_tools.get(server, [])
# 返回所有工具
all_tools = []
for server_name, tools in cls._registered_tools.items():
for tool in tools:
tool_copy = tool.copy()
tool_copy["server"] = server_name
all_tools.append(tool_copy)
return all_tools
@classmethod
def get_tool(cls, server: str, tool_name: str) -> Optional[Dict[str, Any]]:
"""
获取指定工具的信息
Args:
server: 服务器名称
tool_name: 工具名称
Returns:
工具信息或None
"""
tools = cls._registered_tools.get(server, [])
for tool in tools:
if tool.get("name") == tool_name:
return tool
return None
@classmethod
async def call_tool(
cls,
server: str,
tool_name: str,
arguments: Dict[str, Any]
) -> Dict[str, Any]:
"""
调用MCP工具
Args:
server: 服务器名称
tool_name: 工具名称
arguments: 工具参数
Returns:
调用结果
"""
tool = cls.get_tool(server, tool_name)
if not tool:
return {
"success": False,
"error": f"工具不存在: {server}/{tool_name}"
}
# TODO: 实际的MCP工具调用逻辑
# 这里需要根据MCP协议实现工具调用
# 目前返回模拟结果
logger.info(f"调用MCP工具: {server}/{tool_name}, 参数: {arguments}")
return {
"success": True,
"result": f"MCP工具调用: {tool_name}",
"tool": tool_name,
"server": server,
"arguments": arguments
}
@classmethod
def register_tool_for_agent(
cls,
agent_id: str,
tool_name: str
) -> bool:
"""
为Agent注册可用工具
Args:
agent_id: Agent ID
tool_name: 工具名称(格式: server/tool_name
Returns:
是否注册成功
"""
if agent_id not in cls._agent_tools:
cls._agent_tools[agent_id] = []
if tool_name not in cls._agent_tools[agent_id]:
cls._agent_tools[agent_id].append(tool_name)
return True
return False
@classmethod
def unregister_tool_for_agent(
cls,
agent_id: str,
tool_name: str
) -> bool:
"""
为Agent注销工具
Args:
agent_id: Agent ID
tool_name: 工具名称
Returns:
是否注销成功
"""
if agent_id in cls._agent_tools:
if tool_name in cls._agent_tools[agent_id]:
cls._agent_tools[agent_id].remove(tool_name)
return True
return False
@classmethod
def get_agent_tools(cls, agent_id: str) -> List[str]:
"""
获取Agent可用的工具列表
Args:
agent_id: Agent ID
Returns:
工具名称列表
"""
return cls._agent_tools.get(agent_id, [])
@classmethod
def get_tools_for_prompt(cls, agent_id: str) -> str:
"""
获取用于提示词的工具描述
Args:
agent_id: Agent ID
Returns:
工具描述文本
"""
tool_names = cls.get_agent_tools(agent_id)
if not tool_names:
return ""
descriptions = []
for full_name in tool_names:
parts = full_name.split("/", 1)
if len(parts) == 2:
server, tool_name = parts
tool = cls.get_tool(server, tool_name)
if tool:
desc = tool.get("description", "无描述")
descriptions.append(f"- {tool_name}: {desc}")
if not descriptions:
return ""
return "你可以使用以下工具:\n" + "\n".join(descriptions)