- Fix import sorting (isort) across all engine modules - Fix SIM105 try-except-pass patterns (contextlib.suppress) - Fix nested with statements in tests - Fix unused loop variables Run 'uv run pytest' to verify tests still pass.
76 lines
2.5 KiB
Python
76 lines
2.5 KiB
Python
"""
|
|
ntfy.sh message poller — standalone, zero internal dependencies.
|
|
Reusable by any visualizer:
|
|
|
|
from engine.ntfy import NtfyPoller
|
|
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
|
|
poller.start()
|
|
# in render loop:
|
|
msg = poller.get_active_message()
|
|
if msg:
|
|
title, body, ts = msg
|
|
render_my_message(title, body)
|
|
"""
|
|
|
|
import json
|
|
import threading
|
|
import time
|
|
import urllib.request
|
|
|
|
|
|
class NtfyPoller:
|
|
"""Background poller for ntfy.sh topics."""
|
|
|
|
def __init__(self, topic_url, poll_interval=15, display_secs=30):
|
|
self.topic_url = topic_url
|
|
self.poll_interval = poll_interval
|
|
self.display_secs = display_secs
|
|
self._message = None # (title, body, monotonic_timestamp) or None
|
|
self._lock = threading.Lock()
|
|
|
|
def start(self):
|
|
"""Start background polling thread. Returns True."""
|
|
t = threading.Thread(target=self._poll_loop, daemon=True)
|
|
t.start()
|
|
return True
|
|
|
|
def get_active_message(self):
|
|
"""Return (title, body, timestamp) if a message is active and not expired, else None."""
|
|
with self._lock:
|
|
if self._message is None:
|
|
return None
|
|
title, body, ts = self._message
|
|
if time.monotonic() - ts < self.display_secs:
|
|
return self._message
|
|
self._message = None
|
|
return None
|
|
|
|
def dismiss(self):
|
|
"""Manually dismiss the current message."""
|
|
with self._lock:
|
|
self._message = None
|
|
|
|
def _poll_loop(self):
|
|
while True:
|
|
try:
|
|
req = urllib.request.Request(
|
|
self.topic_url, headers={"User-Agent": "mainline/0.1"})
|
|
resp = urllib.request.urlopen(req, timeout=10)
|
|
for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'):
|
|
if not line.strip():
|
|
continue
|
|
try:
|
|
data = json.loads(line)
|
|
except json.JSONDecodeError:
|
|
continue
|
|
if data.get("event") == "message":
|
|
with self._lock:
|
|
self._message = (
|
|
data.get("title", ""),
|
|
data.get("message", ""),
|
|
time.monotonic(),
|
|
)
|
|
except Exception:
|
|
pass
|
|
time.sleep(self.poll_interval)
|