Files
congsh c59dd304f7 fix: 端口更换 & 代码审核修复
端口:
- 服务端口 8000 → 7329
- 前端开发端口 5173 → 7330

安全:
- CORS 收紧为白名单,关闭 credentials
- SPA 路由白名单完善
- 前端 XSS 转义

可靠性:
- 时区统一为 datetime.now(timezone.utc)
- 文章入库改为内存去重 + 增量计数
- OPML 导入改为 body 参数接收
- OPML 导出 URL XML 转义
- 首次抓取改为 BackgroundTasks 异步
- articles.py HTTPException 移到顶部 import
- FTS5 异常显式日志
- FTS5 查询加引号包裹防布尔注入
- 中文摘要支持中文标点
- 去掉未使用的 hashlib import

部署:
- Dockerfile 锁 python:3.12.7-slim
- requirements 锁定具体版本
- healthcheck 不用 curl(镜像里没有)
- docker-compose 使用 .env 文件
- 新增 .env 配置文件
2026-06-11 14:31:29 +08:00

164 lines
5.0 KiB
Python

"""对外 API(供 AI/外部系统调用)"""
from typing import Optional
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from sqlalchemy import desc
from database import get_db
from models import Article, Feed
router = APIRouter(prefix="/external", tags=["external"])
@router.get("/recent")
def get_recent_articles(
hours: int = 24,
limit: int = 50,
feed_id: Optional[int] = None,
category: Optional[str] = None,
db: Session = Depends(get_db),
):
"""获取最近 N 小时的文章
这是对外提供给 AI 分析的主要接口
"""
since = datetime.now(timezone.utc) - timedelta(hours=hours)
query = db.query(Article, Feed.title.label("feed_title"), Feed.category.label("category")).join(Feed)
query = query.filter(Article.created_at >= since)
if feed_id:
query = query.filter(Article.feed_id == feed_id)
if category:
query = query.filter(Feed.category == category)
rows = query.order_by(desc(Article.published_at)).limit(limit).all()
return {
"query": {
"hours": hours,
"limit": limit,
"feed_id": feed_id,
"category": category,
},
"count": len(rows),
"articles": [
{
"id": article.id,
"title": article.title or "",
"link": article.link,
"author": article.author or "",
"summary": article.summary or "",
"content": article.content or "" if len(article.content or "") < 10000 else article.summary or "",
"published_at": article.published_at.isoformat() if article.published_at else None,
"created_at": article.created_at.isoformat(),
"feed_title": feed_title or "",
"category": category or "",
}
for article, feed_title, category in rows
],
}
@router.get("/feeds")
def get_active_feeds(db: Session = Depends(get_db)):
"""获取所有活跃的 RSS 源列表"""
feeds = db.query(Feed).filter(Feed.is_active == True).all()
return {
"count": len(feeds),
"feeds": [
{
"id": feed.id,
"title": feed.title or feed.url,
"url": feed.url,
"category": feed.category or "",
"article_count": feed.article_count,
"last_fetch_at": feed.last_fetch_at.isoformat() if feed.last_fetch_at else None,
}
for feed in feeds
],
}
@router.get("/feeds/{feed_id}/articles")
def get_feed_articles(
feed_id: int,
limit: int = 100,
since: Optional[str] = None,
db: Session = Depends(get_db),
):
"""获取指定 RSS 源的文章"""
feed = db.query(Feed).filter(Feed.id == feed_id).first()
if not feed:
return {"error": "Feed not found"}
query = db.query(Article).filter(Article.feed_id == feed_id)
if since:
query = query.filter(Article.published_at >= since)
articles = query.order_by(desc(Article.published_at)).limit(limit).all()
return {
"feed": {
"id": feed.id,
"title": feed.title or feed.url,
"url": feed.url,
},
"count": len(articles),
"articles": [
{
"id": article.id,
"title": article.title or "",
"link": article.link,
"author": article.author or "",
"summary": article.summary or "",
"published_at": article.published_at.isoformat() if article.published_at else None,
}
for article in articles
],
}
@router.get("/summary")
def get_daily_summary(
date: Optional[str] = None,
db: Session = Depends(get_db),
):
"""获取指定日期的文章摘要统计
供 AI 快速了解某天的 RSS 内容概况
"""
if date:
try:
day = datetime.strptime(date, "%Y-%m-%d")
next_day = day + timedelta(days=1)
except ValueError:
return {"error": "Invalid date format, use YYYY-MM-DD"}
else:
day = datetime.now(timezone.utc).replace(hour=0, minute=0, second=0, microsecond=0)
next_day = day + timedelta(days=1)
query = db.query(Article, Feed.title.label("feed_title"), Feed.category.label("category")).join(Feed)
query = query.filter(Article.created_at >= day, Article.created_at < next_day)
rows = query.order_by(desc(Article.published_at)).all()
# 按分类统计
by_category = {}
for article, feed_title, category in rows:
cat = category or "未分类"
if cat not in by_category:
by_category[cat] = []
by_category[cat].append({
"title": article.title or "",
"link": article.link,
"feed": feed_title or "",
"summary": article.summary or "",
})
return {
"date": day.strftime("%Y-%m-%d"),
"total_articles": len(rows),
"by_category": by_category,
}