Files
rssKeeper/backend/routers/articles.py
T
congsh c59dd304f7 fix: 端口更换 & 代码审核修复
端口:
- 服务端口 8000 → 7329
- 前端开发端口 5173 → 7330

安全:
- CORS 收紧为白名单,关闭 credentials
- SPA 路由白名单完善
- 前端 XSS 转义

可靠性:
- 时区统一为 datetime.now(timezone.utc)
- 文章入库改为内存去重 + 增量计数
- OPML 导入改为 body 参数接收
- OPML 导出 URL XML 转义
- 首次抓取改为 BackgroundTasks 异步
- articles.py HTTPException 移到顶部 import
- FTS5 异常显式日志
- FTS5 查询加引号包裹防布尔注入
- 中文摘要支持中文标点
- 去掉未使用的 hashlib import

部署:
- Dockerfile 锁 python:3.12.7-slim
- requirements 锁定具体版本
- healthcheck 不用 curl(镜像里没有)
- docker-compose 使用 .env 文件
- 新增 .env 配置文件
2026-06-11 14:31:29 +08:00

133 lines
4.0 KiB
Python

"""文章管理 API"""
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel
from sqlalchemy.orm import Session
from sqlalchemy import desc
from database import get_db
from models import Article, Feed
from fulltext_search import search_articles
router = APIRouter(prefix="/articles", tags=["articles"])
class ArticleOut(BaseModel):
id: int
feed_id: int
title: str
link: str
author: str
published_at: Optional[str]
summary: str
is_read: bool
created_at: str
feed_title: str
category: str
class Config:
from_attributes = True
@router.get("")
def list_articles(
skip: int = 0,
limit: int = 50,
feed_id: Optional[int] = None,
category: Optional[str] = None,
search: Optional[str] = None,
since: Optional[str] = None,
until: Optional[str] = None,
is_read: Optional[bool] = None,
db: Session = Depends(get_db),
):
"""获取文章列表,支持多种筛选条件"""
# 如果有搜索关键词,使用 FTS5 全文搜索
if search and search.strip():
results, total = search_articles(search.strip(), limit=limit, offset=skip)
return {"total": total, "items": results}
query = db.query(Article, Feed.title.label("feed_title"), Feed.category.label("category")).join(Feed)
if feed_id:
query = query.filter(Article.feed_id == feed_id)
if category:
query = query.filter(Feed.category == category)
if is_read is not None:
query = query.filter(Article.is_read == is_read)
if since:
query = query.filter(Article.published_at >= since)
if until:
query = query.filter(Article.published_at <= until)
total = query.count()
rows = query.order_by(desc(Article.published_at)).offset(skip).limit(limit).all()
items = []
for article, feed_title, category in rows:
items.append({
"id": article.id,
"feed_id": article.feed_id,
"title": article.title or "",
"link": article.link,
"author": article.author or "",
"published_at": article.published_at.isoformat() if article.published_at else None,
"summary": article.summary or "",
"is_read": article.is_read,
"created_at": article.created_at.isoformat(),
"feed_title": feed_title or "",
"category": category or "",
})
return {"total": total, "items": items}
@router.get("/{article_id}")
def get_article(article_id: int, db: Session = Depends(get_db)):
"""获取文章详情"""
article = db.query(Article).filter(Article.id == article_id).first()
if not article:
raise HTTPException(status_code=404, detail="文章不存在")
feed = db.query(Feed).filter(Feed.id == article.feed_id).first()
return {
"id": article.id,
"feed_id": article.feed_id,
"title": article.title or "",
"link": article.link,
"author": article.author or "",
"published_at": article.published_at.isoformat() if article.published_at else None,
"content": article.content or "",
"summary": article.summary or "",
"is_read": article.is_read,
"created_at": article.created_at.isoformat(),
"feed_title": feed.title if feed else "",
"category": feed.category if feed else "",
}
@router.put("/{article_id}/read")
def mark_read(article_id: int, db: Session = Depends(get_db)):
"""标记文章为已读"""
article = db.query(Article).filter(Article.id == article_id).first()
if not article:
raise HTTPException(status_code=404, detail="文章不存在")
article.is_read = True
db.commit()
return {"message": "已标记为已读"}
@router.get("/search/fulltext")
def fulltext_search(
q: str,
skip: int = 0,
limit: int = 50,
):
"""全文搜索文章"""
results, total = search_articles(q, limit=limit, offset=skip)
return {"total": total, "items": results}