- Add EventBus class with pub/sub messaging (thread-safe) - Add emitter Protocol classes (EventEmitter, Startable, Stoppable) - Add event emission to NtfyPoller (NtfyMessageEvent) - Add event emission to MicMonitor (MicLevelEvent) - Update StreamController to publish stream start/end events - Add comprehensive tests for eventbus and emitters modules
70 lines
2.1 KiB
Python
70 lines
2.1 KiB
Python
"""
|
|
Tests for engine.emitters module.
|
|
"""
|
|
|
|
from engine.emitters import EventEmitter, Startable, Stoppable
|
|
|
|
|
|
class TestEventEmitterProtocol:
|
|
"""Tests for EventEmitter protocol."""
|
|
|
|
def test_protocol_exists(self):
|
|
"""EventEmitter protocol is defined."""
|
|
assert EventEmitter is not None
|
|
|
|
def test_protocol_has_subscribe_method(self):
|
|
"""EventEmitter has subscribe method in protocol."""
|
|
assert hasattr(EventEmitter, "subscribe")
|
|
|
|
def test_protocol_has_unsubscribe_method(self):
|
|
"""EventEmitter has unsubscribe method in protocol."""
|
|
assert hasattr(EventEmitter, "unsubscribe")
|
|
|
|
|
|
class TestStartableProtocol:
|
|
"""Tests for Startable protocol."""
|
|
|
|
def test_protocol_exists(self):
|
|
"""Startable protocol is defined."""
|
|
assert Startable is not None
|
|
|
|
def test_protocol_has_start_method(self):
|
|
"""Startable has start method in protocol."""
|
|
assert hasattr(Startable, "start")
|
|
|
|
|
|
class TestStoppableProtocol:
|
|
"""Tests for Stoppable protocol."""
|
|
|
|
def test_protocol_exists(self):
|
|
"""Stoppable protocol is defined."""
|
|
assert Stoppable is not None
|
|
|
|
def test_protocol_has_stop_method(self):
|
|
"""Stoppable has stop method in protocol."""
|
|
assert hasattr(Stoppable, "stop")
|
|
|
|
|
|
class TestProtocolCompliance:
|
|
"""Tests that existing classes comply with protocols."""
|
|
|
|
def test_ntfy_poller_complies_with_protocol(self):
|
|
"""NtfyPoller implements EventEmitter protocol."""
|
|
from engine.ntfy import NtfyPoller
|
|
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
assert hasattr(poller, "subscribe")
|
|
assert hasattr(poller, "unsubscribe")
|
|
assert callable(poller.subscribe)
|
|
assert callable(poller.unsubscribe)
|
|
|
|
def test_mic_monitor_complies_with_protocol(self):
|
|
"""MicMonitor implements EventEmitter and Startable protocols."""
|
|
from engine.mic import MicMonitor
|
|
|
|
monitor = MicMonitor()
|
|
assert hasattr(monitor, "subscribe")
|
|
assert hasattr(monitor, "unsubscribe")
|
|
assert hasattr(monitor, "start")
|
|
assert hasattr(monitor, "stop")
|