Files

1121 lines
38 KiB
Markdown
Raw Permalink Normal View History

# RSS 信息处理平台:完整设计文档
> 版本:v1.0
> 日期:2026-06-15
> 基于仓库:dataClean (778ccfb) + rssKeeper (4286731)
---
## 一、项目背景与目标
### 1.1 背景
现有两个项目:
- **rssKeeper**RSS 抓取与原始文章管理服务,负责 RSS 源管理、定时抓取、文章存储、全文检索(FTS5)、健康度监控。
- **dataClean**rssKeeper 的下游清洗服务,负责 AI 摘要、分类/标签/打分、URL+内容去重、每日简报生成。
两者已通过 `/api/v1/external/*` 接口松耦合,dataClean 只读调用 rssKeeper。当前部署为单容器 + SQLite,适合小数据量和个人使用。
### 1.2 新平台目标
构建一个 **模块化、工业化、AI 驱动** 的 RSS 信息处理平台,统一承接 rssKeeper + dataClean 的能力,并扩展:
- 聊天式 AI 知识产出(带引用链接)
- Skill 化产出编排(日报、聊天、自定义任务)
- AI 自优化 Prompt / 去重算法
- 多供应商/多模型 AI 配置
- 团队级简单鉴权与锁机制
- 面向 30万~100万 文章量的可扩展架构
### 1.3 设计原则
| 原则 | 说明 |
|------|------|
| **模块化** | 抓取、清洗、AI 处理、检索、聊天、自优化、Skills 均为独立模块,接口契约清晰 |
| **可插拔** | 去重算法以外挂文件形式存在,Skills 可导入/覆盖/恢复默认 |
| **AI 原生** | 分类、Tag、摘要、打分、日报、聊天、自优化均由 AI 驱动,规则仅作兜底 |
| **配置即代码** | Prompt、AI 配置、Skills、去重算法均版本化管理,变更留痕 |
| **工业化** | Docker 部署、日志/监控、任务锁、失败重试、数据分区预留 |
| **渐进扩展** | 初期单库单服务,预留分库分表、独立检索、向量库扩展路径 |
---
## 二、现状分析
### 2.1 rssKeeper 现状
**优势:**
- FastAPI + SQLAlchemy + APScheduler 技术栈成熟
- RSS 源管理完整:增删改查、OPML 导入导出、自动发现
- 文章抓取并发(ThreadPoolExecutor
- 健康度监控与抓取日志
- SQLite FTS5 全文搜索
- Docker 部署就绪
**问题:**
- 无鉴权/限流,CORS 在部分版本宽松
- 时区处理不一致(`datetime.utcnow()` 混用)
- 添加源时同步抓取会阻塞 HTTP
- 导入 OPML 接口 body/query 不一致(已知 bug
- 无测试、无 CI
- SQLite 单库,百万级文章需迁移
### 2.2 dataClean 现状
**优势:**
- 模块划分清晰:`summarizer``tagger``scorer``deduplicator``brief``taxonomy`
- AI 调用封装 `AIClient` 支持运行时配置覆盖
- 配置双层管理:环境变量 + 数据库
- 任务进度可视化
- 去重算法已考虑 URL + 标题 + 内容相似度
- Docker 部署就绪
**问题:**
- 去重算法 O(n²) + SequenceMatcher,数据量大时性能差
- 去重历史数据有破坏风险(近期版本已修复为按日期清空)
- 分类/标签基于规则,未充分发挥 AI
- 仅支持单一 LLM 配置
- 无 Skill 概念,日报格式固定
- 无自优化机制
- 无聊天/产出能力
- 无引用链接的 AI 产出
### 2.3 可复用资产
| 资产 | 来源 | 复用方式 |
|------|------|----------|
| RSS 抓取逻辑 | rssKeeper `rss_fetcher.py` | 迁移至 `feeds/fetcher.py`,解耦数据库操作 |
| Feed 模型与健康度 | rssKeeper `models.py` | 复用并扩展字段 |
| FTS5 搜索 | rssKeeper `fulltext_search.py` | 作为检索 V1,后续抽象接口 |
| 外部 API 设计 | rssKeeper `external_api.py` | 演变为平台内部检索/文章接口 |
| AI 客户端 | dataClean `app/ai_client.py` | 扩展为多 Provider 路由 |
| 去重算法 | dataClean `app/deduplicator.py` | 改造为插件接口,保留默认实现 |
| 任务进度 | dataClean `app/task_progress.py` | 复用并扩展为任务运行时 |
| 配置管理 | dataClean `app/settings_manager.py` | 复用并扩展为配置中心 |
| 简报生成 | dataClean `app/brief.py` | 改造为 Skill 驱动 |
| Taxonomy | dataClean `app/taxonomy.py` | 演变为 Skill + AI 分类体系 |
---
## 三、总体架构
### 3.1 服务边界
采用 **单体模块化架构(Modular Monolith**,而非微服务。原因:
- 当前团队规模小,单体降低运维复杂度
- 模块间接口清晰,未来可拆分为独立服务
- 350 源 / 日增 1000+ 的规模单体完全可承受
```
┌─────────────────────────────────────────────────────────────────────┐
│ 前端应用层 (Web UI) │
│ RSS 源管理 │ 文章库 │ 检索 │ 日报任务 │ AI 聊天 │ Skills 管理 │ 配置 │
└────────────────────────────────┬────────────────────────────────────┘
┌────────────────────────────────▼────────────────────────────────────┐
│ API Gateway / FastAPI │
│ JWT 鉴权 │ 限流 │ 路由 │ 锁服务 │ Skills 注册中心 │
└────────────────────────────────┬────────────────────────────────────┘
┌────────────────────────────────▼────────────────────────────────────┐
│ 核心业务模块 │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────┐ ┌─────────┐ │
│ │ 抓取调度模块 │ │ 清洗流水线 │ │ AI 处理中心 │ │ 检索服务 │ │
│ │ Feeds │ │ Pipeline │ │ Processor │ │ Search │ │
│ └──────────────┘ │ - 去重插件 │ │ - 分类 │ └─────────┘ │
│ │ - 数据校验 │ │ - Tag │ │
│ └──────┬───────┘ │ - 摘要 │ │
│ │ │ - 打分 │ │
│ ┌──────▼──────────┼──┴───────────┤ │
│ │ 任务调度器 │ │ │
│ │ Celery + Redis │ │ │
│ └─────────────────┘ │ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 自优化调度器 │ │
│ │ 每日 02:00Prompt/Skills 自优化 + 去重算法自优化 + 留痕 │ │
│ └──────────────────────────────────────────────────────────────┘ │
│ ┌──────────────────────────────────────────────────────────────┐ │
│ │ 聊天与产出引擎 │ │
│ │ 对话上下文 + Skills 调用 + 检索工具 + 引用链接渲染 │ │
│ └──────────────────────────────────────────────────────────────┘ │
└────────────────────────────────┬────────────────────────────────────┘
┌────────────────────────────────▼────────────────────────────────────┐
│ 数据与存储层 │
│ PostgreSQL 16 + pgvector │ Redis │ 对象存储 (MinIO) │ 日志/监控 │
└─────────────────────────────────────────────────────────────────────┘
```
### 3.2 技术栈
| 层级 | 选型 | 理由 |
|------|------|------|
| 后端框架 | Python 3.12 + FastAPI | AI 生态最全,异步性能好,Pydantic 适合做配置/schema |
| 任务队列 | Celery + Redis | 成熟稳定,支持定时任务、任务锁、进度追踪 |
| 数据库 | PostgreSQL 16 + pgvector | 关系数据 + 向量检索一体,支持分区表 |
| 全文检索 | PostgreSQL tsvector (初期) → OpenSearch (扩展) | 初期降低复杂度,抽象检索接口便于切换 |
| 对象存储 | MinIO / 云 OSS | 存原始 HTML、图片、导出产物 |
| 前端 | Vue 3 + Element Plus + TypeScript | 团队后台管理 + 聊天界面 |
| AI 调用 | LiteLLM + 自封装 Provider 路由 | 统一接口支持多供应商、多模型、Fallback |
| 缓存/锁 | Redis | 分布式锁、任务状态、会话缓存 |
| 监控 | Prometheus + Grafana + Sentry | 工业化可观测性 |
| 部署 | Docker + Docker Compose | 用户指定 Docker 化 |
---
## 四、模块详细设计
### 4.1 抓取调度模块(Feeds
**职责:** 管理 RSS 源、定时抓取、解析、原始文章入库。
**设计要点:**
- 每个 Feed 独立配置:URL、抓取频率、优先级、解析规则、代理策略、启用状态
- 调度器从 APScheduler 迁移到 Celery Beat,支持任务持久化和水平扩展
- 抓取 Worker 与主服务分离,未来可独立扩容
- 原始文章内容存入对象存储,元数据入 `raw_articles`
- 失败重试 + 死信队列 + 健康度统计
**核心模型:**
```python
class Feed:
id: int
url: str
title: str
description: str
category: str # 源预设分类
is_active: bool
fetch_interval_minutes: int
priority: int # 抓取优先级
parser_config: dict # 自定义解析规则
proxy_policy: str # auto / direct / proxy
last_fetch_at: datetime
last_fetch_status: str
last_error: str
error_type: str
success_count: int
fail_count: int
article_count: int
health_status: str
```
**抓取流程:**
```
Celery Beat 触发 → 按优先级/间隔筛选 Feed → 分发 fetch_feed_task
→ 下载 RSS → feedparser 解析 → clean_html → 生成 summary
→ 按 URL 去重 → 批量写入 raw_articles → 更新 Feed 统计
```
### 4.2 清洗流水线(Pipeline
**职责:** 对原始文章进行标准化、去重、校验,产出清洗后的文章。
**流水线阶段:**
```
raw_articles
→ normalize(URL 规范化、编码统一、时间标准化)
→ extract(正文提取,若 RSS 内容不全则 fetch 原文页)
→ dedup_exactURL/标题精确去重)
→ dedup_similar(调用外挂算法,相似度去重)
→ enrich(补充语言、字数、内容 hash 等元数据)
→ cleaned_articles
```
**去重插件接口(核心):**
```python
# plugins/deduplication/current.py
from dataclasses import dataclass
from typing import List, Dict, Set
@dataclass
class DedupInput:
article_id: str
title: str
link: str
content: str
content_length: int
published_at: str
feed_id: str
@dataclass
class DuplicateGroup:
representative_id: str # 保留的主文章 ID
member_ids: List[str] # 重复文章 IDs
reason: str # 去重原因:url_exact / title_exact / content_similar
similarity_scores: Dict[str, float]
class DeduplicationPlugin:
name: str = "default_tfidf"
version: str = "1.0.0"
def find_duplicates(self, articles: List[DedupInput]) -> List[DuplicateGroup]:
"""
输入:待去重文章列表
输出:重复组列表,每组指定 representative 和 members
规则:
1. URL 完全相同 → 直接归为一组
2. 标题完全相同 → 归为一组
3. 内容相似度 >= threshold → 归为一组,保留 content_length 最大的
"""
...
```
**引用关系:**
- 主文章保留完整内容,生成 `cleaned_article` 记录
- 重复文章不丢弃,而是在 `article_references` 表中记录:
- `source_article_id`:主文章
- `referenced_article_id`:重复文章
- `reference_type``duplicate_url` / `duplicate_title` / `duplicate_content`
- `link`:重复文章原始链接
- `similarity`:相似度分数
- 主文章的 `reference_links` JSON 字段聚合所有引用链接,便于前端/AI 使用
### 4.3 AI 处理中心(AI Processor
**职责:** 管理所有 AI 任务,支持多 Provider/多模型/多配置。
**AI 任务类型:**
| 任务 | 输入 | 输出 | 默认模型 |
|------|------|------|----------|
| `summarize` | 标题+正文 | AI 摘要 | 轻量模型 |
| `classify` | 标题+摘要+正文 | category + 置信度 | 推理模型 |
| `tag` | 标题+摘要+正文 | tags[] | 轻量模型 |
| `score` | 标题+摘要+正文+元数据 | heat/importance/composite | 推理模型 |
| `daily_brief` | 当日高分文章 + Skill | Markdown/JSON 日报 | 强模型 |
| `chat` | 对话上下文 + 检索结果 + Skill | 带引用链接的回答 | 强模型 |
| `optimize` | 当前 Prompt/算法 + 历史样例 | 优化建议/新 Prompt | 强模型 |
**AI 配置模型:**
```python
class AIProviderConfig:
id: str
name: str
provider: str # openai / anthropic / gemini / local
base_url: str
api_key: str # 加密存储
default_model: str
timeout: int
max_retries: int
rate_limit_rpm: int
is_active: bool
class AITaskConfig:
id: str
task_type: str # summarize / classify / tag / score / daily_brief / chat / optimize
name: str
provider_config_id: str
model: str
skill_id: str # 绑定的 Skill
temperature: float
max_tokens: int
top_p: float
system_prompt_override: str | None
enabled: bool
```
**多供应商路由:**
- 使用 LiteLLM 统一封装,或自实现 Provider 适配器
- 每个任务独立配置,支持复制配置(Clone)
- 支持 Fallback:主模型失败时自动切换到备用模型
### 4.4 Skills 系统
**定义:** Skill = 指导 AI 产出的结构化配置,包括:
- 系统提示词(system prompt
- 输出模板(output template/schema
- 可用工具集(tools
- 输入参数 schema
- 版本与默认值标记
**Skill 类型:**
| 类型 | 用途 | 示例 |
|------|------|------|
| `output` | 产出型 Skill,直接生成内容 | 日报、行业观察、摘要 |
| `tool` | 工具型 Skill,AI 可调用的能力 | 搜索文章、获取文章详情、调用外部 API |
| `agent` | 复合型 Skill,组合多个工具 | 先搜索再总结再生成报告 |
**Skill 模型:**
```python
class Skill:
id: str
name: str
slug: str
description: str
type: str # output / tool / agent
version: int
is_default: bool # 是否系统默认,不可删除
system_prompt: str
output_schema: dict # JSON Schema
tools: List[str] # 可调用的 tool_id 列表
input_schema: dict # 输入参数 JSON Schema
example_inputs: List[dict]
created_by: str
created_at: datetime
updated_at: datetime
```
**Skill 管理:**
- 内置默认 Skills(不可删除,可恢复默认)
- 用户可修改、新增、导入、导出
- 导入时支持覆盖(overwrite)或新建版本
- 修改后自动生效(无重启)
**默认内置 Skills**
| Skill | 用途 |
|-------|------|
| `daily-brief-default` | 默认日报生成 |
| `daily-tech-watch` | 科技观察日报 |
| `chat-researcher` | 研究型聊天助手 |
| `chat-summarizer` | 摘要型聊天助手 |
| `article-classifier` | 文章分类 |
| `article-tagger` | 文章打标签 |
| `article-scorer` | 文章打分 |
| `article-summarizer` | 文章摘要 |
| `search-articles` | 检索文章工具 |
| `fetch-external-api` | 调用外部 API 工具 |
### 4.5 日报任务系统
**核心概念:** 日报 = 任务 + Skill + 数据筛选 + 定时/手动触发
```python
class OutputTask:
id: str
name: str
task_type: str # daily_brief / chat / custom
skill_id: str # 调用哪个 Skill
schedule: str # cron 表达式,为空则仅手动
filter_config: dict # 数据筛选条件
output_config: dict # 输出方式:web / email / file / webhook
is_active: bool
last_run_at: datetime
last_output_id: str
```
**数据筛选配置示例:**
```json
{
"time_range": "last_24h",
"categories": ["科技", "AI"],
"min_composite_score": 70,
"tags": ["大模型"],
"exclude_duplicates": true,
"top_n": 50
}
```
**新增日报流程:**
1. 用户在 UI 创建 OutputTask
2. 选择 Skill(可复用已有或新建)
3. 配置筛选条件
4. 配置定时/手动触发
5. 系统按 cron 自动执行,或用户手动触发
### 4.6 聊天与产出引擎
**聊天模型:**
```python
class ChatSession:
id: str
user_id: str
title: str
skill_id: str # 当前会话使用的默认 Skill
context_messages: List[dict]
created_at: datetime
updated_at: datetime
class ChatMessage:
id: str
session_id: str
role: str # user / assistant / tool
content: str
tool_calls: List[dict] # AI 调用的工具
tool_results: List[dict] # 工具返回结果
references: List[dict] # 引用文章链接
created_at: datetime
```
**聊天流程:**
```
用户输入
→ 意图识别(由 Skill 系统提示词决定)
→ 如需检索:调用 search-articles tool
→ 如需外部数据:调用 fetch-external-api tool
→ 组装上下文(含检索结果/工具返回)
→ 调用 LLM 生成回答
→ 解析引用标记,映射为文章链接
→ 返回带引用链接的 Markdown
```
**引用链接规范:**
- AI 输出中使用 `[^1^]`, `[^2^]` 等标记
- 后端解析标记,从 `references` 中生成真实链接
- 前端渲染为可点击脚注/卡片
示例输出:
```markdown
今日 AI 领域最重要的消息是 OpenAI 发布了新模型 [^1^]
同时 Google 也公布了相关进展 [^2^]。
[^1^]: [OpenAI 官方公告](https://openai.com/...)
[^2^]: [Google Blog](https://blog.google/...)
```
### 4.7 检索服务
**检索维度:**
| 类型 | 实现 | 说明 |
|------|------|------|
| 全文检索 | PostgreSQL tsvector / OpenSearch | 标题、正文、摘要 |
| 语义检索 | pgvector / 独立向量库 | 基于 Embedding 的相似文章 |
| 元数据过滤 | SQL | 分类、Tag、分数、时间、Feed、来源 |
| 混合检索 | 全文 + 语义 + 元数据 | 未来扩展 |
**检索接口抽象:**
```python
class SearchEngine(ABC):
@abstractmethod
def search(self, query: SearchQuery) -> SearchResult:
...
class PostgresSearchEngine(SearchEngine): ...
class OpenSearchEngine(SearchEngine): ...
```
**SearchQuery 模型:**
```python
class SearchQuery:
q: str | None
semantic_q: str | None
category: str | None
tags: List[str]
feed_id: str | None
min_score: float | None
since: datetime
until: datetime
sort_by: str = "published_at_desc"
limit: int = 50
offset: int = 0
```
---
## 五、数据模型设计
### 5.1 核心实体关系
```
users
└── chat_sessions
└── chat_messages
feeds
└── raw_articles
└── cleaned_articles
├── article_references (duplicate links)
├── article_embeddings
└── article_versions
skills
└── skill_versions
ai_task_configs
└── ai_provider_configs
output_tasks
└── outputs
optimization_logs
```
### 5.2 表结构
#### users(用户)
```sql
CREATE TABLE users (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
username VARCHAR(64) UNIQUE NOT NULL,
password_hash VARCHAR(255) NOT NULL,
role VARCHAR(32) DEFAULT 'member', -- admin / member
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### feedsRSS 源)
```sql
CREATE TABLE feeds (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
url VARCHAR(2048) UNIQUE NOT NULL,
title VARCHAR(512),
description TEXT,
category VARCHAR(128),
is_active BOOLEAN DEFAULT TRUE,
fetch_interval_minutes INT DEFAULT 60,
priority INT DEFAULT 5,
parser_config JSONB DEFAULT '{}',
proxy_policy VARCHAR(32) DEFAULT 'auto',
last_fetch_at TIMESTAMPTZ,
last_fetch_status VARCHAR(32),
last_error TEXT,
error_type VARCHAR(32),
success_count INT DEFAULT 0,
fail_count INT DEFAULT 0,
article_count INT DEFAULT 0,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### raw_articles(原始文章)
```sql
CREATE TABLE raw_articles (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
feed_id UUID NOT NULL REFERENCES feeds(id) ON DELETE CASCADE,
external_id VARCHAR(255), -- 源系统 ID(如 rssKeeper article id
title VARCHAR(1024),
link VARCHAR(2048) NOT NULL,
author VARCHAR(256),
published_at TIMESTAMPTZ,
fetched_at TIMESTAMPTZ DEFAULT NOW(),
content TEXT,
summary TEXT,
raw_html TEXT, -- 原始 HTML,存对象存储或分表
content_hash VARCHAR(64),
language VARCHAR(16),
status VARCHAR(32) DEFAULT 'pending', -- pending / processed / failed
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (fetched_at);
```
#### cleaned_articles(清洗后文章)
```sql
CREATE TABLE cleaned_articles (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
raw_article_id UUID REFERENCES raw_articles(id),
feed_id UUID NOT NULL REFERENCES feeds(id),
title VARCHAR(1024),
link VARCHAR(2048) NOT NULL,
author VARCHAR(256),
feed_title VARCHAR(512),
feed_category VARCHAR(128),
published_at TIMESTAMPTZ,
fetched_at TIMESTAMPTZ,
content TEXT,
content_length INT DEFAULT 0,
original_summary TEXT,
ai_summary TEXT,
category VARCHAR(128),
tags JSONB DEFAULT '[]',
heat_score FLOAT DEFAULT 0,
importance_score FLOAT DEFAULT 0,
duplication_score FLOAT DEFAULT 0,
composite_score FLOAT DEFAULT 0,
duplicate_group_id UUID,
is_representative BOOLEAN DEFAULT TRUE,
reference_links JSONB DEFAULT '[]', -- 重复文章引用链接
processing_status VARCHAR(32) DEFAULT 'pending',
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
) PARTITION BY RANGE (fetched_at);
```
#### article_references(文章引用关系)
```sql
CREATE TABLE article_references (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
source_article_id UUID NOT NULL REFERENCES cleaned_articles(id),
referenced_article_id UUID REFERENCES cleaned_articles(id),
reference_type VARCHAR(64), -- duplicate_url / duplicate_title / duplicate_content
reference_link VARCHAR(2048),
reference_title VARCHAR(1024),
similarity FLOAT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### duplicate_groups(重复组)
```sql
CREATE TABLE duplicate_groups (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
representative_article_id UUID REFERENCES cleaned_articles(id),
member_article_ids JSONB DEFAULT '[]',
similarity_matrix JSONB DEFAULT '{}',
brief_date DATE,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### skills(技能库)
```sql
CREATE TABLE skills (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(128) NOT NULL,
slug VARCHAR(128) UNIQUE NOT NULL,
description TEXT,
type VARCHAR(32) NOT NULL, -- output / tool / agent
version INT DEFAULT 1,
is_default BOOLEAN DEFAULT FALSE,
system_prompt TEXT NOT NULL,
output_schema JSONB,
tools JSONB DEFAULT '[]',
input_schema JSONB,
example_inputs JSONB DEFAULT '[]',
created_by VARCHAR(64),
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### ai_provider_configsAI 供应商配置)
```sql
CREATE TABLE ai_provider_configs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(128) NOT NULL,
provider VARCHAR(64) NOT NULL, -- openai / anthropic / gemini / local
base_url VARCHAR(512),
api_key_encrypted TEXT,
default_model VARCHAR(128),
timeout INT DEFAULT 60,
max_retries INT DEFAULT 3,
rate_limit_rpm INT DEFAULT 60,
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### ai_task_configsAI 任务配置)
```sql
CREATE TABLE ai_task_configs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
task_type VARCHAR(64) NOT NULL, -- summarize / classify / tag / score / daily_brief / chat / optimize
name VARCHAR(128) NOT NULL,
provider_config_id UUID REFERENCES ai_provider_configs(id),
model VARCHAR(128) NOT NULL,
skill_id UUID REFERENCES skills(id),
temperature FLOAT DEFAULT 0.3,
max_tokens INT,
top_p FLOAT DEFAULT 1.0,
system_prompt_override TEXT,
fallback_config_id UUID REFERENCES ai_task_configs(id),
enabled BOOLEAN DEFAULT TRUE,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### output_tasks(产出任务)
```sql
CREATE TABLE output_tasks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
name VARCHAR(128) NOT NULL,
task_type VARCHAR(64) DEFAULT 'daily_brief',
skill_id UUID NOT NULL REFERENCES skills(id),
schedule VARCHAR(128), -- cron 表达式
filter_config JSONB DEFAULT '{}',
output_config JSONB DEFAULT '{}',
is_active BOOLEAN DEFAULT TRUE,
last_run_at TIMESTAMPTZ,
last_output_id UUID,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### outputs(产出记录)
```sql
CREATE TABLE outputs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
output_task_id UUID REFERENCES output_tasks(id),
content TEXT,
content_html TEXT,
references JSONB DEFAULT '[]',
metadata JSONB DEFAULT '{}',
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### chat_sessions(聊天会话)
```sql
CREATE TABLE chat_sessions (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id UUID REFERENCES users(id),
title VARCHAR(256),
skill_id UUID REFERENCES skills(id),
context_window INT DEFAULT 10,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### chat_messages(聊天消息)
```sql
CREATE TABLE chat_messages (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
session_id UUID NOT NULL REFERENCES chat_sessions(id) ON DELETE CASCADE,
role VARCHAR(32) NOT NULL, -- user / assistant / tool
content TEXT,
tool_calls JSONB DEFAULT '[]',
tool_results JSONB DEFAULT '[]',
references JSONB DEFAULT '[]',
token_usage JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### optimization_logs(自优化日志)
```sql
CREATE TABLE optimization_logs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
optimization_type VARCHAR(64), -- prompt / skill / dedup_algorithm
target_id UUID,
target_name VARCHAR(128),
previous_version INT,
new_version INT,
previous_content TEXT,
new_content TEXT,
evaluation_reason TEXT,
is_applied BOOLEAN DEFAULT FALSE,
applied_at TIMESTAMPTZ,
rollback_to_version INT,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
#### locks(分布式锁记录)
```sql
CREATE TABLE locks (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
lock_name VARCHAR(128) UNIQUE NOT NULL,
owner_id UUID,
acquired_at TIMESTAMPTZ DEFAULT NOW(),
expires_at TIMESTAMPTZ,
created_at TIMESTAMPTZ DEFAULT NOW()
);
```
### 5.3 分区策略
**按时间分区表:**
- `raw_articles``fetched_at` 月分区
- `cleaned_articles``fetched_at` 月分区
- `chat_messages``created_at` 月分区
**索引规划:**
```sql
-- 文章核心查询索引
CREATE INDEX idx_cleaned_articles_fetched_at ON cleaned_articles(fetched_at DESC);
CREATE INDEX idx_cleaned_articles_category_score ON cleaned_articles(category, composite_score DESC);
CREATE INDEX idx_cleaned_articles_published_at ON cleaned_articles(published_at DESC);
CREATE INDEX idx_cleaned_articles_link ON cleaned_articles(link);
-- GIN 索引用于 JSONB 标签
CREATE INDEX idx_cleaned_articles_tags ON cleaned_articles USING GIN (tags);
CREATE INDEX idx_cleaned_articles_reference_links ON cleaned_articles USING GIN (reference_links);
-- 全文搜索索引(可切 OpenSearch
CREATE INDEX idx_cleaned_articles_fts ON cleaned_articles
USING GIN (to_tsvector('chinese', COALESCE(title,'') || ' ' || COALESCE(ai_summary,'') || ' ' || COALESCE(content,'')));
```
---
## 六、自优化系统设计
### 6.1 优化范围
每日凌晨 2:00 自动执行:
1. **Prompt/Skill 自优化**:摘要、分类、Tag、打分、日报、聊天 Skills
2. **去重算法自优化**:阈值、特征、算法策略
### 6.2 自优化标准
**AI 自己评审优化点**,无需人工标注。评审维度:
| 优化对象 | 评审维度 |
|----------|----------|
| 摘要 Skill | 信息覆盖率、简洁性、是否保留关键数字/实体 |
| 分类 Skill | 分类边界清晰度、与样本的一致性 |
| Tag Skill | Tag 相关性、避免过度标签、Tag 粒度一致性 |
| 打分 Skill | 评分标准是否稳定、高分文章是否确实重要 |
| 日报 Skill | 结构清晰度、信息密度、引用完整性 |
| 去重算法 | 召回率、精确率、运行效率 |
### 6.3 自优化流程
```
1. 收集近 N 天数据
- 最近执行的输入/输出样例
- 优化历史(避免重复尝试)
- 当前 Prompt/Skill/算法
2. 优化器模型生成候选
- 对每个 Skill:分析弱点 → 生成候选 Prompt
- 对去重算法:分析误判/漏判 → 生成候选算法
3. 优化器模型自评
- 候选 vs 当前版本在历史样例上的模拟表现
- 输出评审理由和评分
4. 自动发布
- Skill:保存为新版本,自动生效
- 去重算法:写入新版本文件,自动热加载
5. 留痕
- 写入 optimization_logs
- 保留旧版本以便回滚
```
### 6.4 去重算法版本管理
**目录结构:**
```
platform/
├── plugins/
│ └── deduplication/
│ ├── current.py # 当前生效
│ ├── current.py.bak.1 # 上一个版本
│ ├── current.py.bak.2
│ └── versions/
│ ├── dedup_v1_20260615_020000.py
│ ├── dedup_v2_20260616_020000.py
│ └── dedup_v3_20260617_020000.py
│ └── metadata.json # 版本历史、回滚点
```
**热加载机制:**
- 使用 `watchdog` 监听 `current.py` 变更
- 变更时尝试 `importlib.reload` 或动态加载
- 加载失败时自动回滚到 `current.py.bak.1`
- 通过 API `/api/admin/deduplication/rollback?version=X` 可手动回滚
**去重算法自优化输入:**
```python
@dataclass
class DedupOptimizationInput:
current_algorithm_code: str
current_metadata: dict
sample_duplicate_groups: List[DuplicateGroup]
sample_false_positives: List[Tuple[str, str]] # 误判样例
sample_false_negatives: List[Tuple[str, str]] # 漏判样例
performance_metrics: dict # 运行时间、内存
```
---
## 七、鉴权与锁机制
### 7.1 鉴权
- **JWT Token**:用户登录后发放 access_token
- **API Key**:供外部系统/脚本调用,与用户绑定
- **简单 RBAC**
- `admin`:管理用户、配置、Skills、任务
- `member`:使用聊天、查看文章、触发个人任务
- **接口保护**:写入/管理类接口需要 `admin`;读取类接口需要登录
### 7.2 锁机制
| 锁类型 | 实现 | 用途 |
|--------|------|------|
| **任务级锁** | Redis 分布式锁 | 防止同一任务(如去重、日报生成)并发执行 |
| **文章级锁** | Redis 分布式锁 + DB locks 表 | 防止多人同时编辑同一篇文章的元数据 |
| **调度锁** | Celery `max_instances=1` + Redis | 防止定时任务与手动任务冲突 |
**锁工具接口:**
```python
class LockService:
async def acquire(self, lock_name: str, ttl: int, owner_id: str) -> bool
async def release(self, lock_name: str, owner_id: str) -> bool
async def extend(self, lock_name: str, ttl: int, owner_id: str) -> bool
```
---
## 八、Docker 部署架构
### 8.1 服务组成
```yaml
services:
web: # FastAPI 主服务
worker-default: # Celery 默认队列 Worker
worker-ai: # Celery AI 任务队列 Worker
worker-fetch: # Celery RSS 抓取队列 Worker
beat: # Celery Beat 定时调度
redis: # 缓存 + 锁 + 消息队列
postgres: # 主数据库 + pgvector
minio: # 对象存储(可选)
```
### 8.2 目录挂载
```
/data/
├── postgres/ # PostgreSQL 数据
├── redis/ # Redis 持久化
├── minio/ # 对象存储
├── logs/ # 应用日志
├── plugins/ # 插件目录(去重算法等)
└── backups/ # 自动备份
```
### 8.3 环境变量
```env
# 数据库
DATABASE_URL=postgresql+asyncpg://rss:rss@postgres:5432/rss_platform
# Redis
REDIS_URL=redis://redis:6379/0
# AI
AI_OPTIMIZER_PROVIDER_ID=...
AI_CHAT_PROVIDER_ID=...
# 安全
SECRET_KEY=...
ACCESS_TOKEN_EXPIRE_MINUTES=480
# 存储
STORAGE_TYPE=minio
MINIO_ENDPOINT=minio:9000
MINIO_BUCKET=rss-platform
# 调度
SELF_OPTIMIZE_CRON=0 2 * * *
```
---
## 九、扩展性规划
### 9.1 数据量增长路径
| 阶段 | 文章量 | 数据库 | 检索 | 去重 |
|------|--------|--------|------|------|
| 阶段一 | < 10万 | PostgreSQL 单库 + 分区表 | PG tsvector | 内存+插件 |
| 阶段二 | 10万~50万 | PostgreSQL 分区表 | OpenSearch | 分桶+Embedding |
| 阶段三 | 50万~100万 | 读写分离 / 按时间分库 | OpenSearch + 向量库 | LSH/MinHash |
| 阶段四 | > 100万 | 分库分表 + 冷热分离 | 专用检索集群 | 近似最近邻 |
### 9.2 服务拆分预留
未来可按模块拆分为独立服务:
- `feed-service`RSS 抓取
- `ai-service`AI 处理
- `search-service`:检索
- `chat-service`:聊天与产出
- `optimization-service`:自优化
---
## 十、关键接口概览
### 10.1 文章与检索
| 方法 | 路径 | 说明 |
|------|------|------|
| GET | `/api/articles` | 文章列表(分页、过滤) |
| GET | `/api/articles/{id}` | 文章详情 |
| POST | `/api/articles/{id}/lock` | 获取文章编辑锁 |
| DELETE | `/api/articles/{id}/lock` | 释放文章编辑锁 |
| GET | `/api/search` | 混合检索 |
| POST | `/api/search/semantic` | 语义检索 |
### 10.2 Skills 与任务
| 方法 | 路径 | 说明 |
|------|------|------|
| GET | `/api/skills` | 列出 Skills |
| GET | `/api/skills/{id}` | Skill 详情 |
| POST | `/api/skills` | 创建 Skill |
| PUT | `/api/skills/{id}` | 更新 Skill |
| POST | `/api/skills/{id}/restore-default` | 恢复默认 |
| POST | `/api/skills/import` | 导入 Skill |
| GET | `/api/output-tasks` | 产出任务列表 |
| POST | `/api/output-tasks` | 创建产出任务 |
| POST | `/api/output-tasks/{id}/run` | 手动运行 |
### 10.3 AI 配置
| 方法 | 路径 | 说明 |
|------|------|------|
| GET | `/api/ai/providers` | AI 供应商配置 |
| POST | `/api/ai/providers` | 新增供应商 |
| POST | `/api/ai/providers/{id}/clone` | 复制配置 |
| GET | `/api/ai/task-configs` | AI 任务配置 |
| POST | `/api/ai/task-configs` | 新增任务配置 |
| POST | `/api/ai/task-configs/{id}/clone` | 复制任务配置 |
### 10.4 聊天
| 方法 | 路径 | 说明 |
|------|------|------|
| GET | `/api/chat/sessions` | 会话列表 |
| POST | `/api/chat/sessions` | 创建会话 |
| POST | `/api/chat/sessions/{id}/messages` | 发送消息 |
| GET | `/api/chat/sessions/{id}/messages` | 获取历史 |
### 10.5 自优化与插件
| 方法 | 路径 | 说明 |
|------|------|------|
| GET | `/api/admin/optimization-logs` | 自优化日志 |
| POST | `/api/admin/optimization/run` | 手动触发自优化 |
| GET | `/api/admin/deduplication/versions` | 去重算法版本 |
| POST | `/api/admin/deduplication/rollback` | 回滚算法 |
| POST | `/api/admin/deduplication/reload` | 热加载当前算法 |
---
## 十一、风险与应对
| 风险 | 影响 | 应对 |
|------|------|------|
| AI 自优化产生劣质 Prompt | 中 | 优化器自评 + 版本化 + 自动回滚 |
| 去重算法热加载失败 | 高 | 失败自动回滚上一个 `.bak` |
| 数据量激增导致查询慢 | 中 | 分区表 + 异步迁移路径 |
| AI 调用成本高 | 中 | 多供应商路由 + Fallback + 缓存 |
| 并发任务冲突 | 高 | Redis 分布式锁 + 任务级互斥 |
| 团队成员误操作 | 中 | 简单 RBAC + 文章级锁 |
| SQLite 迁移到 PostgreSQL | 中 | 一次性迁移脚本 + 数据校验 |
---
## 十二、与现有仓库的关系
新平台不是完全重写,而是:
1. **继承**:复用 rssKeeper 的抓取逻辑和 Feed 管理,dataClean 的 AI 调用和任务进度机制
2. **升级**:数据库从 SQLite 升级到 PostgreSQL,引入分区表和向量扩展
3. **重构**:将分类/标签/打分从规则驱动改为 AI + Skill 驱动
4. **新增**:聊天产出、Skill 系统、自优化、多 AI 配置、团队鉴权
5. **合并**:最终用一个统一平台替代两个分离项目
---
## 十三、文档清单
- [x] 本设计文档
- [ ] 开发步骤文档(见 `rss-platform-dev-plan.md`
- [ ] API 接口详细文档
- [ ] Skill 开发指南
- [ ] 自优化机制说明
- [ ] Docker 部署手册
- [ ] 数据迁移手册