""" ntfy.sh message poller — standalone, zero internal dependencies. Reusable by any visualizer: from engine.ntfy import NtfyPoller poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1") poller.start() # in render loop: msg = poller.get_active_message() if msg: title, body, ts = msg render_my_message(title, body) """ import json import time import threading import urllib.request class NtfyPoller: """Background poller for ntfy.sh topics.""" def __init__(self, topic_url, poll_interval=15, display_secs=30): self.topic_url = topic_url self.poll_interval = poll_interval self.display_secs = display_secs self._message = None # (title, body, monotonic_timestamp) or None self._lock = threading.Lock() def start(self): """Start background polling thread. Returns True.""" t = threading.Thread(target=self._poll_loop, daemon=True) t.start() return True def get_active_message(self): """Return (title, body, timestamp) if a message is active and not expired, else None.""" with self._lock: if self._message is None: return None title, body, ts = self._message if time.monotonic() - ts < self.display_secs: return self._message self._message = None return None def dismiss(self): """Manually dismiss the current message.""" with self._lock: self._message = None def _poll_loop(self): while True: try: req = urllib.request.Request( self.topic_url, headers={"User-Agent": "mainline/0.1"}) resp = urllib.request.urlopen(req, timeout=10) for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'): if not line.strip(): continue try: data = json.loads(line) except json.JSONDecodeError: continue if data.get("event") == "message": with self._lock: self._message = ( data.get("title", ""), data.get("message", ""), time.monotonic(), ) except Exception: pass time.sleep(self.poll_interval)