Files
Mainline/tests/test_eventbus.py
David Gwilliam 86e2c02f19 refactor: phase 4 - event-driven architecture foundation
- Add EventBus class with pub/sub messaging (thread-safe)
- Add emitter Protocol classes (EventEmitter, Startable, Stoppable)
- Add event emission to NtfyPoller (NtfyMessageEvent)
- Add event emission to MicMonitor (MicLevelEvent)
- Update StreamController to publish stream start/end events
- Add comprehensive tests for eventbus and emitters modules
2026-03-18 23:24:14 -07:00

203 lines
5.8 KiB
Python

"""
Tests for engine.eventbus module.
"""
from engine.eventbus import EventBus, get_event_bus, set_event_bus
from engine.events import EventType, NtfyMessageEvent
class TestEventBusInit:
"""Tests for EventBus initialization."""
def test_init_creates_empty_subscribers(self):
"""EventBus starts with no subscribers."""
bus = EventBus()
assert bus.subscriber_count() == 0
class TestEventBusSubscribe:
"""Tests for EventBus.subscribe method."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback for an event type."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
def test_subscribe_multiple_callbacks_same_event(self):
"""Multiple callbacks can be subscribed to the same event type."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.NTFY_MESSAGE, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 2
def test_subscribe_different_event_types(self):
"""Callbacks can be subscribed to different event types."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
assert bus.subscriber_count(EventType.MIC_LEVEL) == 1
class TestEventBusUnsubscribe:
"""Tests for EventBus.unsubscribe method."""
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 0
def test_unsubscribe_nonexistent_callback_no_error(self):
"""unsubscribe() handles non-existent callback gracefully."""
bus = EventBus()
def callback(e):
return None
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
class TestEventBusPublish:
"""Tests for EventBus.publish method."""
def test_publish_calls_subscriber(self):
"""publish() calls the subscriber callback."""
bus = EventBus()
received = []
def callback(event):
received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received) == 1
assert received[0].title == "Test"
def test_publish_multiple_subscribers(self):
"""publish() calls all subscribers for an event type."""
bus = EventBus()
received1 = []
received2 = []
def callback1(event):
received1.append(event)
def callback2(event):
received2.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback1)
bus.subscribe(EventType.NTFY_MESSAGE, callback2)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received1) == 1
assert len(received2) == 1
def test_publish_different_event_types(self):
"""publish() only calls subscribers for the specific event type."""
bus = EventBus()
ntfy_received = []
mic_received = []
def ntfy_callback(event):
ntfy_received.append(event)
def mic_callback(event):
mic_received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, ntfy_callback)
bus.subscribe(EventType.MIC_LEVEL, mic_callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(ntfy_received) == 1
assert len(mic_received) == 0
class TestEventBusClear:
"""Tests for EventBus.clear method."""
def test_clear_removes_all_subscribers(self):
"""clear() removes all subscribers."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
bus.clear()
assert bus.subscriber_count() == 0
class TestEventBusThreadSafety:
"""Tests for EventBus thread safety."""
def test_concurrent_subscribe_unsubscribe(self):
"""subscribe and unsubscribe can be called concurrently."""
import threading
bus = EventBus()
callbacks = [lambda e: None for _ in range(10)]
def subscribe():
for cb in callbacks:
bus.subscribe(EventType.NTFY_MESSAGE, cb)
def unsubscribe():
for cb in callbacks:
bus.unsubscribe(EventType.NTFY_MESSAGE, cb)
t1 = threading.Thread(target=subscribe)
t2 = threading.Thread(target=unsubscribe)
t1.start()
t2.start()
t1.join()
t2.join()
class TestGlobalEventBus:
"""Tests for global event bus functions."""
def test_get_event_bus_returns_singleton(self):
"""get_event_bus() returns the same instance."""
bus1 = get_event_bus()
bus2 = get_event_bus()
assert bus1 is bus2
def test_set_event_bus_replaces_singleton(self):
"""set_event_bus() replaces the global event bus."""
new_bus = EventBus()
set_event_bus(new_bus)
try:
assert get_event_bus() is new_bus
finally:
set_event_bus(None)