1. 项目概述边缘新闻聚合的诞生与价值最近在折腾一个挺有意思的小项目我把它叫做“News — At The Edge”。这个名字听起来有点技术范儿但它的核心想法其实很朴素如何让新闻资讯的获取像打开水龙头一样即时、稳定同时又足够轻量、私密我们每天都被海量的信息流淹没但真正有价值、低延迟、不夹带私货的新闻推送却越来越难找。传统的新闻App要么臃肿不堪广告和追踪器比新闻还多要么就是中心化的服务器一旦出问题推送就全断了。这个项目就是尝试用“边缘计算”的思路来重构我们获取新闻的管道。简单来说“News — At The Edge”是一个运行在“边缘”的新闻聚合与分发系统。这里的“边缘”指的是离用户更近的计算节点比如你的家庭服务器、一个轻量的云函数甚至是一个树莓派。它不再依赖某个中心化的新闻门户后台而是由你自己部署的“边缘节点”主动去抓取、过滤、格式化你关心的信源RSS、Atom、甚至是一些设计良好的API然后通过一个极其简洁的界面或推送通道比如Telegram Bot、邮件、甚至是生成一个静态网页呈现给你。它的核心价值在于控制权回归用户。你决定信源、你决定过滤规则、你决定推送频率数据流经的服务器是你可控的整个过程透明、可审计。这个项目特别适合几类朋友一是注重数字隐私厌倦了商业App数据收集的技术爱好者二是需要实时监控特定领域资讯如科技动态、安全漏洞、行业报告的从业者对信息的及时性和准确性有苛刻要求三是单纯喜欢折腾希望拥有一个完全个性化、不被打扰的信息摄入工具的极客。它不是一个开箱即用的商业产品而更像是一套乐高积木提供了核心的引擎和搭建思路你需要根据自己的需求动手拼装出最适合自己的那一款“新闻边缘站”。2. 核心架构设计为何选择边缘计算范式2.1 从中心化到边缘化的范式迁移传统的新闻应用架构是典型的三层中心化模型手机App客户端 - 厂商云服务器中心节点 - 各大新闻机构/爬虫数据源源站。这个模型的弊端非常明显延迟、单点故障、隐私风险与信息茧房。延迟来源于请求需要跨越多个网络层级到达中心服务器一旦中心服务器宕机或网络拥堵所有用户服务中断你的阅读习惯、点击行为被中心服务器全量收集用于构建用户画像和精准广告同时中心化算法决定了你“应该”看到什么容易形成信息茧房。“News — At The Edge”项目彻底摒弃了这个模型采用了边缘计算范式。其核心架构可以概括为“边缘执行器 用户配置 轻量交付”。边缘执行器是你部署的程序它根据你的配置信源列表、抓取周期、过滤规则主动从互联网上的原始信源拉取数据。这个过程是分布式的你的节点只为你或你授权的小群体服务。数据拉取后在边缘节点立即进行清洗、格式化、去重然后通过你设定的轻量级通道如Webhook、消息队列、生成静态文件推送出去。整个数据流不经过任何第三方中转服务器。选择这个架构主要基于以下几点考量抗脆弱性没有中心节点意味着没有单点故障。你的新闻服务只依赖于你的边缘节点和原始信源的可用性稳定性极大提升。低延迟抓取和处理的逻辑就在边缘节点完成推送通道也是直接面向你的终端省去了中心服务器调度和转发的环节理论上可以获得更快的更新速度。隐私与自主所有配置、抓取历史、阅读数据都保存在你控制的节点上没有数据出域的风险。你可以完全自主地定义信息源的权重和过滤逻辑。成本可控利用现有的边缘资源家庭服务器、低配VPS、云函数免费额度即可运行避免了为臃肿的中心化服务付费。2.2 技术栈选型轻量、高效、可组合为了实现上述架构技术栈的选择遵循“轻量、高效、可组合”的原则。项目不绑定任何单一重型框架而是提供一组可以像搭积木一样组合的核心组件。核心抓取与处理引擎Python aiohttp/httpxBeautifulSoup4/feedparser为什么是Python生态丰富在数据处理、网络请求和快速原型开发上优势明显。asyncio异步框架非常适合处理大量并发的网络I/O操作比如同时抓取几十个RSS源。feedparser是处理RSS/Atom订阅源的事实标准库能优雅地处理各种“不标准”的订阅源格式容错性强。BeautifulSoup4或lxml对于非标准订阅源即需要从普通网页中提取结构化信息的“伪RSS”源需要用到HTML解析库来编写特定的抽取规则XPath或CSS Selector。任务调度与执行celeryredis或apscheduler对于需要定时、周期性执行抓取任务的需求引入任务队列是更稳健的做法。Celery作为分布式任务队列配合Redis作为消息代理和结果存储可以将抓取任务异步化、持久化。即使节点重启未完成的任务也不会丢失。如果需求简单仅需定时触发apscheduler这样的轻量级定时任务库也是极佳选择它可以直接内嵌在Python进程中无需额外组件。数据存储与去重SQLite/TinyDB或Redis需要存储抓取到的文章元数据标题、链接、发布时间、摘要等以便进行去重和后续的历史查询。SQLite单文件数据库零配置非常适合边缘场景。可以用它来构建一个简单的文章库通过link或title的哈希值进行唯一性判断。Redis如果已经为了Celery引入了Redis那么直接使用Redis的Set或Sorted Set数据结构来实现基于链接的布隆过滤器模拟去重性能更高。交付通道模块化设计Webhook推送将格式化后的新闻内容通过HTTP POST请求发送到预设的Webhook地址。这可以轻松对接Telegram Bot、Slack、Discord、企业微信机器人等。静态站点生成使用Jinja2模板引擎将抓取到的新闻生成为HTML文件通过nginx/Caddy提供服务或直接部署到GitHub Pages/Vercel。这是完全静态、可被搜索引擎索引、且极度节省资源的方式。邮件推送使用smtplib库通过SMTP协议将新闻摘要发送到指定邮箱。适合不常在线但习惯处理邮件的用户。部署与运行环境Docker 轻量级VPS/云函数Docker将所有依赖打包成容器镜像是实现环境一致性、一键部署的关键。编写一个Dockerfile和docker-compose.yml可以让你在任何支持Docker的环境家庭NAS、云服务器中快速拉起服务。云函数Serverless对于抓取任务这种“瞬时计算长期休眠”的场景云函数是绝配。你可以将抓取逻辑部署为阿里云函数计算、腾讯云SCF或Vercel Serverless Function利用其免费的调用额度实现零成本运行。定时触发通过云厂商的定时触发器实现。注意技术栈的选择高度灵活。如果你对Go或Rust更熟悉完全可以用collyGo或reqwestscraperRust来重写抓取引擎性能会更优。本项目的核心在于架构思想而非具体实现语言。3. 核心模块实现与配置详解3.1 信源配置与管理打造你的信息雷达项目的“大脑”就是信源配置文件。这里不推荐使用复杂的数据库来管理一个结构清晰的YAML或JSON配置文件足矣因为它需要被版本控制如Git管理且易于手动编辑。# sources.yaml sources: - name: “科技爱好者周刊阮一峰” url: “https://www.ruanyifeng.com/blog/atom.xml” type: “rss” # rss, atom, html enabled: true category: “tech” fetch_interval: 3600 # 抓取间隔单位秒 filters: - keyword: “开源” action: “include” # include, exclude, highlight - keyword: “广告” action: “exclude” - name: “Hacker News Top 10” url: “https://news.ycombinator.com/rss” type: “rss” enabled: true category: “tech” fetch_interval: 1800 - name: “某技术博客最新文章” url: “https://example-blog.com” type: “html” enabled: true category: “dev” selectors: list_selector: “article.post” # 文章列表容器 title_selector: “h2 a” link_selector: “h2 ahref” pubdate_selector: “timedatetime” fetch_interval: 7200配置解析与经验type: html这是处理没有提供RSS订阅的网站的关键。你需要编写CSS选择器来告诉程序如何从页面中提取文章列表、标题、链接和日期。这需要一些前端知识但一次配置终身受用。可以使用浏览器的开发者工具检查元素来辅助编写选择器。filters过滤规则是信息提纯的核心。除了简单的关键词包含/排除后期可以扩展为正则表达式匹配甚至接入简单的自然语言处理NLP进行情感分析或主题分类过滤掉你不感兴趣的负面新闻或无关话题。fetch_interval务必为每个源设置合理的抓取间隔这是网络公民的基本素养。过于频繁的抓取如低于几分钟会对目标网站造成压力可能导致你的IP被屏蔽。对于新闻源每小时或每半小时抓取一次通常足够。可以在配置中为不同重要性的源设置不同间隔。3.2 抓取引擎的实现异步、容错与礼貌爬虫抓取引擎是项目的“四肢”。我们必须把它设计得健壮、高效且有礼貌。# crawler.py (核心片段) import aiohttp import asyncio import feedparser from bs4 import BeautifulSoup import logging from urllib.parse import urljoin class AsyncCrawler: def __init__(self, config, db_store): self.config config self.db db_store self.session None self.semaphore asyncio.Semaphore(5) # 控制并发数避免对单一目标源造成压力 async def fetch_feed(self, source): “”“抓取并解析RSS/Atom源”“” async with self.semaphore: try: # 1. 异步获取内容 async with self.session.get(source[‘url’], timeout10, headers{‘User-Agent’: ‘NewsAtEdgeBot/1.0’}) as resp: if resp.status ! 200: logging.warning(f“Failed to fetch {source[‘name’]}: HTTP {resp.status}”) return [] text await resp.text() # 2. 解析Feed feed feedparser.parse(text) if feed.bozo: # feedparser解析出错 logging.warning(f“Feed parse error for {source[‘name’]}: {feed.bozo_exception}”) # 可以尝试降级为HTML解析 return await self.fallback_html_parse(text, source) # 3. 提取条目并格式化 articles [] for entry in feed.entries: article { ‘source’: source[‘name’], ‘title’: entry.get(‘title’, ‘’), ‘link’: entry.get(‘link’, ‘’), ‘published’: self._parse_date(entry.get(‘published’, entry.get(‘updated’))), ‘summary’: entry.get(‘summary’, entry.get(‘description’, ‘’))[:500], # 截断 ‘id’: self._generate_id(entry.get(‘id’, entry.get(‘link’))) } # 4. 应用过滤规则 if self._apply_filters(article, source.get(‘filters’, [])): articles.append(article) return articles except asyncio.TimeoutError: logging.error(f“Timeout fetching {source[‘name’]}”) return [] except Exception as e: logging.error(f“Error fetching {source[‘name’]}: {e}”) return [] async def fetch_html(self, source): “”“抓取并解析HTML页面用于无RSS的网站”“” # 实现逻辑类似使用BeautifulSoup根据配置的selectors进行解析 # ... pass async def run(self, sources_list): “”“主运行循环并发抓取所有启用的源”“” connector aiohttp.TCPConnector(limit20, sslFalse) # 调整连接池限制 async with aiohttp.ClientSession(connectorconnector) as self.session: tasks [] enabled_sources [s for s in sources_list if s.get(‘enabled’, True)] for source in enabled_sources: if source[‘type’] ‘rss’ or source[‘type’] ‘atom’: task self.fetch_feed(source) else: task self.fetch_html(source) tasks.append(task) # 并发执行所有抓取任务 results await asyncio.gather(*tasks, return_exceptionsTrue) all_articles [] for res in results: if isinstance(res, Exception): logging.error(f“Task failed: {res}”) continue all_articles.extend(res) return all_articles关键实现细节与避坑指南并发控制Semaphore这是“礼貌爬虫”的基石。不加限制的并发请求会瞬间冲垮目标服务器。设置一个信号量如并发数5确保同时只有有限数量的请求在发出。超时与重试网络是不稳定的。必须为每个请求设置超时如10秒并实现简单的重试逻辑例如最多重试2次每次间隔递增。aiohttp可以配置超时重试逻辑需要自己用try-except包裹并计数。User-Agent标识在请求头中设置一个清晰的User-Agent如NewsAtEdgeBot/1.0 (https://my-edge-news.com), 这是一个良好的网络行为规范让网站管理员知道你是谁、你想做什么。避免使用浏览器或空UA。错误处理与降级feedparser的bozo标志位非常有用它能捕获Feed格式错误。当RSS解析失败时可以尝试降级到使用预定义的HTML选择器去解析页面作为备选方案。日期解析Feed中的日期格式千奇百怪RFC 2822, RFC 3339, 各种自定义格式。feedparser能处理大部分但自己解析HTML日期时建议使用dateutil库的parser.parse它非常强大且容错。3.3 数据去重与持久化避免信息洪流的重复冲击抓取到的文章必须经过去重才能进入下一步。去重的核心是生成一个稳定的文章唯一标识符。# storage.py import sqlite3 import hashlib from datetime import datetime class ArticleStorage: def __init__(self, db_path‘news.db’): self.conn sqlite3.connect(db_path, check_same_threadFalse) self._init_db() def _init_db(self): cursor self.conn.cursor() cursor.execute(“”“ CREATE TABLE IF NOT EXISTS articles ( id TEXT PRIMARY KEY, source TEXT, title TEXT, link TEXT UNIQUE, published TIMESTAMP, summary TEXT, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, delivered BOOLEAN DEFAULT 0 ) “”“) cursor.execute(“CREATE INDEX IF NOT EXISTS idx_link ON articles(link)”) cursor.execute(“CREATE INDEX IF NOT EXISTS idx_delivered ON articles(delivered)”) self.conn.commit() def _generate_article_id(self, article): “”“生成文章唯一ID使用链接的MD5哈希因为链接是全局唯一的”“” link article[‘link’].encode(‘utf-8’) return hashlib.md5(link).hexdigest() def save_if_new(self, articles): “”“保存文章返回新文章列表即需要推送的”“” new_articles [] cursor self.conn.cursor() for article in articles: article_id self._generate_article_id(article) # 尝试插入如果违反唯一约束link重复或id重复则忽略 try: cursor.execute(“”“ INSERT INTO articles (id, source, title, link, published, summary) VALUES (?, ?, ?, ?, ?, ?) “”“, ( article_id, article[‘source’], article[‘title’], article[‘link’], article[‘published’], article[‘summary’] )) self.conn.commit() new_articles.append(article) # 插入成功是新文章 except sqlite3.IntegrityError: # 文章已存在检查是否需要更新例如摘要有更新 # 这里简单跳过你也可以选择更新某些字段 pass return new_articles def mark_as_delivered(self, article_ids): “”“将文章标记为已推送”“” # ... 更新 delivered 字段为 1 pass去重策略的深度思考为什么用link的MD5作id链接是文章在网络上的唯一标识最稳定。使用MD5哈希后作为主键比直接使用长字符串的link字段做索引和比较更高效。除了链接去重还需要内容去重吗对于高度同质化的新闻如多家媒体转载同一事件仅靠链接去重不够。可以引入SimHash或MinHash等局部敏感哈希算法计算文章正文的指纹。当两篇文章的指纹相似度超过某个阈值如95%即使链接不同也视为重复。这需要抓取正文内容计算量稍大但能极大提升信息纯度。SQLite的并发写入在高并发抓取场景下SQLite的写操作可能会成为瓶颈因为它默认是文件锁。如果遇到Database is locked错误可以考虑1) 使用WAL模式 (journal_modeWAL)2) 将写操作放入一个队列由单个线程顺序执行3) 对于更高负载换用Redis或PostgreSQL。3.4 交付通道集成让信息精准触达这是项目的“最后一公里”决定了信息如何呈现给你。我们以实现一个Telegram Bot推送为例。# notifier/telegram_notifier.py import aiohttp import logging class TelegramNotifier: def __init__(self, bot_token, chat_id): self.bot_token bot_token self.chat_id chat_id self.api_url f“https://api.telegram.org/bot{bot_token}” async def send_article(self, article): “”“发送单篇文章到Telegram”“” # 格式化消息Telegram支持Markdown/HTML格式 message f“*{article[‘title’]}*\n\n” message f“{article[‘summary’]}\n\n” message f“来源{article[‘source’]}\n” message f“[阅读原文]({article[‘link’]})” payload { ‘chat_id’: self.chat_id, ‘text’: message, ‘parse_mode’: ‘MarkdownV2’, ‘disable_web_page_preview’: False } async with aiohttp.ClientSession() as session: try: async with session.post(f“{self.api_url}/sendMessage”, jsonpayload, timeout10) as resp: if resp.status 200: logging.info(f“Sent to Telegram: {article[‘title’][:50]}...”) return True else: error_text await resp.text() logging.error(f“Telegram API error: {resp.status}, {error_text}”) # 处理特定错误如消息过长可以截断摘要重试 if “message is too long” in error_text: return await self._send_truncated(article, session) return False except Exception as e: logging.error(f“Failed to send to Telegram: {e}”) return False async def send_digest(self, articles, digest_title“今日新闻速递”): “”“发送多篇文章的摘要合并为一条消息或多条”“” if not articles: return # 方案A合并为一条长消息可能超限 # 方案B推荐分批发送每条消息包含2-3篇文章摘要 batch_size 2 for i in range(0, len(articles), batch_size): batch articles[i:ibatch_size] digest_text f“*{digest_title}*\n\n” for idx, article in enumerate(batch): digest_text f“{idx1}. [{article[‘title’]}]({article[‘link’]}) - {article[‘source’]}\n” # 发送digest_text...推送策略优化速率限制无论是Telegram API还是其他Webhook服务都有调用频率限制。在发送大量文章时必须在代码中加入延迟如asyncio.sleep(0.2)避免触发限流。失败重试与队列推送失败是常态网络波动、服务暂时不可用。应该将推送任务放入一个持久化队列如使用Celery并设置重试机制和死信队列确保重要信息不丢失。格式化与可读性不同的推送渠道支持不同的格式Markdown、HTML、纯文本。需要为每个渠道编写对应的消息格式化器确保链接、加粗、换行等效果正确显示。聚合推送与即时推送对于高频更新的源可以考虑“聚合推送”模式即每隔一段时间如1小时将期间所有新文章打包成一份“简报”发送避免消息轰炸。对于突发新闻或高优先级源可以设置为“即时推送”。4. 部署、运维与高级玩法4.1 容器化部署与持续运行为了让服务稳定运行容器化是最佳实践。下面是一个简单的docker-compose.yml示例整合了抓取服务、Redis用于Celery和数据库。# docker-compose.yml version: ‘3.8’ services: redis: image: redis:7-alpine container_name: news-edge-redis restart: unless-stopped volumes: - redis_data:/data command: redis-server --appendonly yes crawler: build: . container_name: news-edge-crawler restart: unless-stopped depends_on: - redis volumes: - ./config:/app/config:ro - ./data:/app/data environment: - REDIS_URLredis://redis:6379/0 - DB_PATH/app/data/news.db # 使用Celery Beat作为定时调度器Celery Worker执行任务 command: sh -c “ celery -A tasks.celery_app beat --loglevelinfo celery -A tasks.celery_app worker --loglevelinfo ” # 可选一个极简的Web界面查看抓取历史和配置 web-ui: image: nginx:alpine container_name: news-edge-ui restart: unless-stopped ports: - “8080:80” volumes: - ./web-ui:/usr/share/nginx/html:ro # 假设静态文件在此目录部署要点配置外部化将sources.yaml等配置文件通过volumes挂载到容器内这样修改配置无需重建镜像。数据持久化将SQLite数据库文件、日志目录等通过volumes挂载到宿主机防止容器重启后数据丢失。健康检查可以为crawler服务添加healthcheck指令监控Celery Worker是否存活。日志管理将容器日志导出到宿主机文件或日志收集系统如Loki/Elasticsearch方便排查问题。4.2 监控、日志与故障排查一个健壮的系统离不开可观测性。日志分级使用Python的logging模块为不同组件设置不同的日志级别INFO, WARNING, ERROR。将抓取成功、失败、去重情况、推送结果都记录下来。关键指标监控抓取成功率成功抓取的源数量 / 总源数量。低于阈值如80%报警。新文章发现率周期内发现的新文章数量。持续为0可能意味着源失效或去重逻辑过严。推送成功率消息成功送达率。系统资源CPU、内存、磁盘使用率特别是SQLite数据库文件大小。简易监控实现可以编写一个简单的健康检查端点或者定期将上述指标输出到日志然后使用PrometheusGrafana进行采集和展示或者更简单地写一个脚本定期检查并发送状态报告到你的Telegram。常见故障排查清单抓取全部失败检查网络容器或服务器是否能访问外网ping 8.8.8.8。检查DNSnslookup检查目标域名解析。检查目标源手动用curl或浏览器访问源地址看是否正常。检查User-Agent和频率是否被目标网站屏蔽尝试调整UA和抓取间隔。数据库锁死或写入慢检查SQLite连接确保没有多处代码同时写数据库。使用连接池或序列化写操作。检查磁盘IO如果数据库在机械硬盘上性能可能很差。考虑移到SSD或使用SQLite的PRAGMA synchronous NORMAL;和PRAGMA journal_mode WAL;进行优化。数据库膨胀定期清理已推送的旧文章如保留30天执行VACUUM;命令回收空间。推送渠道失败Token/Chat ID错误检查配置。API限流查看错误信息加入更长的请求间隔。消息格式错误特别是Markdown中的特殊字符_,*,[,],(等需要转义。使用Telegram的parse_mode: ‘MarkdownV2’时尤其要注意。4.3 扩展与高级功能设想基础功能稳定后你可以考虑以下扩展打造更强大的个人信息中枢智能过滤与推荐集成NLP库如jieba中文分词scikit-learn或transformers库。可以对文章摘要进行关键词提取、主题分类如“科技”、“财经”、“体育”或情感分析实现更精准的过滤。协同过滤如果你为多个用户服务家庭使用可以记录用户的点击/阅读行为实现简单的“看了这个的人也看了”的推荐。多模态内容支持音频/视频摘要对于播客或视频新闻源可以尝试调用语音转文本API如OpenAI Whisper的本地部署生成文字摘要再纳入你的新闻流。图片OCR抓取包含重要信息的图片新闻使用Tesseract等OCR库提取文字。信息聚合与摘要自动摘要利用文本摘要算法如TextRank或大语言模型本地部署的ChatGLM、Qwen等将长文章自动生成简短摘要方便快速浏览。每日/每周简报在固定时间如早上8点将过去24小时或一周内最重要的新闻可根据点击率、来源权重、关键词热度综合排序生成一份精美的HTML或PDF简报通过邮件发送。边缘存储与缓存全文存档不仅存储元数据也将文章正文抓取下来存储到本地数据库或对象存储如MinIO作为个人的“互联网档案馆”即使原链接失效也能查看。CDN加速将生成的静态新闻页面推送到Cloudflare Pages、Vercel或Netlify等全球CDN实现全球快速访问。这个项目的魅力在于它从一个简单的想法开始可以随着你的需求和技术成长不断迭代和扩展最终成为完全属于你个人的、智能的、高效的信息获取与处理基础设施。它不仅仅是一个工具更是一种对待信息和数字生活的态度。
基于边缘计算的新闻聚合系统:从架构设计到工程实践
发布时间:2026/6/1 13:10:49
1. 项目概述边缘新闻聚合的诞生与价值最近在折腾一个挺有意思的小项目我把它叫做“News — At The Edge”。这个名字听起来有点技术范儿但它的核心想法其实很朴素如何让新闻资讯的获取像打开水龙头一样即时、稳定同时又足够轻量、私密我们每天都被海量的信息流淹没但真正有价值、低延迟、不夹带私货的新闻推送却越来越难找。传统的新闻App要么臃肿不堪广告和追踪器比新闻还多要么就是中心化的服务器一旦出问题推送就全断了。这个项目就是尝试用“边缘计算”的思路来重构我们获取新闻的管道。简单来说“News — At The Edge”是一个运行在“边缘”的新闻聚合与分发系统。这里的“边缘”指的是离用户更近的计算节点比如你的家庭服务器、一个轻量的云函数甚至是一个树莓派。它不再依赖某个中心化的新闻门户后台而是由你自己部署的“边缘节点”主动去抓取、过滤、格式化你关心的信源RSS、Atom、甚至是一些设计良好的API然后通过一个极其简洁的界面或推送通道比如Telegram Bot、邮件、甚至是生成一个静态网页呈现给你。它的核心价值在于控制权回归用户。你决定信源、你决定过滤规则、你决定推送频率数据流经的服务器是你可控的整个过程透明、可审计。这个项目特别适合几类朋友一是注重数字隐私厌倦了商业App数据收集的技术爱好者二是需要实时监控特定领域资讯如科技动态、安全漏洞、行业报告的从业者对信息的及时性和准确性有苛刻要求三是单纯喜欢折腾希望拥有一个完全个性化、不被打扰的信息摄入工具的极客。它不是一个开箱即用的商业产品而更像是一套乐高积木提供了核心的引擎和搭建思路你需要根据自己的需求动手拼装出最适合自己的那一款“新闻边缘站”。2. 核心架构设计为何选择边缘计算范式2.1 从中心化到边缘化的范式迁移传统的新闻应用架构是典型的三层中心化模型手机App客户端 - 厂商云服务器中心节点 - 各大新闻机构/爬虫数据源源站。这个模型的弊端非常明显延迟、单点故障、隐私风险与信息茧房。延迟来源于请求需要跨越多个网络层级到达中心服务器一旦中心服务器宕机或网络拥堵所有用户服务中断你的阅读习惯、点击行为被中心服务器全量收集用于构建用户画像和精准广告同时中心化算法决定了你“应该”看到什么容易形成信息茧房。“News — At The Edge”项目彻底摒弃了这个模型采用了边缘计算范式。其核心架构可以概括为“边缘执行器 用户配置 轻量交付”。边缘执行器是你部署的程序它根据你的配置信源列表、抓取周期、过滤规则主动从互联网上的原始信源拉取数据。这个过程是分布式的你的节点只为你或你授权的小群体服务。数据拉取后在边缘节点立即进行清洗、格式化、去重然后通过你设定的轻量级通道如Webhook、消息队列、生成静态文件推送出去。整个数据流不经过任何第三方中转服务器。选择这个架构主要基于以下几点考量抗脆弱性没有中心节点意味着没有单点故障。你的新闻服务只依赖于你的边缘节点和原始信源的可用性稳定性极大提升。低延迟抓取和处理的逻辑就在边缘节点完成推送通道也是直接面向你的终端省去了中心服务器调度和转发的环节理论上可以获得更快的更新速度。隐私与自主所有配置、抓取历史、阅读数据都保存在你控制的节点上没有数据出域的风险。你可以完全自主地定义信息源的权重和过滤逻辑。成本可控利用现有的边缘资源家庭服务器、低配VPS、云函数免费额度即可运行避免了为臃肿的中心化服务付费。2.2 技术栈选型轻量、高效、可组合为了实现上述架构技术栈的选择遵循“轻量、高效、可组合”的原则。项目不绑定任何单一重型框架而是提供一组可以像搭积木一样组合的核心组件。核心抓取与处理引擎Python aiohttp/httpxBeautifulSoup4/feedparser为什么是Python生态丰富在数据处理、网络请求和快速原型开发上优势明显。asyncio异步框架非常适合处理大量并发的网络I/O操作比如同时抓取几十个RSS源。feedparser是处理RSS/Atom订阅源的事实标准库能优雅地处理各种“不标准”的订阅源格式容错性强。BeautifulSoup4或lxml对于非标准订阅源即需要从普通网页中提取结构化信息的“伪RSS”源需要用到HTML解析库来编写特定的抽取规则XPath或CSS Selector。任务调度与执行celeryredis或apscheduler对于需要定时、周期性执行抓取任务的需求引入任务队列是更稳健的做法。Celery作为分布式任务队列配合Redis作为消息代理和结果存储可以将抓取任务异步化、持久化。即使节点重启未完成的任务也不会丢失。如果需求简单仅需定时触发apscheduler这样的轻量级定时任务库也是极佳选择它可以直接内嵌在Python进程中无需额外组件。数据存储与去重SQLite/TinyDB或Redis需要存储抓取到的文章元数据标题、链接、发布时间、摘要等以便进行去重和后续的历史查询。SQLite单文件数据库零配置非常适合边缘场景。可以用它来构建一个简单的文章库通过link或title的哈希值进行唯一性判断。Redis如果已经为了Celery引入了Redis那么直接使用Redis的Set或Sorted Set数据结构来实现基于链接的布隆过滤器模拟去重性能更高。交付通道模块化设计Webhook推送将格式化后的新闻内容通过HTTP POST请求发送到预设的Webhook地址。这可以轻松对接Telegram Bot、Slack、Discord、企业微信机器人等。静态站点生成使用Jinja2模板引擎将抓取到的新闻生成为HTML文件通过nginx/Caddy提供服务或直接部署到GitHub Pages/Vercel。这是完全静态、可被搜索引擎索引、且极度节省资源的方式。邮件推送使用smtplib库通过SMTP协议将新闻摘要发送到指定邮箱。适合不常在线但习惯处理邮件的用户。部署与运行环境Docker 轻量级VPS/云函数Docker将所有依赖打包成容器镜像是实现环境一致性、一键部署的关键。编写一个Dockerfile和docker-compose.yml可以让你在任何支持Docker的环境家庭NAS、云服务器中快速拉起服务。云函数Serverless对于抓取任务这种“瞬时计算长期休眠”的场景云函数是绝配。你可以将抓取逻辑部署为阿里云函数计算、腾讯云SCF或Vercel Serverless Function利用其免费的调用额度实现零成本运行。定时触发通过云厂商的定时触发器实现。注意技术栈的选择高度灵活。如果你对Go或Rust更熟悉完全可以用collyGo或reqwestscraperRust来重写抓取引擎性能会更优。本项目的核心在于架构思想而非具体实现语言。3. 核心模块实现与配置详解3.1 信源配置与管理打造你的信息雷达项目的“大脑”就是信源配置文件。这里不推荐使用复杂的数据库来管理一个结构清晰的YAML或JSON配置文件足矣因为它需要被版本控制如Git管理且易于手动编辑。# sources.yaml sources: - name: “科技爱好者周刊阮一峰” url: “https://www.ruanyifeng.com/blog/atom.xml” type: “rss” # rss, atom, html enabled: true category: “tech” fetch_interval: 3600 # 抓取间隔单位秒 filters: - keyword: “开源” action: “include” # include, exclude, highlight - keyword: “广告” action: “exclude” - name: “Hacker News Top 10” url: “https://news.ycombinator.com/rss” type: “rss” enabled: true category: “tech” fetch_interval: 1800 - name: “某技术博客最新文章” url: “https://example-blog.com” type: “html” enabled: true category: “dev” selectors: list_selector: “article.post” # 文章列表容器 title_selector: “h2 a” link_selector: “h2 ahref” pubdate_selector: “timedatetime” fetch_interval: 7200配置解析与经验type: html这是处理没有提供RSS订阅的网站的关键。你需要编写CSS选择器来告诉程序如何从页面中提取文章列表、标题、链接和日期。这需要一些前端知识但一次配置终身受用。可以使用浏览器的开发者工具检查元素来辅助编写选择器。filters过滤规则是信息提纯的核心。除了简单的关键词包含/排除后期可以扩展为正则表达式匹配甚至接入简单的自然语言处理NLP进行情感分析或主题分类过滤掉你不感兴趣的负面新闻或无关话题。fetch_interval务必为每个源设置合理的抓取间隔这是网络公民的基本素养。过于频繁的抓取如低于几分钟会对目标网站造成压力可能导致你的IP被屏蔽。对于新闻源每小时或每半小时抓取一次通常足够。可以在配置中为不同重要性的源设置不同间隔。3.2 抓取引擎的实现异步、容错与礼貌爬虫抓取引擎是项目的“四肢”。我们必须把它设计得健壮、高效且有礼貌。# crawler.py (核心片段) import aiohttp import asyncio import feedparser from bs4 import BeautifulSoup import logging from urllib.parse import urljoin class AsyncCrawler: def __init__(self, config, db_store): self.config config self.db db_store self.session None self.semaphore asyncio.Semaphore(5) # 控制并发数避免对单一目标源造成压力 async def fetch_feed(self, source): “”“抓取并解析RSS/Atom源”“” async with self.semaphore: try: # 1. 异步获取内容 async with self.session.get(source[‘url’], timeout10, headers{‘User-Agent’: ‘NewsAtEdgeBot/1.0’}) as resp: if resp.status ! 200: logging.warning(f“Failed to fetch {source[‘name’]}: HTTP {resp.status}”) return [] text await resp.text() # 2. 解析Feed feed feedparser.parse(text) if feed.bozo: # feedparser解析出错 logging.warning(f“Feed parse error for {source[‘name’]}: {feed.bozo_exception}”) # 可以尝试降级为HTML解析 return await self.fallback_html_parse(text, source) # 3. 提取条目并格式化 articles [] for entry in feed.entries: article { ‘source’: source[‘name’], ‘title’: entry.get(‘title’, ‘’), ‘link’: entry.get(‘link’, ‘’), ‘published’: self._parse_date(entry.get(‘published’, entry.get(‘updated’))), ‘summary’: entry.get(‘summary’, entry.get(‘description’, ‘’))[:500], # 截断 ‘id’: self._generate_id(entry.get(‘id’, entry.get(‘link’))) } # 4. 应用过滤规则 if self._apply_filters(article, source.get(‘filters’, [])): articles.append(article) return articles except asyncio.TimeoutError: logging.error(f“Timeout fetching {source[‘name’]}”) return [] except Exception as e: logging.error(f“Error fetching {source[‘name’]}: {e}”) return [] async def fetch_html(self, source): “”“抓取并解析HTML页面用于无RSS的网站”“” # 实现逻辑类似使用BeautifulSoup根据配置的selectors进行解析 # ... pass async def run(self, sources_list): “”“主运行循环并发抓取所有启用的源”“” connector aiohttp.TCPConnector(limit20, sslFalse) # 调整连接池限制 async with aiohttp.ClientSession(connectorconnector) as self.session: tasks [] enabled_sources [s for s in sources_list if s.get(‘enabled’, True)] for source in enabled_sources: if source[‘type’] ‘rss’ or source[‘type’] ‘atom’: task self.fetch_feed(source) else: task self.fetch_html(source) tasks.append(task) # 并发执行所有抓取任务 results await asyncio.gather(*tasks, return_exceptionsTrue) all_articles [] for res in results: if isinstance(res, Exception): logging.error(f“Task failed: {res}”) continue all_articles.extend(res) return all_articles关键实现细节与避坑指南并发控制Semaphore这是“礼貌爬虫”的基石。不加限制的并发请求会瞬间冲垮目标服务器。设置一个信号量如并发数5确保同时只有有限数量的请求在发出。超时与重试网络是不稳定的。必须为每个请求设置超时如10秒并实现简单的重试逻辑例如最多重试2次每次间隔递增。aiohttp可以配置超时重试逻辑需要自己用try-except包裹并计数。User-Agent标识在请求头中设置一个清晰的User-Agent如NewsAtEdgeBot/1.0 (https://my-edge-news.com), 这是一个良好的网络行为规范让网站管理员知道你是谁、你想做什么。避免使用浏览器或空UA。错误处理与降级feedparser的bozo标志位非常有用它能捕获Feed格式错误。当RSS解析失败时可以尝试降级到使用预定义的HTML选择器去解析页面作为备选方案。日期解析Feed中的日期格式千奇百怪RFC 2822, RFC 3339, 各种自定义格式。feedparser能处理大部分但自己解析HTML日期时建议使用dateutil库的parser.parse它非常强大且容错。3.3 数据去重与持久化避免信息洪流的重复冲击抓取到的文章必须经过去重才能进入下一步。去重的核心是生成一个稳定的文章唯一标识符。# storage.py import sqlite3 import hashlib from datetime import datetime class ArticleStorage: def __init__(self, db_path‘news.db’): self.conn sqlite3.connect(db_path, check_same_threadFalse) self._init_db() def _init_db(self): cursor self.conn.cursor() cursor.execute(“”“ CREATE TABLE IF NOT EXISTS articles ( id TEXT PRIMARY KEY, source TEXT, title TEXT, link TEXT UNIQUE, published TIMESTAMP, summary TEXT, fetched_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, delivered BOOLEAN DEFAULT 0 ) “”“) cursor.execute(“CREATE INDEX IF NOT EXISTS idx_link ON articles(link)”) cursor.execute(“CREATE INDEX IF NOT EXISTS idx_delivered ON articles(delivered)”) self.conn.commit() def _generate_article_id(self, article): “”“生成文章唯一ID使用链接的MD5哈希因为链接是全局唯一的”“” link article[‘link’].encode(‘utf-8’) return hashlib.md5(link).hexdigest() def save_if_new(self, articles): “”“保存文章返回新文章列表即需要推送的”“” new_articles [] cursor self.conn.cursor() for article in articles: article_id self._generate_article_id(article) # 尝试插入如果违反唯一约束link重复或id重复则忽略 try: cursor.execute(“”“ INSERT INTO articles (id, source, title, link, published, summary) VALUES (?, ?, ?, ?, ?, ?) “”“, ( article_id, article[‘source’], article[‘title’], article[‘link’], article[‘published’], article[‘summary’] )) self.conn.commit() new_articles.append(article) # 插入成功是新文章 except sqlite3.IntegrityError: # 文章已存在检查是否需要更新例如摘要有更新 # 这里简单跳过你也可以选择更新某些字段 pass return new_articles def mark_as_delivered(self, article_ids): “”“将文章标记为已推送”“” # ... 更新 delivered 字段为 1 pass去重策略的深度思考为什么用link的MD5作id链接是文章在网络上的唯一标识最稳定。使用MD5哈希后作为主键比直接使用长字符串的link字段做索引和比较更高效。除了链接去重还需要内容去重吗对于高度同质化的新闻如多家媒体转载同一事件仅靠链接去重不够。可以引入SimHash或MinHash等局部敏感哈希算法计算文章正文的指纹。当两篇文章的指纹相似度超过某个阈值如95%即使链接不同也视为重复。这需要抓取正文内容计算量稍大但能极大提升信息纯度。SQLite的并发写入在高并发抓取场景下SQLite的写操作可能会成为瓶颈因为它默认是文件锁。如果遇到Database is locked错误可以考虑1) 使用WAL模式 (journal_modeWAL)2) 将写操作放入一个队列由单个线程顺序执行3) 对于更高负载换用Redis或PostgreSQL。3.4 交付通道集成让信息精准触达这是项目的“最后一公里”决定了信息如何呈现给你。我们以实现一个Telegram Bot推送为例。# notifier/telegram_notifier.py import aiohttp import logging class TelegramNotifier: def __init__(self, bot_token, chat_id): self.bot_token bot_token self.chat_id chat_id self.api_url f“https://api.telegram.org/bot{bot_token}” async def send_article(self, article): “”“发送单篇文章到Telegram”“” # 格式化消息Telegram支持Markdown/HTML格式 message f“*{article[‘title’]}*\n\n” message f“{article[‘summary’]}\n\n” message f“来源{article[‘source’]}\n” message f“[阅读原文]({article[‘link’]})” payload { ‘chat_id’: self.chat_id, ‘text’: message, ‘parse_mode’: ‘MarkdownV2’, ‘disable_web_page_preview’: False } async with aiohttp.ClientSession() as session: try: async with session.post(f“{self.api_url}/sendMessage”, jsonpayload, timeout10) as resp: if resp.status 200: logging.info(f“Sent to Telegram: {article[‘title’][:50]}...”) return True else: error_text await resp.text() logging.error(f“Telegram API error: {resp.status}, {error_text}”) # 处理特定错误如消息过长可以截断摘要重试 if “message is too long” in error_text: return await self._send_truncated(article, session) return False except Exception as e: logging.error(f“Failed to send to Telegram: {e}”) return False async def send_digest(self, articles, digest_title“今日新闻速递”): “”“发送多篇文章的摘要合并为一条消息或多条”“” if not articles: return # 方案A合并为一条长消息可能超限 # 方案B推荐分批发送每条消息包含2-3篇文章摘要 batch_size 2 for i in range(0, len(articles), batch_size): batch articles[i:ibatch_size] digest_text f“*{digest_title}*\n\n” for idx, article in enumerate(batch): digest_text f“{idx1}. [{article[‘title’]}]({article[‘link’]}) - {article[‘source’]}\n” # 发送digest_text...推送策略优化速率限制无论是Telegram API还是其他Webhook服务都有调用频率限制。在发送大量文章时必须在代码中加入延迟如asyncio.sleep(0.2)避免触发限流。失败重试与队列推送失败是常态网络波动、服务暂时不可用。应该将推送任务放入一个持久化队列如使用Celery并设置重试机制和死信队列确保重要信息不丢失。格式化与可读性不同的推送渠道支持不同的格式Markdown、HTML、纯文本。需要为每个渠道编写对应的消息格式化器确保链接、加粗、换行等效果正确显示。聚合推送与即时推送对于高频更新的源可以考虑“聚合推送”模式即每隔一段时间如1小时将期间所有新文章打包成一份“简报”发送避免消息轰炸。对于突发新闻或高优先级源可以设置为“即时推送”。4. 部署、运维与高级玩法4.1 容器化部署与持续运行为了让服务稳定运行容器化是最佳实践。下面是一个简单的docker-compose.yml示例整合了抓取服务、Redis用于Celery和数据库。# docker-compose.yml version: ‘3.8’ services: redis: image: redis:7-alpine container_name: news-edge-redis restart: unless-stopped volumes: - redis_data:/data command: redis-server --appendonly yes crawler: build: . container_name: news-edge-crawler restart: unless-stopped depends_on: - redis volumes: - ./config:/app/config:ro - ./data:/app/data environment: - REDIS_URLredis://redis:6379/0 - DB_PATH/app/data/news.db # 使用Celery Beat作为定时调度器Celery Worker执行任务 command: sh -c “ celery -A tasks.celery_app beat --loglevelinfo celery -A tasks.celery_app worker --loglevelinfo ” # 可选一个极简的Web界面查看抓取历史和配置 web-ui: image: nginx:alpine container_name: news-edge-ui restart: unless-stopped ports: - “8080:80” volumes: - ./web-ui:/usr/share/nginx/html:ro # 假设静态文件在此目录部署要点配置外部化将sources.yaml等配置文件通过volumes挂载到容器内这样修改配置无需重建镜像。数据持久化将SQLite数据库文件、日志目录等通过volumes挂载到宿主机防止容器重启后数据丢失。健康检查可以为crawler服务添加healthcheck指令监控Celery Worker是否存活。日志管理将容器日志导出到宿主机文件或日志收集系统如Loki/Elasticsearch方便排查问题。4.2 监控、日志与故障排查一个健壮的系统离不开可观测性。日志分级使用Python的logging模块为不同组件设置不同的日志级别INFO, WARNING, ERROR。将抓取成功、失败、去重情况、推送结果都记录下来。关键指标监控抓取成功率成功抓取的源数量 / 总源数量。低于阈值如80%报警。新文章发现率周期内发现的新文章数量。持续为0可能意味着源失效或去重逻辑过严。推送成功率消息成功送达率。系统资源CPU、内存、磁盘使用率特别是SQLite数据库文件大小。简易监控实现可以编写一个简单的健康检查端点或者定期将上述指标输出到日志然后使用PrometheusGrafana进行采集和展示或者更简单地写一个脚本定期检查并发送状态报告到你的Telegram。常见故障排查清单抓取全部失败检查网络容器或服务器是否能访问外网ping 8.8.8.8。检查DNSnslookup检查目标域名解析。检查目标源手动用curl或浏览器访问源地址看是否正常。检查User-Agent和频率是否被目标网站屏蔽尝试调整UA和抓取间隔。数据库锁死或写入慢检查SQLite连接确保没有多处代码同时写数据库。使用连接池或序列化写操作。检查磁盘IO如果数据库在机械硬盘上性能可能很差。考虑移到SSD或使用SQLite的PRAGMA synchronous NORMAL;和PRAGMA journal_mode WAL;进行优化。数据库膨胀定期清理已推送的旧文章如保留30天执行VACUUM;命令回收空间。推送渠道失败Token/Chat ID错误检查配置。API限流查看错误信息加入更长的请求间隔。消息格式错误特别是Markdown中的特殊字符_,*,[,],(等需要转义。使用Telegram的parse_mode: ‘MarkdownV2’时尤其要注意。4.3 扩展与高级功能设想基础功能稳定后你可以考虑以下扩展打造更强大的个人信息中枢智能过滤与推荐集成NLP库如jieba中文分词scikit-learn或transformers库。可以对文章摘要进行关键词提取、主题分类如“科技”、“财经”、“体育”或情感分析实现更精准的过滤。协同过滤如果你为多个用户服务家庭使用可以记录用户的点击/阅读行为实现简单的“看了这个的人也看了”的推荐。多模态内容支持音频/视频摘要对于播客或视频新闻源可以尝试调用语音转文本API如OpenAI Whisper的本地部署生成文字摘要再纳入你的新闻流。图片OCR抓取包含重要信息的图片新闻使用Tesseract等OCR库提取文字。信息聚合与摘要自动摘要利用文本摘要算法如TextRank或大语言模型本地部署的ChatGLM、Qwen等将长文章自动生成简短摘要方便快速浏览。每日/每周简报在固定时间如早上8点将过去24小时或一周内最重要的新闻可根据点击率、来源权重、关键词热度综合排序生成一份精美的HTML或PDF简报通过邮件发送。边缘存储与缓存全文存档不仅存储元数据也将文章正文抓取下来存储到本地数据库或对象存储如MinIO作为个人的“互联网档案馆”即使原链接失效也能查看。CDN加速将生成的静态新闻页面推送到Cloudflare Pages、Vercel或Netlify等全球CDN实现全球快速访问。这个项目的魅力在于它从一个简单的想法开始可以随着你的需求和技术成长不断迭代和扩展最终成为完全属于你个人的、智能的、高效的信息获取与处理基础设施。它不仅仅是一个工具更是一种对待信息和数字生活的态度。