"""文章管理 API""" from typing import Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel from sqlalchemy.orm import Session from sqlalchemy import desc from database import get_db from models import Article, Feed from fulltext_search import search_articles router = APIRouter(prefix="/articles", tags=["articles"]) class ArticleOut(BaseModel): id: int feed_id: int title: str link: str author: str published_at: Optional[str] summary: str is_read: bool created_at: str feed_title: str category: str class Config: from_attributes = True @router.get("") def list_articles( skip: int = 0, limit: int = 50, feed_id: Optional[int] = None, category: Optional[str] = None, search: Optional[str] = None, since: Optional[str] = None, until: Optional[str] = None, is_read: Optional[bool] = None, db: Session = Depends(get_db), ): """获取文章列表,支持多种筛选条件""" # 如果有搜索关键词,使用 FTS5 全文搜索 if search and search.strip(): results, total = search_articles(search.strip(), limit=limit, offset=skip) return {"total": total, "items": results} query = db.query(Article, Feed.title.label("feed_title"), Feed.category.label("category")).join(Feed) if feed_id: query = query.filter(Article.feed_id == feed_id) if category: query = query.filter(Feed.category == category) if is_read is not None: query = query.filter(Article.is_read == is_read) if since: query = query.filter(Article.published_at >= since) if until: query = query.filter(Article.published_at <= until) total = query.count() rows = query.order_by(desc(Article.published_at)).offset(skip).limit(limit).all() items = [] for article, feed_title, category in rows: items.append({ "id": article.id, "feed_id": article.feed_id, "title": article.title or "", "link": article.link, "author": article.author or "", "published_at": article.published_at.isoformat() if article.published_at else None, "summary": article.summary or "", "is_read": article.is_read, "created_at": article.created_at.isoformat(), "feed_title": feed_title or "", "category": category or "", }) return {"total": total, "items": items} @router.get("/{article_id}") def get_article(article_id: int, db: Session = Depends(get_db)): """获取文章详情""" article = db.query(Article).filter(Article.id == article_id).first() if not article: raise HTTPException(status_code=404, detail="文章不存在") feed = db.query(Feed).filter(Feed.id == article.feed_id).first() return { "id": article.id, "feed_id": article.feed_id, "title": article.title or "", "link": article.link, "author": article.author or "", "published_at": article.published_at.isoformat() if article.published_at else None, "content": article.content or "", "summary": article.summary or "", "is_read": article.is_read, "created_at": article.created_at.isoformat(), "feed_title": feed.title if feed else "", "category": feed.category if feed else "", } @router.put("/{article_id}/read") def mark_read(article_id: int, db: Session = Depends(get_db)): """标记文章为已读""" article = db.query(Article).filter(Article.id == article_id).first() if not article: raise HTTPException(status_code=404, detail="文章不存在") article.is_read = True db.commit() return {"message": "已标记为已读"} @router.get("/search/fulltext") def fulltext_search( q: str, skip: int = 0, limit: int = 50, ): """全文搜索文章""" results, total = search_articles(q, limit=limit, offset=skip) return {"total": total, "items": results}