refactor: phase 4 - event-driven architecture foundation
- Add EventBus class with pub/sub messaging (thread-safe) - Add emitter Protocol classes (EventEmitter, Startable, Stoppable) - Add event emission to NtfyPoller (NtfyMessageEvent) - Add event emission to MicMonitor (MicLevelEvent) - Update StreamController to publish stream start/end events - Add comprehensive tests for eventbus and emitters modules
This commit is contained in:
@@ -3,6 +3,8 @@ Stream controller - manages input sources and orchestrates the render stream.
|
||||
"""
|
||||
|
||||
from engine.config import Config, get_config
|
||||
from engine.eventbus import EventBus
|
||||
from engine.events import EventType, StreamEvent
|
||||
from engine.mic import MicMonitor
|
||||
from engine.ntfy import NtfyPoller
|
||||
from engine.scroll import stream
|
||||
@@ -11,8 +13,9 @@ from engine.scroll import stream
|
||||
class StreamController:
|
||||
"""Controls the stream lifecycle - initializes sources and runs the stream."""
|
||||
|
||||
def __init__(self, config: Config | None = None):
|
||||
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
|
||||
self.config = config or get_config()
|
||||
self.event_bus = event_bus
|
||||
self.mic: MicMonitor | None = None
|
||||
self.ntfy: NtfyPoller | None = None
|
||||
|
||||
@@ -38,8 +41,27 @@ class StreamController:
|
||||
"""Run the stream with initialized sources."""
|
||||
if self.mic is None or self.ntfy is None:
|
||||
self.initialize_sources()
|
||||
|
||||
if self.event_bus:
|
||||
self.event_bus.publish(
|
||||
EventType.STREAM_START,
|
||||
StreamEvent(
|
||||
event_type=EventType.STREAM_START,
|
||||
headline_count=len(items),
|
||||
),
|
||||
)
|
||||
|
||||
stream(items, self.ntfy, self.mic)
|
||||
|
||||
if self.event_bus:
|
||||
self.event_bus.publish(
|
||||
EventType.STREAM_END,
|
||||
StreamEvent(
|
||||
event_type=EventType.STREAM_END,
|
||||
headline_count=len(items),
|
||||
),
|
||||
)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Clean up resources."""
|
||||
if self.mic:
|
||||
|
||||
25
engine/emitters.py
Normal file
25
engine/emitters.py
Normal file
@@ -0,0 +1,25 @@
|
||||
"""
|
||||
Event emitter protocols - abstract interfaces for event-producing components.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from typing import Any, Protocol
|
||||
|
||||
|
||||
class EventEmitter(Protocol):
|
||||
"""Protocol for components that emit events."""
|
||||
|
||||
def subscribe(self, callback: Callable[[Any], None]) -> None: ...
|
||||
def unsubscribe(self, callback: Callable[[Any], None]) -> None: ...
|
||||
|
||||
|
||||
class Startable(Protocol):
|
||||
"""Protocol for components that can be started."""
|
||||
|
||||
def start(self) -> Any: ...
|
||||
|
||||
|
||||
class Stoppable(Protocol):
|
||||
"""Protocol for components that can be stopped."""
|
||||
|
||||
def stop(self) -> None: ...
|
||||
72
engine/eventbus.py
Normal file
72
engine/eventbus.py
Normal file
@@ -0,0 +1,72 @@
|
||||
"""
|
||||
Event bus - pub/sub messaging for decoupled component communication.
|
||||
"""
|
||||
|
||||
import threading
|
||||
from collections import defaultdict
|
||||
from collections.abc import Callable
|
||||
from typing import Any
|
||||
|
||||
from engine.events import EventType
|
||||
|
||||
|
||||
class EventBus:
|
||||
"""Thread-safe event bus for publish-subscribe messaging."""
|
||||
|
||||
def __init__(self):
|
||||
self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict(
|
||||
list
|
||||
)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None:
|
||||
"""Register a callback for a specific event type."""
|
||||
with self._lock:
|
||||
self._subscribers[event_type].append(callback)
|
||||
|
||||
def unsubscribe(
|
||||
self, event_type: EventType, callback: Callable[[Any], None]
|
||||
) -> None:
|
||||
"""Remove a callback for a specific event type."""
|
||||
with self._lock:
|
||||
if callback in self._subscribers[event_type]:
|
||||
self._subscribers[event_type].remove(callback)
|
||||
|
||||
def publish(self, event_type: EventType, event: Any = None) -> None:
|
||||
"""Publish an event to all subscribers."""
|
||||
with self._lock:
|
||||
callbacks = list(self._subscribers.get(event_type, []))
|
||||
for callback in callbacks:
|
||||
try:
|
||||
callback(event)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Remove all subscribers."""
|
||||
with self._lock:
|
||||
self._subscribers.clear()
|
||||
|
||||
def subscriber_count(self, event_type: EventType | None = None) -> int:
|
||||
"""Get subscriber count for an event type, or total if None."""
|
||||
with self._lock:
|
||||
if event_type is None:
|
||||
return sum(len(cb) for cb in self._subscribers.values())
|
||||
return len(self._subscribers.get(event_type, []))
|
||||
|
||||
|
||||
_event_bus: EventBus | None = None
|
||||
|
||||
|
||||
def get_event_bus() -> EventBus:
|
||||
"""Get the global event bus instance."""
|
||||
global _event_bus
|
||||
if _event_bus is None:
|
||||
_event_bus = EventBus()
|
||||
return _event_bus
|
||||
|
||||
|
||||
def set_event_bus(bus: EventBus) -> None:
|
||||
"""Set the global event bus instance (for testing)."""
|
||||
global _event_bus
|
||||
_event_bus = bus
|
||||
@@ -4,6 +4,8 @@ Gracefully degrades if sounddevice/numpy are unavailable.
|
||||
"""
|
||||
|
||||
import atexit
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
|
||||
try:
|
||||
import numpy as _np
|
||||
@@ -14,6 +16,9 @@ except Exception:
|
||||
_HAS_MIC = False
|
||||
|
||||
|
||||
from engine.events import MicLevelEvent
|
||||
|
||||
|
||||
class MicMonitor:
|
||||
"""Background mic stream that exposes current RMS dB level."""
|
||||
|
||||
@@ -21,6 +26,7 @@ class MicMonitor:
|
||||
self.threshold_db = threshold_db
|
||||
self._db = -99.0
|
||||
self._stream = None
|
||||
self._subscribers: list[Callable[[MicLevelEvent], None]] = []
|
||||
|
||||
@property
|
||||
def available(self):
|
||||
@@ -37,6 +43,23 @@ class MicMonitor:
|
||||
"""dB above threshold (clamped to 0)."""
|
||||
return max(0.0, self._db - self.threshold_db)
|
||||
|
||||
def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
|
||||
"""Register a callback to be called when mic level changes."""
|
||||
self._subscribers.append(callback)
|
||||
|
||||
def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
|
||||
"""Remove a registered callback."""
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
def _emit(self, event: MicLevelEvent) -> None:
|
||||
"""Emit an event to all subscribers."""
|
||||
for cb in self._subscribers:
|
||||
try:
|
||||
cb(event)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
"""Start background mic stream. Returns True on success, False/None otherwise."""
|
||||
if not _HAS_MIC:
|
||||
@@ -45,6 +68,13 @@ class MicMonitor:
|
||||
def _cb(indata, frames, t, status):
|
||||
rms = float(_np.sqrt(_np.mean(indata**2)))
|
||||
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
|
||||
if self._subscribers:
|
||||
event = MicLevelEvent(
|
||||
db_level=self._db,
|
||||
excess_above_threshold=max(0.0, self._db - self.threshold_db),
|
||||
timestamp=datetime.now(),
|
||||
)
|
||||
self._emit(event)
|
||||
|
||||
try:
|
||||
self._stream = _sd.InputStream(
|
||||
|
||||
@@ -16,8 +16,12 @@ import json
|
||||
import threading
|
||||
import time
|
||||
import urllib.request
|
||||
from collections.abc import Callable
|
||||
from datetime import datetime
|
||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||
|
||||
from engine.events import NtfyMessageEvent
|
||||
|
||||
|
||||
class NtfyPoller:
|
||||
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
|
||||
@@ -28,6 +32,24 @@ class NtfyPoller:
|
||||
self.display_secs = display_secs
|
||||
self._message = None # (title, body, monotonic_timestamp) or None
|
||||
self._lock = threading.Lock()
|
||||
self._subscribers: list[Callable[[NtfyMessageEvent], None]] = []
|
||||
|
||||
def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
|
||||
"""Register a callback to be called when a message is received."""
|
||||
self._subscribers.append(callback)
|
||||
|
||||
def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
|
||||
"""Remove a registered callback."""
|
||||
if callback in self._subscribers:
|
||||
self._subscribers.remove(callback)
|
||||
|
||||
def _emit(self, event: NtfyMessageEvent) -> None:
|
||||
"""Emit an event to all subscribers."""
|
||||
for cb in self._subscribers:
|
||||
try:
|
||||
cb(event)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def start(self):
|
||||
"""Start background stream thread. Returns True."""
|
||||
@@ -88,6 +110,13 @@ class NtfyPoller:
|
||||
data.get("message", ""),
|
||||
time.monotonic(),
|
||||
)
|
||||
event = NtfyMessageEvent(
|
||||
title=data.get("title", ""),
|
||||
body=data.get("message", ""),
|
||||
message_id=data.get("id"),
|
||||
timestamp=datetime.now(),
|
||||
)
|
||||
self._emit(event)
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(self.reconnect_delay)
|
||||
|
||||
@@ -43,11 +43,15 @@ def stream(items, ntfy_poller, mic_monitor):
|
||||
scroll_motion_accum = 0.0
|
||||
msg_cache = (None, None)
|
||||
|
||||
while queued < config.HEADLINE_LIMIT or active:
|
||||
while True:
|
||||
if queued >= config.HEADLINE_LIMIT and not active:
|
||||
break
|
||||
|
||||
t0 = time.monotonic()
|
||||
w, h = tw(), th()
|
||||
fh = config.FIREHOSE_H if config.FIREHOSE else 0
|
||||
ticker_view_h = h - fh
|
||||
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
|
||||
|
||||
msg = ntfy_poller.get_active_message()
|
||||
msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache)
|
||||
|
||||
Reference in New Issue
Block a user