- 支持 MiniMax/OpenAI/Google Gemini/智谱/Kimi 五个平台 - 插件化 Provider 架构,自动发现注册 - 多维度 QuotaRule 额度追踪(固定间隔/自然周期/API同步/手动) - OpenAI + Anthropic 兼容 API 代理,SSE 流式转发 - Model 路由表 + 额度耗尽自动 fallback - 多媒体任务队列(图片/语音/视频) - Vue3 + Tailwind 单文件 Web 仪表盘 - Docker 一键部署 Made-with: Cursor
107 lines
3.8 KiB
Python
107 lines
3.8 KiB
Python
"""Google Gemini 适配器 -- 转换 OpenAI 格式到 Gemini API"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import json
|
|
from typing import Any, AsyncGenerator
|
|
|
|
import httpx
|
|
|
|
from app.providers.base import BaseProvider, Capability
|
|
|
|
|
|
class GoogleProvider(BaseProvider):
|
|
name = "google"
|
|
display_name = "Google Gemini"
|
|
capabilities = [Capability.CHAT, Capability.IMAGE]
|
|
|
|
def _gemini_url(self, plan: dict, model: str, method: str = "generateContent") -> str:
|
|
base = self._base_url(plan)
|
|
api_key = plan.get("api_key", "")
|
|
return f"{base}/models/{model}:{method}?key={api_key}"
|
|
|
|
def _to_gemini_messages(self, messages: list[dict]) -> list[dict]:
|
|
"""OpenAI 格式 messages -> Gemini contents"""
|
|
contents = []
|
|
for m in messages:
|
|
role = "user" if m["role"] in ("user", "system") else "model"
|
|
contents.append({
|
|
"role": role,
|
|
"parts": [{"text": m.get("content", "")}],
|
|
})
|
|
return contents
|
|
|
|
async def chat(
|
|
self,
|
|
messages: list[dict],
|
|
model: str,
|
|
plan: dict,
|
|
stream: bool = True,
|
|
**kwargs,
|
|
) -> AsyncGenerator[str, None]:
|
|
if stream:
|
|
url = self._gemini_url(plan, model, "streamGenerateContent") + "&alt=sse"
|
|
else:
|
|
url = self._gemini_url(plan, model)
|
|
|
|
body = {"contents": self._to_gemini_messages(messages)}
|
|
headers = {"Content-Type": "application/json"}
|
|
|
|
async with httpx.AsyncClient(timeout=120) as client:
|
|
if stream:
|
|
async with client.stream("POST", url, json=body, headers=headers) as resp:
|
|
resp.raise_for_status()
|
|
async for line in resp.aiter_lines():
|
|
if line.startswith("data: "):
|
|
gemini_data = json.loads(line[6:])
|
|
oai_chunk = self._gemini_to_openai_chunk(gemini_data, model)
|
|
yield f"data: {json.dumps(oai_chunk)}\n\n"
|
|
yield "data: [DONE]\n\n"
|
|
else:
|
|
resp = await client.post(url, json=body, headers=headers)
|
|
resp.raise_for_status()
|
|
gemini_resp = resp.json()
|
|
oai_resp = self._gemini_to_openai_response(gemini_resp, model)
|
|
yield json.dumps(oai_resp)
|
|
|
|
def _gemini_to_openai_chunk(self, data: dict, model: str) -> dict:
|
|
"""Gemini SSE chunk -> OpenAI SSE chunk 格式"""
|
|
text = ""
|
|
candidates = data.get("candidates", [])
|
|
if candidates:
|
|
parts = candidates[0].get("content", {}).get("parts", [])
|
|
if parts:
|
|
text = parts[0].get("text", "")
|
|
return {
|
|
"object": "chat.completion.chunk",
|
|
"model": model,
|
|
"choices": [{
|
|
"index": 0,
|
|
"delta": {"content": text},
|
|
"finish_reason": None,
|
|
}],
|
|
}
|
|
|
|
def _gemini_to_openai_response(self, data: dict, model: str) -> dict:
|
|
text = ""
|
|
candidates = data.get("candidates", [])
|
|
if candidates:
|
|
parts = candidates[0].get("content", {}).get("parts", [])
|
|
if parts:
|
|
text = parts[0].get("text", "")
|
|
usage = data.get("usageMetadata", {})
|
|
return {
|
|
"object": "chat.completion",
|
|
"model": model,
|
|
"choices": [{
|
|
"index": 0,
|
|
"message": {"role": "assistant", "content": text},
|
|
"finish_reason": "stop",
|
|
}],
|
|
"usage": {
|
|
"prompt_tokens": usage.get("promptTokenCount", 0),
|
|
"completion_tokens": usage.get("candidatesTokenCount", 0),
|
|
"total_tokens": usage.get("totalTokenCount", 0),
|
|
},
|
|
}
|