Files
rssKeeper/backend/rss_fetcher.py
T
congsh 68bba3d9e0 feat: 深色主题UI、错误分类、批量抓取、健康度筛选
- 修复 datetime 时区不一致导致所有API 500错误的问题
- Feeds/Dashboard 页面改为深色表格主题,高对比度文字
- 添加错误类型自动分类(URL失效/被拒绝/超时/DNS失败/SSL错误等12种)
- 新增"下次抓取时间"列,从APScheduler获取
- 新增健康度筛选下拉,修复分页后过滤失效的bug
- "全部抓取"改为同步并发执行,基于当前筛选条件获取所有匹配源
- 新增数据库自动迁移机制,处理增量列变更

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-06-11 17:44:54 +08:00

357 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""RSS 抓取核心逻辑"""
import time
import re
import html
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
import requests
import feedparser
from bs4 import BeautifulSoup
from sqlalchemy.orm import Session
from models import Feed, Article, FetchLog
from database import SessionLocal
import config
def classify_error(error: str) -> str:
"""根据错误信息分类错误类型"""
if not error:
return ""
err = error.lower()
if "404" in error or "not found" in err:
return "url_invalid"
if "403" in error or "forbidden" in err:
return "forbidden"
if "429" in error or "too many request" in err:
return "rate_limited"
if "timeout" in err or "timed out" in err:
return "timeout"
if "connecttimeout" in err or "connectiontimeout" in err:
return "timeout"
if "could not resolve" in err or "name or service not known" in err or "nodename nor servname" in err:
return "dns_failure"
if "connection refused" in err:
return "connection_refused"
if "connection aborted" in err or "remotedisconnected" in err or "remote end closed" in err:
return "connection_reset"
if "ssl" in err or "certificate" in err or "certifi" in err:
return "ssl_error"
if "max retries" in err or "newconnectionerror" in err:
return "unreachable"
if "invalid url" in err or "no host" in err or "missing scheme" in err:
return "url_malformed"
if "5" in error and "server error" in err:
return "server_error"
return "unknown"
def fetch_feed(url: str, timeout: int = config.FETCH_TIMEOUT) -> dict:
"""抓取单个 RSS 源
返回 {"success": bool, "feed_data": parsed, "error": str, "response_time_ms": int}
"""
start_time = time.time()
try:
headers = {
"User-Agent": "rssKeeper/1.0 (+https://github.com/rssKeeper)",
"Accept": "application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
}
response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True)
response.raise_for_status()
# 解析 RSS
parsed = feedparser.parse(response.content)
response_time_ms = int((time.time() - start_time) * 1000)
if parsed.bozo and hasattr(parsed, 'bozo_exception'):
# 有解析警告但可能仍然可用
pass
return {
"success": True,
"feed_data": parsed,
"error": None,
"response_time_ms": response_time_ms,
}
except requests.exceptions.RequestException as e:
return {"success": False, "feed_data": None, "error": str(e), "response_time_ms": None}
except Exception as e:
return {"success": False, "feed_data": None, "error": str(e), "response_time_ms": None}
def discover_feed_url(url: str, timeout: int = 15) -> list:
"""从任意网页自动发现 RSS/Atom feed URL
返回找到的 feed URL 列表
"""
try:
headers = {
"User-Agent": "rssKeeper/1.0 (+https://github.com/rssKeeper)",
}
response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True)
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
feed_urls = []
# 查找 <link rel="alternate"> 标签
for link in soup.find_all("link", rel="alternate"):
link_type = link.get("type", "").lower()
href = link.get("href", "")
if href and any(t in link_type for t in ["rss", "atom", "xml"]):
full_url = urljoin(response.url, href)
feed_urls.append(full_url)
# 也查找常见的 RSS 链接
common_patterns = [
"/rss", "/feed", "/feeds", "/atom.xml", "/rss.xml",
"/index.xml", "/feed.xml", "/?feed=rss2",
]
for pattern in common_patterns:
candidate = urljoin(response.url, pattern)
if candidate not in feed_urls:
# 验证是否是有效的 feed
try:
resp = requests.head(candidate, headers=headers, timeout=5, allow_redirects=True)
content_type = resp.headers.get("Content-Type", "").lower()
if any(t in content_type for t in ["rss", "atom", "xml"]):
feed_urls.append(candidate)
except Exception:
pass
return list(dict.fromkeys(feed_urls)) # 去重保持顺序
except Exception:
return []
def parse_article(entry, feed_id: int) -> dict:
"""从 feedparser entry 解析文章数据"""
title = entry.get("title", "")
link = entry.get("link", "")
author = entry.get("author", "")
# 发布时间 — 统一存为 UTC aware datetime
published_at = None
if hasattr(entry, "published_parsed") and entry.published_parsed:
try:
published_at = datetime(*entry.published_parsed[:6])
except (ValueError, TypeError):
pass
if not published_at and hasattr(entry, "updated_parsed") and entry.updated_parsed:
try:
published_at = datetime(*entry.updated_parsed[:6])
except (ValueError, TypeError):
pass
# 内容:优先 summary,其次 content
content = ""
if hasattr(entry, "content") and entry.content:
content = entry.content[0].value
elif hasattr(entry, "summary"):
content = entry.summary
# 清洗 HTML
content = clean_html(content)
# 生成摘要
summary = generate_summary(content)
return {
"feed_id": feed_id,
"title": title[:1024],
"link": link[:2048],
"author": author[:256],
"published_at": published_at,
"content": content[:config.MAX_ARTICLE_CONTENT_LENGTH],
"summary": summary[:config.MAX_SUMMARY_LENGTH],
}
def clean_html(html_text: str) -> str:
"""清洗 HTML,去除 script/style 标签,转为安全文本"""
if not html_text:
return ""
# 先解码 HTML 实体
text = html.unescape(html_text)
# 用 BeautifulSoup 清理
soup = BeautifulSoup(text, "html.parser")
# 移除 script 和 style
for tag in soup(["script", "style", "iframe", "object", "embed"]):
tag.decompose()
# 获取纯文本
cleaned = soup.get_text(separator="\n")
# 压缩空白行
cleaned = re.sub(r"\n\s*\n+", "\n\n", cleaned)
cleaned = cleaned.strip()
return cleaned
def generate_summary(content: str, max_length: int = 300) -> str:
"""从内容生成摘要"""
if not content:
return ""
# 去掉多余空白
text = re.sub(r"\s+", " ", content).strip()
if len(text) <= max_length:
return text
# 在句子边界截断(支持中英文标点)
truncated = text[:max_length]
last_period = max(
truncated.rfind(""), truncated.rfind(". "),
truncated.rfind("! "), truncated.rfind("? "),
truncated.rfind(""), truncated.rfind(""),
truncated.rfind(""),
)
if last_period > max_length * 0.5:
return truncated[:last_period + 1]
return truncated + "..."
def fetch_and_store_feed(feed_id: int) -> dict:
"""抓取指定 RSS 源并存储文章
返回抓取结果统计
"""
db = SessionLocal()
try:
feed = db.query(Feed).filter(Feed.id == feed_id).first()
if not feed:
return {"success": False, "error": "Feed not found", "articles_count": 0}
result = fetch_feed(feed.url)
if not result["success"]:
# 记录失败
feed.last_fetch_at = datetime.utcnow()
feed.last_fetch_status = "fail"
feed.last_error = result["error"]
feed.error_type = classify_error(result["error"])
feed.fail_count += 1
log = FetchLog(
feed_id=feed_id,
status="fail",
error_message=result["error"],
response_time_ms=result.get("response_time_ms"),
)
db.add(log)
db.commit()
return {"success": False, "error": result["error"], "articles_count": 0}
parsed = result["feed_data"]
# 更新 feed 元信息
if hasattr(parsed.feed, "title"):
feed.title = parsed.feed.title[:512]
if hasattr(parsed.feed, "description"):
feed.description = parsed.feed.description[:1000]
# 存储文章 — 先收集所有文章,内存去重后批量入库
seen_links = set()
articles_to_add = []
articles_to_update = []
for entry in parsed.entries:
article_data = parse_article(entry, feed_id)
link = article_data.get("link", "")
if not link or link in seen_links:
continue
seen_links.add(link)
articles_to_add.append(article_data)
# 批量查询已有文章
if articles_to_add:
existing_links = {
row[0] for row in db.query(Article.link).filter(
Article.link.in_([a["link"] for a in articles_to_add])
).all()
}
new_count = 0
for article_data in articles_to_add:
if article_data["link"] in existing_links:
articles_to_update.append(article_data)
else:
article = Article(**article_data)
db.add(article)
new_count += 1
# 更新已有文章
for article_data in articles_to_update:
existing = db.query(Article).filter(Article.link == article_data["link"]).first()
if existing:
existing.title = article_data["title"] or existing.title
existing.content = article_data["content"] or existing.content
existing.summary = article_data["summary"] or existing.summary
existing.author = article_data["author"] or existing.author
if article_data["published_at"]:
existing.published_at = article_data["published_at"]
# 更新 feed 统计
feed.last_fetch_at = datetime.utcnow()
feed.last_fetch_status = "success"
feed.last_error = ""
feed.error_type = ""
feed.success_count += 1
feed.article_count += new_count
log = FetchLog(
feed_id=feed_id,
status="success",
articles_fetched=new_count,
response_time_ms=result.get("response_time_ms"),
)
db.add(log)
db.commit()
return {
"success": True,
"articles_count": new_count,
"feed_title": feed.title,
}
except Exception as e:
db.rollback()
return {"success": False, "error": str(e), "articles_count": 0}
finally:
db.close()
def fetch_all_feeds(feed_ids: list = None) -> list:
"""并发抓取多个 RSS 源
返回每个源的抓取结果列表
"""
db = SessionLocal()
try:
query = db.query(Feed).filter(Feed.is_active == True)
if feed_ids:
query = query.filter(Feed.id.in_(feed_ids))
feeds = query.all()
finally:
db.close()
results = []
with ThreadPoolExecutor(max_workers=config.FETCH_CONCURRENCY) as executor:
future_to_feed = {
executor.submit(fetch_and_store_feed, feed.id): feed
for feed in feeds
}
for future in as_completed(future_to_feed):
feed = future_to_feed[future]
try:
result = future.result()
results.append({"feed_id": feed.id, **result})
except Exception as e:
results.append({"feed_id": feed.id, "success": False, "error": str(e), "articles_count": 0})
return results