From 6e39a2dad20cf3bd79b5c18e2ee7cb8619c3e779 Mon Sep 17 00:00:00 2001 From: Gene Johnson Date: Sun, 15 Mar 2026 13:44:26 -0700 Subject: [PATCH] feat: migrate Ntfy message retrieval from polling to SSE streaming, replacing `poll_interval` with `reconnect_delay` for continuous updates. --- engine/app.py | 2 +- engine/config.py | 4 ++-- engine/ntfy.py | 47 ++++++++++++++++++++++++++++++++--------------- 3 files changed, 35 insertions(+), 18 deletions(-) diff --git a/engine/app.py b/engine/app.py index 98cddd1..375b7b5 100644 --- a/engine/app.py +++ b/engine/app.py @@ -296,7 +296,7 @@ def main(): ntfy = NtfyPoller( config.NTFY_TOPIC, - poll_interval=config.NTFY_POLL_INTERVAL, + reconnect_delay=config.NTFY_RECONNECT_DELAY, display_secs=config.MESSAGE_DISPLAY_SECS, ) ntfy_ok = ntfy.start() diff --git a/engine/config.py b/engine/config.py index f509b23..10e5b03 100644 --- a/engine/config.py +++ b/engine/config.py @@ -59,8 +59,8 @@ MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news' FIREHOSE = '--firehose' in sys.argv # ─── NTFY MESSAGE QUEUE ────────────────────────────────── -NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json?since=20s&poll=1" -NTFY_POLL_INTERVAL = 15 # seconds between polls +NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json" +NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen # ─── FONT RENDERING ────────────────────────────────────── diff --git a/engine/ntfy.py b/engine/ntfy.py index 25dd6a5..25920ed 100644 --- a/engine/ntfy.py +++ b/engine/ntfy.py @@ -1,9 +1,9 @@ """ -ntfy.sh message poller — standalone, zero internal dependencies. +ntfy.sh SSE stream listener — standalone, zero internal dependencies. Reusable by any visualizer: from engine.ntfy import NtfyPoller - poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1") + poller = NtfyPoller("https://ntfy.sh/my_topic/json") poller.start() # in render loop: msg = poller.get_active_message() @@ -16,21 +16,22 @@ import json import time import threading import urllib.request +from urllib.parse import urlparse, parse_qs, urlencode, urlunparse class NtfyPoller: - """Background poller for ntfy.sh topics.""" + """SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT).""" - def __init__(self, topic_url, poll_interval=15, display_secs=30): + def __init__(self, topic_url, reconnect_delay=5, display_secs=30): self.topic_url = topic_url - self.poll_interval = poll_interval + self.reconnect_delay = reconnect_delay self.display_secs = display_secs self._message = None # (title, body, monotonic_timestamp) or None self._lock = threading.Lock() def start(self): - """Start background polling thread. Returns True.""" - t = threading.Thread(target=self._poll_loop, daemon=True) + """Start background stream thread. Returns True.""" + t = threading.Thread(target=self._stream_loop, daemon=True) t.start() return True @@ -50,19 +51,35 @@ class NtfyPoller: with self._lock: self._message = None - def _poll_loop(self): + def _build_url(self, last_id=None): + """Build the stream URL, substituting since= to avoid message replays on reconnect.""" + parsed = urlparse(self.topic_url) + params = parse_qs(parsed.query, keep_blank_values=True) + params['since'] = [last_id if last_id else '20s'] + new_query = urlencode({k: v[0] for k, v in params.items()}) + return urlunparse(parsed._replace(query=new_query)) + + def _stream_loop(self): + last_id = None while True: try: + url = self._build_url(last_id) req = urllib.request.Request( - self.topic_url, headers={"User-Agent": "mainline/0.1"}) - resp = urllib.request.urlopen(req, timeout=10) - for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'): - if not line.strip(): - continue + url, headers={"User-Agent": "mainline/0.1"}) + # timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats + resp = urllib.request.urlopen(req, timeout=90) + while True: + line = resp.readline() + if not line: + break # server closed connection — reconnect try: - data = json.loads(line) + data = json.loads(line.decode('utf-8', errors='replace')) except json.JSONDecodeError: continue + # Advance cursor on every event (message + keepalive) to + # avoid replaying already-seen events after a reconnect. + if "id" in data: + last_id = data["id"] if data.get("event") == "message": with self._lock: self._message = ( @@ -72,4 +89,4 @@ class NtfyPoller: ) except Exception: pass - time.sleep(self.poll_interval) + time.sleep(self.reconnect_delay)