feat: migrate Ntfy message retrieval from polling to SSE streaming, replacing poll_interval with reconnect_delay for continuous updates.

This commit is contained in:
2026-03-15 13:44:26 -07:00
parent e2e5b42212
commit dbb9743640
3 changed files with 35 additions and 18 deletions

View File

@@ -296,7 +296,7 @@ def main():
ntfy = NtfyPoller(
config.NTFY_TOPIC,
poll_interval=config.NTFY_POLL_INTERVAL,
reconnect_delay=config.NTFY_RECONNECT_DELAY,
display_secs=config.MESSAGE_DISPLAY_SECS,
)
ntfy_ok = ntfy.start()

View File

@@ -59,8 +59,8 @@ MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news'
FIREHOSE = '--firehose' in sys.argv
# ─── NTFY MESSAGE QUEUE ──────────────────────────────────
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json?since=20s&poll=1"
NTFY_POLL_INTERVAL = 15 # seconds between polls
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json"
NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen
# ─── FONT RENDERING ──────────────────────────────────────

View File

@@ -1,9 +1,9 @@
"""
ntfy.sh message poller — standalone, zero internal dependencies.
ntfy.sh SSE stream listener — standalone, zero internal dependencies.
Reusable by any visualizer:
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
poller = NtfyPoller("https://ntfy.sh/my_topic/json")
poller.start()
# in render loop:
msg = poller.get_active_message()
@@ -16,21 +16,22 @@ import json
import time
import threading
import urllib.request
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
class NtfyPoller:
"""Background poller for ntfy.sh topics."""
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
def __init__(self, topic_url, poll_interval=15, display_secs=30):
def __init__(self, topic_url, reconnect_delay=5, display_secs=30):
self.topic_url = topic_url
self.poll_interval = poll_interval
self.reconnect_delay = reconnect_delay
self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock()
def start(self):
"""Start background polling thread. Returns True."""
t = threading.Thread(target=self._poll_loop, daemon=True)
"""Start background stream thread. Returns True."""
t = threading.Thread(target=self._stream_loop, daemon=True)
t.start()
return True
@@ -50,19 +51,35 @@ class NtfyPoller:
with self._lock:
self._message = None
def _poll_loop(self):
def _build_url(self, last_id=None):
"""Build the stream URL, substituting since= to avoid message replays on reconnect."""
parsed = urlparse(self.topic_url)
params = parse_qs(parsed.query, keep_blank_values=True)
params['since'] = [last_id if last_id else '20s']
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def _stream_loop(self):
last_id = None
while True:
try:
url = self._build_url(last_id)
req = urllib.request.Request(
self.topic_url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=10)
for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'):
if not line.strip():
continue
url, headers={"User-Agent": "mainline/0.1"})
# timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats
resp = urllib.request.urlopen(req, timeout=90)
while True:
line = resp.readline()
if not line:
break # server closed connection — reconnect
try:
data = json.loads(line)
data = json.loads(line.decode('utf-8', errors='replace'))
except json.JSONDecodeError:
continue
# Advance cursor on every event (message + keepalive) to
# avoid replaying already-seen events after a reconnect.
if "id" in data:
last_id = data["id"]
if data.get("event") == "message":
with self._lock:
self._message = (
@@ -72,4 +89,4 @@ class NtfyPoller:
)
except Exception:
pass
time.sleep(self.poll_interval)
time.sleep(self.reconnect_delay)