Files

119 lines
3.8 KiB
Python
Raw Permalink Normal View History

"""Task runtime progress tracking service."""
from datetime import datetime, timezone
from typing import Any
from app.core.logging import get_logger
from app.core.redis import get_redis
logger = get_logger(__name__)
TASK_STATUS_IDLE = "idle"
TASK_STATUS_RUNNING = "running"
TASK_STATUS_SUCCESS = "success"
TASK_STATUS_ERROR = "error"
class TaskRuntime:
"""Runtime task progress tracker using Redis."""
def __init__(self):
self._redis = None
async def _get_redis(self):
if self._redis is None:
self._redis = await get_redis()
return self._redis
def _key(self, task_key: str) -> str:
return f"task_progress:{task_key}"
async def update_progress(
self,
task_key: str,
*,
status: str | None = None,
stage: str | None = None,
current: int | None = None,
total: int | None = None,
message: str | None = None,
trigger: str | None = None,
) -> None:
"""Update task progress."""
try:
redis = await self._get_redis()
key = self._key(task_key)
existing = await redis.hgetall(key)
data = dict(existing) if existing else {}
if status:
data["status"] = status
if stage:
data["stage"] = stage
if current is not None:
data["current"] = str(current)
if total is not None:
data["total"] = str(total)
if message is not None:
data["message"] = message
if trigger:
data["trigger"] = trigger
data["updated_at"] = datetime.now(timezone.utc).isoformat()
if status == TASK_STATUS_RUNNING and "started_at" not in data:
data["started_at"] = data["updated_at"]
if status in (TASK_STATUS_SUCCESS, TASK_STATUS_ERROR):
data["finished_at"] = data["updated_at"]
await redis.hset(key, mapping=data)
except Exception as exc:
logger.warning("Failed to update task progress: %s", exc)
async def get_progress(self, task_key: str) -> dict[str, Any]:
"""Get task progress."""
try:
redis = await self._get_redis()
data = await redis.hgetall(self._key(task_key))
if not data:
return self._empty_progress(task_key)
return {
"task_key": task_key,
"status": data.get("status", TASK_STATUS_IDLE),
"stage": data.get("stage", ""),
"current": int(data.get("current", 0)),
"total": int(data.get("total", 0)),
"message": data.get("message"),
"trigger": data.get("trigger"),
"started_at": data.get("started_at"),
"updated_at": data.get("updated_at"),
"finished_at": data.get("finished_at"),
}
except Exception as exc:
logger.warning("Failed to get task progress: %s", exc)
return self._empty_progress(task_key)
async def reset_progress(self, task_key: str) -> None:
"""Reset task progress to idle."""
try:
redis = await self._get_redis()
await redis.delete(self._key(task_key))
except Exception as exc:
logger.warning("Failed to reset task progress: %s", exc)
def _empty_progress(self, task_key: str) -> dict[str, Any]:
return {
"task_key": task_key,
"status": TASK_STATUS_IDLE,
"stage": "",
"current": 0,
"total": 0,
"message": None,
"trigger": None,
"started_at": None,
"updated_at": None,
"finished_at": None,
}
task_runtime = TaskRuntime()