Merge pull request 'feat: migrate Ntfy message retrieval from polling to SSE streaming, replacing poll_interval with reconnect_delay for continuous updates.' (#20) from feat/ntfy-sse into main

Reviewed-on: genewildish/Mainline#20
This commit is contained in:
2026-03-15 20:50:08 +00:00
3 changed files with 35 additions and 18 deletions

View File

@@ -296,7 +296,7 @@ def main():
ntfy = NtfyPoller( ntfy = NtfyPoller(
config.NTFY_TOPIC, config.NTFY_TOPIC,
poll_interval=config.NTFY_POLL_INTERVAL, reconnect_delay=config.NTFY_RECONNECT_DELAY,
display_secs=config.MESSAGE_DISPLAY_SECS, display_secs=config.MESSAGE_DISPLAY_SECS,
) )
ntfy_ok = ntfy.start() ntfy_ok = ntfy.start()

View File

@@ -59,8 +59,8 @@ MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news'
FIREHOSE = '--firehose' in sys.argv FIREHOSE = '--firehose' in sys.argv
# ─── NTFY MESSAGE QUEUE ────────────────────────────────── # ─── NTFY MESSAGE QUEUE ──────────────────────────────────
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json?since=20s&poll=1" NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json"
NTFY_POLL_INTERVAL = 15 # seconds between polls NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen
# ─── FONT RENDERING ────────────────────────────────────── # ─── FONT RENDERING ──────────────────────────────────────

View File

@@ -1,9 +1,9 @@
""" """
ntfy.sh message poller — standalone, zero internal dependencies. ntfy.sh SSE stream listener — standalone, zero internal dependencies.
Reusable by any visualizer: Reusable by any visualizer:
from engine.ntfy import NtfyPoller from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1") poller = NtfyPoller("https://ntfy.sh/my_topic/json")
poller.start() poller.start()
# in render loop: # in render loop:
msg = poller.get_active_message() msg = poller.get_active_message()
@@ -16,21 +16,22 @@ import json
import time import time
import threading import threading
import urllib.request import urllib.request
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse
class NtfyPoller: class NtfyPoller:
"""Background poller for ntfy.sh topics.""" """SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
def __init__(self, topic_url, poll_interval=15, display_secs=30): def __init__(self, topic_url, reconnect_delay=5, display_secs=30):
self.topic_url = topic_url self.topic_url = topic_url
self.poll_interval = poll_interval self.reconnect_delay = reconnect_delay
self.display_secs = display_secs self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock() self._lock = threading.Lock()
def start(self): def start(self):
"""Start background polling thread. Returns True.""" """Start background stream thread. Returns True."""
t = threading.Thread(target=self._poll_loop, daemon=True) t = threading.Thread(target=self._stream_loop, daemon=True)
t.start() t.start()
return True return True
@@ -50,19 +51,35 @@ class NtfyPoller:
with self._lock: with self._lock:
self._message = None self._message = None
def _poll_loop(self): def _build_url(self, last_id=None):
"""Build the stream URL, substituting since= to avoid message replays on reconnect."""
parsed = urlparse(self.topic_url)
params = parse_qs(parsed.query, keep_blank_values=True)
params['since'] = [last_id if last_id else '20s']
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def _stream_loop(self):
last_id = None
while True: while True:
try: try:
url = self._build_url(last_id)
req = urllib.request.Request( req = urllib.request.Request(
self.topic_url, headers={"User-Agent": "mainline/0.1"}) url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=10) # timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats
for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'): resp = urllib.request.urlopen(req, timeout=90)
if not line.strip(): while True:
continue line = resp.readline()
if not line:
break # server closed connection — reconnect
try: try:
data = json.loads(line) data = json.loads(line.decode('utf-8', errors='replace'))
except json.JSONDecodeError: except json.JSONDecodeError:
continue continue
# Advance cursor on every event (message + keepalive) to
# avoid replaying already-seen events after a reconnect.
if "id" in data:
last_id = data["id"]
if data.get("event") == "message": if data.get("event") == "message":
with self._lock: with self._lock:
self._message = ( self._message = (
@@ -72,4 +89,4 @@ class NtfyPoller:
) )
except Exception: except Exception:
pass pass
time.sleep(self.poll_interval) time.sleep(self.reconnect_delay)