Files
dataClean/app/deduplicator.py
congsh 778ccefb22 feat: 任务进度实时展示、接口测试、暗色主题重构及多项 bug 修复
后端
- 新增 app/task_progress.py 线程安全进度注册表
- 任务改为后台线程异步执行(_run_task_background),手动触发立即返回 task_key
- 6 个任务函数(summarizer/tagger/scorer/deduplicator/brief/taxonomy)循环内上报进度
- scheduler 定时任务同步上报进度(trigger=scheduled)
- 新增 GET /api/tasks/progress 与 POST /api/tasks/progress/reset 接口
- 新增 POST /api/test-connection 接口连通性测试(独立短超时客户端)
- 修复 ai_client/rss_client 配置在 import 时固化的 bug(改为 property 运行时读取 settings),
  导致实际任务用 .env 假 key 调 LLM 401
- 修复 ai_client 对 reasoning 模型(MiniMax-M3 等)输出 <think> 块的 JSON 解析失败
- 修复 taxonomy bootstrap:LLM 超时(改用 300s 专用 client)、MiniMax 输出审查
  (精简样本仅标题 + 约束生成中性类目名)、失败误报 success(改抛异常如实标记)
- 修复 models.py 双外键关系映射启动崩溃(显式 foreign_keys)
- 修复 main.py SPA 路由 404、ArticleOut.published_at 序列化 500
- 移除 lifespan 同步 bootstrap 阻塞启动,改由 scheduler 后台异步执行

前端
- Deep Ink 高对比度暗色主题重构,修复 Element Plus 暗色模式对比度问题
- Tasks 页面任务进度实时展示(进度条/阶段/计数/状态/触发来源)+ 1.5s 轮询
- 接口测试面板(rssKeeper / LLM 连通性 + 延迟)
- 修复 nextJobs jobId 映射 bug

部署与文档
- Dockerfile 优化(BuildKit 缓存挂载、预编译 wheel、去 gcc、阿里云镜像源)
- 新增 API.md 接口文档

Co-Authored-By: Claude <noreply@anthropic.com>
2026-06-14 15:14:40 +08:00

230 lines
7.4 KiB
Python

