Files

75 lines
2.0 KiB
Python
Raw Permalink Normal View History

"""APScheduler 定时任务管理"""
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from rss_fetcher import fetch_and_store_feed
import config
_scheduler = None
def get_scheduler():
"""获取或创建调度器实例"""
global _scheduler
if _scheduler is None:
_scheduler = BackgroundScheduler()
return _scheduler
def add_feed_job(feed_id: int, interval_minutes: int):
"""为指定 RSS 源添加定时抓取任务"""
scheduler = get_scheduler()
job_id = f"fetch_feed_{feed_id}"
# 确保间隔不低于最小值
interval = max(interval_minutes, config.MIN_FETCH_INTERVAL)
# 如果任务已存在则更新
existing = scheduler.get_job(job_id)
if existing:
existing.reschedule(trigger=IntervalTrigger(minutes=interval))
return
scheduler.add_job(
fetch_and_store_feed,
trigger=IntervalTrigger(minutes=interval),
id=job_id,
args=[feed_id],
replace_existing=True,
misfire_grace_time=300, # 5分钟容错
coalesce=True, # 合并错过的任务
)
def remove_feed_job(feed_id: int):
"""移除指定 RSS 源的定时任务"""
scheduler = get_scheduler()
job_id = f"fetch_feed_{feed_id}"
try:
scheduler.remove_job(job_id)
except Exception:
pass
def start_scheduler():
"""启动调度器"""
scheduler = get_scheduler()
if not scheduler.running:
scheduler.start()
def stop_scheduler():
"""停止调度器"""
global _scheduler
if _scheduler and _scheduler.running:
_scheduler.shutdown(wait=False)
_scheduler = None
def init_feed_jobs(db):
"""从数据库加载所有活跃 RSS 源并注册定时任务"""
from models import Feed
feeds = db.query(Feed).filter(Feed.is_active == True).all()
for feed in feeds:
add_feed_job(feed.id, feed.fetch_interval_minutes or config.DEFAULT_FETCH_INTERVAL)
start_scheduler()