Files
planManage/app/providers/google.py
锦麟 王 61ce809634 feat: 多平台 Coding Plan 统一管理系统初始实现
- 支持 MiniMax/OpenAI/Google Gemini/智谱/Kimi 五个平台
- 插件化 Provider 架构,自动发现注册
- 多维度 QuotaRule 额度追踪(固定间隔/自然周期/API同步/手动)
- OpenAI + Anthropic 兼容 API 代理,SSE 流式转发
- Model 路由表 + 额度耗尽自动 fallback
- 多媒体任务队列(图片/语音/视频)
- Vue3 + Tailwind 单文件 Web 仪表盘
- Docker 一键部署

Made-with: Cursor
2026-03-31 15:50:42 +08:00

107 lines
3.8 KiB
Python

"""Google Gemini 适配器 -- 转换 OpenAI 格式到 Gemini API"""
from __future__ import annotations
import json
from typing import Any, AsyncGenerator
import httpx
from app.providers.base import BaseProvider, Capability
class GoogleProvider(BaseProvider):
name = "google"
display_name = "Google Gemini"
capabilities = [Capability.CHAT, Capability.IMAGE]
def _gemini_url(self, plan: dict, model: str, method: str = "generateContent") -> str:
base = self._base_url(plan)
api_key = plan.get("api_key", "")
return f"{base}/models/{model}:{method}?key={api_key}"
def _to_gemini_messages(self, messages: list[dict]) -> list[dict]:
"""OpenAI 格式 messages -> Gemini contents"""
contents = []
for m in messages:
role = "user" if m["role"] in ("user", "system") else "model"
contents.append({
"role": role,
"parts": [{"text": m.get("content", "")}],
})
return contents
async def chat(
self,
messages: list[dict],
model: str,
plan: dict,
stream: bool = True,
**kwargs,
) -> AsyncGenerator[str, None]:
if stream:
url = self._gemini_url(plan, model, "streamGenerateContent") + "&alt=sse"
else:
url = self._gemini_url(plan, model)
body = {"contents": self._to_gemini_messages(messages)}
headers = {"Content-Type": "application/json"}
async with httpx.AsyncClient(timeout=120) as client:
if stream:
async with client.stream("POST", url, json=body, headers=headers) as resp:
resp.raise_for_status()
async for line in resp.aiter_lines():
if line.startswith("data: "):
gemini_data = json.loads(line[6:])
oai_chunk = self._gemini_to_openai_chunk(gemini_data, model)
yield f"data: {json.dumps(oai_chunk)}\n\n"
yield "data: [DONE]\n\n"
else:
resp = await client.post(url, json=body, headers=headers)
resp.raise_for_status()
gemini_resp = resp.json()
oai_resp = self._gemini_to_openai_response(gemini_resp, model)
yield json.dumps(oai_resp)
def _gemini_to_openai_chunk(self, data: dict, model: str) -> dict:
"""Gemini SSE chunk -> OpenAI SSE chunk 格式"""
text = ""
candidates = data.get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
if parts:
text = parts[0].get("text", "")
return {
"object": "chat.completion.chunk",
"model": model,
"choices": [{
"index": 0,
"delta": {"content": text},
"finish_reason": None,
}],
}
def _gemini_to_openai_response(self, data: dict, model: str) -> dict:
text = ""
candidates = data.get("candidates", [])
if candidates:
parts = candidates[0].get("content", {}).get("parts", [])
if parts:
text = parts[0].get("text", "")
usage = data.get("usageMetadata", {})
return {
"object": "chat.completion",
"model": model,
"choices": [{
"index": 0,
"message": {"role": "assistant", "content": text},
"finish_reason": "stop",
}],
"usage": {
"prompt_tokens": usage.get("promptTokenCount", 0),
"completion_tokens": usage.get("candidatesTokenCount", 0),
"total_tokens": usage.get("totalTokenCount", 0),
},
}