- Add for quick startup using first N feeds - Add background thread for full fetch and caching - Update to use fast fetch - Update docs and skills
199 lines
6.1 KiB
Python
199 lines
6.1 KiB
Python
"""
|
|
RSS feed fetching, Project Gutenberg parsing, and headline caching.
|
|
Depends on: config, sources, filter, terminal.
|
|
"""
|
|
|
|
import json
|
|
import pathlib
|
|
import re
|
|
import urllib.request
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
from datetime import datetime
|
|
from typing import Any
|
|
|
|
import feedparser
|
|
|
|
from engine import config
|
|
from engine.filter import skip, strip_tags
|
|
from engine.sources import FEEDS, POETRY_SOURCES
|
|
from engine.terminal import boot_ln
|
|
|
|
HeadlineTuple = tuple[str, str, str]
|
|
|
|
DEFAULT_MAX_WORKERS = 10
|
|
FAST_START_SOURCES = 5
|
|
FAST_START_TIMEOUT = 3
|
|
|
|
|
|
def fetch_feed(url: str) -> tuple[str, Any] | tuple[None, None]:
|
|
"""Fetch and parse a single RSS feed URL. Returns (url, feed) tuple."""
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
|
timeout = FAST_START_TIMEOUT if url in _fast_start_urls else config.FEED_TIMEOUT
|
|
resp = urllib.request.urlopen(req, timeout=timeout)
|
|
return (url, feedparser.parse(resp.read()))
|
|
except Exception:
|
|
return (url, None)
|
|
|
|
|
|
def _parse_feed(feed: Any, src: str) -> list[HeadlineTuple]:
|
|
"""Parse a feed and return list of headline tuples."""
|
|
items = []
|
|
if feed is None or (feed.bozo and not feed.entries):
|
|
return items
|
|
|
|
for e in feed.entries:
|
|
t = strip_tags(e.get("title", ""))
|
|
if not t or skip(t):
|
|
continue
|
|
pub = e.get("published_parsed") or e.get("updated_parsed")
|
|
try:
|
|
ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——"
|
|
except Exception:
|
|
ts = "——:——"
|
|
items.append((t, src, ts))
|
|
return items
|
|
|
|
|
|
def fetch_all_fast() -> list[HeadlineTuple]:
|
|
"""Fetch only the first N sources for fast startup."""
|
|
global _fast_start_urls
|
|
_fast_start_urls = set(list(FEEDS.values())[:FAST_START_SOURCES])
|
|
|
|
items: list[HeadlineTuple] = []
|
|
with ThreadPoolExecutor(max_workers=FAST_START_SOURCES) as executor:
|
|
futures = {
|
|
executor.submit(fetch_feed, url): src
|
|
for src, url in list(FEEDS.items())[:FAST_START_SOURCES]
|
|
}
|
|
for future in as_completed(futures):
|
|
src = futures[future]
|
|
url, feed = future.result()
|
|
if feed is None or (feed.bozo and not feed.entries):
|
|
boot_ln(src, "DARK", False)
|
|
continue
|
|
parsed = _parse_feed(feed, src)
|
|
if parsed:
|
|
items.extend(parsed)
|
|
boot_ln(src, f"LINKED [{len(parsed)}]", True)
|
|
else:
|
|
boot_ln(src, "EMPTY", False)
|
|
return items
|
|
|
|
|
|
def fetch_all() -> tuple[list[HeadlineTuple], int, int]:
|
|
"""Fetch all RSS feeds concurrently and return items, linked count, failed count."""
|
|
global _fast_start_urls
|
|
_fast_start_urls = set()
|
|
|
|
items: list[HeadlineTuple] = []
|
|
linked = failed = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS) as executor:
|
|
futures = {executor.submit(fetch_feed, url): src for src, url in FEEDS.items()}
|
|
for future in as_completed(futures):
|
|
src = futures[future]
|
|
url, feed = future.result()
|
|
if feed is None or (feed.bozo and not feed.entries):
|
|
boot_ln(src, "DARK", False)
|
|
failed += 1
|
|
continue
|
|
parsed = _parse_feed(feed, src)
|
|
if parsed:
|
|
items.extend(parsed)
|
|
boot_ln(src, f"LINKED [{len(parsed)}]", True)
|
|
linked += 1
|
|
else:
|
|
boot_ln(src, "EMPTY", False)
|
|
failed += 1
|
|
|
|
return items, linked, failed
|
|
|
|
|
|
def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]:
|
|
"""Download and parse stanzas/passages from a Project Gutenberg text."""
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
|
resp = urllib.request.urlopen(req, timeout=15)
|
|
text = (
|
|
resp.read()
|
|
.decode("utf-8", errors="replace")
|
|
.replace("\r\n", "\n")
|
|
.replace("\r", "\n")
|
|
)
|
|
m = re.search(r"\*\*\*\s*START OF[^\n]*\n", text)
|
|
if m:
|
|
text = text[m.end() :]
|
|
m = re.search(r"\*\*\*\s*END OF", text)
|
|
if m:
|
|
text = text[: m.start()]
|
|
blocks = re.split(r"\n{2,}", text.strip())
|
|
items = []
|
|
for blk in blocks:
|
|
blk = " ".join(blk.split())
|
|
if len(blk) < 20 or len(blk) > 280:
|
|
continue
|
|
if blk.isupper():
|
|
continue
|
|
if re.match(r"^[IVXLCDM]+\.?\s*$", blk):
|
|
continue
|
|
items.append((blk, label, ""))
|
|
return items
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def fetch_poetry() -> tuple[list[HeadlineTuple], int, int]:
|
|
"""Fetch all poetry/literature sources concurrently."""
|
|
items = []
|
|
linked = failed = 0
|
|
|
|
with ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS) as executor:
|
|
futures = {
|
|
executor.submit(_fetch_gutenberg, url, label): label
|
|
for label, url in POETRY_SOURCES.items()
|
|
}
|
|
for future in as_completed(futures):
|
|
label = futures[future]
|
|
stanzas = future.result()
|
|
if stanzas:
|
|
boot_ln(label, f"LOADED [{len(stanzas)}]", True)
|
|
items.extend(stanzas)
|
|
linked += 1
|
|
else:
|
|
boot_ln(label, "DARK", False)
|
|
failed += 1
|
|
|
|
return items, linked, failed
|
|
|
|
|
|
_cache_dir = pathlib.Path(__file__).resolve().parent / "fixtures"
|
|
|
|
|
|
def _cache_path():
|
|
return _cache_dir / "headlines.json"
|
|
|
|
|
|
def load_cache():
|
|
"""Load cached items from disk if available."""
|
|
p = _cache_path()
|
|
if not p.exists():
|
|
return None
|
|
try:
|
|
data = json.loads(p.read_text())
|
|
items = [tuple(i) for i in data["items"]]
|
|
return items if items else None
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def save_cache(items):
|
|
"""Save fetched items to disk for fast subsequent runs."""
|
|
try:
|
|
_cache_path().write_text(json.dumps({"items": items}))
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
_fast_start_urls: set = set()
|