Files
Mainline/engine/ntfy.py
David Gwilliam 15de46722a refactor: phase 4 - event-driven architecture foundation
- Add EventBus class with pub/sub messaging (thread-safe)
- Add emitter Protocol classes (EventEmitter, Startable, Stoppable)
- Add event emission to NtfyPoller (NtfyMessageEvent)
- Add event emission to MicMonitor (MicLevelEvent)
- Update StreamController to publish stream start/end events
- Add comprehensive tests for eventbus and emitters modules
2026-03-15 19:15:08 -07:00

123 lines
4.6 KiB
Python

"""
ntfy.sh SSE stream listener — standalone, zero internal dependencies.
Reusable by any visualizer:
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json")
poller.start()
# in render loop:
msg = poller.get_active_message()
if msg:
title, body, ts = msg
render_my_message(title, body)
"""
import json
import threading
import time
import urllib.request
from collections.abc import Callable
from datetime import datetime
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from engine.events import NtfyMessageEvent
class NtfyPoller:
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
def __init__(self, topic_url, reconnect_delay=5, display_secs=30):
self.topic_url = topic_url
self.reconnect_delay = reconnect_delay
self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock()
self._subscribers: list[Callable[[NtfyMessageEvent], None]] = []
def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Register a callback to be called when a message is received."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: NtfyMessageEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background stream thread. Returns True."""
t = threading.Thread(target=self._stream_loop, daemon=True)
t.start()
return True
def get_active_message(self):
"""Return (title, body, timestamp) if a message is active and not expired, else None."""
with self._lock:
if self._message is None:
return None
title, body, ts = self._message
if time.monotonic() - ts < self.display_secs:
return self._message
self._message = None
return None
def dismiss(self):
"""Manually dismiss the current message."""
with self._lock:
self._message = None
def _build_url(self, last_id=None):
"""Build the stream URL, substituting since= to avoid message replays on reconnect."""
parsed = urlparse(self.topic_url)
params = parse_qs(parsed.query, keep_blank_values=True)
params["since"] = [last_id if last_id else "20s"]
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def _stream_loop(self):
last_id = None
while True:
try:
url = self._build_url(last_id)
req = urllib.request.Request(
url, headers={"User-Agent": "mainline/0.1"}
)
# timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats
resp = urllib.request.urlopen(req, timeout=90)
while True:
line = resp.readline()
if not line:
break # server closed connection — reconnect
try:
data = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
# Advance cursor on every event (message + keepalive) to
# avoid replaying already-seen events after a reconnect.
if "id" in data:
last_id = data["id"]
if data.get("event") == "message":
with self._lock:
self._message = (
data.get("title", ""),
data.get("message", ""),
time.monotonic(),
)
event = NtfyMessageEvent(
title=data.get("title", ""),
body=data.get("message", ""),
message_id=data.get("id"),
timestamp=datetime.now(),
)
self._emit(event)
except Exception:
pass
time.sleep(self.reconnect_delay)