"""每日简报生成""" import json import logging from datetime import datetime, timedelta, timezone from pathlib import Path from typing import Dict, Any, List from sqlalchemy.orm import Session from config import settings from models import EnrichedArticle, DailyBrief from app.task_progress import update_progress logger = logging.getLogger(__name__) def _format_article(article: EnrichedArticle) -> Dict[str, Any]: """把文章格式化为简报中的条目""" return { "id": article.id, "rk_article_id": article.rk_article_id, "title": article.title or "", "link": article.link or "", "author": article.author or "", "feed_title": article.feed_title or "", "summary": article.ai_summary or article.original_summary or "", "tags": article.tags or [], "heat_score": article.heat_score, "importance_score": article.importance_score, "duplication_score": article.duplication_score, "composite_score": article.composite_score, "published_at": article.published_at.isoformat() if article.published_at else None, } def _build_markdown(date_str: str, by_category: Dict[str, List[Dict[str, Any]]], stats: Dict[str, int]) -> str: """生成 Markdown 简报""" lines = [ f"# RSS 每日简报 ({date_str})", "", f"- 去重前文章数: {stats['total_articles']}", f"- 去重后文章数: {stats['unique_articles']}", f"- 生成分类数: {len(by_category)}", "", "---", "", ] for category, items in sorted(by_category.items(), key=lambda x: x[0]): lines.append(f"## {category}") lines.append("") for item in items: tags = " ".join([f"`{t}`" for t in item["tags"]]) if item["tags"] else "" lines.append(f"### {item['title']}") lines.append(f"- 来源: {item['feed_title']} | 作者: {item.get('author') or '未知'}") lines.append(f"- 标签: {tags}") lines.append(f"- 热度: {item['heat_score']:.1f} | 重要性: {item['importance_score']:.1f} | 重复度: {item['duplication_score']:.1f} | 综合: {item['composite_score']:.1f}") if item["summary"]: lines.append(f"- 摘要: {item['summary']}") if item["link"]: lines.append(f"- [阅读原文]({item['link']})") lines.append("") return "\n".join(lines) def generate_daily_brief(db: Session, date_str: str = None, force: bool = False) -> Dict[str, Any]: """ 生成指定日期的每日简报。 若 date_str 为空则处理今天。 返回简报数据字典。 """ if date_str is None: date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d") # 检查是否已存在 existing = db.query(DailyBrief).filter(DailyBrief.brief_date == date_str).first() if existing and not force: logger.info("日期 %s 简报已存在,跳过生成", date_str) update_progress("generate_daily_brief", status="running", stage="简报已存在", current=0, total=0, message="简报已存在,跳过生成") return { "date": date_str, "total_articles": existing.total_articles, "unique_articles": existing.unique_articles, "markdown_path": existing.markdown_path, } day_start = datetime.strptime(date_str, "%Y-%m-%d") day_end = day_start + timedelta(days=1) update_progress("generate_daily_brief", status="running", stage="加载文章", current=0, total=0) # 取当天去重后的代表文章 query = ( db.query(EnrichedArticle) .filter( EnrichedArticle.fetched_at >= day_start, EnrichedArticle.fetched_at < day_end, ) ) # 默认只取代表文章或未归入重复组的文章 representative_articles = ( query.filter( (EnrichedArticle.is_representative == True) | (EnrichedArticle.duplicate_group_id == None) ) .order_by(EnrichedArticle.composite_score.desc()) .all() ) # 按分类分组并排序 update_progress("generate_daily_brief", status="running", stage="按分类整理", current=0, total=0) by_category: Dict[str, List[Dict[str, Any]]] = {} for art in representative_articles: cat = art.category or "未分类" if cat not in by_category: by_category[cat] = [] by_category[cat].append(_format_article(art)) # 每个分类只保留 TOP N top_n = settings.BRIEF_TOP_N_PER_CATEGORY for cat in by_category: by_category[cat] = by_category[cat][:top_n] total_before_dedup = query.count() unique_count = sum(len(items) for items in by_category.values()) stats = { "total_articles": total_before_dedup, "unique_articles": unique_count, } # 生成 Markdown 文件 update_progress("generate_daily_brief", status="running", stage="生成 Markdown", current=0, total=0) output_dir = settings.brief_output_dir_path / date_str output_dir.mkdir(parents=True, exist_ok=True) markdown_path = output_dir / "daily-brief.md" markdown_content = _build_markdown(date_str, by_category, stats) markdown_path.write_text(markdown_content, encoding="utf-8") # 更新文章 brief_date update_progress("generate_daily_brief", status="running", stage="保存简报", current=0, total=0) for art in representative_articles: art.brief_date = date_str # 保存到数据库 brief_data = { "date": date_str, "total_articles": stats["total_articles"], "unique_articles": stats["unique_articles"], "by_category": by_category, "markdown_path": str(markdown_path), } if existing: existing.total_articles = stats["total_articles"] existing.unique_articles = stats["unique_articles"] existing.by_category = by_category existing.markdown_path = str(markdown_path) existing.updated_at = datetime.now(timezone.utc) else: db.add( DailyBrief( brief_date=date_str, total_articles=stats["total_articles"], unique_articles=stats["unique_articles"], by_category=by_category, markdown_path=str(markdown_path), ) ) db.commit() logger.info("简报生成完成: 日期=%s, 去重前=%d, 去重后=%d", date_str, stats["total_articles"], stats["unique_articles"]) return brief_data