- 支持 MiniMax/OpenAI/Google Gemini/智谱/Kimi 五个平台 - 插件化 Provider 架构,自动发现注册 - 多维度 QuotaRule 额度追踪(固定间隔/自然周期/API同步/手动) - OpenAI + Anthropic 兼容 API 代理,SSE 流式转发 - Model 路由表 + 额度耗尽自动 fallback - 多媒体任务队列(图片/语音/视频) - Vue3 + Tailwind 单文件 Web 仪表盘 - Docker 一键部署 Made-with: Cursor
47 lines
1.4 KiB
Python
47 lines
1.4 KiB
Python
"""额度查询聚合服务"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from app import database as db
|
|
from app.providers import ProviderRegistry
|
|
from app.providers.base import QuotaInfo
|
|
|
|
|
|
async def query_plan_quota(plan_id: str) -> list[dict]:
|
|
"""查询指定 Plan 的所有 QuotaRule 状态,对 api_sync 类型尝试实时查询"""
|
|
plan = await db.get_plan(plan_id)
|
|
if not plan:
|
|
return []
|
|
|
|
rules = await db.list_quota_rules(plan_id)
|
|
result = []
|
|
|
|
for r in rules:
|
|
info = {
|
|
"rule_id": r["id"],
|
|
"rule_name": r["rule_name"],
|
|
"quota_total": r["quota_total"],
|
|
"quota_used": r["quota_used"],
|
|
"quota_remaining": r["quota_total"] - r["quota_used"],
|
|
"quota_unit": r["quota_unit"],
|
|
"refresh_type": r["refresh_type"],
|
|
"next_refresh_at": r.get("next_refresh_at"),
|
|
}
|
|
|
|
# api_sync 类型尝试实时查询
|
|
if r["refresh_type"] == "api_sync":
|
|
provider = ProviderRegistry.get(plan["provider_name"])
|
|
if provider:
|
|
try:
|
|
qi = await provider.query_quota(plan)
|
|
if qi:
|
|
info["quota_remaining"] = qi.quota_remaining
|
|
info["quota_used"] = qi.quota_used
|
|
info["api_raw"] = qi.raw
|
|
except Exception:
|
|
pass
|
|
|
|
result.append(info)
|
|
|
|
return result
|