118 lines
3.5 KiB
Python
118 lines
3.5 KiB
Python
|
|
"""任务进度注册表(进程内内存,线程安全)。
|
|||
|
|
|
|||
|
|
供手动任务、定时任务在执行过程中上报进度,前端通过
|
|||
|
|
GET /api/tasks/progress 轮询读取展示。
|
|||
|
|
|
|||
|
|
单 worker(uvicorn --workers 1)前提下,所有请求/任务线程共享同一份内存。
|
|||
|
|
"""
|
|||
|
|
import copy
|
|||
|
|
import threading
|
|||
|
|
from datetime import datetime, timezone
|
|||
|
|
from typing import Optional
|
|||
|
|
|
|||
|
|
# 4 个稳定任务 key
|
|||
|
|
TASK_KEYS = ("summarize", "tag_score_dedup", "generate_daily_brief", "bootstrap_taxonomy")
|
|||
|
|
|
|||
|
|
_progress: dict = {}
|
|||
|
|
_lock = threading.Lock()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _now_iso() -> str:
|
|||
|
|
return datetime.now(timezone.utc).isoformat()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def _init() -> None:
|
|||
|
|
"""初始化所有任务 key 为 idle"""
|
|||
|
|
for key in TASK_KEYS:
|
|||
|
|
_progress[key] = {
|
|||
|
|
"status": "idle",
|
|||
|
|
"stage": "",
|
|||
|
|
"current": 0,
|
|||
|
|
"total": 0,
|
|||
|
|
"message": None,
|
|||
|
|
"started_at": None,
|
|||
|
|
"updated_at": None,
|
|||
|
|
"finished_at": None,
|
|||
|
|
"trigger": None,
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
|
|||
|
|
_init()
|
|||
|
|
|
|||
|
|
|
|||
|
|
def update_progress(
|
|||
|
|
task_key: str,
|
|||
|
|
*,
|
|||
|
|
status: Optional[str] = None,
|
|||
|
|
stage: Optional[str] = None,
|
|||
|
|
current: Optional[int] = None,
|
|||
|
|
total: Optional[int] = None,
|
|||
|
|
message: Optional[str] = None,
|
|||
|
|
trigger: Optional[str] = None,
|
|||
|
|
) -> None:
|
|||
|
|
"""合并非 None 字段并盖时间戳"""
|
|||
|
|
with _lock:
|
|||
|
|
entry = _progress.get(task_key)
|
|||
|
|
if entry is None:
|
|||
|
|
entry = {
|
|||
|
|
"status": "idle", "stage": "", "current": 0, "total": 0,
|
|||
|
|
"message": None, "started_at": None, "updated_at": None,
|
|||
|
|
"finished_at": None, "trigger": None,
|
|||
|
|
}
|
|||
|
|
_progress[task_key] = entry
|
|||
|
|
|
|||
|
|
now = _now_iso()
|
|||
|
|
if status == "running" and entry.get("started_at") is None:
|
|||
|
|
entry["started_at"] = now
|
|||
|
|
if status in ("success", "error"):
|
|||
|
|
entry["finished_at"] = now
|
|||
|
|
# 若重新进入 running,重置终态时间戳
|
|||
|
|
if status == "running":
|
|||
|
|
entry["finished_at"] = None
|
|||
|
|
|
|||
|
|
if status is not None:
|
|||
|
|
entry["status"] = status
|
|||
|
|
if stage is not None:
|
|||
|
|
entry["stage"] = stage
|
|||
|
|
if current is not None:
|
|||
|
|
entry["current"] = current
|
|||
|
|
if total is not None:
|
|||
|
|
entry["total"] = total
|
|||
|
|
if message is not None:
|
|||
|
|
entry["message"] = message
|
|||
|
|
if trigger is not None:
|
|||
|
|
entry["trigger"] = trigger
|
|||
|
|
entry["updated_at"] = now
|
|||
|
|
|
|||
|
|
|
|||
|
|
def report_loop_progress(
|
|||
|
|
task_key: str,
|
|||
|
|
index: int,
|
|||
|
|
total: int,
|
|||
|
|
stage: str,
|
|||
|
|
message: Optional[str] = None,
|
|||
|
|
every: int = 5,
|
|||
|
|
) -> None:
|
|||
|
|
"""紧凑循环进度上报:每 `every` 次或最后一次(index==total)才上报,减少加锁"""
|
|||
|
|
if index % every == 0 or index >= total:
|
|||
|
|
update_progress(task_key, status="running", stage=stage, current=index, total=total, message=message)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def get_progress(task_key: Optional[str] = None) -> dict:
|
|||
|
|
"""返回深拷贝(单个或全部),防止序列化期间被并发修改"""
|
|||
|
|
with _lock:
|
|||
|
|
if task_key is not None:
|
|||
|
|
return copy.deepcopy(_progress.get(task_key))
|
|||
|
|
return copy.deepcopy(_progress)
|
|||
|
|
|
|||
|
|
|
|||
|
|
def reset_progress(task_key: str) -> None:
|
|||
|
|
"""重置单个任务为 idle(前端清除终态显示用)"""
|
|||
|
|
with _lock:
|
|||
|
|
if task_key in _progress:
|
|||
|
|
_progress[task_key] = {
|
|||
|
|
"status": "idle", "stage": "", "current": 0, "total": 0,
|
|||
|
|
"message": None, "started_at": None, "updated_at": None,
|
|||
|
|
"finished_at": None, "trigger": None,
|
|||
|
|
}
|