forked from genewildish/Mainline
- Add EventBus class with pub/sub messaging (thread-safe) - Add emitter Protocol classes (EventEmitter, Startable, Stoppable) - Add event emission to NtfyPoller (NtfyMessageEvent) - Add event emission to MicMonitor (MicLevelEvent) - Update StreamController to publish stream start/end events - Add comprehensive tests for eventbus and emitters modules
73 lines
2.2 KiB
Python
73 lines
2.2 KiB
Python
"""
|
|
Event bus - pub/sub messaging for decoupled component communication.
|
|
"""
|
|
|
|
import threading
|
|
from collections import defaultdict
|
|
from collections.abc import Callable
|
|
from typing import Any
|
|
|
|
from engine.events import EventType
|
|
|
|
|
|
class EventBus:
|
|
"""Thread-safe event bus for publish-subscribe messaging."""
|
|
|
|
def __init__(self):
|
|
self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict(
|
|
list
|
|
)
|
|
self._lock = threading.Lock()
|
|
|
|
def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None:
|
|
"""Register a callback for a specific event type."""
|
|
with self._lock:
|
|
self._subscribers[event_type].append(callback)
|
|
|
|
def unsubscribe(
|
|
self, event_type: EventType, callback: Callable[[Any], None]
|
|
) -> None:
|
|
"""Remove a callback for a specific event type."""
|
|
with self._lock:
|
|
if callback in self._subscribers[event_type]:
|
|
self._subscribers[event_type].remove(callback)
|
|
|
|
def publish(self, event_type: EventType, event: Any = None) -> None:
|
|
"""Publish an event to all subscribers."""
|
|
with self._lock:
|
|
callbacks = list(self._subscribers.get(event_type, []))
|
|
for callback in callbacks:
|
|
try:
|
|
callback(event)
|
|
except Exception:
|
|
pass
|
|
|
|
def clear(self) -> None:
|
|
"""Remove all subscribers."""
|
|
with self._lock:
|
|
self._subscribers.clear()
|
|
|
|
def subscriber_count(self, event_type: EventType | None = None) -> int:
|
|
"""Get subscriber count for an event type, or total if None."""
|
|
with self._lock:
|
|
if event_type is None:
|
|
return sum(len(cb) for cb in self._subscribers.values())
|
|
return len(self._subscribers.get(event_type, []))
|
|
|
|
|
|
_event_bus: EventBus | None = None
|
|
|
|
|
|
def get_event_bus() -> EventBus:
|
|
"""Get the global event bus instance."""
|
|
global _event_bus
|
|
if _event_bus is None:
|
|
_event_bus = EventBus()
|
|
return _event_bus
|
|
|
|
|
|
def set_event_bus(bus: EventBus) -> None:
|
|
"""Set the global event bus instance (for testing)."""
|
|
global _event_bus
|
|
_event_bus = bus
|