forked from genewildish/Mainline
- Add EventBus class with pub/sub messaging (thread-safe) - Add emitter Protocol classes (EventEmitter, Startable, Stoppable) - Add event emission to NtfyPoller (NtfyMessageEvent) - Add event emission to MicMonitor (MicLevelEvent) - Update StreamController to publish stream start/end events - Add comprehensive tests for eventbus and emitters modules
123 lines
3.6 KiB
Python
123 lines
3.6 KiB
Python
"""
|
|
Tests for engine.ntfy module.
|
|
"""
|
|
|
|
import time
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
from engine.events import NtfyMessageEvent
|
|
from engine.ntfy import NtfyPoller
|
|
|
|
|
|
class TestNtfyPollerInit:
|
|
"""Tests for NtfyPoller initialization."""
|
|
|
|
def test_init_sets_defaults(self):
|
|
"""Default values are set correctly."""
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
assert poller.topic_url == "http://example.com/topic"
|
|
assert poller.reconnect_delay == 5
|
|
assert poller.display_secs == 30
|
|
|
|
def test_init_custom_values(self):
|
|
"""Custom values are set correctly."""
|
|
poller = NtfyPoller(
|
|
"http://example.com/topic", reconnect_delay=10, display_secs=60
|
|
)
|
|
assert poller.reconnect_delay == 10
|
|
assert poller.display_secs == 60
|
|
|
|
|
|
class TestNtfyPollerStart:
|
|
"""Tests for NtfyPoller.start method."""
|
|
|
|
@patch("engine.ntfy.threading.Thread")
|
|
def test_start_creates_daemon_thread(self, mock_thread):
|
|
"""start() creates and starts a daemon thread."""
|
|
mock_thread_instance = MagicMock()
|
|
mock_thread.return_value = mock_thread_instance
|
|
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
result = poller.start()
|
|
|
|
assert result is True
|
|
mock_thread.assert_called_once()
|
|
args, kwargs = mock_thread.call_args
|
|
assert kwargs.get("daemon") is True
|
|
mock_thread_instance.start.assert_called_once()
|
|
|
|
|
|
class TestNtfyPollerGetActiveMessage:
|
|
"""Tests for NtfyPoller.get_active_message method."""
|
|
|
|
def test_returns_none_when_no_message(self):
|
|
"""Returns None when no message has been received."""
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
result = poller.get_active_message()
|
|
assert result is None
|
|
|
|
|
|
class TestNtfyPollerDismiss:
|
|
"""Tests for NtfyPoller.dismiss method."""
|
|
|
|
def test_dismiss_clears_message(self):
|
|
"""dismiss() clears the current message."""
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
|
|
with patch.object(poller, "_lock"):
|
|
poller._message = ("Title", "Body", time.monotonic())
|
|
poller.dismiss()
|
|
|
|
assert poller._message is None
|
|
|
|
|
|
class TestNtfyPollerEventEmission:
|
|
"""Tests for NtfyPoller event emission."""
|
|
|
|
def test_subscribe_adds_callback(self):
|
|
"""subscribe() adds a callback."""
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
def callback(e):
|
|
return None
|
|
|
|
poller.subscribe(callback)
|
|
|
|
assert callback in poller._subscribers
|
|
|
|
def test_unsubscribe_removes_callback(self):
|
|
"""unsubscribe() removes a callback."""
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
def callback(e):
|
|
return None
|
|
poller.subscribe(callback)
|
|
|
|
poller.unsubscribe(callback)
|
|
|
|
assert callback not in poller._subscribers
|
|
|
|
def test_emit_calls_subscribers(self):
|
|
"""_emit() calls all subscribers."""
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
received = []
|
|
|
|
def callback(event):
|
|
received.append(event)
|
|
|
|
poller.subscribe(callback)
|
|
event = NtfyMessageEvent(title="Test", body="Body")
|
|
poller._emit(event)
|
|
|
|
assert len(received) == 1
|
|
assert received[0].title == "Test"
|
|
|
|
def test_emit_handles_subscriber_exception(self):
|
|
"""_emit() handles exceptions in subscribers gracefully."""
|
|
poller = NtfyPoller("http://example.com/topic")
|
|
|
|
def bad_callback(event):
|
|
raise RuntimeError("test")
|
|
|
|
poller.subscribe(bad_callback)
|
|
event = NtfyMessageEvent(title="Test", body="Body")
|
|
poller._emit(event)
|