""" ntfy.sh SSE stream listener — standalone, zero internal dependencies. Reusable by any visualizer: from engine.ntfy import NtfyPoller poller = NtfyPoller("https://ntfy.sh/my_topic/json") poller.start() # in render loop: msg = poller.get_active_message() if msg: title, body, ts = msg render_my_message(title, body) """ import json import threading import time import urllib.request from collections.abc import Callable from datetime import datetime from urllib.parse import parse_qs, urlencode, urlparse, urlunparse from engine.events import NtfyMessageEvent class NtfyPoller: """SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT).""" def __init__(self, topic_url, reconnect_delay=5, display_secs=30): self.topic_url = topic_url self.reconnect_delay = reconnect_delay self.display_secs = display_secs self._message = None # (title, body, monotonic_timestamp) or None self._lock = threading.Lock() self._subscribers: list[Callable[[NtfyMessageEvent], None]] = [] def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None: """Register a callback to be called when a message is received.""" self._subscribers.append(callback) def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None: """Remove a registered callback.""" if callback in self._subscribers: self._subscribers.remove(callback) def _emit(self, event: NtfyMessageEvent) -> None: """Emit an event to all subscribers.""" for cb in self._subscribers: try: cb(event) except Exception: pass def start(self): """Start background stream thread. Returns True.""" t = threading.Thread(target=self._stream_loop, daemon=True) t.start() return True def get_active_message(self): """Return (title, body, timestamp) if a message is active and not expired, else None.""" with self._lock: if self._message is None: return None title, body, ts = self._message if time.monotonic() - ts < self.display_secs: return self._message self._message = None return None def dismiss(self): """Manually dismiss the current message.""" with self._lock: self._message = None def _build_url(self, last_id=None): """Build the stream URL, substituting since= to avoid message replays on reconnect.""" parsed = urlparse(self.topic_url) params = parse_qs(parsed.query, keep_blank_values=True) params["since"] = [last_id if last_id else "20s"] new_query = urlencode({k: v[0] for k, v in params.items()}) return urlunparse(parsed._replace(query=new_query)) def _stream_loop(self): last_id = None while True: try: url = self._build_url(last_id) req = urllib.request.Request( url, headers={"User-Agent": "mainline/0.1"} ) # timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats resp = urllib.request.urlopen(req, timeout=90) while True: line = resp.readline() if not line: break # server closed connection — reconnect try: data = json.loads(line.decode("utf-8", errors="replace")) except json.JSONDecodeError: continue # Advance cursor on every event (message + keepalive) to # avoid replaying already-seen events after a reconnect. if "id" in data: last_id = data["id"] if data.get("event") == "message": with self._lock: self._message = ( data.get("title", ""), data.get("message", ""), time.monotonic(), ) event = NtfyMessageEvent( title=data.get("title", ""), body=data.get("message", ""), message_id=data.get("id"), timestamp=datetime.now(), ) self._emit(event) except Exception: pass time.sleep(self.reconnect_delay)