Files
planManage/app/services/quota_service.py
锦麟 王 61ce809634 feat: 多平台 Coding Plan 统一管理系统初始实现
- 支持 MiniMax/OpenAI/Google Gemini/智谱/Kimi 五个平台
- 插件化 Provider 架构,自动发现注册
- 多维度 QuotaRule 额度追踪(固定间隔/自然周期/API同步/手动)
- OpenAI + Anthropic 兼容 API 代理,SSE 流式转发
- Model 路由表 + 额度耗尽自动 fallback
- 多媒体任务队列(图片/语音/视频)
- Vue3 + Tailwind 单文件 Web 仪表盘
- Docker 一键部署

Made-with: Cursor
2026-03-31 15:50:42 +08:00

47 lines
1.4 KiB
Python

"""额度查询聚合服务"""
from __future__ import annotations
from app import database as db
from app.providers import ProviderRegistry
from app.providers.base import QuotaInfo
async def query_plan_quota(plan_id: str) -> list[dict]:
"""查询指定 Plan 的所有 QuotaRule 状态,对 api_sync 类型尝试实时查询"""
plan = await db.get_plan(plan_id)
if not plan:
return []
rules = await db.list_quota_rules(plan_id)
result = []
for r in rules:
info = {
"rule_id": r["id"],
"rule_name": r["rule_name"],
"quota_total": r["quota_total"],
"quota_used": r["quota_used"],
"quota_remaining": r["quota_total"] - r["quota_used"],
"quota_unit": r["quota_unit"],
"refresh_type": r["refresh_type"],
"next_refresh_at": r.get("next_refresh_at"),
}
# api_sync 类型尝试实时查询
if r["refresh_type"] == "api_sync":
provider = ProviderRegistry.get(plan["provider_name"])
if provider:
try:
qi = await provider.query_quota(plan)
if qi:
info["quota_remaining"] = qi.quota_remaining
info["quota_used"] = qi.quota_used
info["api_raw"] = qi.raw
except Exception:
pass
result.append(info)
return result