""" CLI 调用器 通过子进程调用真实的 AI CLI 工具(Claude Code / Kimi CLI / OpenCode), 将 prompt 发送给 CLI 并捕获输出。 支持的 CLI: - claude: Claude Code CLI(使用 -p 参数发送单轮 prompt) - kimi: Kimi CLI(使用 -p 参数发送单轮 prompt) - opencode: OpenCode CLI """ import asyncio import logging import os import re import time import shutil from typing import Optional, Tuple from dataclasses import dataclass logger = logging.getLogger(__name__) # CLI 命令映射:model 前缀 → (二进制名, 构造参数的函数) CLI_REGISTRY = { "claude": "claude", "kimi": "kimi", "opencode": "opencode", } @dataclass class CLIResult: """CLI 调用结果""" content: str cli_name: str exit_code: int latency: float success: bool error: str = "" def detect_available_clis() -> dict: """检测系统中可用的 CLI 工具""" available = {} for name, binary in CLI_REGISTRY.items(): path = shutil.which(binary) if path: available[name] = path return available def resolve_cli(model: str) -> Optional[str]: """ 根据 agent 的 model 字段判断应使用哪个 CLI 规则: - 以 "claude" 开头 → claude CLI - 以 "kimi" 开头 → kimi CLI - 以 "opencode" 开头 → opencode CLI - 完全匹配 CLI 名 → 直接使用 """ model_lower = model.lower().strip() for prefix in CLI_REGISTRY: if model_lower.startswith(prefix): return prefix if model_lower in CLI_REGISTRY: return model_lower return None async def invoke_cli( cli_name: str, prompt: str, timeout: int = 120, max_tokens: int = 1024, system_prompt: str = "", ) -> CLIResult: """ 调用指定的 CLI 工具并返回结果 参数: cli_name: CLI 名称(claude / kimi / opencode) prompt: 要发送的 prompt timeout: 超时秒数 max_tokens: 最大 token 数 """ binary = CLI_REGISTRY.get(cli_name) if not binary: return CLIResult( content="", cli_name=cli_name, exit_code=-1, latency=0, success=False, error=f"未知 CLI: {cli_name}" ) # 必须获取完整路径,否则 subprocess 在不同环境下可能找不到 full_path = shutil.which(binary) if not full_path: return CLIResult( content="", cli_name=cli_name, exit_code=-1, latency=0, success=False, error=f"CLI 未安装: {binary}" ) cmd = _build_command(cli_name, prompt, max_tokens, full_path, system_prompt) logger.info(f"调用 CLI [{cli_name}]: {full_path} (prompt 长度={len(prompt)})") # Windows 下需要设置 PYTHONIOENCODING 解决 GBK 编码问题 env = dict(os.environ) env["PYTHONIOENCODING"] = "utf-8" start = time.time() try: proc = await asyncio.create_subprocess_exec( *cmd, stdin=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, env=env, ) try: # 立即关闭 stdin,防止 CLI 阻塞等待输入 stdout, stderr = await asyncio.wait_for( proc.communicate(input=b""), timeout=timeout ) except asyncio.TimeoutError: proc.kill() await proc.communicate() return CLIResult( content="", cli_name=cli_name, exit_code=-1, latency=time.time() - start, success=False, error=f"CLI 超时 ({timeout}s)" ) latency = time.time() - start stdout_text = stdout.decode("utf-8", errors="replace").strip() stderr_text = stderr.decode("utf-8", errors="replace").strip() # 过滤掉 OpenCode 的 INFO 日志行和 kimi 的框线 stdout_text = _clean_output(cli_name, stdout_text) if proc.returncode == 0 and stdout_text: logger.info(f"CLI [{cli_name}] 完成: {latency:.1f}s, {len(stdout_text)} chars") return CLIResult( content=stdout_text, cli_name=cli_name, exit_code=0, latency=round(latency, 2), success=True, ) else: error_msg = stderr_text or f"退出码 {proc.returncode}" logger.warning(f"CLI [{cli_name}] 失败: {error_msg}") return CLIResult( content=stdout_text or "", cli_name=cli_name, exit_code=proc.returncode or -1, latency=round(latency, 2), success=False, error=error_msg, ) except FileNotFoundError: return CLIResult( content="", cli_name=cli_name, exit_code=-1, latency=0, success=False, error=f"找不到命令: {binary}" ) except Exception as e: return CLIResult( content="", cli_name=cli_name, exit_code=-1, latency=time.time() - start, success=False, error=str(e) ) def _build_command( cli_name: str, prompt: str, max_tokens: int, full_path: str, system_prompt: str = "" ) -> list: """ 为不同 CLI 构造命令行参数 使用完整二进制路径确保跨环境兼容 """ default_sys = ( "这是一个角色扮演讨论场景,不是编程任务。" "请直接用中文回答,不要使用任何工具、不要读取文件、不要执行代码。" "直接给出你作为角色的观点和建议,2-3句话即可。" ) sys_prompt = system_prompt or default_sys if cli_name == "claude": return [ full_path, "-p", prompt, "--output-format", "text", "--system-prompt", sys_prompt, ] elif cli_name == "kimi": return [ full_path, "-p", f"{sys_prompt}\n\n{prompt}", ] elif cli_name == "opencode": return [ full_path, "run", f"{sys_prompt}\n\n{prompt}", "--model", "opencode/minimax-m2.5-free", ] else: return [full_path, "-p", prompt] def _clean_output(cli_name: str, text: str) -> str: """清理 CLI 输出中的框线、日志、prompt 回显等噪音""" if cli_name == "kimi": return _clean_kimi_output(text) lines = text.splitlines() cleaned = [] for line in lines: if line.strip().startswith("INFO "): continue cleaned.append(line) result = "\n".join(cleaned).strip() return result if result else text.strip() def _clean_kimi_output(text: str) -> str: """ Kimi CLI 输出格式: ┌─────────────────────┐ │ (prompt 回显) │ └─────────────────────┘ • 思考过程... • 实际回复内容 需要:1) 移除框线和框内的 prompt 回显 2) 只保留最后一个 bullet 作为实际回复 """ lines = text.splitlines() # 找到框线结束位置(最后一个 └ 或 ╰ 行) box_end = -1 for i, line in enumerate(lines): stripped = line.strip() if stripped and stripped[0] in "└╰" and all( c in "└┘─╰╯ " for c in stripped ): box_end = i # 跳过框线区域 content_lines = lines[box_end + 1:] if box_end >= 0 else lines # Kimi 用 • 输出思考过程和最终回复,最后一个 • 块通常是实际回复 bullets = [] current_bullet = [] for line in content_lines: stripped = line.strip() if not stripped: if current_bullet: current_bullet.append(line) continue if stripped.startswith("• ") or stripped.startswith("? "): if current_bullet: bullets.append("\n".join(current_bullet)) current_bullet = [stripped.lstrip("•? ").strip()] elif current_bullet: current_bullet.append(stripped) if current_bullet: bullets.append("\n".join(current_bullet)) if not bullets: return text.strip() # 最后一个 bullet 是实际回复 return bullets[-1].strip()