""" ntfy.sh SSE stream listener — standalone, zero internal dependencies. Reusable by any visualizer: from engine.ntfy import NtfyPoller poller = NtfyPoller("https://ntfy.sh/my_topic/json") poller.start() # in render loop: msg = poller.get_active_message() if msg: title, body, ts = msg render_my_message(title, body) """ import json import threading import time import urllib.request from urllib.parse import urlparse, parse_qs, urlencode, urlunparse class NtfyPoller: """SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT).""" def __init__(self, topic_url, reconnect_delay=5, display_secs=30): self.topic_url = topic_url self.reconnect_delay = reconnect_delay self.display_secs = display_secs self._message = None # (title, body, monotonic_timestamp) or None self._lock = threading.Lock() def start(self): """Start background stream thread. Returns True.""" t = threading.Thread(target=self._stream_loop, daemon=True) t.start() return True def get_active_message(self): """Return (title, body, timestamp) if a message is active and not expired, else None.""" with self._lock: if self._message is None: return None title, body, ts = self._message if time.monotonic() - ts < self.display_secs: return self._message self._message = None return None def dismiss(self): """Manually dismiss the current message.""" with self._lock: self._message = None def _build_url(self, last_id=None): """Build the stream URL, substituting since= to avoid message replays on reconnect.""" parsed = urlparse(self.topic_url) params = parse_qs(parsed.query, keep_blank_values=True) params["since"] = [last_id if last_id else "20s"] new_query = urlencode({k: v[0] for k, v in params.items()}) return urlunparse(parsed._replace(query=new_query)) def _stream_loop(self): last_id = None while True: try: url = self._build_url(last_id) req = urllib.request.Request( url, headers={"User-Agent": "mainline/0.1"} ) # timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats resp = urllib.request.urlopen(req, timeout=90) while True: line = resp.readline() if not line: break # server closed connection — reconnect try: data = json.loads(line.decode("utf-8", errors="replace")) except json.JSONDecodeError: continue # Advance cursor on every event (message + keepalive) to # avoid replaying already-seen events after a reconnect. if "id" in data: last_id = data["id"] if data.get("event") == "message": with self._lock: self._message = ( data.get("title", ""), data.get("message", ""), time.monotonic(), ) except Exception: pass time.sleep(self.reconnect_delay)