refactor: phase 4 - event-driven architecture foundation

- Add EventBus class with pub/sub messaging (thread-safe)
- Add emitter Protocol classes (EventEmitter, Startable, Stoppable)
- Add event emission to NtfyPoller (NtfyMessageEvent)
- Add event emission to MicMonitor (MicLevelEvent)
- Update StreamController to publish stream start/end events
- Add comprehensive tests for eventbus and emitters modules
This commit is contained in:
2026-03-15 16:20:52 -07:00
parent 35e5c8d38b
commit 15de46722a
12 changed files with 704 additions and 22 deletions

View File

@@ -3,6 +3,8 @@ Stream controller - manages input sources and orchestrates the render stream.
"""
from engine.config import Config, get_config
from engine.eventbus import EventBus
from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
@@ -11,8 +13,9 @@ from engine.scroll import stream
class StreamController:
"""Controls the stream lifecycle - initializes sources and runs the stream."""
def __init__(self, config: Config | None = None):
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
self.config = config or get_config()
self.event_bus = event_bus
self.mic: MicMonitor | None = None
self.ntfy: NtfyPoller | None = None
@@ -38,8 +41,27 @@ class StreamController:
"""Run the stream with initialized sources."""
if self.mic is None or self.ntfy is None:
self.initialize_sources()
if self.event_bus:
self.event_bus.publish(
EventType.STREAM_START,
StreamEvent(
event_type=EventType.STREAM_START,
headline_count=len(items),
),
)
stream(items, self.ntfy, self.mic)
if self.event_bus:
self.event_bus.publish(
EventType.STREAM_END,
StreamEvent(
event_type=EventType.STREAM_END,
headline_count=len(items),
),
)
def cleanup(self) -> None:
"""Clean up resources."""
if self.mic:

25
engine/emitters.py Normal file
View File

@@ -0,0 +1,25 @@
"""
Event emitter protocols - abstract interfaces for event-producing components.
"""
from collections.abc import Callable
from typing import Any, Protocol
class EventEmitter(Protocol):
"""Protocol for components that emit events."""
def subscribe(self, callback: Callable[[Any], None]) -> None: ...
def unsubscribe(self, callback: Callable[[Any], None]) -> None: ...
class Startable(Protocol):
"""Protocol for components that can be started."""
def start(self) -> Any: ...
class Stoppable(Protocol):
"""Protocol for components that can be stopped."""
def stop(self) -> None: ...

72
engine/eventbus.py Normal file
View File

@@ -0,0 +1,72 @@
"""
Event bus - pub/sub messaging for decoupled component communication.
"""
import threading
from collections import defaultdict
from collections.abc import Callable
from typing import Any
from engine.events import EventType
class EventBus:
"""Thread-safe event bus for publish-subscribe messaging."""
def __init__(self):
self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict(
list
)
self._lock = threading.Lock()
def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None:
"""Register a callback for a specific event type."""
with self._lock:
self._subscribers[event_type].append(callback)
def unsubscribe(
self, event_type: EventType, callback: Callable[[Any], None]
) -> None:
"""Remove a callback for a specific event type."""
with self._lock:
if callback in self._subscribers[event_type]:
self._subscribers[event_type].remove(callback)
def publish(self, event_type: EventType, event: Any = None) -> None:
"""Publish an event to all subscribers."""
with self._lock:
callbacks = list(self._subscribers.get(event_type, []))
for callback in callbacks:
try:
callback(event)
except Exception:
pass
def clear(self) -> None:
"""Remove all subscribers."""
with self._lock:
self._subscribers.clear()
def subscriber_count(self, event_type: EventType | None = None) -> int:
"""Get subscriber count for an event type, or total if None."""
with self._lock:
if event_type is None:
return sum(len(cb) for cb in self._subscribers.values())
return len(self._subscribers.get(event_type, []))
_event_bus: EventBus | None = None
def get_event_bus() -> EventBus:
"""Get the global event bus instance."""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
def set_event_bus(bus: EventBus) -> None:
"""Set the global event bus instance (for testing)."""
global _event_bus
_event_bus = bus

View File

@@ -4,6 +4,8 @@ Gracefully degrades if sounddevice/numpy are unavailable.
"""
import atexit
from collections.abc import Callable
from datetime import datetime
try:
import numpy as _np
@@ -14,6 +16,9 @@ except Exception:
_HAS_MIC = False
from engine.events import MicLevelEvent
class MicMonitor:
"""Background mic stream that exposes current RMS dB level."""
@@ -21,6 +26,7 @@ class MicMonitor:
self.threshold_db = threshold_db
self._db = -99.0
self._stream = None
self._subscribers: list[Callable[[MicLevelEvent], None]] = []
@property
def available(self):
@@ -37,6 +43,23 @@ class MicMonitor:
"""dB above threshold (clamped to 0)."""
return max(0.0, self._db - self.threshold_db)
def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Register a callback to be called when mic level changes."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: MicLevelEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background mic stream. Returns True on success, False/None otherwise."""
if not _HAS_MIC:
@@ -45,6 +68,13 @@ class MicMonitor:
def _cb(indata, frames, t, status):
rms = float(_np.sqrt(_np.mean(indata**2)))
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
if self._subscribers:
event = MicLevelEvent(
db_level=self._db,
excess_above_threshold=max(0.0, self._db - self.threshold_db),
timestamp=datetime.now(),
)
self._emit(event)
try:
self._stream = _sd.InputStream(

View File

@@ -16,8 +16,12 @@ import json
import threading
import time
import urllib.request
from collections.abc import Callable
from datetime import datetime
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from engine.events import NtfyMessageEvent
class NtfyPoller:
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
@@ -28,6 +32,24 @@ class NtfyPoller:
self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock()
self._subscribers: list[Callable[[NtfyMessageEvent], None]] = []
def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Register a callback to be called when a message is received."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: NtfyMessageEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background stream thread. Returns True."""
@@ -88,6 +110,13 @@ class NtfyPoller:
data.get("message", ""),
time.monotonic(),
)
event = NtfyMessageEvent(
title=data.get("title", ""),
body=data.get("message", ""),
message_id=data.get("id"),
timestamp=datetime.now(),
)
self._emit(event)
except Exception:
pass
time.sleep(self.reconnect_delay)

View File

@@ -43,11 +43,15 @@ def stream(items, ntfy_poller, mic_monitor):
scroll_motion_accum = 0.0
msg_cache = (None, None)
while queued < config.HEADLINE_LIMIT or active:
while True:
if queued >= config.HEADLINE_LIMIT and not active:
break
t0 = time.monotonic()
w, h = tw(), th()
fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
msg = ntfy_poller.get_active_message()
msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache)