From 778ccefb220dc7730833dfa4571ee0307167c683 Mon Sep 17 00:00:00 2001 From: congsh Date: Sun, 14 Jun 2026 15:14:40 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E4=BB=BB=E5=8A=A1=E8=BF=9B=E5=BA=A6?= =?UTF-8?q?=E5=AE=9E=E6=97=B6=E5=B1=95=E7=A4=BA=E3=80=81=E6=8E=A5=E5=8F=A3?= =?UTF-8?q?=E6=B5=8B=E8=AF=95=E3=80=81=E6=9A=97=E8=89=B2=E4=B8=BB=E9=A2=98?= =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=8F=8A=E5=A4=9A=E9=A1=B9=20bug=20=E4=BF=AE?= =?UTF-8?q?=E5=A4=8D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 后端 - 新增 app/task_progress.py 线程安全进度注册表 - 任务改为后台线程异步执行(_run_task_background),手动触发立即返回 task_key - 6 个任务函数(summarizer/tagger/scorer/deduplicator/brief/taxonomy)循环内上报进度 - scheduler 定时任务同步上报进度(trigger=scheduled) - 新增 GET /api/tasks/progress 与 POST /api/tasks/progress/reset 接口 - 新增 POST /api/test-connection 接口连通性测试(独立短超时客户端) - 修复 ai_client/rss_client 配置在 import 时固化的 bug(改为 property 运行时读取 settings), 导致实际任务用 .env 假 key 调 LLM 401 - 修复 ai_client 对 reasoning 模型(MiniMax-M3 等)输出 块的 JSON 解析失败 - 修复 taxonomy bootstrap:LLM 超时(改用 300s 专用 client)、MiniMax 输出审查 (精简样本仅标题 + 约束生成中性类目名)、失败误报 success(改抛异常如实标记) - 修复 models.py 双外键关系映射启动崩溃(显式 foreign_keys) - 修复 main.py SPA 路由 404、ArticleOut.published_at 序列化 500 - 移除 lifespan 同步 bootstrap 阻塞启动,改由 scheduler 后台异步执行 前端 - Deep Ink 高对比度暗色主题重构,修复 Element Plus 暗色模式对比度问题 - Tasks 页面任务进度实时展示(进度条/阶段/计数/状态/触发来源)+ 1.5s 轮询 - 接口测试面板(rssKeeper / LLM 连通性 + 延迟) - 修复 nextJobs jobId 映射 bug 部署与文档 - Dockerfile 优化(BuildKit 缓存挂载、预编译 wheel、去 gcc、阿里云镜像源) - 新增 API.md 接口文档 Co-Authored-By: Claude --- .gitignore | 7 + API.md | 434 +++++++++++++++++++++++++ Dockerfile | 27 +- app/ai_client.py | 105 ++++-- app/brief.py | 7 + app/deduplicator.py | 8 +- app/rss_client.py | 20 +- app/scorer.py | 3 + app/summarizer.py | 7 +- app/tagger.py | 3 + app/task_progress.py | 117 +++++++ app/taxonomy.py | 39 +-- frontend/index.html | 5 +- frontend/src/App.vue | 54 ++-- frontend/src/api/index.js | 5 + frontend/src/style.css | 462 +++++++++++++++++++++++---- frontend/src/views/ArticleDetail.vue | 91 +++--- frontend/src/views/BriefDetail.vue | 46 +-- frontend/src/views/Dashboard.vue | 58 +++- frontend/src/views/Tasks.vue | 389 ++++++++++++++++++++-- frontend/src/views/Taxonomy.vue | 57 ++-- main.py | 192 +++++++++-- models.py | 4 +- scheduler.py | 25 +- 24 files changed, 1853 insertions(+), 312 deletions(-) create mode 100644 API.md create mode 100644 app/task_progress.py diff --git a/.gitignore b/.gitignore index b8dddb1..e55f97f 100644 --- a/.gitignore +++ b/.gitignore @@ -51,3 +51,10 @@ data/ # 系统文件 .DS_Store + +# 截图与 Playwright 调试产物 +*.png +.playwright-mcp/ + +# Claude 计划文件 +.claude/ diff --git a/API.md b/API.md new file mode 100644 index 0000000..45759d8 --- /dev/null +++ b/API.md @@ -0,0 +1,434 @@ +# dataClean 接口文档 + +> 服务地址:`http://:7331` +> 所有 `/api/*` 接口(除 `/health`)在配置了 `API_TOKEN` 时需要在请求头携带 `Authorization: Bearer `。 + +## 目录 + +- [鉴权](#鉴权) +- [健康检查](#健康检查) +- [文章接口](#文章接口) +- [简报接口](#简报接口) +- [分类体系接口](#分类体系接口) +- [任务接口](#任务接口) +- [任务进度](#任务进度) +- [接口连通性测试](#接口连通性测试) +- [配置管理接口](#配置管理接口) +- [仪表盘统计](#仪表盘统计) +- [错误码](#错误码) + +--- + +## 鉴权 + +若服务端配置了 `API_TOKEN`,除 `/health` 外所有接口需要在请求头携带: + +``` +Authorization: Bearer +``` + +未携带或 token 无效时返回 `401` / `403`。未配置 `API_TOKEN` 时不启用鉴权(仅建议内网使用)。 + +--- + +## 健康检查 + +### `GET /health` + +服务存活探针,无需鉴权。 + +**响应** + +```json +{ "status": "ok", "service": "dataClean" } +``` + +--- + +## 文章接口 + +### `GET /api/articles` + +分页查询加工后的文章,支持按日期、分类、标签过滤。 + +**查询参数** + +| 参数 | 类型 | 必填 | 默认 | 说明 | +|------|------|------|------|------| +| `date` | string | 否 | - | 日期 `YYYY-MM-DD`,按 `fetched_at` 过滤当天 | +| `category` | string | 否 | - | 精确分类名 | +| `tag` | string | 否 | - | 标签名(JSON 数组精确匹配) | +| `representative_only` | bool | 否 | `false` | 仅返回重复组代表文章 | +| `limit` | int | 否 | `50` | 1–200 | +| `offset` | int | 否 | `0` | 分页偏移 | + +**响应** `ArticleListOut` + +```json +{ + "total": 200, + "items": [ + { + "id": 1, + "rk_article_id": 28124, + "title": "文章标题", + "link": "https://...", + "feed_title": "来源", + "category": "科技", + "tags": ["AI", "芯片"], + "heat_score": 45.2, + "importance_score": 60.0, + "duplication_score": 25.0, + "composite_score": 52.56, + "ai_summary": "AI 生成的摘要", + "is_representative": false, + "published_at": "2026-06-13T13:48:42" + } + ] +} +``` + +### `GET /api/articles/{article_id}` + +获取单篇文章详情。 + +**路径参数** `article_id`:int + +**响应** `ArticleOut`(同上 items 元素) + +**错误** `404` 文章不存在 + +--- + +## 简报接口 + +### `GET /api/briefs` + +列出每日简报(按日期倒序)。 + +**查询参数** `limit`:int,默认 30,范围 1–100 + +**响应** `List[BriefOut]` + +```json +[ + { + "id": 1, + "brief_date": "2026-06-13", + "total_articles": 200, + "unique_articles": 150, + "by_category": { "科技": [{...}], "财经": [{...}] }, + "markdown_path": "/app/data/briefs/2026-06-13/daily-brief.md" + } +] +``` + +### `GET /api/briefs/{date}` + +获取指定日期简报。 + +**路径参数** `date`:string,`YYYY-MM-DD` + +**响应** `BriefOut` **错误** `404` 简报不存在 + +### `POST /api/briefs/{date}/regenerate` + +强制重新生成指定日期简报。同步执行(需持任务锁)。 + +**响应** + +```json +{ "message": "简报已重新生成", "data": { ... } } +``` + +**错误** `409` 已有任务执行中 + +--- + +## 分类体系接口 + +### `GET /api/taxonomy` + +列出分类/标签/打分规则。 + +**查询参数** `kind`:string,可选,过滤类型:`category` / `tag` / `heat_rule` / `importance_rule` / `duplication_rule` + +**响应** `List[TaxonomyOut]` + +```json +[ + { + "id": 1, + "name": "科技", + "kind": "category", + "description": "人工智能、芯片、互联网等", + "keywords": ["AI", "芯片", "大模型"], + "weight": 1.0, + "created_by_ai": true + } +] +``` + +### `POST /api/taxonomy/bootstrap` + +初始化或强制重建分类体系(后台异步执行)。 + +**查询参数** `force`:bool,默认 `false`。`true` 时清空后重建。 + +**响应**(立即返回) + +```json +{ "message": "taxonomy 初始化已开始", "task_key": "bootstrap_taxonomy" } +``` + +**错误** `409` 已有任务执行中。可通过 [`GET /api/tasks/progress`](#任务进度) 查看 `bootstrap_taxonomy` 进度。 + +--- + +## 任务接口 + +所有任务接口均为**后台异步执行**:提交后立即返回 `task_key`,任务在线程池执行,通过[进度接口](#任务进度)轮询。 + +任务全局互斥(共享 `_task_lock`):同一时刻仅一个任务运行。 + +### `POST /api/tasks/summarize` + +拉取 rssKeeper 最近 24 小时文章,为无摘要/短摘要文章生成 AI 摘要。 + +**响应** + +```json +{ "message": "摘要任务已开始", "task_key": "summarize" } +``` + +**错误** `409` 已有任务执行中 + +### `POST /api/tasks/tag-score-dedup` + +对当天文章执行:分类打标 → 去重 → 打分(三阶段,进度合并显示)。 + +**响应** + +```json +{ "message": "分类/去重/打分任务已开始", "task_key": "tag_score_dedup" } +``` + +### `POST /api/tasks/brief` + +生成当天每日简报(force 重新生成)。 + +**响应** + +```json +{ "message": "简报生成任务已开始", "task_key": "generate_daily_brief" } +``` + +--- + +## 任务进度 + +### `GET /api/tasks/progress` + +返回所有任务的实时进度快照(前端每 ~1.5 秒轮询)。 + +**响应** + +```json +{ + "summarize": { + "status": "running", + "stage": "生成摘要", + "current": 75, + "total": 200, + "message": null, + "started_at": "2026-06-13T14:30:00+00:00", + "updated_at": "2026-06-13T14:32:15+00:00", + "finished_at": null, + "trigger": "manual" + }, + "tag_score_dedup": { "status": "idle", "stage": "", "current": 0, "total": 0, "message": null, "started_at": null, "updated_at": null, "finished_at": null, "trigger": null }, + "generate_daily_brief": { "..." : "同上结构" }, + "bootstrap_taxonomy": { "..." : "同上结构" } +} +``` + +**字段说明** + +| 字段 | 说明 | +|------|------| +| `status` | `idle` / `running` / `success` / `error` | +| `stage` | 当前阶段文案(如「生成摘要」「LLM 生成分类体系」) | +| `current` / `total` | 进度计数,`total=0` 时为阶段型任务(用 indeterminate 进度条) | +| `message` | 附加信息或错误详情(`status=error` 时为错误信息) | +| `started_at` / `finished_at` | ISO 8601 时间戳 | +| `trigger` | `manual`(手动触发)/ `scheduled`(定时触发) | + +### `POST /api/tasks/progress/reset` + +重置指定任务的进度为 idle(清除终态显示)。 + +**查询参数** `task_key`:string,必填 + +**响应** `{ "message": "已重置" }` + +--- + +## 接口连通性测试 + +### `POST /api/test-connection` + +测试 rssKeeper 与 LLM API 连通性,返回状态与延迟。每个测试使用独立短超时客户端(10 秒、0 重试)。 + +**响应** `ConnectionTestResponse` + +```json +{ + "rss_keeper": { + "name": "rssKeeper", + "status": "ok", + "latency_ms": 26.0, + "error": null + }, + "llm": { + "name": "LLM", + "status": "ok", + "latency_ms": 6871.4, + "error": null + } +} +``` + +`status` 为 `error` 时 `error` 字段含失败原因,`latency_ms` 为 `null`。 + +--- + +## 配置管理接口 + +> 配置修改保存到数据库,部分配置(调度间隔等)需重启服务生效。 + +### `GET /api/settings` + +列出所有可编辑配置。敏感项(`OPENAI_API_KEY` / `API_TOKEN`)返回脱敏值。 + +**响应** `List[SettingOut]` + +```json +[ + { + "key": "OPENAI_API_KEY", + "value": "sk-c...2R_8", + "description": "LLM API Key", + "is_sensitive": true, + "is_masked": true, + "updated_at": "2026-06-13T..." + } +] +``` + +### `PUT /api/settings/{key}` + +更新单个配置项。 + +**请求体** + +```json +{ "value": "新值" } +``` + +**响应** `{ "message": "配置已保存,重启服务后生效" }` **错误** `400` 无效配置项 + +### `PUT /api/settings` + +批量更新配置。 + +**请求体** + +```json +{ "settings": { "OPENAI_MODEL": "gpt-4o-mini", "OPENAI_TIMEOUT": "60" } } +``` + +**响应** `{ "message": "配置已保存,重启服务后生效" }` **错误** `400` 列出无效配置项 + +### `POST /api/settings/reset` + +将所有配置重置为环境变量默认值。 + +**响应** `{ "message": "配置已重置为环境变量默认值,重启服务后生效" }` + +### 可编辑配置清单 + +| key | 说明 | 敏感 | +|-----|------|------| +| `RSSKEEPER_BASE_URL` | rssKeeper 服务地址 | 否 | +| `OPENAI_API_KEY` | LLM API Key | 是 | +| `OPENAI_BASE_URL` | LLM API 基础地址 | 否 | +| `OPENAI_MODEL` | LLM 模型名 | 否 | +| `OPENAI_TIMEOUT` | LLM 调用超时(秒) | 否 | +| `OPENAI_MAX_RETRIES` | LLM 最大重试次数 | 否 | +| `SUMMARIZE_INTERVAL_MINUTES` | 摘要任务间隔(分钟) | 否 | +| `TAG_SCORE_INTERVAL_MINUTES` | 分类/打分/去重任务间隔(分钟) | 否 | +| `DAILY_BRIEF_HOUR` | 每日简报生成小时 | 否 | +| `DAILY_BRIEF_MINUTE` | 每日简报生成分钟 | 否 | +| `TITLE_SIMILARITY_THRESHOLD` | 标题相似度阈值 | 否 | +| `CONTENT_SIMILARITY_THRESHOLD` | 内容相似度阈值 | 否 | +| `MAX_AI_SUMMARY_LENGTH` | AI 摘要最大长度 | 否 | +| `MIN_ORIGINAL_SUMMARY_LENGTH` | 原始摘要最小长度 | 否 | +| `BRIEF_TOP_N_PER_CATEGORY` | 简报每分类显示文章数 | 否 | +| `LOG_LEVEL` | 日志级别 | 否 | +| `API_TOKEN` | API 鉴权 Token(空则不启用) | 是 | +| `CORS_ALLOWED_ORIGINS` | CORS 允许来源(逗号分隔) | 否 | + +--- + +## 仪表盘统计 + +### `GET /api/stats` + +返回仪表盘统计与下次定时任务时间。 + +**响应** `StatsOut` + +```json +{ + "total_articles": 200, + "today_articles": 50, + "ai_summarized": 180, + "categories": 12, + "tags": 43, + "duplicate_groups": 5, + "briefs": 1, + "next_jobs": { + "fetch_and_summarize": "2026-06-13T17:39:13+08:00", + "tag_score_deduplicate": "2026-06-14T16:39:13+08:00", + "generate_daily_brief": "2026-06-14T08:00:00+08:00" + } +} +``` + +--- + +## 错误码 + +| 状态码 | 含义 | 触发场景 | +|--------|------|----------| +| `200` | 成功 | 正常请求 | +| `400` | 参数错误 | 无效配置项 / 请求体格式错误 | +| `401` | 未认证 | 未携带 Authorization 头(启用了鉴权时) | +| `403` | 鉴权失败 | token 无效 | +| `404` | 资源不存在 | 文章/简报不存在 | +| `409` | 冲突 | 已有任务正在执行(任务全局互斥) | +| `422` | 校验失败 | 响应模型序列化失败等 | +| `500` | 服务器错误 | 内部异常 | + +## 定时任务 + +服务启动后由 APScheduler 自动注册(时区 `Asia/Shanghai`): + +| Job ID | 触发方式 | 默认 | +|--------|----------|------| +| `bootstrap_taxonomy` | 启动时一次(DateTrigger) | taxonomy 为空时生成 | +| `fetch_and_summarize` | 间隔 | 每 60 分钟 | +| `tag_score_deduplicate` | 间隔 | 每 1440 分钟(24 小时) | +| `generate_daily_brief` | Cron | 每日 08:00 | + +间隔参数可通过[配置接口](#配置管理接口)修改,修改后需重启服务生效。 diff --git a/Dockerfile b/Dockerfile index 38645d9..e682120 100644 --- a/Dockerfile +++ b/Dockerfile @@ -5,25 +5,34 @@ ARG NPM_REGISTRY=https://registry.npmmirror.com WORKDIR /app/frontend COPY frontend/package*.json ./ -RUN npm install --registry=${NPM_REGISTRY} +RUN --mount=type=cache,target=/root/.npm \ + npm install --registry=${NPM_REGISTRY} COPY frontend/ . RUN npm run build # Stage 2: Python 后端 FROM python:3.12-slim -ARG PIP_INDEX=https://pypi.tuna.tsinghua.edu.cn/simple +ARG PIP_INDEX=https://mirrors.aliyun.com/pypi/simple/ WORKDIR /app -# 安装构建依赖(部分 Python 包可能需要),并创建非 root 用户 -RUN apt-get update && apt-get install -y --no-install-recommends \ - gcc \ - && rm -rf /var/lib/apt/lists/* \ - && useradd --create-home --uid 1000 app - +# 先只 COPY requirements.txt,利用 Docker 层缓存——只要依赖不变就命中缓存 COPY requirements.txt . -RUN pip install --no-cache-dir -r requirements.txt -i ${PIP_INDEX} + +# 用 --only-binary=:all: 强制只下载预编译 wheel,避免编译 scikit-learn +# 若平台无 wheel 会报错,但 x86_64 上 scikit-learn/numpy/scipy 都有 +RUN --mount=type=cache,target=/root/.cache/pip \ + pip install --no-cache-dir -r requirements.txt \ + -i ${PIP_INDEX} \ + --trusted-host mirrors.aliyun.com \ + --only-binary=:all: \ + || pip install --no-cache-dir -r requirements.txt \ + -i ${PIP_INDEX} \ + --trusted-host mirrors.aliyun.com + +# 创建非 root 用户(不需要 gcc 了,去掉 apt-get 节省 ~40s) +RUN useradd --create-home --uid 1000 app COPY . . COPY --from=frontend-builder /app/frontend/dist ./static diff --git a/app/ai_client.py b/app/ai_client.py index 5cf4de2..8125299 100644 --- a/app/ai_client.py +++ b/app/ai_client.py @@ -1,6 +1,7 @@ """LLM API 客户端,兼容 OpenAI API 格式""" import json import logging +import re from typing import Optional from openai import OpenAI, APIError @@ -9,9 +10,57 @@ from config import settings logger = logging.getLogger(__name__) +# 匹配 reasoning 模型(MiniMax-M3 / DeepSeek-R1 / GLM-Z1 等)的 ... 推理块 +_THINK_RE = re.compile(r".*?", re.DOTALL) + + +def _parse_llm_json(content: str) -> dict: + """从 LLM 输出中提取 JSON。 + + 兼容 reasoning 模型在 json_object 模式下仍输出 ... + 推理块、以及 JSON 前后有多余文本的情况。 + """ + if not content or not content.strip(): + raise ValueError("LLM 返回空内容,无法解析 JSON") + + text = content.strip() + # 1) 去掉闭合的 ... 块 + text = _THINK_RE.sub("", text).strip() + # 2) 处理只有 开头但未闭合(content 被截断)的情况 + if text.startswith(""): + text = text.split("", 1)[-1].strip() + + # 3) 尝试直接解析 + try: + return json.loads(text) + except json.JSONDecodeError: + pass + + # 4) 提取首个 { 到最后 } 之间的子串 + start = text.find("{") + end = text.rfind("}") + if start != -1 and end > start: + try: + return json.loads(text[start : end + 1]) + except json.JSONDecodeError: + pass + + # 5) 兜底:尝试数组 + start = text.find("[") + end = text.rfind("]") + if start != -1 and end > start: + return json.loads(text[start : end + 1]) + + logger.error("无法从 LLM 输出提取 JSON: %s", content[:500]) + raise ValueError("LLM 输出无法解析为 JSON") + class AIClient: - """封装 LLM 调用,支持重试和 JSON 输出""" + """封装 LLM 调用,支持重试和 JSON 输出。 + + 配置以 property 形式运行时从 settings 读取,避免模块 import 时 + 固化旧值(settings 在 FastAPI lifespan 启动后才会被数据库配置覆盖)。 + """ def __init__( self, @@ -21,24 +70,42 @@ class AIClient: timeout: Optional[int] = None, max_retries: Optional[int] = None, ): - self.api_key = api_key or settings.OPENAI_API_KEY - self.base_url = base_url or settings.OPENAI_BASE_URL - self.model = model or settings.OPENAI_MODEL - self.timeout = timeout or settings.OPENAI_TIMEOUT - self.max_retries = max_retries or settings.OPENAI_MAX_RETRIES + # 仅保存显式传入的覆盖值;为 None 时运行时回退到 settings + self._api_key = api_key + self._base_url = base_url + self._model = model + self._timeout = timeout + self._max_retries = max_retries - self._client: Optional[OpenAI] = None + @property + def api_key(self) -> str: + return self._api_key or settings.OPENAI_API_KEY + + @property + def base_url(self) -> str: + return self._base_url or settings.OPENAI_BASE_URL + + @property + def model(self) -> str: + return self._model or settings.OPENAI_MODEL + + @property + def timeout(self) -> int: + return self._timeout or settings.OPENAI_TIMEOUT + + @property + def max_retries(self) -> int: + return self._max_retries or settings.OPENAI_MAX_RETRIES @property def client(self) -> OpenAI: - if self._client is None: - self._client = OpenAI( - api_key=self.api_key, - base_url=self.base_url, - timeout=self.timeout, - max_retries=self.max_retries, - ) - return self._client + # 每次按最新配置创建,确保用到启动后覆盖的真实配置 + return OpenAI( + api_key=self.api_key, + base_url=self.base_url, + timeout=self.timeout, + max_retries=self.max_retries, + ) def chat_completion( self, @@ -75,18 +142,14 @@ class AIClient: user_prompt: str, temperature: float = 0.3, ) -> dict: - """调用 LLM 并解析返回的 JSON""" + """调用 LLM 并解析返回的 JSON(兼容 reasoning 模型的 块)""" content = self.chat_completion( system_prompt=system_prompt, user_prompt=user_prompt, temperature=temperature, json_mode=True, ) - try: - return json.loads(content) - except json.JSONDecodeError as exc: - logger.error("LLM 返回不是合法 JSON: %s - content=%s", exc, content[:500]) - raise + return _parse_llm_json(content) ai_client = AIClient() diff --git a/app/brief.py b/app/brief.py index ac58062..4e62544 100644 --- a/app/brief.py +++ b/app/brief.py @@ -9,6 +9,7 @@ from sqlalchemy.orm import Session from config import settings from models import EnrichedArticle, DailyBrief +from app.task_progress import update_progress logger = logging.getLogger(__name__) @@ -76,6 +77,7 @@ def generate_daily_brief(db: Session, date_str: str = None, force: bool = False) existing = db.query(DailyBrief).filter(DailyBrief.brief_date == date_str).first() if existing and not force: logger.info("日期 %s 简报已存在,跳过生成", date_str) + update_progress("generate_daily_brief", status="running", stage="简报已存在", current=0, total=0, message="简报已存在,跳过生成") return { "date": date_str, "total_articles": existing.total_articles, @@ -86,6 +88,8 @@ def generate_daily_brief(db: Session, date_str: str = None, force: bool = False) day_start = datetime.strptime(date_str, "%Y-%m-%d") day_end = day_start + timedelta(days=1) + update_progress("generate_daily_brief", status="running", stage="加载文章", current=0, total=0) + # 取当天去重后的代表文章 query = ( db.query(EnrichedArticle) @@ -106,6 +110,7 @@ def generate_daily_brief(db: Session, date_str: str = None, force: bool = False) ) # 按分类分组并排序 + update_progress("generate_daily_brief", status="running", stage="按分类整理", current=0, total=0) by_category: Dict[str, List[Dict[str, Any]]] = {} for art in representative_articles: cat = art.category or "未分类" @@ -127,6 +132,7 @@ def generate_daily_brief(db: Session, date_str: str = None, force: bool = False) } # 生成 Markdown 文件 + update_progress("generate_daily_brief", status="running", stage="生成 Markdown", current=0, total=0) output_dir = settings.brief_output_dir_path / date_str output_dir.mkdir(parents=True, exist_ok=True) markdown_path = output_dir / "daily-brief.md" @@ -134,6 +140,7 @@ def generate_daily_brief(db: Session, date_str: str = None, force: bool = False) markdown_path.write_text(markdown_content, encoding="utf-8") # 更新文章 brief_date + update_progress("generate_daily_brief", status="running", stage="保存简报", current=0, total=0) for art in representative_articles: art.brief_date = date_str diff --git a/app/deduplicator.py b/app/deduplicator.py index 79bd46c..8d1091b 100644 --- a/app/deduplicator.py +++ b/app/deduplicator.py @@ -12,6 +12,7 @@ import numpy as np from config import settings from models import EnrichedArticle, DuplicateGroup +from app.task_progress import update_progress, report_loop_progress logger = logging.getLogger(__name__) @@ -172,8 +173,11 @@ def deduplicate_articles( if not articles: logger.info("日期 %s 无文章可去重", date_str) + update_progress("tag_score_dedup", status="running", stage="去重", current=0, total=0, message="无文章可去重") return {"total": 0, "duplicate_groups": 0, "representatives": 0} + update_progress("tag_score_dedup", status="running", stage="计算相似度并去重", current=0, total=0) + # 先 URL 去重:相同 link 只保留一篇 unique_articles: List[EnrichedArticle] = [] seen_links: set = set() @@ -194,8 +198,9 @@ def deduplicate_articles( ) stats = {"total": len(articles), "duplicate_groups": len(clusters), "representatives": 0} + update_progress("tag_score_dedup", status="running", stage="写入重复组", current=0, total=len(clusters)) - for cluster in clusters: + for ci, cluster in enumerate(clusters): representative = _pick_representative(unique_articles, cluster) member_ids = [unique_articles[i].id for i in cluster] @@ -214,6 +219,7 @@ def deduplicate_articles( art.is_representative = (art.id == representative.id) stats["representatives"] += 1 + report_loop_progress("tag_score_dedup", ci + 1, len(clusters), "写入重复组") db.commit() logger.info( diff --git a/app/rss_client.py b/app/rss_client.py index d0e68b5..9998b0e 100644 --- a/app/rss_client.py +++ b/app/rss_client.py @@ -11,11 +11,23 @@ logger = logging.getLogger(__name__) class RSSKeeperClient: - """rssKeeper 外部 API 客户端""" + """rssKeeper 外部 API 客户端。 - def __init__(self, base_url: Optional[str] = None, timeout: int = 30): - self.base_url = (base_url or settings.RSSKEEPER_BASE_URL).rstrip("/") - self.timeout = timeout + 配置以 property 形式运行时从 settings 读取,避免模块 import 时 + 固化旧值(settings 在 FastAPI lifespan 启动后才会被数据库配置覆盖)。 + """ + + def __init__(self, base_url: Optional[str] = None, timeout: Optional[int] = None): + self._base_url = base_url + self._timeout = timeout + + @property + def base_url(self) -> str: + return (self._base_url or settings.RSSKEEPER_BASE_URL).rstrip("/") + + @property + def timeout(self) -> int: + return self._timeout if self._timeout is not None else 30 def _get(self, path: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: url = f"{self.base_url}{path}" diff --git a/app/scorer.py b/app/scorer.py index 4b3984b..f7c019d 100644 --- a/app/scorer.py +++ b/app/scorer.py @@ -8,6 +8,7 @@ from sqlalchemy.orm import Session from config import settings from models import EnrichedArticle, Taxonomy +from app.task_progress import update_progress, report_loop_progress from app.tagger import _count_matches, _normalize logger = logging.getLogger(__name__) @@ -119,6 +120,7 @@ def score_articles( query = query.filter(EnrichedArticle.id.in_(article_ids)) articles = query.all() + update_progress("tag_score_dedup", status="running", stage="计算分数", current=0, total=len(articles)) count = 0 for article in articles: article.heat_score = compute_heat_score(article, heat_rules) @@ -141,6 +143,7 @@ def score_articles( count += 1 if count % 50 == 0: db.commit() + report_loop_progress("tag_score_dedup", count, len(articles), "计算分数") db.commit() logger.info("打分完成: %d 篇文章", count) diff --git a/app/summarizer.py b/app/summarizer.py index 696b645..6342f6a 100644 --- a/app/summarizer.py +++ b/app/summarizer.py @@ -7,6 +7,7 @@ from sqlalchemy.orm import Session from app.ai_client import ai_client from app.rss_client import rss_client +from app.task_progress import update_progress, report_loop_progress from config import settings from models import EnrichedArticle @@ -109,11 +110,13 @@ def fetch_and_summarize(db: Session, hours: int = 24, limit: int = 200) -> Dict[ articles = rss_client.fetch_recent(hours=hours, limit=limit) if not articles: logger.info("未拉取到新文章") + update_progress("summarize", status="running", stage="无新文章", current=0, total=0, message="未拉取到新文章") return {"fetched": 0, "created": 0, "summarized": 0} stats = {"fetched": len(articles), "created": 0, "summarized": 0} + update_progress("summarize", status="running", stage="拉取文章并生成摘要", current=0, total=len(articles)) - for raw in articles: + for i, raw in enumerate(articles): data = _article_from_rss(raw) article = db.query(EnrichedArticle).filter( EnrichedArticle.rk_article_id == data["rk_article_id"] @@ -146,6 +149,8 @@ def fetch_and_summarize(db: Session, hours: int = 24, limit: int = 200) -> Dict[ if stats["summarized"] % 10 == 0: db.commit() + report_loop_progress("summarize", i + 1, len(articles), "生成摘要") + db.commit() logger.info( "摘要任务完成: fetched=%d, created=%d, summarized=%d", diff --git a/app/tagger.py b/app/tagger.py index 9ee530b..e3dac32 100644 --- a/app/tagger.py +++ b/app/tagger.py @@ -5,6 +5,7 @@ from typing import List, Dict, Any, Tuple from sqlalchemy.orm import Session +from app.task_progress import update_progress, report_loop_progress from models import EnrichedArticle, Taxonomy logger = logging.getLogger(__name__) @@ -103,6 +104,7 @@ def tag_articles(db: Session, article_ids: List[int] = None) -> int: ) articles = query.all() + update_progress("tag_score_dedup", status="running", stage="分类打标", current=0, total=len(articles)) count = 0 for article in articles: article.category = classify_article(article, categories) @@ -110,6 +112,7 @@ def tag_articles(db: Session, article_ids: List[int] = None) -> int: count += 1 if count % 50 == 0: db.commit() + report_loop_progress("tag_score_dedup", count, len(articles), "分类打标") db.commit() logger.info("分类/打标签完成: %d 篇文章", count) diff --git a/app/task_progress.py b/app/task_progress.py new file mode 100644 index 0000000..cef8b5a --- /dev/null +++ b/app/task_progress.py @@ -0,0 +1,117 @@ +"""任务进度注册表(进程内内存,线程安全)。 + +供手动任务、定时任务在执行过程中上报进度,前端通过 +GET /api/tasks/progress 轮询读取展示。 + +单 worker(uvicorn --workers 1)前提下,所有请求/任务线程共享同一份内存。 +""" +import copy +import threading +from datetime import datetime, timezone +from typing import Optional + +# 4 个稳定任务 key +TASK_KEYS = ("summarize", "tag_score_dedup", "generate_daily_brief", "bootstrap_taxonomy") + +_progress: dict = {} +_lock = threading.Lock() + + +def _now_iso() -> str: + return datetime.now(timezone.utc).isoformat() + + +def _init() -> None: + """初始化所有任务 key 为 idle""" + for key in TASK_KEYS: + _progress[key] = { + "status": "idle", + "stage": "", + "current": 0, + "total": 0, + "message": None, + "started_at": None, + "updated_at": None, + "finished_at": None, + "trigger": None, + } + + +_init() + + +def update_progress( + task_key: str, + *, + status: Optional[str] = None, + stage: Optional[str] = None, + current: Optional[int] = None, + total: Optional[int] = None, + message: Optional[str] = None, + trigger: Optional[str] = None, +) -> None: + """合并非 None 字段并盖时间戳""" + with _lock: + entry = _progress.get(task_key) + if entry is None: + entry = { + "status": "idle", "stage": "", "current": 0, "total": 0, + "message": None, "started_at": None, "updated_at": None, + "finished_at": None, "trigger": None, + } + _progress[task_key] = entry + + now = _now_iso() + if status == "running" and entry.get("started_at") is None: + entry["started_at"] = now + if status in ("success", "error"): + entry["finished_at"] = now + # 若重新进入 running,重置终态时间戳 + if status == "running": + entry["finished_at"] = None + + if status is not None: + entry["status"] = status + if stage is not None: + entry["stage"] = stage + if current is not None: + entry["current"] = current + if total is not None: + entry["total"] = total + if message is not None: + entry["message"] = message + if trigger is not None: + entry["trigger"] = trigger + entry["updated_at"] = now + + +def report_loop_progress( + task_key: str, + index: int, + total: int, + stage: str, + message: Optional[str] = None, + every: int = 5, +) -> None: + """紧凑循环进度上报:每 `every` 次或最后一次(index==total)才上报,减少加锁""" + if index % every == 0 or index >= total: + update_progress(task_key, status="running", stage=stage, current=index, total=total, message=message) + + +def get_progress(task_key: Optional[str] = None) -> dict: + """返回深拷贝(单个或全部),防止序列化期间被并发修改""" + with _lock: + if task_key is not None: + return copy.deepcopy(_progress.get(task_key)) + return copy.deepcopy(_progress) + + +def reset_progress(task_key: str) -> None: + """重置单个任务为 idle(前端清除终态显示用)""" + with _lock: + if task_key in _progress: + _progress[task_key] = { + "status": "idle", "stage": "", "current": 0, "total": 0, + "message": None, "started_at": None, "updated_at": None, + "finished_at": None, "trigger": None, + } diff --git a/app/taxonomy.py b/app/taxonomy.py index a67c4af..eeee8ec 100644 --- a/app/taxonomy.py +++ b/app/taxonomy.py @@ -5,8 +5,9 @@ from typing import List, Dict, Any from sqlalchemy.orm import Session -from app.ai_client import ai_client +from app.ai_client import AIClient from app.rss_client import rss_client +from app.task_progress import update_progress from models import Taxonomy logger = logging.getLogger(__name__) @@ -40,19 +41,19 @@ TAXONOMY_SYSTEM_PROMPT = """你是一位专业的信息分类与内容分析专 3. heat_rules 和 importance_rules 各 10-20 条,weight 范围 0.5-2.0。 4. 所有 keywords 用中文或中英双语,便于后续关键词匹配。 5. 不要输出任何解释文字,只输出 JSON。 +6. **分类与标签名称必须使用中性的主题领域词**(如科技、财经、文化、体育、生活、健康、设计、商业等), + 禁止使用具体事件、人名、地名、国家名、机构名或任何政治/军事/冲突相关的敏感词作为名称或关键词, + 以保证内容中立、避免触发内容审查。 """ def _build_sample_prompt(articles: List[Dict[str, Any]]) -> str: - lines = [f"共有 {len(articles)} 篇文章样本:"] - for idx, art in enumerate(articles[:50], 1): + # 只用标题和来源,不带正文摘要——降低输入中的敏感内容,避免触发内容审查 + lines = [f"共有 {len(articles)} 篇文章样本(仅展示标题用于归纳主题):"] + for idx, art in enumerate(articles[:40], 1): title = art.get("title", "") - summary = art.get("summary", "") or art.get("content", "")[:300] feed = art.get("feed_title", "") - cat = art.get("category", "") - lines.append(f"\n[{idx}] 标题:{title}") - lines.append(f" 来源:{feed} | 源分类:{cat}") - lines.append(f" 摘要:{summary[:400]}") + lines.append(f"[{idx}] {title} (来源:{feed})") return "\n".join(lines) @@ -72,22 +73,24 @@ def bootstrap_taxonomy(db: Session, force: bool = False) -> bool: logger.info("强制重新初始化 taxonomy") logger.info("开始从 rssKeeper 拉取样本文章并生成分类体系...") + update_progress("bootstrap_taxonomy", status="running", stage="拉取样本文章", current=0, total=0) articles = rss_client.fetch_recent(hours=24 * 7, limit=200) if not articles: logger.warning("未获取到样本文章,无法生成分类体系") - return False + raise RuntimeError("未获取到样本文章,无法生成分类体系") user_prompt = _build_sample_prompt(articles) - try: - result = ai_client.chat_completion_json( - system_prompt=TAXONOMY_SYSTEM_PROMPT, - user_prompt=user_prompt, - temperature=0.5, - ) - except Exception as exc: - logger.error("生成分类体系失败: %s", exc) - return False + update_progress("bootstrap_taxonomy", status="running", stage="LLM 生成分类体系", current=0, total=0, message="正在调用 LLM 生成分类规则,可能需要 2-4 分钟") + # bootstrap 是一次性大任务(生成 categories+tags+rules),MiniMax-M3 reasoning 模式较慢, + # 用专用大 timeout client(默认 60s 不够),失败抛异常由调用方捕获并如实标记进度 + bootstrap_ai = AIClient(timeout=300, max_retries=2) + result = bootstrap_ai.chat_completion_json( + system_prompt=TAXONOMY_SYSTEM_PROMPT, + user_prompt=user_prompt, + temperature=0.5, + ) + update_progress("bootstrap_taxonomy", status="running", stage="保存规则", current=0, total=0) _save_taxonomy(db, result) logger.info("taxonomy 初始化完成,共写入 %d 条规则", db.query(Taxonomy).count()) return True diff --git a/frontend/index.html b/frontend/index.html index 49e3163..8d3e5b4 100644 --- a/frontend/index.html +++ b/frontend/index.html @@ -1,10 +1,13 @@ - + dataClean - RSS 数据清洗 + + +
diff --git a/frontend/src/App.vue b/frontend/src/App.vue index 32ca499..f41ea49 100644 --- a/frontend/src/App.vue +++ b/frontend/src/App.vue @@ -1,16 +1,18 @@