"""对外 API(供 AI/外部系统调用)""" from typing import Optional from datetime import datetime, timedelta from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from sqlalchemy import desc from database import get_db from models import Article, Feed router = APIRouter(prefix="/external", tags=["external"]) @router.get("/recent") def get_recent_articles( hours: int = 24, limit: int = 50, feed_id: Optional[int] = None, category: Optional[str] = None, db: Session = Depends(get_db), ): """获取最近 N 小时的文章 这是对外提供给 AI 分析的主要接口 """ since = datetime.utcnow() - timedelta(hours=hours) query = db.query(Article, Feed.title.label("feed_title"), Feed.category.label("category")).join(Feed) query = query.filter(Article.created_at >= since) if feed_id: query = query.filter(Article.feed_id == feed_id) if category: query = query.filter(Feed.category == category) rows = query.order_by(desc(Article.published_at)).limit(limit).all() return { "query": { "hours": hours, "limit": limit, "feed_id": feed_id, "category": category, }, "count": len(rows), "articles": [ { "id": article.id, "title": article.title or "", "link": article.link, "author": article.author or "", "summary": article.summary or "", "content": article.content or "" if len(article.content or "") < 10000 else article.summary or "", "published_at": article.published_at.isoformat() if article.published_at else None, "created_at": article.created_at.isoformat(), "feed_title": feed_title or "", "category": category or "", } for article, feed_title, category in rows ], } @router.get("/feeds") def get_active_feeds(db: Session = Depends(get_db)): """获取所有活跃的 RSS 源列表""" feeds = db.query(Feed).filter(Feed.is_active == True).all() return { "count": len(feeds), "feeds": [ { "id": feed.id, "title": feed.title or feed.url, "url": feed.url, "category": feed.category or "", "article_count": feed.article_count, "last_fetch_at": feed.last_fetch_at.isoformat() if feed.last_fetch_at else None, } for feed in feeds ], } @router.get("/feeds/{feed_id}/articles") def get_feed_articles( feed_id: int, limit: int = 100, since: Optional[str] = None, db: Session = Depends(get_db), ): """获取指定 RSS 源的文章""" feed = db.query(Feed).filter(Feed.id == feed_id).first() if not feed: return {"error": "Feed not found"} query = db.query(Article).filter(Article.feed_id == feed_id) if since: query = query.filter(Article.published_at >= since) articles = query.order_by(desc(Article.published_at)).limit(limit).all() return { "feed": { "id": feed.id, "title": feed.title or feed.url, "url": feed.url, }, "count": len(articles), "articles": [ { "id": article.id, "title": article.title or "", "link": article.link, "author": article.author or "", "summary": article.summary or "", "published_at": article.published_at.isoformat() if article.published_at else None, } for article in articles ], } @router.get("/summary") def get_daily_summary( date: Optional[str] = None, db: Session = Depends(get_db), ): """获取指定日期的文章摘要统计 供 AI 快速了解某天的 RSS 内容概况 """ if date: try: day = datetime.strptime(date, "%Y-%m-%d") next_day = day + timedelta(days=1) except ValueError: return {"error": "Invalid date format, use YYYY-MM-DD"} else: day = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) next_day = day + timedelta(days=1) query = db.query(Article, Feed.title.label("feed_title"), Feed.category.label("category")).join(Feed) query = query.filter(Article.created_at >= day, Article.created_at < next_day) rows = query.order_by(desc(Article.published_at)).all() # 按分类统计 by_category = {} for article, feed_title, category in rows: cat = category or "未分类" if cat not in by_category: by_category[cat] = [] by_category[cat].append({ "title": article.title or "", "link": article.link, "feed": feed_title or "", "summary": article.summary or "", }) return { "date": day.strftime("%Y-%m-%d"), "total_articles": len(rows), "by_category": by_category, }