feat: Introduce ntfy.sh message polling, content fetching with caching, and microphone input monitoring.

This commit is contained in:
2026-03-14 23:34:23 -07:00
parent 2e6b2c48bd
commit 2c777729f5
4 changed files with 271 additions and 1 deletions

133
engine/fetch.py Normal file
View File

@@ -0,0 +1,133 @@
"""
RSS feed fetching, Project Gutenberg parsing, and headline caching.
Depends on: config, sources, filter, terminal.
"""
import re
import json
import pathlib
import urllib.request
from datetime import datetime
import feedparser
from engine import config
from engine.sources import FEEDS, POETRY_SOURCES
from engine.filter import strip_tags, skip
from engine.terminal import boot_ln
# ─── SINGLE FEED ──────────────────────────────────────────
def fetch_feed(url):
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT)
return feedparser.parse(resp.read())
except Exception:
return None
# ─── ALL RSS FEEDS ────────────────────────────────────────
def fetch_all():
items = []
linked = failed = 0
for src, url in FEEDS.items():
feed = fetch_feed(url)
if feed is None or (feed.bozo and not feed.entries):
boot_ln(src, "DARK", False)
failed += 1
continue
n = 0
for e in feed.entries:
t = strip_tags(e.get("title", ""))
if not t or skip(t):
continue
pub = e.get("published_parsed") or e.get("updated_parsed")
try:
ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——"
except Exception:
ts = "——:——"
items.append((t, src, ts))
n += 1
if n:
boot_ln(src, f"LINKED [{n}]", True)
linked += 1
else:
boot_ln(src, "EMPTY", False)
failed += 1
return items, linked, failed
# ─── PROJECT GUTENBERG ────────────────────────────────────
def _fetch_gutenberg(url, label):
"""Download and parse stanzas/passages from a Project Gutenberg text."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=15)
text = resp.read().decode('utf-8', errors='replace').replace('\r\n', '\n').replace('\r', '\n')
# Strip PG boilerplate
m = re.search(r'\*\*\*\s*START OF[^\n]*\n', text)
if m:
text = text[m.end():]
m = re.search(r'\*\*\*\s*END OF', text)
if m:
text = text[:m.start()]
# Split on blank lines into stanzas/passages
blocks = re.split(r'\n{2,}', text.strip())
items = []
for blk in blocks:
blk = ' '.join(blk.split()) # flatten to one line
if len(blk) < 20 or len(blk) > 280:
continue
if blk.isupper(): # skip all-caps headers
continue
if re.match(r'^[IVXLCDM]+\.?\s*$', blk): # roman numerals
continue
items.append((blk, label, ''))
return items
except Exception:
return []
def fetch_poetry():
"""Fetch all poetry/literature sources."""
items = []
linked = failed = 0
for label, url in POETRY_SOURCES.items():
stanzas = _fetch_gutenberg(url, label)
if stanzas:
boot_ln(label, f"LOADED [{len(stanzas)}]", True)
items.extend(stanzas)
linked += 1
else:
boot_ln(label, "DARK", False)
failed += 1
return items, linked, failed
# ─── CACHE ────────────────────────────────────────────────
_CACHE_DIR = pathlib.Path(__file__).resolve().parent.parent
def _cache_path():
return _CACHE_DIR / f".mainline_cache_{config.MODE}.json"
def load_cache():
"""Load cached items from disk if available."""
p = _cache_path()
if not p.exists():
return None
try:
data = json.loads(p.read_text())
items = [tuple(i) for i in data["items"]]
return items if items else None
except Exception:
return None
def save_cache(items):
"""Save fetched items to disk for fast subsequent runs."""
try:
_cache_path().write_text(json.dumps({"items": items}))
except Exception:
pass

62
engine/mic.py Normal file
View File

@@ -0,0 +1,62 @@
"""
Microphone input monitor — standalone, no internal dependencies.
Gracefully degrades if sounddevice/numpy are unavailable.
"""
import atexit
try:
import sounddevice as _sd
import numpy as _np
_HAS_MIC = True
except Exception:
_HAS_MIC = False
class MicMonitor:
"""Background mic stream that exposes current RMS dB level."""
def __init__(self, threshold_db=50):
self.threshold_db = threshold_db
self._db = -99.0
self._stream = None
@property
def available(self):
"""True if sounddevice is importable."""
return _HAS_MIC
@property
def db(self):
"""Current RMS dB level."""
return self._db
@property
def excess(self):
"""dB above threshold (clamped to 0)."""
return max(0.0, self._db - self.threshold_db)
def start(self):
"""Start background mic stream. Returns True on success, False/None otherwise."""
if not _HAS_MIC:
return None
def _cb(indata, frames, t, status):
rms = float(_np.sqrt(_np.mean(indata ** 2)))
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
try:
self._stream = _sd.InputStream(
callback=_cb, channels=1, samplerate=44100, blocksize=2048)
self._stream.start()
atexit.register(self.stop)
return True
except Exception:
return False
def stop(self):
"""Stop the mic stream if running."""
if self._stream:
try:
self._stream.stop()
except Exception:
pass
self._stream = None

75
engine/ntfy.py Normal file
View File

@@ -0,0 +1,75 @@
"""
ntfy.sh message poller — standalone, zero internal dependencies.
Reusable by any visualizer:
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
poller.start()
# in render loop:
msg = poller.get_active_message()
if msg:
title, body, ts = msg
render_my_message(title, body)
"""
import json
import time
import threading
import urllib.request
class NtfyPoller:
"""Background poller for ntfy.sh topics."""
def __init__(self, topic_url, poll_interval=15, display_secs=30):
self.topic_url = topic_url
self.poll_interval = poll_interval
self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock()
def start(self):
"""Start background polling thread. Returns True."""
t = threading.Thread(target=self._poll_loop, daemon=True)
t.start()
return True
def get_active_message(self):
"""Return (title, body, timestamp) if a message is active and not expired, else None."""
with self._lock:
if self._message is None:
return None
title, body, ts = self._message
if time.monotonic() - ts < self.display_secs:
return self._message
self._message = None
return None
def dismiss(self):
"""Manually dismiss the current message."""
with self._lock:
self._message = None
def _poll_loop(self):
while True:
try:
req = urllib.request.Request(
self.topic_url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=10)
for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
continue
if data.get("event") == "message":
with self._lock:
self._message = (
data.get("title", ""),
data.get("message", ""),
time.monotonic(),
)
except Exception:
pass
time.sleep(self.poll_interval)

View File

@@ -10,7 +10,7 @@ import random
from PIL import Image, ImageDraw, ImageFont
from engine import config
from engine.terminal import RST, W_COOL
from engine.terminal import RST
from engine.sources import SCRIPT_FONTS, SOURCE_LANGS, NO_UPPER
from engine.translate import detect_location_language, translate_headline