"""文章去重:URL 精确去重 + 标题/内容相似度去重"""
import logging
import re
from datetime import datetime, timedelta, timezone
from difflib import SequenceMatcher
from typing import List, Dict, Tuple, Set
from sqlalchemy.orm import Session
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np
from config import settings
from models import EnrichedArticle, DuplicateGroup
from app.task_progress import update_progress, report_loop_progress
logger = logging.getLogger(__name__)
def _normalize_title(title: str) -> str:
"""标题规范化:去除标点和多余空格,小写,保留中英文数字"""
if not title:
return ""
# 保留:单词字符、CJK 统一表意符号(含扩展 A/B/C/D/E)
title = re.sub(
r"[^\w一-鿿㐀-䶿\U00020000-\U0002a6df\U0002a700-\U0002b73f\U0002b740-\U0002b81f]",
" ",
title,
)
title = " ".join(title.split())
return title.lower()
def _title_similarity(a: str, b: str) -> float:
"""计算标题相似度"""
na = _normalize_title(a)
nb = _normalize_title(b)
if not na or not nb:
return 0.0
return SequenceMatcher(None, na, nb).ratio()
def _content_similarity_matrix(contents: List[str]) -> np.ndarray:
"""使用 TF-IDF + 余弦相似度计算内容相似度矩阵"""
if len(contents) < 2:
return np.zeros((len(contents), len(contents)))
# 过滤空内容
valid_contents = [c or "" for c in contents]
try:
vectorizer = TfidfVectorizer(
max_features=5000,
stop_words="english",
ngram_range=(1, 2),
min_df=1,
)
tfidf = vectorizer.fit_transform(valid_contents)
return cosine_similarity(tfidf)
except Exception as exc:
logger.warning("TF-IDF 相似度计算失败: %s", exc)
return np.zeros((len(contents), len(contents)))
def _find_duplicate_clusters(
articles: List[EnrichedArticle],
title_threshold: float = None,
content_threshold: float = None,
) -> List[Set[int]]:
"""
基于标题相似度和内容相似度找出重复簇。
返回索引簇列表,每个簇是一组 articles 的索引集合。
"""
title_threshold = title_threshold or settings.TITLE_SIMILARITY_THRESHOLD
content_threshold = content_threshold or settings.CONTENT_SIMILARITY_THRESHOLD
n = len(articles)
if n < 2:
return []
contents = []
for art in articles:
text = " ".join([
art.title or "",
art.ai_summary or art.original_summary or "",
art.content or "",
])
contents.append(text[:2000]) # 限制长度加速计算
content_sim = _content_similarity_matrix(contents)
visited = [False] * n
clusters: List[Set[int]] = []
for i in range(n):
if visited[i]:
continue
cluster = {i}
queue = [i]
visited[i] = True
while queue:
cur = queue.pop(0)
for j in range(n):
if visited[j] or cur == j:
continue
title_sim = _title_similarity(articles[cur].title or "", articles[j].title or "")
c_sim = content_sim[cur][j] if cur < n and j < n else 0.0
# 标题高度相似 或 内容高度相似均视为重复
if title_sim >= title_threshold or c_sim >= content_threshold:
cluster.add(j)
queue.append(j)
visited[j] = True
if len(cluster) > 1:
clusters.append(cluster)
return clusters
def _pick_representative(articles: List[EnrichedArticle], indices: Set[int]) -> EnrichedArticle:
"""从重复组中选择代表文章:优先选有 AI 摘要、来源 Feed 分类明确、发布时间最早的"""
candidates = [articles[i] for i in indices]
# 排序:有 AI 摘要优先,然后有 Feed 分类,然后发布时间早
candidates.sort(
key=lambda a: (
bool(a.ai_summary),
bool(a.feed_category),
a.published_at or datetime.min,
),
reverse=True,
)
return candidates[0]
def deduplicate_articles(
db: Session,
date_str: str = None,
title_threshold: float = None,
content_threshold: float = None,
) -> Dict[str, int]:
"""
对指定日期的文章进行去重。
若 date_str 为空则处理今天(UTC)的文章。
返回统计:{"total": x, "duplicate_groups": y, "representatives": z}
"""
if date_str is None:
date_str = datetime.now(timezone.utc).strftime("%Y-%m-%d")
# 只清空该日期已有的去重组,避免破坏历史数据
day_start = datetime.strptime(date_str, "%Y-%m-%d")
day_end = day_start + timedelta(days=1)
old_groups = db.query(DuplicateGroup).filter(DuplicateGroup.brief_date == date_str).all()
for og in old_groups:
for art in og.articles:
art.duplicate_group_id = None
art.is_representative = False
db.delete(og)
db.commit()
# 重置该日期文章的去重标记
articles = (
db.query(EnrichedArticle)
.filter(
EnrichedArticle.fetched_at >= day_start,
EnrichedArticle.fetched_at < day_end,
)
.order_by(EnrichedArticle.published_at)
.all()
)
if not articles:
logger.info("日期 %s 无文章可去重", date_str)
update_progress("tag_score_dedup", status="running", stage="去重", current=0, total=0, message="无文章可去重")
return {"total": 0, "duplicate_groups": 0, "representatives": 0}
update_progress("tag_score_dedup", status="running", stage="计算相似度并去重", current=0, total=0)
# 先 URL 去重:相同 link 只保留一篇
unique_articles: List[EnrichedArticle] = []
seen_links: set = set()
url_dup_count = 0
for art in articles:
link = (art.link or "").strip()
if link and link in seen_links:
url_dup_count += 1
continue
if link:
seen_links.add(link)
unique_articles.append(art)
clusters = _find_duplicate_clusters(
unique_articles,
title_threshold=title_threshold,
content_threshold=content_threshold,
)
stats = {"total": len(articles), "duplicate_groups": len(clusters), "representatives": 0}
update_progress("tag_score_dedup", status="running", stage="写入重复组", current=0, total=len(clusters))
for ci, cluster in enumerate(clusters):
representative = _pick_representative(unique_articles, cluster)
member_ids = [unique_articles[i].id for i in cluster]
group = DuplicateGroup(
representative_article_id=representative.id,
member_article_ids=member_ids,
similarity_matrix={}, # 可后续补充
brief_date=date_str,
)
db.add(group)
db.flush()
for idx in cluster:
art = unique_articles[idx]
art.duplicate_group_id = group.id
art.is_representative = (art.id == representative.id)
stats["representatives"] += 1
report_loop_progress("tag_score_dedup", ci + 1, len(clusters), "写入重复组")
db.commit()
logger.info(
"去重完成: 日期=%s, 总文章=%d, 重复组=%d, URL 重复=%d",
date_str, stats["total"], stats["duplicate_groups"], url_dup_count
)
return stats