# RSS 信息处理平台:完整设计文档 > 版本:v1.0 > 日期:2026-06-15 > 基于仓库:dataClean (778ccfb) + rssKeeper (4286731) --- ## 一、项目背景与目标 ### 1.1 背景 现有两个项目: - **rssKeeper**:RSS 抓取与原始文章管理服务,负责 RSS 源管理、定时抓取、文章存储、全文检索(FTS5)、健康度监控。 - **dataClean**:rssKeeper 的下游清洗服务,负责 AI 摘要、分类/标签/打分、URL+内容去重、每日简报生成。 两者已通过 `/api/v1/external/*` 接口松耦合,dataClean 只读调用 rssKeeper。当前部署为单容器 + SQLite,适合小数据量和个人使用。 ### 1.2 新平台目标 构建一个 **模块化、工业化、AI 驱动** 的 RSS 信息处理平台,统一承接 rssKeeper + dataClean 的能力,并扩展: - 聊天式 AI 知识产出(带引用链接) - Skill 化产出编排(日报、聊天、自定义任务) - AI 自优化 Prompt / 去重算法 - 多供应商/多模型 AI 配置 - 团队级简单鉴权与锁机制 - 面向 30万~100万 文章量的可扩展架构 ### 1.3 设计原则 | 原则 | 说明 | |------|------| | **模块化** | 抓取、清洗、AI 处理、检索、聊天、自优化、Skills 均为独立模块,接口契约清晰 | | **可插拔** | 去重算法以外挂文件形式存在,Skills 可导入/覆盖/恢复默认 | | **AI 原生** | 分类、Tag、摘要、打分、日报、聊天、自优化均由 AI 驱动,规则仅作兜底 | | **配置即代码** | Prompt、AI 配置、Skills、去重算法均版本化管理,变更留痕 | | **工业化** | Docker 部署、日志/监控、任务锁、失败重试、数据分区预留 | | **渐进扩展** | 初期单库单服务,预留分库分表、独立检索、向量库扩展路径 | --- ## 二、现状分析 ### 2.1 rssKeeper 现状 **优势:** - FastAPI + SQLAlchemy + APScheduler 技术栈成熟 - RSS 源管理完整:增删改查、OPML 导入导出、自动发现 - 文章抓取并发(ThreadPoolExecutor) - 健康度监控与抓取日志 - SQLite FTS5 全文搜索 - Docker 部署就绪 **问题:** - 无鉴权/限流,CORS 在部分版本宽松 - 时区处理不一致(`datetime.utcnow()` 混用) - 添加源时同步抓取会阻塞 HTTP - 导入 OPML 接口 body/query 不一致(已知 bug) - 无测试、无 CI - SQLite 单库,百万级文章需迁移 ### 2.2 dataClean 现状 **优势:** - 模块划分清晰:`summarizer`、`tagger`、`scorer`、`deduplicator`、`brief`、`taxonomy` - AI 调用封装 `AIClient` 支持运行时配置覆盖 - 配置双层管理:环境变量 + 数据库 - 任务进度可视化 - 去重算法已考虑 URL + 标题 + 内容相似度 - Docker 部署就绪 **问题:** - 去重算法 O(n²) + SequenceMatcher,数据量大时性能差 - 去重历史数据有破坏风险(近期版本已修复为按日期清空) - 分类/标签基于规则,未充分发挥 AI - 仅支持单一 LLM 配置 - 无 Skill 概念,日报格式固定 - 无自优化机制 - 无聊天/产出能力 - 无引用链接的 AI 产出 ### 2.3 可复用资产 | 资产 | 来源 | 复用方式 | |------|------|----------| | RSS 抓取逻辑 | rssKeeper `rss_fetcher.py` | 迁移至 `feeds/fetcher.py`,解耦数据库操作 | | Feed 模型与健康度 | rssKeeper `models.py` | 复用并扩展字段 | | FTS5 搜索 | rssKeeper `fulltext_search.py` | 作为检索 V1,后续抽象接口 | | 外部 API 设计 | rssKeeper `external_api.py` | 演变为平台内部检索/文章接口 | | AI 客户端 | dataClean `app/ai_client.py` | 扩展为多 Provider 路由 | | 去重算法 | dataClean `app/deduplicator.py` | 改造为插件接口,保留默认实现 | | 任务进度 | dataClean `app/task_progress.py` | 复用并扩展为任务运行时 | | 配置管理 | dataClean `app/settings_manager.py` | 复用并扩展为配置中心 | | 简报生成 | dataClean `app/brief.py` | 改造为 Skill 驱动 | | Taxonomy | dataClean `app/taxonomy.py` | 演变为 Skill + AI 分类体系 | --- ## 三、总体架构 ### 3.1 服务边界 采用 **单体模块化架构(Modular Monolith)**,而非微服务。原因: - 当前团队规模小,单体降低运维复杂度 - 模块间接口清晰,未来可拆分为独立服务 - 350 源 / 日增 1000+ 的规模单体完全可承受 ``` ┌─────────────────────────────────────────────────────────────────────┐ │ 前端应用层 (Web UI) │ │ RSS 源管理 │ 文章库 │ 检索 │ 日报任务 │ AI 聊天 │ Skills 管理 │ 配置 │ └────────────────────────────────┬────────────────────────────────────┘ │ ┌────────────────────────────────▼────────────────────────────────────┐ │ API Gateway / FastAPI │ │ JWT 鉴权 │ 限流 │ 路由 │ 锁服务 │ Skills 注册中心 │ └────────────────────────────────┬────────────────────────────────────┘ │ ┌────────────────────────────────▼────────────────────────────────────┐ │ 核心业务模块 │ │ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌─────────┐ │ │ │ 抓取调度模块 │ │ 清洗流水线 │ │ AI 处理中心 │ │ 检索服务 │ │ │ │ Feeds │ │ Pipeline │ │ Processor │ │ Search │ │ │ └──────────────┘ │ - 去重插件 │ │ - 分类 │ └─────────┘ │ │ │ - 数据校验 │ │ - Tag │ │ │ └──────┬───────┘ │ - 摘要 │ │ │ │ │ - 打分 │ │ │ ┌──────▼──────────┼──┴───────────┤ │ │ │ 任务调度器 │ │ │ │ │ Celery + Redis │ │ │ │ └─────────────────┘ │ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 自优化调度器 │ │ │ │ 每日 02:00:Prompt/Skills 自优化 + 去重算法自优化 + 留痕 │ │ │ └──────────────────────────────────────────────────────────────┘ │ │ ┌──────────────────────────────────────────────────────────────┐ │ │ │ 聊天与产出引擎 │ │ │ │ 对话上下文 + Skills 调用 + 检索工具 + 引用链接渲染 │ │ │ └──────────────────────────────────────────────────────────────┘ │ └────────────────────────────────┬────────────────────────────────────┘ │ ┌────────────────────────────────▼────────────────────────────────────┐ │ 数据与存储层 │ │ PostgreSQL 16 + pgvector │ Redis │ 对象存储 (MinIO) │ 日志/监控 │ └─────────────────────────────────────────────────────────────────────┘ ``` ### 3.2 技术栈 | 层级 | 选型 | 理由 | |------|------|------| | 后端框架 | Python 3.12 + FastAPI | AI 生态最全,异步性能好,Pydantic 适合做配置/schema | | 任务队列 | Celery + Redis | 成熟稳定,支持定时任务、任务锁、进度追踪 | | 数据库 | PostgreSQL 16 + pgvector | 关系数据 + 向量检索一体,支持分区表 | | 全文检索 | PostgreSQL tsvector (初期) → OpenSearch (扩展) | 初期降低复杂度,抽象检索接口便于切换 | | 对象存储 | MinIO / 云 OSS | 存原始 HTML、图片、导出产物 | | 前端 | Vue 3 + Element Plus + TypeScript | 团队后台管理 + 聊天界面 | | AI 调用 | LiteLLM + 自封装 Provider 路由 | 统一接口支持多供应商、多模型、Fallback | | 缓存/锁 | Redis | 分布式锁、任务状态、会话缓存 | | 监控 | Prometheus + Grafana + Sentry | 工业化可观测性 | | 部署 | Docker + Docker Compose | 用户指定 Docker 化 | --- ## 四、模块详细设计 ### 4.1 抓取调度模块(Feeds) **职责:** 管理 RSS 源、定时抓取、解析、原始文章入库。 **设计要点:** - 每个 Feed 独立配置:URL、抓取频率、优先级、解析规则、代理策略、启用状态 - 调度器从 APScheduler 迁移到 Celery Beat,支持任务持久化和水平扩展 - 抓取 Worker 与主服务分离,未来可独立扩容 - 原始文章内容存入对象存储,元数据入 `raw_articles` 表 - 失败重试 + 死信队列 + 健康度统计 **核心模型:** ```python class Feed: id: int url: str title: str description: str category: str # 源预设分类 is_active: bool fetch_interval_minutes: int priority: int # 抓取优先级 parser_config: dict # 自定义解析规则 proxy_policy: str # auto / direct / proxy last_fetch_at: datetime last_fetch_status: str last_error: str error_type: str success_count: int fail_count: int article_count: int health_status: str ``` **抓取流程:** ``` Celery Beat 触发 → 按优先级/间隔筛选 Feed → 分发 fetch_feed_task → 下载 RSS → feedparser 解析 → clean_html → 生成 summary → 按 URL 去重 → 批量写入 raw_articles → 更新 Feed 统计 ``` ### 4.2 清洗流水线(Pipeline) **职责:** 对原始文章进行标准化、去重、校验,产出清洗后的文章。 **流水线阶段:** ``` raw_articles → normalize(URL 规范化、编码统一、时间标准化) → extract(正文提取,若 RSS 内容不全则 fetch 原文页) → dedup_exact(URL/标题精确去重) → dedup_similar(调用外挂算法,相似度去重) → enrich(补充语言、字数、内容 hash 等元数据) → cleaned_articles ``` **去重插件接口(核心):** ```python # plugins/deduplication/current.py from dataclasses import dataclass from typing import List, Dict, Set @dataclass class DedupInput: article_id: str title: str link: str content: str content_length: int published_at: str feed_id: str @dataclass class DuplicateGroup: representative_id: str # 保留的主文章 ID member_ids: List[str] # 重复文章 IDs reason: str # 去重原因:url_exact / title_exact / content_similar similarity_scores: Dict[str, float] class DeduplicationPlugin: name: str = "default_tfidf" version: str = "1.0.0" def find_duplicates(self, articles: List[DedupInput]) -> List[DuplicateGroup]: """ 输入:待去重文章列表 输出:重复组列表,每组指定 representative 和 members 规则: 1. URL 完全相同 → 直接归为一组 2. 标题完全相同 → 归为一组 3. 内容相似度 >= threshold → 归为一组,保留 content_length 最大的 """ ... ``` **引用关系:** - 主文章保留完整内容,生成 `cleaned_article` 记录 - 重复文章不丢弃,而是在 `article_references` 表中记录: - `source_article_id`:主文章 - `referenced_article_id`:重复文章 - `reference_type`:`duplicate_url` / `duplicate_title` / `duplicate_content` - `link`:重复文章原始链接 - `similarity`:相似度分数 - 主文章的 `reference_links` JSON 字段聚合所有引用链接,便于前端/AI 使用 ### 4.3 AI 处理中心(AI Processor) **职责:** 管理所有 AI 任务,支持多 Provider/多模型/多配置。 **AI 任务类型:** | 任务 | 输入 | 输出 | 默认模型 | |------|------|------|----------| | `summarize` | 标题+正文 | AI 摘要 | 轻量模型 | | `classify` | 标题+摘要+正文 | category + 置信度 | 推理模型 | | `tag` | 标题+摘要+正文 | tags[] | 轻量模型 | | `score` | 标题+摘要+正文+元数据 | heat/importance/composite | 推理模型 | | `daily_brief` | 当日高分文章 + Skill | Markdown/JSON 日报 | 强模型 | | `chat` | 对话上下文 + 检索结果 + Skill | 带引用链接的回答 | 强模型 | | `optimize` | 当前 Prompt/算法 + 历史样例 | 优化建议/新 Prompt | 强模型 | **AI 配置模型:** ```python class AIProviderConfig: id: str name: str provider: str # openai / anthropic / gemini / local base_url: str api_key: str # 加密存储 default_model: str timeout: int max_retries: int rate_limit_rpm: int is_active: bool class AITaskConfig: id: str task_type: str # summarize / classify / tag / score / daily_brief / chat / optimize name: str provider_config_id: str model: str skill_id: str # 绑定的 Skill temperature: float max_tokens: int top_p: float system_prompt_override: str | None enabled: bool ``` **多供应商路由:** - 使用 LiteLLM 统一封装,或自实现 Provider 适配器 - 每个任务独立配置,支持复制配置(Clone) - 支持 Fallback:主模型失败时自动切换到备用模型 ### 4.4 Skills 系统 **定义:** Skill = 指导 AI 产出的结构化配置,包括: - 系统提示词(system prompt) - 输出模板(output template/schema) - 可用工具集(tools) - 输入参数 schema - 版本与默认值标记 **Skill 类型:** | 类型 | 用途 | 示例 | |------|------|------| | `output` | 产出型 Skill,直接生成内容 | 日报、行业观察、摘要 | | `tool` | 工具型 Skill,AI 可调用的能力 | 搜索文章、获取文章详情、调用外部 API | | `agent` | 复合型 Skill,组合多个工具 | 先搜索再总结再生成报告 | **Skill 模型:** ```python class Skill: id: str name: str slug: str description: str type: str # output / tool / agent version: int is_default: bool # 是否系统默认,不可删除 system_prompt: str output_schema: dict # JSON Schema tools: List[str] # 可调用的 tool_id 列表 input_schema: dict # 输入参数 JSON Schema example_inputs: List[dict] created_by: str created_at: datetime updated_at: datetime ``` **Skill 管理:** - 内置默认 Skills(不可删除,可恢复默认) - 用户可修改、新增、导入、导出 - 导入时支持覆盖(overwrite)或新建版本 - 修改后自动生效(无重启) **默认内置 Skills:** | Skill | 用途 | |-------|------| | `daily-brief-default` | 默认日报生成 | | `daily-tech-watch` | 科技观察日报 | | `chat-researcher` | 研究型聊天助手 | | `chat-summarizer` | 摘要型聊天助手 | | `article-classifier` | 文章分类 | | `article-tagger` | 文章打标签 | | `article-scorer` | 文章打分 | | `article-summarizer` | 文章摘要 | | `search-articles` | 检索文章工具 | | `fetch-external-api` | 调用外部 API 工具 | ### 4.5 日报任务系统 **核心概念:** 日报 = 任务 + Skill + 数据筛选 + 定时/手动触发 ```python class OutputTask: id: str name: str task_type: str # daily_brief / chat / custom skill_id: str # 调用哪个 Skill schedule: str # cron 表达式,为空则仅手动 filter_config: dict # 数据筛选条件 output_config: dict # 输出方式:web / email / file / webhook is_active: bool last_run_at: datetime last_output_id: str ``` **数据筛选配置示例:** ```json { "time_range": "last_24h", "categories": ["科技", "AI"], "min_composite_score": 70, "tags": ["大模型"], "exclude_duplicates": true, "top_n": 50 } ``` **新增日报流程:** 1. 用户在 UI 创建 OutputTask 2. 选择 Skill(可复用已有或新建) 3. 配置筛选条件 4. 配置定时/手动触发 5. 系统按 cron 自动执行,或用户手动触发 ### 4.6 聊天与产出引擎 **聊天模型:** ```python class ChatSession: id: str user_id: str title: str skill_id: str # 当前会话使用的默认 Skill context_messages: List[dict] created_at: datetime updated_at: datetime class ChatMessage: id: str session_id: str role: str # user / assistant / tool content: str tool_calls: List[dict] # AI 调用的工具 tool_results: List[dict] # 工具返回结果 references: List[dict] # 引用文章链接 created_at: datetime ``` **聊天流程:** ``` 用户输入 → 意图识别(由 Skill 系统提示词决定) → 如需检索:调用 search-articles tool → 如需外部数据:调用 fetch-external-api tool → 组装上下文(含检索结果/工具返回) → 调用 LLM 生成回答 → 解析引用标记,映射为文章链接 → 返回带引用链接的 Markdown ``` **引用链接规范:** - AI 输出中使用 `[^1^]`, `[^2^]` 等标记 - 后端解析标记,从 `references` 中生成真实链接 - 前端渲染为可点击脚注/卡片 示例输出: ```markdown 今日 AI 领域最重要的消息是 OpenAI 发布了新模型 [^1^], 同时 Google 也公布了相关进展 [^2^]。 [^1^]: [OpenAI 官方公告](https://openai.com/...) [^2^]: [Google Blog](https://blog.google/...) ``` ### 4.7 检索服务 **检索维度:** | 类型 | 实现 | 说明 | |------|------|------| | 全文检索 | PostgreSQL tsvector / OpenSearch | 标题、正文、摘要 | | 语义检索 | pgvector / 独立向量库 | 基于 Embedding 的相似文章 | | 元数据过滤 | SQL | 分类、Tag、分数、时间、Feed、来源 | | 混合检索 | 全文 + 语义 + 元数据 | 未来扩展 | **检索接口抽象:** ```python class SearchEngine(ABC): @abstractmethod def search(self, query: SearchQuery) -> SearchResult: ... class PostgresSearchEngine(SearchEngine): ... class OpenSearchEngine(SearchEngine): ... ``` **SearchQuery 模型:** ```python class SearchQuery: q: str | None semantic_q: str | None category: str | None tags: List[str] feed_id: str | None min_score: float | None since: datetime until: datetime sort_by: str = "published_at_desc" limit: int = 50 offset: int = 0 ``` --- ## 五、数据模型设计 ### 5.1 核心实体关系 ``` users └── chat_sessions └── chat_messages feeds └── raw_articles └── cleaned_articles ├── article_references (duplicate links) ├── article_embeddings └── article_versions skills └── skill_versions ai_task_configs └── ai_provider_configs output_tasks └── outputs optimization_logs ``` ### 5.2 表结构 #### users(用户) ```sql CREATE TABLE users ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), username VARCHAR(64) UNIQUE NOT NULL, password_hash VARCHAR(255) NOT NULL, role VARCHAR(32) DEFAULT 'member', -- admin / member is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### feeds(RSS 源) ```sql CREATE TABLE feeds ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), url VARCHAR(2048) UNIQUE NOT NULL, title VARCHAR(512), description TEXT, category VARCHAR(128), is_active BOOLEAN DEFAULT TRUE, fetch_interval_minutes INT DEFAULT 60, priority INT DEFAULT 5, parser_config JSONB DEFAULT '{}', proxy_policy VARCHAR(32) DEFAULT 'auto', last_fetch_at TIMESTAMPTZ, last_fetch_status VARCHAR(32), last_error TEXT, error_type VARCHAR(32), success_count INT DEFAULT 0, fail_count INT DEFAULT 0, article_count INT DEFAULT 0, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### raw_articles(原始文章) ```sql CREATE TABLE raw_articles ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), feed_id UUID NOT NULL REFERENCES feeds(id) ON DELETE CASCADE, external_id VARCHAR(255), -- 源系统 ID(如 rssKeeper article id) title VARCHAR(1024), link VARCHAR(2048) NOT NULL, author VARCHAR(256), published_at TIMESTAMPTZ, fetched_at TIMESTAMPTZ DEFAULT NOW(), content TEXT, summary TEXT, raw_html TEXT, -- 原始 HTML,存对象存储或分表 content_hash VARCHAR(64), language VARCHAR(16), status VARCHAR(32) DEFAULT 'pending', -- pending / processed / failed created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ) PARTITION BY RANGE (fetched_at); ``` #### cleaned_articles(清洗后文章) ```sql CREATE TABLE cleaned_articles ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), raw_article_id UUID REFERENCES raw_articles(id), feed_id UUID NOT NULL REFERENCES feeds(id), title VARCHAR(1024), link VARCHAR(2048) NOT NULL, author VARCHAR(256), feed_title VARCHAR(512), feed_category VARCHAR(128), published_at TIMESTAMPTZ, fetched_at TIMESTAMPTZ, content TEXT, content_length INT DEFAULT 0, original_summary TEXT, ai_summary TEXT, category VARCHAR(128), tags JSONB DEFAULT '[]', heat_score FLOAT DEFAULT 0, importance_score FLOAT DEFAULT 0, duplication_score FLOAT DEFAULT 0, composite_score FLOAT DEFAULT 0, duplicate_group_id UUID, is_representative BOOLEAN DEFAULT TRUE, reference_links JSONB DEFAULT '[]', -- 重复文章引用链接 processing_status VARCHAR(32) DEFAULT 'pending', created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ) PARTITION BY RANGE (fetched_at); ``` #### article_references(文章引用关系) ```sql CREATE TABLE article_references ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), source_article_id UUID NOT NULL REFERENCES cleaned_articles(id), referenced_article_id UUID REFERENCES cleaned_articles(id), reference_type VARCHAR(64), -- duplicate_url / duplicate_title / duplicate_content reference_link VARCHAR(2048), reference_title VARCHAR(1024), similarity FLOAT, created_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### duplicate_groups(重复组) ```sql CREATE TABLE duplicate_groups ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), representative_article_id UUID REFERENCES cleaned_articles(id), member_article_ids JSONB DEFAULT '[]', similarity_matrix JSONB DEFAULT '{}', brief_date DATE, created_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### skills(技能库) ```sql CREATE TABLE skills ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(128) NOT NULL, slug VARCHAR(128) UNIQUE NOT NULL, description TEXT, type VARCHAR(32) NOT NULL, -- output / tool / agent version INT DEFAULT 1, is_default BOOLEAN DEFAULT FALSE, system_prompt TEXT NOT NULL, output_schema JSONB, tools JSONB DEFAULT '[]', input_schema JSONB, example_inputs JSONB DEFAULT '[]', created_by VARCHAR(64), created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### ai_provider_configs(AI 供应商配置) ```sql CREATE TABLE ai_provider_configs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(128) NOT NULL, provider VARCHAR(64) NOT NULL, -- openai / anthropic / gemini / local base_url VARCHAR(512), api_key_encrypted TEXT, default_model VARCHAR(128), timeout INT DEFAULT 60, max_retries INT DEFAULT 3, rate_limit_rpm INT DEFAULT 60, is_active BOOLEAN DEFAULT TRUE, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### ai_task_configs(AI 任务配置) ```sql CREATE TABLE ai_task_configs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), task_type VARCHAR(64) NOT NULL, -- summarize / classify / tag / score / daily_brief / chat / optimize name VARCHAR(128) NOT NULL, provider_config_id UUID REFERENCES ai_provider_configs(id), model VARCHAR(128) NOT NULL, skill_id UUID REFERENCES skills(id), temperature FLOAT DEFAULT 0.3, max_tokens INT, top_p FLOAT DEFAULT 1.0, system_prompt_override TEXT, fallback_config_id UUID REFERENCES ai_task_configs(id), enabled BOOLEAN DEFAULT TRUE, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### output_tasks(产出任务) ```sql CREATE TABLE output_tasks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name VARCHAR(128) NOT NULL, task_type VARCHAR(64) DEFAULT 'daily_brief', skill_id UUID NOT NULL REFERENCES skills(id), schedule VARCHAR(128), -- cron 表达式 filter_config JSONB DEFAULT '{}', output_config JSONB DEFAULT '{}', is_active BOOLEAN DEFAULT TRUE, last_run_at TIMESTAMPTZ, last_output_id UUID, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### outputs(产出记录) ```sql CREATE TABLE outputs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), output_task_id UUID REFERENCES output_tasks(id), content TEXT, content_html TEXT, references JSONB DEFAULT '[]', metadata JSONB DEFAULT '{}', created_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### chat_sessions(聊天会话) ```sql CREATE TABLE chat_sessions ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id UUID REFERENCES users(id), title VARCHAR(256), skill_id UUID REFERENCES skills(id), context_window INT DEFAULT 10, created_at TIMESTAMPTZ DEFAULT NOW(), updated_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### chat_messages(聊天消息) ```sql CREATE TABLE chat_messages ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), session_id UUID NOT NULL REFERENCES chat_sessions(id) ON DELETE CASCADE, role VARCHAR(32) NOT NULL, -- user / assistant / tool content TEXT, tool_calls JSONB DEFAULT '[]', tool_results JSONB DEFAULT '[]', references JSONB DEFAULT '[]', token_usage JSONB, created_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### optimization_logs(自优化日志) ```sql CREATE TABLE optimization_logs ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), optimization_type VARCHAR(64), -- prompt / skill / dedup_algorithm target_id UUID, target_name VARCHAR(128), previous_version INT, new_version INT, previous_content TEXT, new_content TEXT, evaluation_reason TEXT, is_applied BOOLEAN DEFAULT FALSE, applied_at TIMESTAMPTZ, rollback_to_version INT, created_at TIMESTAMPTZ DEFAULT NOW() ); ``` #### locks(分布式锁记录) ```sql CREATE TABLE locks ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), lock_name VARCHAR(128) UNIQUE NOT NULL, owner_id UUID, acquired_at TIMESTAMPTZ DEFAULT NOW(), expires_at TIMESTAMPTZ, created_at TIMESTAMPTZ DEFAULT NOW() ); ``` ### 5.3 分区策略 **按时间分区表:** - `raw_articles` 按 `fetched_at` 月分区 - `cleaned_articles` 按 `fetched_at` 月分区 - `chat_messages` 按 `created_at` 月分区 **索引规划:** ```sql -- 文章核心查询索引 CREATE INDEX idx_cleaned_articles_fetched_at ON cleaned_articles(fetched_at DESC); CREATE INDEX idx_cleaned_articles_category_score ON cleaned_articles(category, composite_score DESC); CREATE INDEX idx_cleaned_articles_published_at ON cleaned_articles(published_at DESC); CREATE INDEX idx_cleaned_articles_link ON cleaned_articles(link); -- GIN 索引用于 JSONB 标签 CREATE INDEX idx_cleaned_articles_tags ON cleaned_articles USING GIN (tags); CREATE INDEX idx_cleaned_articles_reference_links ON cleaned_articles USING GIN (reference_links); -- 全文搜索索引(可切 OpenSearch) CREATE INDEX idx_cleaned_articles_fts ON cleaned_articles USING GIN (to_tsvector('chinese', COALESCE(title,'') || ' ' || COALESCE(ai_summary,'') || ' ' || COALESCE(content,''))); ``` --- ## 六、自优化系统设计 ### 6.1 优化范围 每日凌晨 2:00 自动执行: 1. **Prompt/Skill 自优化**:摘要、分类、Tag、打分、日报、聊天 Skills 2. **去重算法自优化**:阈值、特征、算法策略 ### 6.2 自优化标准 由 **AI 自己评审优化点**,无需人工标注。评审维度: | 优化对象 | 评审维度 | |----------|----------| | 摘要 Skill | 信息覆盖率、简洁性、是否保留关键数字/实体 | | 分类 Skill | 分类边界清晰度、与样本的一致性 | | Tag Skill | Tag 相关性、避免过度标签、Tag 粒度一致性 | | 打分 Skill | 评分标准是否稳定、高分文章是否确实重要 | | 日报 Skill | 结构清晰度、信息密度、引用完整性 | | 去重算法 | 召回率、精确率、运行效率 | ### 6.3 自优化流程 ``` 1. 收集近 N 天数据 - 最近执行的输入/输出样例 - 优化历史(避免重复尝试) - 当前 Prompt/Skill/算法 2. 优化器模型生成候选 - 对每个 Skill:分析弱点 → 生成候选 Prompt - 对去重算法:分析误判/漏判 → 生成候选算法 3. 优化器模型自评 - 候选 vs 当前版本在历史样例上的模拟表现 - 输出评审理由和评分 4. 自动发布 - Skill:保存为新版本,自动生效 - 去重算法:写入新版本文件,自动热加载 5. 留痕 - 写入 optimization_logs - 保留旧版本以便回滚 ``` ### 6.4 去重算法版本管理 **目录结构:** ``` platform/ ├── plugins/ │ └── deduplication/ │ ├── current.py # 当前生效 │ ├── current.py.bak.1 # 上一个版本 │ ├── current.py.bak.2 │ └── versions/ │ ├── dedup_v1_20260615_020000.py │ ├── dedup_v2_20260616_020000.py │ └── dedup_v3_20260617_020000.py │ └── metadata.json # 版本历史、回滚点 ``` **热加载机制:** - 使用 `watchdog` 监听 `current.py` 变更 - 变更时尝试 `importlib.reload` 或动态加载 - 加载失败时自动回滚到 `current.py.bak.1` - 通过 API `/api/admin/deduplication/rollback?version=X` 可手动回滚 **去重算法自优化输入:** ```python @dataclass class DedupOptimizationInput: current_algorithm_code: str current_metadata: dict sample_duplicate_groups: List[DuplicateGroup] sample_false_positives: List[Tuple[str, str]] # 误判样例 sample_false_negatives: List[Tuple[str, str]] # 漏判样例 performance_metrics: dict # 运行时间、内存 ``` --- ## 七、鉴权与锁机制 ### 7.1 鉴权 - **JWT Token**:用户登录后发放 access_token - **API Key**:供外部系统/脚本调用,与用户绑定 - **简单 RBAC**: - `admin`:管理用户、配置、Skills、任务 - `member`:使用聊天、查看文章、触发个人任务 - **接口保护**:写入/管理类接口需要 `admin`;读取类接口需要登录 ### 7.2 锁机制 | 锁类型 | 实现 | 用途 | |--------|------|------| | **任务级锁** | Redis 分布式锁 | 防止同一任务(如去重、日报生成)并发执行 | | **文章级锁** | Redis 分布式锁 + DB locks 表 | 防止多人同时编辑同一篇文章的元数据 | | **调度锁** | Celery `max_instances=1` + Redis | 防止定时任务与手动任务冲突 | **锁工具接口:** ```python class LockService: async def acquire(self, lock_name: str, ttl: int, owner_id: str) -> bool async def release(self, lock_name: str, owner_id: str) -> bool async def extend(self, lock_name: str, ttl: int, owner_id: str) -> bool ``` --- ## 八、Docker 部署架构 ### 8.1 服务组成 ```yaml services: web: # FastAPI 主服务 worker-default: # Celery 默认队列 Worker worker-ai: # Celery AI 任务队列 Worker worker-fetch: # Celery RSS 抓取队列 Worker beat: # Celery Beat 定时调度 redis: # 缓存 + 锁 + 消息队列 postgres: # 主数据库 + pgvector minio: # 对象存储(可选) ``` ### 8.2 目录挂载 ``` /data/ ├── postgres/ # PostgreSQL 数据 ├── redis/ # Redis 持久化 ├── minio/ # 对象存储 ├── logs/ # 应用日志 ├── plugins/ # 插件目录(去重算法等) └── backups/ # 自动备份 ``` ### 8.3 环境变量 ```env # 数据库 DATABASE_URL=postgresql+asyncpg://rss:rss@postgres:5432/rss_platform # Redis REDIS_URL=redis://redis:6379/0 # AI AI_OPTIMIZER_PROVIDER_ID=... AI_CHAT_PROVIDER_ID=... # 安全 SECRET_KEY=... ACCESS_TOKEN_EXPIRE_MINUTES=480 # 存储 STORAGE_TYPE=minio MINIO_ENDPOINT=minio:9000 MINIO_BUCKET=rss-platform # 调度 SELF_OPTIMIZE_CRON=0 2 * * * ``` --- ## 九、扩展性规划 ### 9.1 数据量增长路径 | 阶段 | 文章量 | 数据库 | 检索 | 去重 | |------|--------|--------|------|------| | 阶段一 | < 10万 | PostgreSQL 单库 + 分区表 | PG tsvector | 内存+插件 | | 阶段二 | 10万~50万 | PostgreSQL 分区表 | OpenSearch | 分桶+Embedding | | 阶段三 | 50万~100万 | 读写分离 / 按时间分库 | OpenSearch + 向量库 | LSH/MinHash | | 阶段四 | > 100万 | 分库分表 + 冷热分离 | 专用检索集群 | 近似最近邻 | ### 9.2 服务拆分预留 未来可按模块拆分为独立服务: - `feed-service`:RSS 抓取 - `ai-service`:AI 处理 - `search-service`:检索 - `chat-service`:聊天与产出 - `optimization-service`:自优化 --- ## 十、关键接口概览 ### 10.1 文章与检索 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/articles` | 文章列表(分页、过滤) | | GET | `/api/articles/{id}` | 文章详情 | | POST | `/api/articles/{id}/lock` | 获取文章编辑锁 | | DELETE | `/api/articles/{id}/lock` | 释放文章编辑锁 | | GET | `/api/search` | 混合检索 | | POST | `/api/search/semantic` | 语义检索 | ### 10.2 Skills 与任务 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/skills` | 列出 Skills | | GET | `/api/skills/{id}` | Skill 详情 | | POST | `/api/skills` | 创建 Skill | | PUT | `/api/skills/{id}` | 更新 Skill | | POST | `/api/skills/{id}/restore-default` | 恢复默认 | | POST | `/api/skills/import` | 导入 Skill | | GET | `/api/output-tasks` | 产出任务列表 | | POST | `/api/output-tasks` | 创建产出任务 | | POST | `/api/output-tasks/{id}/run` | 手动运行 | ### 10.3 AI 配置 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/ai/providers` | AI 供应商配置 | | POST | `/api/ai/providers` | 新增供应商 | | POST | `/api/ai/providers/{id}/clone` | 复制配置 | | GET | `/api/ai/task-configs` | AI 任务配置 | | POST | `/api/ai/task-configs` | 新增任务配置 | | POST | `/api/ai/task-configs/{id}/clone` | 复制任务配置 | ### 10.4 聊天 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/chat/sessions` | 会话列表 | | POST | `/api/chat/sessions` | 创建会话 | | POST | `/api/chat/sessions/{id}/messages` | 发送消息 | | GET | `/api/chat/sessions/{id}/messages` | 获取历史 | ### 10.5 自优化与插件 | 方法 | 路径 | 说明 | |------|------|------| | GET | `/api/admin/optimization-logs` | 自优化日志 | | POST | `/api/admin/optimization/run` | 手动触发自优化 | | GET | `/api/admin/deduplication/versions` | 去重算法版本 | | POST | `/api/admin/deduplication/rollback` | 回滚算法 | | POST | `/api/admin/deduplication/reload` | 热加载当前算法 | --- ## 十一、风险与应对 | 风险 | 影响 | 应对 | |------|------|------| | AI 自优化产生劣质 Prompt | 中 | 优化器自评 + 版本化 + 自动回滚 | | 去重算法热加载失败 | 高 | 失败自动回滚上一个 `.bak` | | 数据量激增导致查询慢 | 中 | 分区表 + 异步迁移路径 | | AI 调用成本高 | 中 | 多供应商路由 + Fallback + 缓存 | | 并发任务冲突 | 高 | Redis 分布式锁 + 任务级互斥 | | 团队成员误操作 | 中 | 简单 RBAC + 文章级锁 | | SQLite 迁移到 PostgreSQL | 中 | 一次性迁移脚本 + 数据校验 | --- ## 十二、与现有仓库的关系 新平台不是完全重写,而是: 1. **继承**:复用 rssKeeper 的抓取逻辑和 Feed 管理,dataClean 的 AI 调用和任务进度机制 2. **升级**:数据库从 SQLite 升级到 PostgreSQL,引入分区表和向量扩展 3. **重构**:将分类/标签/打分从规则驱动改为 AI + Skill 驱动 4. **新增**:聊天产出、Skill 系统、自优化、多 AI 配置、团队鉴权 5. **合并**:最终用一个统一平台替代两个分离项目 --- ## 十三、文档清单 - [x] 本设计文档 - [ ] 开发步骤文档(见 `rss-platform-dev-plan.md`) - [ ] API 接口详细文档 - [ ] Skill 开发指南 - [ ] 自优化机制说明 - [ ] Docker 部署手册 - [ ] 数据迁移手册