""" RSS feed fetching, Project Gutenberg parsing, and headline caching. Depends on: config, sources, filter, terminal. """ import json import pathlib import re import urllib.request from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import Any import feedparser from engine import config from engine.filter import skip, strip_tags from engine.sources import FEEDS, POETRY_SOURCES from engine.terminal import boot_ln HeadlineTuple = tuple[str, str, str] DEFAULT_MAX_WORKERS = 10 FAST_START_SOURCES = 5 FAST_START_TIMEOUT = 3 def fetch_feed(url: str) -> tuple[str, Any] | tuple[None, None]: """Fetch and parse a single RSS feed URL. Returns (url, feed) tuple.""" try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) timeout = FAST_START_TIMEOUT if url in _fast_start_urls else config.FEED_TIMEOUT resp = urllib.request.urlopen(req, timeout=timeout) return (url, feedparser.parse(resp.read())) except Exception: return (url, None) def _parse_feed(feed: Any, src: str) -> list[HeadlineTuple]: """Parse a feed and return list of headline tuples.""" items = [] if feed is None or (feed.bozo and not feed.entries): return items for e in feed.entries: t = strip_tags(e.get("title", "")) if not t or skip(t): continue pub = e.get("published_parsed") or e.get("updated_parsed") try: ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——" except Exception: ts = "——:——" items.append((t, src, ts)) return items def fetch_all_fast() -> list[HeadlineTuple]: """Fetch only the first N sources for fast startup.""" global _fast_start_urls _fast_start_urls = set(list(FEEDS.values())[:FAST_START_SOURCES]) items: list[HeadlineTuple] = [] with ThreadPoolExecutor(max_workers=FAST_START_SOURCES) as executor: futures = { executor.submit(fetch_feed, url): src for src, url in list(FEEDS.items())[:FAST_START_SOURCES] } for future in as_completed(futures): src = futures[future] url, feed = future.result() if feed is None or (feed.bozo and not feed.entries): boot_ln(src, "DARK", False) continue parsed = _parse_feed(feed, src) if parsed: items.extend(parsed) boot_ln(src, f"LINKED [{len(parsed)}]", True) else: boot_ln(src, "EMPTY", False) return items def fetch_all() -> tuple[list[HeadlineTuple], int, int]: """Fetch all RSS feeds concurrently and return items, linked count, failed count.""" global _fast_start_urls _fast_start_urls = set() items: list[HeadlineTuple] = [] linked = failed = 0 with ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS) as executor: futures = {executor.submit(fetch_feed, url): src for src, url in FEEDS.items()} for future in as_completed(futures): src = futures[future] url, feed = future.result() if feed is None or (feed.bozo and not feed.entries): boot_ln(src, "DARK", False) failed += 1 continue parsed = _parse_feed(feed, src) if parsed: items.extend(parsed) boot_ln(src, f"LINKED [{len(parsed)}]", True) linked += 1 else: boot_ln(src, "EMPTY", False) failed += 1 return items, linked, failed def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]: """Download and parse stanzas/passages from a Project Gutenberg text.""" try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) resp = urllib.request.urlopen(req, timeout=15) text = ( resp.read() .decode("utf-8", errors="replace") .replace("\r\n", "\n") .replace("\r", "\n") ) m = re.search(r"\*\*\*\s*START OF[^\n]*\n", text) if m: text = text[m.end() :] m = re.search(r"\*\*\*\s*END OF", text) if m: text = text[: m.start()] blocks = re.split(r"\n{2,}", text.strip()) items = [] for blk in blocks: blk = " ".join(blk.split()) if len(blk) < 20 or len(blk) > 280: continue if blk.isupper(): continue if re.match(r"^[IVXLCDM]+\.?\s*$", blk): continue items.append((blk, label, "")) return items except Exception: return [] def fetch_poetry() -> tuple[list[HeadlineTuple], int, int]: """Fetch all poetry/literature sources concurrently.""" items = [] linked = failed = 0 with ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS) as executor: futures = { executor.submit(_fetch_gutenberg, url, label): label for label, url in POETRY_SOURCES.items() } for future in as_completed(futures): label = futures[future] stanzas = future.result() if stanzas: boot_ln(label, f"LOADED [{len(stanzas)}]", True) items.extend(stanzas) linked += 1 else: boot_ln(label, "DARK", False) failed += 1 return items, linked, failed _cache_dir = pathlib.Path(__file__).resolve().parent / "fixtures" def _cache_path(): return _cache_dir / "headlines.json" def load_cache(): """Load cached items from disk if available.""" p = _cache_path() if not p.exists(): return None try: data = json.loads(p.read_text()) items = [tuple(i) for i in data["items"]] return items if items else None except Exception: return None def save_cache(items): """Save fetched items to disk for fast subsequent runs.""" try: _cache_path().write_text(json.dumps({"items": items})) except Exception: pass _fast_start_urls: set = set()