forked from genewildish/Mainline
- Add EventBus class with pub/sub messaging (thread-safe) - Add emitter Protocol classes (EventEmitter, Startable, Stoppable) - Add event emission to NtfyPoller (NtfyMessageEvent) - Add event emission to MicMonitor (MicLevelEvent) - Update StreamController to publish stream start/end events - Add comprehensive tests for eventbus and emitters modules
123 lines
4.6 KiB
Python
123 lines
4.6 KiB
Python
"""
|
|
ntfy.sh SSE stream listener — standalone, zero internal dependencies.
|
|
Reusable by any visualizer:
|
|
|
|
from engine.ntfy import NtfyPoller
|
|
poller = NtfyPoller("https://ntfy.sh/my_topic/json")
|
|
poller.start()
|
|
# in render loop:
|
|
msg = poller.get_active_message()
|
|
if msg:
|
|
title, body, ts = msg
|
|
render_my_message(title, body)
|
|
"""
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
from collections.abc import Callable
|
|
from datetime import datetime
|
|
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
|
|
|
from engine.events import NtfyMessageEvent
|
|
|
|
|
|
class NtfyPoller:
|
|
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
|
|
|
|
def __init__(self, topic_url, reconnect_delay=5, display_secs=30):
|
|
self.topic_url = topic_url
|
|
self.reconnect_delay = reconnect_delay
|
|
self.display_secs = display_secs
|
|
self._message = None # (title, body, monotonic_timestamp) or None
|
|
self._lock = threading.Lock()
|
|
self._subscribers: list[Callable[[NtfyMessageEvent], None]] = []
|
|
|
|
def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
|
|
"""Register a callback to be called when a message is received."""
|
|
self._subscribers.append(callback)
|
|
|
|
def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
|
|
"""Remove a registered callback."""
|
|
if callback in self._subscribers:
|
|
self._subscribers.remove(callback)
|
|
|
|
def _emit(self, event: NtfyMessageEvent) -> None:
|
|
"""Emit an event to all subscribers."""
|
|
for cb in self._subscribers:
|
|
try:
|
|
cb(event)
|
|
except Exception:
|
|
pass
|
|
|
|
def start(self):
|
|
"""Start background stream thread. Returns True."""
|
|
t = threading.Thread(target=self._stream_loop, daemon=True)
|
|
t.start()
|
|
return True
|
|
|
|
def get_active_message(self):
|
|
"""Return (title, body, timestamp) if a message is active and not expired, else None."""
|
|
with self._lock:
|
|
if self._message is None:
|
|
return None
|
|
title, body, ts = self._message
|
|
if time.monotonic() - ts < self.display_secs:
|
|
return self._message
|
|
self._message = None
|
|
return None
|
|
|
|
def dismiss(self):
|
|
"""Manually dismiss the current message."""
|
|
with self._lock:
|
|
self._message = None
|
|
|
|
def _build_url(self, last_id=None):
|
|
"""Build the stream URL, substituting since= to avoid message replays on reconnect."""
|
|
parsed = urlparse(self.topic_url)
|
|
params = parse_qs(parsed.query, keep_blank_values=True)
|
|
params["since"] = [last_id if last_id else "20s"]
|
|
new_query = urlencode({k: v[0] for k, v in params.items()})
|
|
return urlunparse(parsed._replace(query=new_query))
|
|
|
|
def _stream_loop(self):
|
|
last_id = None
|
|
while True:
|
|
try:
|
|
url = self._build_url(last_id)
|
|
req = urllib.request.Request(
|
|
url, headers={"User-Agent": "mainline/0.1"}
|
|
)
|
|
# timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats
|
|
resp = urllib.request.urlopen(req, timeout=90)
|
|
while True:
|
|
line = resp.readline()
|
|
if not line:
|
|
break # server closed connection — reconnect
|
|
try:
|
|
data = json.loads(line.decode("utf-8", errors="replace"))
|
|
except json.JSONDecodeError:
|
|
continue
|
|
# Advance cursor on every event (message + keepalive) to
|
|
# avoid replaying already-seen events after a reconnect.
|
|
if "id" in data:
|
|
last_id = data["id"]
|
|
if data.get("event") == "message":
|
|
with self._lock:
|
|
self._message = (
|
|
data.get("title", ""),
|
|
data.get("message", ""),
|
|
time.monotonic(),
|
|
)
|
|
event = NtfyMessageEvent(
|
|
title=data.get("title", ""),
|
|
body=data.get("message", ""),
|
|
message_id=data.get("id"),
|
|
timestamp=datetime.now(),
|
|
)
|
|
self._emit(event)
|
|
except Exception:
|
|
pass
|
|
time.sleep(self.reconnect_delay)
|