"""任务进度注册表(进程内内存,线程安全)。 供手动任务、定时任务在执行过程中上报进度,前端通过 GET /api/tasks/progress 轮询读取展示。 单 worker(uvicorn --workers 1)前提下,所有请求/任务线程共享同一份内存。 """ import copy import threading from datetime import datetime, timezone from typing import Optional # 4 个稳定任务 key TASK_KEYS = ("summarize", "tag_score_dedup", "generate_daily_brief", "bootstrap_taxonomy") _progress: dict = {} _lock = threading.Lock() def _now_iso() -> str: return datetime.now(timezone.utc).isoformat() def _init() -> None: """初始化所有任务 key 为 idle""" for key in TASK_KEYS: _progress[key] = { "status": "idle", "stage": "", "current": 0, "total": 0, "message": None, "started_at": None, "updated_at": None, "finished_at": None, "trigger": None, } _init() def update_progress( task_key: str, *, status: Optional[str] = None, stage: Optional[str] = None, current: Optional[int] = None, total: Optional[int] = None, message: Optional[str] = None, trigger: Optional[str] = None, ) -> None: """合并非 None 字段并盖时间戳""" with _lock: entry = _progress.get(task_key) if entry is None: entry = { "status": "idle", "stage": "", "current": 0, "total": 0, "message": None, "started_at": None, "updated_at": None, "finished_at": None, "trigger": None, } _progress[task_key] = entry now = _now_iso() if status == "running" and entry.get("started_at") is None: entry["started_at"] = now if status in ("success", "error"): entry["finished_at"] = now # 若重新进入 running,重置终态时间戳 if status == "running": entry["finished_at"] = None if status is not None: entry["status"] = status if stage is not None: entry["stage"] = stage if current is not None: entry["current"] = current if total is not None: entry["total"] = total if message is not None: entry["message"] = message if trigger is not None: entry["trigger"] = trigger entry["updated_at"] = now def report_loop_progress( task_key: str, index: int, total: int, stage: str, message: Optional[str] = None, every: int = 5, ) -> None: """紧凑循环进度上报:每 `every` 次或最后一次(index==total)才上报,减少加锁""" if index % every == 0 or index >= total: update_progress(task_key, status="running", stage=stage, current=index, total=total, message=message) def get_progress(task_key: Optional[str] = None) -> dict: """返回深拷贝(单个或全部),防止序列化期间被并发修改""" with _lock: if task_key is not None: return copy.deepcopy(_progress.get(task_key)) return copy.deepcopy(_progress) def reset_progress(task_key: str) -> None: """重置单个任务为 idle(前端清除终态显示用)""" with _lock: if task_key in _progress: _progress[task_key] = { "status": "idle", "stage": "", "current": 0, "total": 0, "message": None, "started_at": None, "updated_at": None, "finished_at": None, "trigger": None, }