"""RSS 抓取核心逻辑"""
import time
import re
import html
from datetime import datetime
from concurrent.futures import ThreadPoolExecutor, as_completed
from urllib.parse import urljoin
import requests
import feedparser
from bs4 import BeautifulSoup
from sqlalchemy.orm import Session
from models import Feed, Article, FetchLog
from database import SessionLocal
import config
# 国内域名后缀/关键字 — 这些直连,其余走代理
CN_DOMAINS = (
".cn", ".com.cn", ".org.cn", ".net.cn",
"36kr.com", "zhihu.com", "weibo.com", "douban.com", "bilibili.com",
"tmtpost.com", "ifanr.com", "geekpark.net", "pingwest.com",
"juejin.cn", "segmentfault.com", "cnblogs.com", "csdn.net",
"qq.com", "163.com", "sohu.com", "sina.com.cn", "baidu.com",
"taobao.com", "jd.com", "aliyun.com",
"xinhuanet.com", "people.com.cn", "sciencenet.cn",
"localhost", "127.0.0.1", "192.168.",
)
def _get_proxies(url: str) -> dict:
"""根据 URL 判断是否需要代理,返回 proxies dict"""
if not config.HTTPS_PROXY:
return {}
from urllib.parse import urlparse
host = urlparse(url).hostname or ""
# 国内域名直连
for d in CN_DOMAINS:
if host.endswith(d) or host == d:
return {}
# 外网走代理
return {
"http": config.HTTP_PROXY or config.HTTPS_PROXY,
"https": config.HTTPS_PROXY,
}
def classify_error(error: str) -> str:
"""根据错误信息分类错误类型"""
if not error:
return ""
err = error.lower()
if "404" in error or "not found" in err:
return "url_invalid"
if "403" in error or "forbidden" in err:
return "forbidden"
if "429" in error or "too many request" in err:
return "rate_limited"
if "timeout" in err or "timed out" in err:
return "timeout"
if "connecttimeout" in err or "connectiontimeout" in err:
return "timeout"
if "could not resolve" in err or "name or service not known" in err or "nodename nor servname" in err:
return "dns_failure"
if "connection refused" in err:
return "connection_refused"
if "connection aborted" in err or "remotedisconnected" in err or "remote end closed" in err:
return "connection_reset"
if "ssl" in err or "certificate" in err or "certifi" in err:
return "ssl_error"
if "max retries" in err or "newconnectionerror" in err:
return "unreachable"
if "invalid url" in err or "no host" in err or "missing scheme" in err:
return "url_malformed"
if "5" in error and "server error" in err:
return "server_error"
return "unknown"
def fetch_feed(url: str, timeout: int = config.FETCH_TIMEOUT) -> dict:
"""抓取单个 RSS 源
返回 {"success": bool, "feed_data": parsed, "error": str, "response_time_ms": int}
"""
start_time = time.time()
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36",
"Accept": "application/rss+xml, application/atom+xml, application/xml, text/xml, */*",
}
response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True, proxies=_get_proxies(url))
response.raise_for_status()
# 解析 RSS
parsed = feedparser.parse(response.content)
response_time_ms = int((time.time() - start_time) * 1000)
if parsed.bozo and hasattr(parsed, 'bozo_exception'):
# 有解析警告但可能仍然可用
pass
return {
"success": True,
"feed_data": parsed,
"error": None,
"response_time_ms": response_time_ms,
}
except requests.exceptions.RequestException as e:
return {"success": False, "feed_data": None, "error": str(e), "response_time_ms": None}
except Exception as e:
return {"success": False, "feed_data": None, "error": str(e), "response_time_ms": None}
def discover_feed_url(url: str, timeout: int = 15) -> list:
"""从任意网页自动发现 RSS/Atom feed URL
返回找到的 feed URL 列表
"""
try:
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/125.0 Safari/537.36",
}
response = requests.get(url, headers=headers, timeout=timeout, allow_redirects=True, proxies=_get_proxies(url))
response.raise_for_status()
soup = BeautifulSoup(response.content, "html.parser")
feed_urls = []
# 查找 标签
for link in soup.find_all("link", rel="alternate"):
link_type = link.get("type", "").lower()
href = link.get("href", "")
if href and any(t in link_type for t in ["rss", "atom", "xml"]):
full_url = urljoin(response.url, href)
feed_urls.append(full_url)
# 也查找常见的 RSS 链接
common_patterns = [
"/rss", "/feed", "/feeds", "/atom.xml", "/rss.xml",
"/index.xml", "/feed.xml", "/?feed=rss2",
]
for pattern in common_patterns:
candidate = urljoin(response.url, pattern)
if candidate not in feed_urls:
# 验证是否是有效的 feed
try:
resp = requests.head(candidate, headers=headers, timeout=5, allow_redirects=True)
content_type = resp.headers.get("Content-Type", "").lower()
if any(t in content_type for t in ["rss", "atom", "xml"]):
feed_urls.append(candidate)
except Exception:
pass
return list(dict.fromkeys(feed_urls)) # 去重保持顺序
except Exception:
return []
def parse_article(entry, feed_id: int) -> dict:
"""从 feedparser entry 解析文章数据"""
title = entry.get("title", "")
link = entry.get("link", "")
author = entry.get("author", "")
# 发布时间 — 统一存为 UTC aware datetime
published_at = None
if hasattr(entry, "published_parsed") and entry.published_parsed:
try:
published_at = datetime(*entry.published_parsed[:6])
except (ValueError, TypeError):
pass
if not published_at and hasattr(entry, "updated_parsed") and entry.updated_parsed:
try:
published_at = datetime(*entry.updated_parsed[:6])
except (ValueError, TypeError):
pass
# 内容:优先 summary,其次 content
content = ""
if hasattr(entry, "content") and entry.content:
content = entry.content[0].value
elif hasattr(entry, "summary"):
content = entry.summary
# 清洗 HTML
content = clean_html(content)
# 生成摘要
summary = generate_summary(content)
return {
"feed_id": feed_id,
"title": title[:1024],
"link": link[:2048],
"author": author[:256],
"published_at": published_at,
"content": content[:config.MAX_ARTICLE_CONTENT_LENGTH],
"summary": summary[:config.MAX_SUMMARY_LENGTH],
}
def clean_html(html_text: str) -> str:
"""清洗 HTML,去除 script/style 标签,转为安全文本"""
if not html_text:
return ""
# 先解码 HTML 实体
text = html.unescape(html_text)
# 用 BeautifulSoup 清理
soup = BeautifulSoup(text, "html.parser")
# 移除 script 和 style
for tag in soup(["script", "style", "iframe", "object", "embed"]):
tag.decompose()
# 获取纯文本
cleaned = soup.get_text(separator="\n")
# 压缩空白行
cleaned = re.sub(r"\n\s*\n+", "\n\n", cleaned)
cleaned = cleaned.strip()
return cleaned
def generate_summary(content: str, max_length: int = 300) -> str:
"""从内容生成摘要"""
if not content:
return ""
# 去掉多余空白
text = re.sub(r"\s+", " ", content).strip()
if len(text) <= max_length:
return text
# 在句子边界截断(支持中英文标点)
truncated = text[:max_length]
last_period = max(
truncated.rfind("。"), truncated.rfind(". "),
truncated.rfind("! "), truncated.rfind("? "),
truncated.rfind("?"), truncated.rfind("!"),
truncated.rfind(";"),
)
if last_period > max_length * 0.5:
return truncated[:last_period + 1]
return truncated + "..."
def fetch_and_store_feed(feed_id: int) -> dict:
"""抓取指定 RSS 源并存储文章
返回抓取结果统计
"""
db = SessionLocal()
try:
feed = db.query(Feed).filter(Feed.id == feed_id).first()
if not feed:
return {"success": False, "error": "Feed not found", "articles_count": 0}
result = fetch_feed(feed.url)
if not result["success"]:
# 记录失败
feed.last_fetch_at = datetime.utcnow()
feed.last_fetch_status = "fail"
feed.last_error = result["error"]
feed.error_type = classify_error(result["error"])
feed.fail_count += 1
log = FetchLog(
feed_id=feed_id,
status="fail",
error_message=result["error"],
response_time_ms=result.get("response_time_ms"),
)
db.add(log)
db.commit()
return {"success": False, "error": result["error"], "articles_count": 0}
parsed = result["feed_data"]
# 更新 feed 元信息
if hasattr(parsed.feed, "title"):
feed.title = parsed.feed.title[:512]
if hasattr(parsed.feed, "description"):
feed.description = parsed.feed.description[:1000]
# 存储文章 — 先收集所有文章,内存去重后批量入库
seen_links = set()
articles_to_add = []
articles_to_update = []
for entry in parsed.entries:
article_data = parse_article(entry, feed_id)
link = article_data.get("link", "")
if not link or link in seen_links:
continue
seen_links.add(link)
articles_to_add.append(article_data)
# 批量查询已有文章
if articles_to_add:
existing_links = {
row[0] for row in db.query(Article.link).filter(
Article.link.in_([a["link"] for a in articles_to_add])
).all()
}
new_count = 0
for article_data in articles_to_add:
if article_data["link"] in existing_links:
articles_to_update.append(article_data)
else:
article = Article(**article_data)
db.add(article)
new_count += 1
# 更新已有文章
for article_data in articles_to_update:
existing = db.query(Article).filter(Article.link == article_data["link"]).first()
if existing:
existing.title = article_data["title"] or existing.title
existing.content = article_data["content"] or existing.content
existing.summary = article_data["summary"] or existing.summary
existing.author = article_data["author"] or existing.author
if article_data["published_at"]:
existing.published_at = article_data["published_at"]
# 更新 feed 统计
feed.last_fetch_at = datetime.utcnow()
feed.last_fetch_status = "success"
feed.last_error = ""
feed.error_type = ""
feed.success_count += 1
feed.article_count += new_count
log = FetchLog(
feed_id=feed_id,
status="success",
articles_fetched=new_count,
response_time_ms=result.get("response_time_ms"),
)
db.add(log)
db.commit()
return {
"success": True,
"articles_count": new_count,
"feed_title": feed.title,
}
except Exception as e:
db.rollback()
return {"success": False, "error": str(e), "articles_count": 0}
finally:
db.close()
def fetch_all_feeds(feed_ids: list = None) -> list:
"""并发抓取多个 RSS 源
返回每个源的抓取结果列表
"""
db = SessionLocal()
try:
query = db.query(Feed).filter(Feed.is_active == True)
if feed_ids:
query = query.filter(Feed.id.in_(feed_ids))
feeds = query.all()
finally:
db.close()
results = []
with ThreadPoolExecutor(max_workers=config.FETCH_CONCURRENCY) as executor:
future_to_feed = {
executor.submit(fetch_and_store_feed, feed.id): feed
for feed in feeds
}
for future in as_completed(future_to_feed):
feed = future_to_feed[future]
try:
result = future.result()
results.append({"feed_id": feed.id, **result})
except Exception as e:
results.append({"feed_id": feed.id, "success": False, "error": str(e), "articles_count": 0})
return results