"""APScheduler 定时任务""" import functools import logging import threading from datetime import datetime, timezone from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.date import DateTrigger from apscheduler.triggers.interval import IntervalTrigger from apscheduler.triggers.cron import CronTrigger from sqlalchemy.orm import Session from config import settings from database import SessionLocal from app.taxonomy import ensure_taxonomy, bootstrap_taxonomy from app.summarizer import fetch_and_summarize from app.tagger import tag_articles from app.deduplicator import deduplicate_articles from app.scorer import score_articles from app.brief import generate_daily_brief from app.settings_manager import get_setting_value from app import task_progress logger = logging.getLogger(__name__) _scheduler: BackgroundScheduler | None = None # 任务互斥锁:防止手动任务与定时任务并发执行 _task_lock = threading.Lock() # 定时任务函数名 → 进度 key 映射 _JOB_TASK_KEYS = { "job_fetch_and_summarize": "summarize", "job_tag_score_deduplicate": "tag_score_dedup", "job_generate_daily_brief": "generate_daily_brief", "job_bootstrap_taxonomy": "bootstrap_taxonomy", } def get_scheduler() -> BackgroundScheduler: global _scheduler if _scheduler is None: _scheduler = BackgroundScheduler( job_defaults={ "coalesce": True, "max_instances": 1, "misfire_grace_time": 300, }, timezone="Asia/Shanghai", ) return _scheduler def get_task_lock(): """返回全局任务互斥锁,供手动任务接口使用""" return _task_lock def _with_db(func): """装饰器:为任务函数提供数据库会话,并记录运行日志,同时上报进度""" @functools.wraps(func) def wrapper(): acquired = _task_lock.acquire(blocking=False) if not acquired: logger.warning("定时任务 %s 跳过:已有其他任务正在执行", func.__name__) return task_key = _JOB_TASK_KEYS.get(func.__name__) db = SessionLocal() if task_key: task_progress.update_progress( task_key, status="running", trigger="scheduled", stage="初始化", current=0, total=0, message=None, ) try: func(db) if task_key: task_progress.update_progress( task_key, status="success", stage="完成", message="定时任务执行成功" ) except Exception as exc: logger.error("定时任务 %s 执行失败: %s", func.__name__, exc, exc_info=True) if task_key: task_progress.update_progress( task_key, status="error", stage="失败", message=str(exc)[:500] ) finally: db.close() _task_lock.release() return wrapper @_with_db def job_bootstrap_taxonomy(db: Session): """初始化分类体系(仅在表为空时执行)""" logger.info("执行 taxonomy 初始化检查") ensure_taxonomy(db) @_with_db def job_fetch_and_summarize(db: Session): """拉取文章并生成摘要""" logger.info("执行摘要生成任务") fetch_and_summarize(db, hours=24, limit=200) @_with_db def job_tag_score_deduplicate(db: Session): """对当天文章分类、打分、去重""" logger.info("执行分类/打分/去重任务") today = datetime.now(timezone.utc).strftime("%Y-%m-%d") # 1. 对当天未分类的文章打标签 tag_articles(db) # 2. 对当天文章去重 deduplicate_articles(db, date_str=today) # 3. 重新计算分数(含重复性分数) score_articles(db, update_duplication=True) @_with_db def job_generate_daily_brief(db: Session): """生成每日简报""" logger.info("执行每日简报生成任务") today = datetime.now(timezone.utc).strftime("%Y-%m-%d") generate_daily_brief(db, date_str=today, force=True) def init_scheduler(): """注册并启动所有定时任务""" scheduler = get_scheduler() # 从数据库/环境变量读取调度配置 summarize_interval = int(get_setting_value("SUMMARIZE_INTERVAL_MINUTES", settings.SUMMARIZE_INTERVAL_MINUTES)) tag_score_interval = int(get_setting_value("TAG_SCORE_INTERVAL_MINUTES", settings.TAG_SCORE_INTERVAL_MINUTES)) brief_hour = int(get_setting_value("DAILY_BRIEF_HOUR", settings.DAILY_BRIEF_HOUR)) brief_minute = int(get_setting_value("DAILY_BRIEF_MINUTE", settings.DAILY_BRIEF_MINUTE)) # taxonomy 初始化:服务启动后立即执行一次 scheduler.add_job( job_bootstrap_taxonomy, trigger=DateTrigger(run_date=datetime.now()), id="bootstrap_taxonomy", replace_existing=True, max_instances=1, ) # 摘要任务 scheduler.add_job( job_fetch_and_summarize, trigger=IntervalTrigger(minutes=summarize_interval), id="fetch_and_summarize", replace_existing=True, ) # 分类/打分/去重任务 scheduler.add_job( job_tag_score_deduplicate, trigger=IntervalTrigger(minutes=tag_score_interval), id="tag_score_deduplicate", replace_existing=True, ) # 每日简报 scheduler.add_job( job_generate_daily_brief, trigger=CronTrigger(hour=brief_hour, minute=brief_minute), id="generate_daily_brief", replace_existing=True, ) scheduler.start() logger.info( "调度器已启动: summarize=%d分钟, tag_score=%d分钟, brief=%02d:%02d", summarize_interval, tag_score_interval, brief_hour, brief_minute, ) def stop_scheduler(): """停止调度器""" global _scheduler if _scheduler: _scheduler.shutdown(wait=False) _scheduler = None logger.info("调度器已停止")