Files
dataClean/scheduler.py
T

167 lines
4.9 KiB
Python
Raw Normal View History

2026-06-12 16:04:03 +08:00
"""APScheduler 定时任务"""
import functools
import logging
import threading
from datetime import datetime, timezone
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.date import DateTrigger
from apscheduler.triggers.interval import IntervalTrigger
from apscheduler.triggers.cron import CronTrigger
from sqlalchemy.orm import Session
from config import settings
from database import SessionLocal
from app.taxonomy import ensure_taxonomy, bootstrap_taxonomy
from app.summarizer import fetch_and_summarize
from app.tagger import tag_articles
from app.deduplicator import deduplicate_articles
from app.scorer import score_articles
from app.brief import generate_daily_brief
from app.settings_manager import get_setting_value
logger = logging.getLogger(__name__)
_scheduler: BackgroundScheduler | None = None
# 任务互斥锁:防止手动任务与定时任务并发执行
_task_lock = threading.Lock()
def get_scheduler() -> BackgroundScheduler:
global _scheduler
if _scheduler is None:
_scheduler = BackgroundScheduler(
job_defaults={
"coalesce": True,
"max_instances": 1,
"misfire_grace_time": 300,
},
timezone="Asia/Shanghai",
)
return _scheduler
def get_task_lock():
"""返回全局任务互斥锁,供手动任务接口使用"""
return _task_lock
def _with_db(func):
"""装饰器:为任务函数提供数据库会话,并记录运行日志"""
@functools.wraps(func)
def wrapper():
acquired = _task_lock.acquire(blocking=False)
if not acquired:
logger.warning("定时任务 %s 跳过:已有其他任务正在执行", func.__name__)
return
db = SessionLocal()
try:
func(db)
except Exception as exc:
logger.error("定时任务 %s 执行失败: %s", func.__name__, exc, exc_info=True)
finally:
db.close()
_task_lock.release()
return wrapper
@_with_db
def job_bootstrap_taxonomy(db: Session):
"""初始化分类体系(仅在表为空时执行)"""
logger.info("执行 taxonomy 初始化检查")
ensure_taxonomy(db)
@_with_db
def job_fetch_and_summarize(db: Session):
"""拉取文章并生成摘要"""
logger.info("执行摘要生成任务")
fetch_and_summarize(db, hours=24, limit=200)
@_with_db
def job_tag_score_deduplicate(db: Session):
"""对当天文章分类、打分、去重"""
logger.info("执行分类/打分/去重任务")
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# 1. 对当天未分类的文章打标签
tag_articles(db)
# 2. 对当天文章去重
deduplicate_articles(db, date_str=today)
# 3. 重新计算分数(含重复性分数)
score_articles(db, update_duplication=True)
@_with_db
def job_generate_daily_brief(db: Session):
"""生成每日简报"""
logger.info("执行每日简报生成任务")
today = datetime.now(timezone.utc).strftime("%Y-%m-%d")
generate_daily_brief(db, date_str=today, force=True)
def init_scheduler():
"""注册并启动所有定时任务"""
scheduler = get_scheduler()
# 从数据库/环境变量读取调度配置
summarize_interval = int(get_setting_value("SUMMARIZE_INTERVAL_MINUTES", settings.SUMMARIZE_INTERVAL_MINUTES))
tag_score_interval = int(get_setting_value("TAG_SCORE_INTERVAL_MINUTES", settings.TAG_SCORE_INTERVAL_MINUTES))
brief_hour = int(get_setting_value("DAILY_BRIEF_HOUR", settings.DAILY_BRIEF_HOUR))
brief_minute = int(get_setting_value("DAILY_BRIEF_MINUTE", settings.DAILY_BRIEF_MINUTE))
# taxonomy 初始化:服务启动后立即执行一次
scheduler.add_job(
job_bootstrap_taxonomy,
trigger=DateTrigger(run_date=datetime.now()),
id="bootstrap_taxonomy",
replace_existing=True,
max_instances=1,
)
# 摘要任务
scheduler.add_job(
job_fetch_and_summarize,
trigger=IntervalTrigger(minutes=summarize_interval),
id="fetch_and_summarize",
replace_existing=True,
)
# 分类/打分/去重任务
scheduler.add_job(
job_tag_score_deduplicate,
trigger=IntervalTrigger(minutes=tag_score_interval),
id="tag_score_deduplicate",
replace_existing=True,
)
# 每日简报
scheduler.add_job(
job_generate_daily_brief,
trigger=CronTrigger(hour=brief_hour, minute=brief_minute),
id="generate_daily_brief",
replace_existing=True,
)
scheduler.start()
logger.info(
"调度器已启动: summarize=%d分钟, tag_score=%d分钟, brief=%02d:%02d",
summarize_interval,
tag_score_interval,
brief_hour,
brief_minute,
)
def stop_scheduler():
"""停止调度器"""
global _scheduler
if _scheduler:
_scheduler.shutdown(wait=False)
_scheduler = None
logger.info("调度器已停止")