From f1701439391fa4175368b7450b661d39435d07e0 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 15:44:39 -0700 Subject: [PATCH 1/4] refactor: phase 1 - testability improvements - Add Config dataclass with get_config()/set_config() for injection - Add Config.from_args() for CLI argument parsing (testable) - Add platform font path detection (Darwin/Linux) - Bound translate cache with @lru_cache(maxsize=500) - Add fixtures for external dependencies (network, feeds, config) - Add 15 tests for Config class, from_args, and platform detection This enables testability by: - Allowing config injection instead of global mutable state - Supporting custom argv in from_args() for testing - Providing reusable fixtures for mocking network/filesystem - Preventing unbounded memory growth in translation cache Fixes: _arg_value/_arg_int not accepting custom argv --- engine/config.py | 143 +++++++++++++++++++++- engine/translate.py | 35 +++--- tests/fixtures/__init__.py | 236 +++++++++++++++++++++++++++++++++++++ tests/test_config.py | 139 ++++++++++++++++++++++ 4 files changed, 531 insertions(+), 22 deletions(-) create mode 100644 tests/fixtures/__init__.py diff --git a/engine/config.py b/engine/config.py index 2174568..284877c 100644 --- a/engine/config.py +++ b/engine/config.py @@ -1,25 +1,28 @@ """ Configuration constants, CLI flags, and glyph tables. +Supports both global constants (backward compatible) and injected config for testing. """ import sys +from dataclasses import dataclass, field from pathlib import Path _REPO_ROOT = Path(__file__).resolve().parent.parent _FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"} -def _arg_value(flag): +def _arg_value(flag, argv: list[str] | None = None): """Get value following a CLI flag, if present.""" - if flag not in sys.argv: + argv = argv or sys.argv + if flag not in argv: return None - i = sys.argv.index(flag) - return sys.argv[i + 1] if i + 1 < len(sys.argv) else None + i = argv.index(flag) + return argv[i + 1] if i + 1 < len(argv) else None -def _arg_int(flag, default): +def _arg_int(flag, default, argv: list[str] | None = None): """Get int CLI argument with safe fallback.""" - raw = _arg_value(flag) + raw = _arg_value(flag, argv) if raw is None: return default try: @@ -53,6 +56,134 @@ def list_repo_font_files(): return _list_font_files(FONT_DIR) +def _get_platform_font_paths() -> dict[str, str]: + """Get platform-appropriate font paths for non-Latin scripts.""" + import platform + + system = platform.system() + + if system == "Darwin": + return { + "zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc", + "ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc", + "ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc", + "ru": "/System/Library/Fonts/Supplemental/Arial.ttf", + "uk": "/System/Library/Fonts/Supplemental/Arial.ttf", + "el": "/System/Library/Fonts/Supplemental/Arial.ttf", + "he": "/System/Library/Fonts/Supplemental/Arial.ttf", + "ar": "/System/Library/Fonts/GeezaPro.ttc", + "fa": "/System/Library/Fonts/GeezaPro.ttc", + "hi": "/System/Library/Fonts/Kohinoor.ttc", + "th": "/System/Library/Fonts/ThonburiUI.ttc", + } + elif system == "Linux": + return { + "zh-cn": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", + "ja": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", + "ko": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", + "ru": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "uk": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "el": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "he": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "ar": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "fa": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "hi": "/usr/share/fonts/truetype/noto/NotoSansDevanagari-Regular.ttf", + "th": "/usr/share/fonts/truetype/noto/NotoSansThai-Regular.ttf", + } + else: + return {} + + +@dataclass(frozen=True) +class Config: + """Immutable configuration container for injected config.""" + + headline_limit: int = 1000 + feed_timeout: int = 10 + mic_threshold_db: int = 50 + mode: str = "news" + firehose: bool = False + + ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json" + ntfy_reconnect_delay: int = 5 + message_display_secs: int = 30 + + font_dir: str = "fonts" + font_path: str = "" + font_index: int = 0 + font_picker: bool = True + font_sz: int = 60 + render_h: int = 8 + + ssaa: int = 4 + + scroll_dur: float = 5.625 + frame_dt: float = 0.05 + firehose_h: int = 12 + grad_speed: float = 0.08 + + glitch_glyphs: str = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" + kata_glyphs: str = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ" + + script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths) + + @classmethod + def from_args(cls, argv: list[str] | None = None) -> "Config": + """Create Config from CLI arguments (or custom argv for testing).""" + argv = argv or sys.argv + + font_dir = _resolve_font_path(_arg_value("--font-dir", argv) or "fonts") + font_file_arg = _arg_value("--font-file", argv) + font_files = _list_font_files(font_dir) + font_path = ( + _resolve_font_path(font_file_arg) + if font_file_arg + else (font_files[0] if font_files else "") + ) + + return cls( + headline_limit=1000, + feed_timeout=10, + mic_threshold_db=50, + mode="poetry" if "--poetry" in argv or "-p" in argv else "news", + firehose="--firehose" in argv, + ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json", + ntfy_reconnect_delay=5, + message_display_secs=30, + font_dir=font_dir, + font_path=font_path, + font_index=max(0, _arg_int("--font-index", 0, argv)), + font_picker="--no-font-picker" not in argv, + font_sz=60, + render_h=8, + ssaa=4, + scroll_dur=5.625, + frame_dt=0.05, + firehose_h=12, + grad_speed=0.08, + glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋", + kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ", + script_fonts=_get_platform_font_paths(), + ) + + +_config: Config | None = None + + +def get_config() -> Config: + """Get the global config instance (lazy-loaded).""" + global _config + if _config is None: + _config = Config.from_args() + return _config + + +def set_config(config: Config) -> None: + """Set the global config instance (for testing).""" + global _config + _config = config + + # ─── RUNTIME ────────────────────────────────────────────── HEADLINE_LIMIT = 1000 FEED_TIMEOUT = 10 diff --git a/engine/translate.py b/engine/translate.py index eb1f2ca..04e04dd 100644 --- a/engine/translate.py +++ b/engine/translate.py @@ -7,26 +7,16 @@ import json import re import urllib.parse import urllib.request +from functools import lru_cache from engine.sources import LOCATION_LANGS -_TRANSLATE_CACHE = {} +TRANSLATE_CACHE_SIZE = 500 -def detect_location_language(title): - """Detect if headline mentions a location, return target language.""" - title_lower = title.lower() - for pattern, lang in LOCATION_LANGS.items(): - if re.search(pattern, title_lower): - return lang - return None - - -def translate_headline(title, target_lang): - """Translate headline via Google Translate API (zero dependencies).""" - key = (title, target_lang) - if key in _TRANSLATE_CACHE: - return _TRANSLATE_CACHE[key] +@lru_cache(maxsize=TRANSLATE_CACHE_SIZE) +def _translate_cached(title: str, target_lang: str) -> str: + """Cached translation implementation.""" try: q = urllib.parse.quote(title) url = ( @@ -39,5 +29,18 @@ def translate_headline(title, target_lang): result = "".join(p[0] for p in data[0] if p[0]) or title except Exception: result = title - _TRANSLATE_CACHE[key] = result return result + + +def detect_location_language(title): + """Detect if headline mentions a location, return target language.""" + title_lower = title.lower() + for pattern, lang in LOCATION_LANGS.items(): + if re.search(pattern, title_lower): + return lang + return None + + +def translate_headline(title: str, target_lang: str) -> str: + """Translate headline via Google Translate API (zero dependencies).""" + return _translate_cached(title, target_lang) diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py new file mode 100644 index 0000000..68cce00 --- /dev/null +++ b/tests/fixtures/__init__.py @@ -0,0 +1,236 @@ +""" +Pytest fixtures for mocking external dependencies (network, filesystem). +""" + +import json +from unittest.mock import MagicMock + +import pytest + + +@pytest.fixture +def mock_feed_response(): + """Mock RSS feed response data.""" + return b""" + + + Test Feed + https://example.com + + Test Headline One + Sat, 15 Mar 2025 12:00:00 GMT + + + Test Headline Two + Sat, 15 Mar 2025 11:00:00 GMT + + + Sports: Team Wins Championship + Sat, 15 Mar 2025 10:00:00 GMT + + +""" + + +@pytest.fixture +def mock_gutenberg_response(): + """Mock Project Gutenberg text response.""" + return """Project Gutenberg's Collection, by Various + +*** START OF SOME TEXT *** +This is a test poem with multiple lines +that should be parsed as stanzas. + +Another stanza here with different content +and more lines to test the parsing logic. + +Yet another stanza for variety +in the test data. + +*** END OF SOME TEXT ***""" + + +@pytest.fixture +def mock_gutenberg_empty(): + """Mock Gutenberg response with no valid stanzas.""" + return """Project Gutenberg's Collection + +*** START OF TEXT *** +THIS IS ALL CAPS AND SHOULD BE SKIPPED + +I. + +*** END OF TEXT ***""" + + +@pytest.fixture +def mock_ntfy_message(): + """Mock ntfy.sh SSE message.""" + return json.dumps( + { + "id": "test123", + "event": "message", + "title": "Test Title", + "message": "Test message body", + "time": 1234567890, + } + ).encode() + + +@pytest.fixture +def mock_ntfy_keepalive(): + """Mock ntfy.sh keepalive message.""" + return b'data: {"event":"keepalive"}\n\n' + + +@pytest.fixture +def mock_google_translate_response(): + """Mock Google Translate API response.""" + return json.dumps( + [ + [["Translated text", "Original text", None, 0.8], None, "en"], + None, + None, + [], + [], + [], + [], + ] + ) + + +@pytest.fixture +def mock_feedparser(): + """Create a mock feedparser.parse function.""" + + def _mock(data): + mock_result = MagicMock() + mock_result.bozo = False + mock_result.entries = [ + { + "title": "Test Headline", + "published_parsed": (2025, 3, 15, 12, 0, 0, 0, 0, 0), + }, + { + "title": "Another Headline", + "updated_parsed": (2025, 3, 15, 11, 0, 0, 0, 0, 0), + }, + ] + return mock_result + + return _mock + + +@pytest.fixture +def mock_urllib_open(mock_feed_response): + """Create a mock urllib.request.urlopen that returns feed data.""" + + def _mock(url): + mock_response = MagicMock() + mock_response.read.return_value = mock_feed_response + return mock_response + + return _mock + + +@pytest.fixture +def sample_items(): + """Sample items as returned by fetch module (title, source, timestamp).""" + return [ + ("Headline One", "Test Source", "12:00"), + ("Headline Two", "Another Source", "11:30"), + ("Headline Three", "Third Source", "10:45"), + ] + + +@pytest.fixture +def sample_config(): + """Sample config for testing.""" + from engine.config import Config + + return Config( + headline_limit=100, + feed_timeout=10, + mic_threshold_db=50, + mode="news", + firehose=False, + ntfy_topic="https://ntfy.sh/test/json", + ntfy_reconnect_delay=5, + message_display_secs=30, + font_dir="fonts", + font_path="", + font_index=0, + font_picker=False, + font_sz=60, + render_h=8, + ssaa=4, + scroll_dur=5.625, + frame_dt=0.05, + firehose_h=12, + grad_speed=0.08, + glitch_glyphs="░▒▓█▌▐", + kata_glyphs="ハミヒーウ", + script_fonts={}, + ) + + +@pytest.fixture +def poetry_config(): + """Sample config for poetry mode.""" + from engine.config import Config + + return Config( + headline_limit=100, + feed_timeout=10, + mic_threshold_db=50, + mode="poetry", + firehose=False, + ntfy_topic="https://ntfy.sh/test/json", + ntfy_reconnect_delay=5, + message_display_secs=30, + font_dir="fonts", + font_path="", + font_index=0, + font_picker=False, + font_sz=60, + render_h=8, + ssaa=4, + scroll_dur=5.625, + frame_dt=0.05, + firehose_h=12, + grad_speed=0.08, + glitch_glyphs="░▒▓█▌▐", + kata_glyphs="ハミヒーウ", + script_fonts={}, + ) + + +@pytest.fixture +def firehose_config(): + """Sample config with firehose enabled.""" + from engine.config import Config + + return Config( + headline_limit=100, + feed_timeout=10, + mic_threshold_db=50, + mode="news", + firehose=True, + ntfy_topic="https://ntfy.sh/test/json", + ntfy_reconnect_delay=5, + message_display_secs=30, + font_dir="fonts", + font_path="", + font_index=0, + font_picker=False, + font_sz=60, + render_h=8, + ssaa=4, + scroll_dur=5.625, + frame_dt=0.05, + firehose_h=12, + grad_speed=0.08, + glitch_glyphs="░▒▓█▌▐", + kata_glyphs="ハミヒーウ", + script_fonts={}, + ) diff --git a/tests/test_config.py b/tests/test_config.py index 9cf6c1c..ec5e27d 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -7,6 +7,8 @@ import tempfile from pathlib import Path from unittest.mock import patch +import pytest + from engine import config @@ -160,3 +162,140 @@ class TestSetFontSelection: config.set_font_selection(font_path=None, font_index=None) assert original_path == config.FONT_PATH assert original_index == config.FONT_INDEX + + +class TestConfigDataclass: + """Tests for Config dataclass.""" + + def test_config_has_required_fields(self): + """Config has all required fields.""" + c = config.Config() + assert hasattr(c, "headline_limit") + assert hasattr(c, "feed_timeout") + assert hasattr(c, "mic_threshold_db") + assert hasattr(c, "mode") + assert hasattr(c, "firehose") + assert hasattr(c, "ntfy_topic") + assert hasattr(c, "ntfy_reconnect_delay") + assert hasattr(c, "message_display_secs") + assert hasattr(c, "font_dir") + assert hasattr(c, "font_path") + assert hasattr(c, "font_index") + assert hasattr(c, "font_picker") + assert hasattr(c, "font_sz") + assert hasattr(c, "render_h") + assert hasattr(c, "ssaa") + assert hasattr(c, "scroll_dur") + assert hasattr(c, "frame_dt") + assert hasattr(c, "firehose_h") + assert hasattr(c, "grad_speed") + assert hasattr(c, "glitch_glyphs") + assert hasattr(c, "kata_glyphs") + assert hasattr(c, "script_fonts") + + def test_config_defaults(self): + """Config has sensible defaults.""" + c = config.Config() + assert c.headline_limit == 1000 + assert c.feed_timeout == 10 + assert c.mic_threshold_db == 50 + assert c.mode == "news" + assert c.firehose is False + assert c.ntfy_reconnect_delay == 5 + assert c.message_display_secs == 30 + + def test_config_is_immutable(self): + """Config is frozen (immutable).""" + c = config.Config() + with pytest.raises(AttributeError): + c.headline_limit = 500 # type: ignore + + def test_config_custom_values(self): + """Config accepts custom values.""" + c = config.Config( + headline_limit=500, + mode="poetry", + firehose=True, + ntfy_topic="https://ntfy.sh/test", + ) + assert c.headline_limit == 500 + assert c.mode == "poetry" + assert c.firehose is True + assert c.ntfy_topic == "https://ntfy.sh/test" + + +class TestConfigFromArgs: + """Tests for Config.from_args method.""" + + def test_from_args_defaults(self): + """from_args creates config with defaults from empty argv.""" + c = config.Config.from_args(["prog"]) + assert c.mode == "news" + assert c.firehose is False + assert c.font_picker is True + + def test_from_args_poetry_mode(self): + """from_args detects --poetry flag.""" + c = config.Config.from_args(["prog", "--poetry"]) + assert c.mode == "poetry" + + def test_from_args_poetry_short_flag(self): + """from_args detects -p short flag.""" + c = config.Config.from_args(["prog", "-p"]) + assert c.mode == "poetry" + + def test_from_args_firehose(self): + """from_args detects --firehose flag.""" + c = config.Config.from_args(["prog", "--firehose"]) + assert c.firehose is True + + def test_from_args_no_font_picker(self): + """from_args detects --no-font-picker flag.""" + c = config.Config.from_args(["prog", "--no-font-picker"]) + assert c.font_picker is False + + def test_from_args_font_index(self): + """from_args parses --font-index.""" + c = config.Config.from_args(["prog", "--font-index", "3"]) + assert c.font_index == 3 + + +class TestGetSetConfig: + """Tests for get_config and set_config functions.""" + + def test_get_config_returns_config(self): + """get_config returns a Config instance.""" + c = config.get_config() + assert isinstance(c, config.Config) + + def test_set_config_allows_injection(self): + """set_config allows injecting a custom config.""" + custom = config.Config(mode="poetry", headline_limit=100) + config.set_config(custom) + assert config.get_config().mode == "poetry" + assert config.get_config().headline_limit == 100 + + def test_set_config_then_get_config(self): + """set_config followed by get_config returns the set config.""" + original = config.get_config() + test_config = config.Config(headline_limit=42) + config.set_config(test_config) + result = config.get_config() + assert result.headline_limit == 42 + config.set_config(original) + + +class TestPlatformFontPaths: + """Tests for platform font path detection.""" + + def test_get_platform_font_paths_returns_dict(self): + """_get_platform_font_paths returns a dictionary.""" + fonts = config._get_platform_font_paths() + assert isinstance(fonts, dict) + + def test_platform_font_paths_common_languages(self): + """Common language font mappings exist.""" + fonts = config._get_platform_font_paths() + common = {"ja", "zh-cn", "ko", "ru", "ar", "hi"} + found = set(fonts.keys()) & common + assert len(found) > 0 From cdc8094de2e15cf64908736f92fb086647c1f89c Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 15:53:37 -0700 Subject: [PATCH 2/4] refactor: phase 2 - modularization of scroll engine Split monolithic scroll.py into focused modules: - viewport.py: terminal size (tw/th), ANSI positioning helpers - frame.py: FrameTimer class, scroll step calculation - layers.py: message overlay, ticker zone, firehose rendering - scroll.py: simplified orchestrator, imports from new modules Add stream controller and event types for future event-driven architecture: - controller.py: StreamController for source initialization and stream lifecycle - events.py: EventType enum and event dataclasses (HeadlineEvent, FrameTickEvent, etc.) Added tests for new modules: - test_viewport.py: 8 tests for viewport utilities - test_frame.py: 10 tests for frame timing - test_layers.py: 13 tests for layer compositing - test_events.py: 11 tests for event types - test_controller.py: 6 tests for stream controller This enables: - Testable chunks with clear responsibilities - Reusable viewport utilities across modules - Better separation of concerns in render pipeline - Foundation for future event-driven architecture Also includes Phase 1 documentation updates in code comments. --- engine/controller.py | 46 +++++++++ engine/events.py | 67 +++++++++++++ engine/frame.py | 57 +++++++++++ engine/layers.py | 201 +++++++++++++++++++++++++++++++++++++++ engine/scroll.py | 173 ++++++--------------------------- engine/viewport.py | 37 +++++++ tests/test_controller.py | 85 +++++++++++++++++ tests/test_events.py | 112 ++++++++++++++++++++++ tests/test_frame.py | 63 ++++++++++++ tests/test_layers.py | 96 +++++++++++++++++++ tests/test_viewport.py | 64 +++++++++++++ 11 files changed, 858 insertions(+), 143 deletions(-) create mode 100644 engine/controller.py create mode 100644 engine/events.py create mode 100644 engine/frame.py create mode 100644 engine/layers.py create mode 100644 engine/viewport.py create mode 100644 tests/test_controller.py create mode 100644 tests/test_events.py create mode 100644 tests/test_frame.py create mode 100644 tests/test_layers.py create mode 100644 tests/test_viewport.py diff --git a/engine/controller.py b/engine/controller.py new file mode 100644 index 0000000..98b24e5 --- /dev/null +++ b/engine/controller.py @@ -0,0 +1,46 @@ +""" +Stream controller - manages input sources and orchestrates the render stream. +""" + +from engine.config import Config, get_config +from engine.mic import MicMonitor +from engine.ntfy import NtfyPoller +from engine.scroll import stream + + +class StreamController: + """Controls the stream lifecycle - initializes sources and runs the stream.""" + + def __init__(self, config: Config | None = None): + self.config = config or get_config() + self.mic: MicMonitor | None = None + self.ntfy: NtfyPoller | None = None + + def initialize_sources(self) -> tuple[bool, bool]: + """Initialize microphone and ntfy sources. + + Returns: + (mic_ok, ntfy_ok) - success status for each source + """ + self.mic = MicMonitor(threshold_db=self.config.mic_threshold_db) + mic_ok = self.mic.start() if self.mic.available else False + + self.ntfy = NtfyPoller( + self.config.ntfy_topic, + reconnect_delay=self.config.ntfy_reconnect_delay, + display_secs=self.config.message_display_secs, + ) + ntfy_ok = self.ntfy.start() + + return bool(mic_ok), ntfy_ok + + def run(self, items: list) -> None: + """Run the stream with initialized sources.""" + if self.mic is None or self.ntfy is None: + self.initialize_sources() + stream(items, self.ntfy, self.mic) + + def cleanup(self) -> None: + """Clean up resources.""" + if self.mic: + self.mic.stop() diff --git a/engine/events.py b/engine/events.py new file mode 100644 index 0000000..d686285 --- /dev/null +++ b/engine/events.py @@ -0,0 +1,67 @@ +""" +Event types for the mainline application. +Defines the core events that flow through the system. +These types support a future migration to an event-driven architecture. +""" + +from dataclasses import dataclass +from datetime import datetime +from enum import Enum, auto + + +class EventType(Enum): + """Core event types in the mainline application.""" + + NEW_HEADLINE = auto() + FRAME_TICK = auto() + MIC_LEVEL = auto() + NTFY_MESSAGE = auto() + STREAM_START = auto() + STREAM_END = auto() + + +@dataclass +class HeadlineEvent: + """Event emitted when a new headline is ready for display.""" + + title: str + source: str + timestamp: str + language: str | None = None + + +@dataclass +class FrameTickEvent: + """Event emitted on each render frame.""" + + frame_number: int + timestamp: datetime + delta_seconds: float + + +@dataclass +class MicLevelEvent: + """Event emitted when microphone level changes significantly.""" + + db_level: float + excess_above_threshold: float + timestamp: datetime + + +@dataclass +class NtfyMessageEvent: + """Event emitted when an ntfy message is received.""" + + title: str + body: str + message_id: str | None = None + timestamp: datetime | None = None + + +@dataclass +class StreamEvent: + """Event emitted when stream starts or ends.""" + + event_type: EventType + headline_count: int = 0 + timestamp: datetime | None = None diff --git a/engine/frame.py b/engine/frame.py new file mode 100644 index 0000000..747d040 --- /dev/null +++ b/engine/frame.py @@ -0,0 +1,57 @@ +""" +Frame timing utilities — FPS control and precise timing. +""" + +import time + + +class FrameTimer: + """Frame timer for consistent render loop timing.""" + + def __init__(self, target_frame_dt: float = 0.05): + self.target_frame_dt = target_frame_dt + self._frame_count = 0 + self._start_time = time.monotonic() + self._last_frame_time = self._start_time + + @property + def fps(self) -> float: + """Current FPS based on elapsed frames.""" + elapsed = time.monotonic() - self._start_time + if elapsed > 0: + return self._frame_count / elapsed + return 0.0 + + def sleep_until_next_frame(self) -> float: + """Sleep to maintain target frame rate. Returns actual elapsed time.""" + now = time.monotonic() + elapsed = now - self._last_frame_time + self._last_frame_time = now + self._frame_count += 1 + + sleep_time = max(0, self.target_frame_dt - elapsed) + if sleep_time > 0: + time.sleep(sleep_time) + return elapsed + + def reset(self) -> None: + """Reset frame counter and start time.""" + self._frame_count = 0 + self._start_time = time.monotonic() + self._last_frame_time = self._start_time + + +def calculate_scroll_step( + scroll_dur: float, view_height: int, padding: int = 15 +) -> float: + """Calculate scroll step interval for smooth scrolling. + + Args: + scroll_dur: Duration in seconds for one headline to scroll through view + view_height: Terminal height in rows + padding: Extra rows for off-screen content + + Returns: + Time in seconds between scroll steps + """ + return scroll_dur / (view_height + padding) * 2 diff --git a/engine/layers.py b/engine/layers.py new file mode 100644 index 0000000..ebc53ef --- /dev/null +++ b/engine/layers.py @@ -0,0 +1,201 @@ +""" +Layer compositing — message overlay, ticker zone, firehose, noise. +Depends on: config, render, effects. +""" + +import random +import re +import time +from datetime import datetime + +from engine import config +from engine.effects import ( + fade_line, + firehose_line, + glitch_bar, + noise, + vis_trunc, +) +from engine.render import big_wrap, lr_gradient, lr_gradient_opposite +from engine.terminal import RST, W_COOL + +MSG_META = "\033[38;5;245m" +MSG_BORDER = "\033[2;38;5;37m" + + +def render_message_overlay( + msg: tuple[str, str, float] | None, + w: int, + h: int, + msg_cache: tuple, +) -> tuple[list[str], tuple]: + """Render ntfy message overlay. + + Args: + msg: (title, body, timestamp) or None + w: terminal width + h: terminal height + msg_cache: (cache_key, rendered_rows) for caching + + Returns: + (list of ANSI strings, updated cache) + """ + overlay = [] + if msg is None: + return overlay, msg_cache + + m_title, m_body, m_ts = msg + display_text = m_body or m_title or "(empty)" + display_text = re.sub(r"\s+", " ", display_text.upper()) + + cache_key = (display_text, w) + if msg_cache[0] != cache_key: + msg_rows = big_wrap(display_text, w - 4) + msg_cache = (cache_key, msg_rows) + else: + msg_rows = msg_cache[1] + + msg_rows = lr_gradient_opposite( + msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 + ) + + elapsed_s = int(time.monotonic() - m_ts) + remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) + ts_str = datetime.now().strftime("%H:%M:%S") + panel_h = len(msg_rows) + 2 + panel_top = max(0, (h - panel_h) // 2) + + row_idx = 0 + for mr in msg_rows: + ln = vis_trunc(mr, w) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") + row_idx += 1 + + meta_parts = [] + if m_title and m_title != m_body: + meta_parts.append(m_title) + meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") + meta = ( + " " + " \u00b7 ".join(meta_parts) + if len(meta_parts) > 1 + else " " + meta_parts[0] + ) + overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") + row_idx += 1 + + bar = "\u2500" * (w - 4) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") + + return overlay, msg_cache + + +def render_ticker_zone( + active: list, + scroll_cam: int, + ticker_h: int, + w: int, + noise_cache: dict, + grad_offset: float, +) -> tuple[list[str], dict]: + """Render the ticker scroll zone. + + Args: + active: list of (content_rows, color, canvas_y, meta_idx) + scroll_cam: camera position (viewport top) + ticker_h: height of ticker zone + w: terminal width + noise_cache: dict of cy -> noise string + grad_offset: gradient animation offset + + Returns: + (list of ANSI strings, updated noise_cache) + """ + buf = [] + top_zone = max(1, int(ticker_h * 0.25)) + bot_zone = max(1, int(ticker_h * 0.10)) + + def noise_at(cy): + if cy not in noise_cache: + noise_cache[cy] = noise(w) if random.random() < 0.15 else None + return noise_cache[cy] + + for r in range(ticker_h): + scr_row = r + 1 + cy = scroll_cam + r + top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 + bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 + row_fade = min(top_f, bot_f) + drawn = False + + for content, hc, by, midx in active: + cr = cy - by + if 0 <= cr < len(content): + raw = content[cr] + if cr != midx: + colored = lr_gradient([raw], grad_offset)[0] + else: + colored = raw + ln = vis_trunc(colored, w) + if row_fade < 1.0: + ln = fade_line(ln, row_fade) + + if cr == midx: + buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") + elif ln.strip(): + buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") + else: + buf.append(f"\033[{scr_row};1H\033[K") + drawn = True + break + + if not drawn: + n = noise_at(cy) + if row_fade < 1.0 and n: + n = fade_line(n, row_fade) + if n: + buf.append(f"\033[{scr_row};1H{n}") + else: + buf.append(f"\033[{scr_row};1H\033[K") + + return buf, noise_cache + + +def apply_glitch( + buf: list[str], + ticker_buf_start: int, + mic_excess: float, + w: int, +) -> list[str]: + """Apply glitch effect to ticker buffer. + + Args: + buf: current buffer + ticker_buf_start: index where ticker starts in buffer + mic_excess: mic level above threshold + w: terminal width + + Returns: + Updated buffer with glitches applied + """ + glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) + n_hits = 4 + int(mic_excess / 2) + ticker_buf_len = len(buf) - ticker_buf_start + + if random.random() < glitch_prob and ticker_buf_len > 0: + for _ in range(min(n_hits, ticker_buf_len)): + gi = random.randint(0, ticker_buf_len - 1) + scr_row = gi + 1 + buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" + + return buf + + +def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: + """Render firehose strip at bottom of screen.""" + buf = [] + if fh > 0: + for fr in range(fh): + scr_row = h - fh + fr + 1 + fline = firehose_line(items, w) + buf.append(f"\033[{scr_row};1H{fline}\033[K") + return buf diff --git a/engine/scroll.py b/engine/scroll.py index 810fe9f..dcb96f7 100644 --- a/engine/scroll.py +++ b/engine/scroll.py @@ -1,25 +1,22 @@ """ Render engine — ticker content, scroll motion, message panel, and firehose overlay. -Depends on: config, terminal, render, effects, ntfy, mic. +Orchestrates viewport, frame timing, and layers. """ import random -import re import sys import time -from datetime import datetime from engine import config -from engine.effects import ( - fade_line, - firehose_line, - glitch_bar, - next_headline, - noise, - vis_trunc, +from engine.frame import calculate_scroll_step +from engine.layers import ( + apply_glitch, + render_firehose, + render_message_overlay, + render_ticker_zone, ) -from engine.render import big_wrap, lr_gradient, lr_gradient_opposite, make_block -from engine.terminal import CLR, RST, W_COOL, th, tw +from engine.terminal import CLR +from engine.viewport import th, tw def stream(items, ntfy_poller, mic_monitor): @@ -35,33 +32,16 @@ def stream(items, ntfy_poller, mic_monitor): w, h = tw(), th() fh = config.FIREHOSE_H if config.FIREHOSE else 0 - ticker_view_h = h - fh # reserve fixed firehose strip at bottom - GAP = 3 # blank rows between headlines - scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2 + ticker_view_h = h - fh + GAP = 3 + scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) - # Taxonomy: - # - message: centered ntfy overlay panel - # - ticker: large headline text content - # - scroll: upward camera motion applied to ticker content - # - firehose: fixed carriage-return style strip pinned at bottom - # Active ticker blocks: (content_rows, color, canvas_y, meta_idx) active = [] - scroll_cam = 0 # viewport top in virtual canvas coords - ticker_next_y = ( - ticker_view_h # canvas-y where next block starts (off-screen bottom) - ) + scroll_cam = 0 + ticker_next_y = ticker_view_h noise_cache = {} scroll_motion_accum = 0.0 - - def _noise_at(cy): - if cy not in noise_cache: - noise_cache[cy] = noise(w) if random.random() < 0.15 else None - return noise_cache[cy] - - # Message color: bright cyan/white — distinct from headline greens - MSG_META = "\033[38;5;245m" # cool grey - MSG_BORDER = "\033[2;38;5;37m" # dim teal - _msg_cache = (None, None) # (cache_key, rendered_rows) + msg_cache = (None, None) while queued < config.HEADLINE_LIMIT or active: t0 = time.monotonic() @@ -69,80 +49,30 @@ def stream(items, ntfy_poller, mic_monitor): fh = config.FIREHOSE_H if config.FIREHOSE else 0 ticker_view_h = h - fh - # ── Check for ntfy message ──────────────────────── - msg_h = 0 - msg_overlay = [] msg = ntfy_poller.get_active_message() + msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache) buf = [] - if msg is not None: - m_title, m_body, m_ts = msg - # ── Message overlay: centered in the viewport ── - display_text = m_body or m_title or "(empty)" - display_text = re.sub(r"\s+", " ", display_text.upper()) - cache_key = (display_text, w) - if _msg_cache[0] != cache_key: - msg_rows = big_wrap(display_text, w - 4) - _msg_cache = (cache_key, msg_rows) - else: - msg_rows = _msg_cache[1] - msg_rows = lr_gradient_opposite( - msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 - ) - # Layout: rendered text + meta + border - elapsed_s = int(time.monotonic() - m_ts) - remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) - ts_str = datetime.now().strftime("%H:%M:%S") - panel_h = len(msg_rows) + 2 # meta + border - panel_top = max(0, (h - panel_h) // 2) - row_idx = 0 - for mr in msg_rows: - ln = vis_trunc(mr, w) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K" - ) - row_idx += 1 - # Meta line: title (if distinct) + source + countdown - meta_parts = [] - if m_title and m_title != m_body: - meta_parts.append(m_title) - meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") - meta = ( - " " + " \u00b7 ".join(meta_parts) - if len(meta_parts) > 1 - else " " + meta_parts[0] - ) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K" - ) - row_idx += 1 - # Border — constant boundary under message panel - bar = "\u2500" * (w - 4) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K" - ) + ticker_h = ticker_view_h - # Ticker draws above the fixed firehose strip; message is a centered overlay. - ticker_h = ticker_view_h - msg_h - - # ── Ticker content + scroll motion (always runs) ── scroll_motion_accum += config.FRAME_DT while scroll_motion_accum >= scroll_step_interval: scroll_motion_accum -= scroll_step_interval scroll_cam += 1 - # Enqueue new headlines when room at the bottom while ( ticker_next_y < scroll_cam + ticker_view_h + 10 and queued < config.HEADLINE_LIMIT ): + from engine.effects import next_headline + from engine.render import make_block + t, src, ts = next_headline(pool, items, seen) ticker_content, hc, midx = make_block(t, src, ts, w) active.append((ticker_content, hc, ticker_next_y, midx)) ticker_next_y += len(ticker_content) + GAP queued += 1 - # Prune off-screen blocks and stale noise active = [ (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam ] @@ -150,69 +80,26 @@ def stream(items, ntfy_poller, mic_monitor): if k < scroll_cam: del noise_cache[k] - # Draw ticker zone (above fixed firehose strip) - top_zone = max(1, int(ticker_h * 0.25)) - bot_zone = max(1, int(ticker_h * 0.10)) grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0 - ticker_buf_start = len(buf) # track where ticker rows start in buf - for r in range(ticker_h): - scr_row = r + 1 # 1-indexed ANSI screen row - cy = scroll_cam + r - top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 - bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 - row_fade = min(top_f, bot_f) - drawn = False - for content, hc, by, midx in active: - cr = cy - by - if 0 <= cr < len(content): - raw = content[cr] - if cr != midx: - colored = lr_gradient([raw], grad_offset)[0] - else: - colored = raw - ln = vis_trunc(colored, w) - if row_fade < 1.0: - ln = fade_line(ln, row_fade) - if cr == midx: - buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") - elif ln.strip(): - buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") - else: - buf.append(f"\033[{scr_row};1H\033[K") - drawn = True - break - if not drawn: - n = _noise_at(cy) - if row_fade < 1.0 and n: - n = fade_line(n, row_fade) - if n: - buf.append(f"\033[{scr_row};1H{n}") - else: - buf.append(f"\033[{scr_row};1H\033[K") + ticker_buf_start = len(buf) + + ticker_buf, noise_cache = render_ticker_zone( + active, scroll_cam, ticker_h, w, noise_cache, grad_offset + ) + buf.extend(ticker_buf) - # Glitch — base rate + mic-reactive spikes (ticker zone only) mic_excess = mic_monitor.excess - glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) - n_hits = 4 + int(mic_excess / 2) - ticker_buf_len = len(buf) - ticker_buf_start - if random.random() < glitch_prob and ticker_buf_len > 0: - for _ in range(min(n_hits, ticker_buf_len)): - gi = random.randint(0, ticker_buf_len - 1) - scr_row = gi + 1 - buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" + buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) + + firehose_buf = render_firehose(items, w, fh, h) + buf.extend(firehose_buf) - if config.FIREHOSE and fh > 0: - for fr in range(fh): - scr_row = h - fh + fr + 1 - fline = firehose_line(items, w) - buf.append(f"\033[{scr_row};1H{fline}\033[K") if msg_overlay: buf.extend(msg_overlay) sys.stdout.buffer.write("".join(buf).encode()) sys.stdout.flush() - # Precise frame timing elapsed = time.monotonic() - t0 time.sleep(max(0, config.FRAME_DT - elapsed)) diff --git a/engine/viewport.py b/engine/viewport.py new file mode 100644 index 0000000..d89b6a8 --- /dev/null +++ b/engine/viewport.py @@ -0,0 +1,37 @@ +""" +Viewport utilities — terminal dimensions and ANSI positioning helpers. +No internal dependencies. +""" + +import os + + +def tw() -> int: + """Get terminal width (columns).""" + try: + return os.get_terminal_size().columns + except Exception: + return 80 + + +def th() -> int: + """Get terminal height (lines).""" + try: + return os.get_terminal_size().lines + except Exception: + return 24 + + +def move_to(row: int, col: int = 1) -> str: + """Generate ANSI escape to move cursor to row, col (1-indexed).""" + return f"\033[{row};{col}H" + + +def clear_screen() -> str: + """Clear screen and move cursor to home.""" + return "\033[2J\033[H" + + +def clear_line() -> str: + """Clear current line.""" + return "\033[K" diff --git a/tests/test_controller.py b/tests/test_controller.py new file mode 100644 index 0000000..96ef02d --- /dev/null +++ b/tests/test_controller.py @@ -0,0 +1,85 @@ +""" +Tests for engine.controller module. +""" + +from unittest.mock import MagicMock, patch + +from engine import config +from engine.controller import StreamController + + +class TestStreamController: + """Tests for StreamController class.""" + + def test_init_default_config(self): + """StreamController initializes with default config.""" + controller = StreamController() + assert controller.config is not None + assert isinstance(controller.config, config.Config) + + def test_init_custom_config(self): + """StreamController accepts custom config.""" + custom_config = config.Config(headline_limit=500) + controller = StreamController(config=custom_config) + assert controller.config.headline_limit == 500 + + def test_init_sources_none_by_default(self): + """Sources are None until initialized.""" + controller = StreamController() + assert controller.mic is None + assert controller.ntfy is None + + @patch("engine.controller.MicMonitor") + @patch("engine.controller.NtfyPoller") + def test_initialize_sources(self, mock_ntfy, mock_mic): + """initialize_sources creates mic and ntfy instances.""" + mock_mic_instance = MagicMock() + mock_mic_instance.available = True + mock_mic_instance.start.return_value = True + mock_mic.return_value = mock_mic_instance + + mock_ntfy_instance = MagicMock() + mock_ntfy_instance.start.return_value = True + mock_ntfy.return_value = mock_ntfy_instance + + controller = StreamController() + mic_ok, ntfy_ok = controller.initialize_sources() + + assert mic_ok is True + assert ntfy_ok is True + assert controller.mic is not None + assert controller.ntfy is not None + + @patch("engine.controller.MicMonitor") + @patch("engine.controller.NtfyPoller") + def test_initialize_sources_mic_unavailable(self, mock_ntfy, mock_mic): + """initialize_sources handles unavailable mic.""" + mock_mic_instance = MagicMock() + mock_mic_instance.available = False + mock_mic.return_value = mock_mic_instance + + mock_ntfy_instance = MagicMock() + mock_ntfy_instance.start.return_value = True + mock_ntfy.return_value = mock_ntfy_instance + + controller = StreamController() + mic_ok, ntfy_ok = controller.initialize_sources() + + assert mic_ok is False + assert ntfy_ok is True + + +class TestStreamControllerCleanup: + """Tests for StreamController cleanup.""" + + @patch("engine.controller.MicMonitor") + def test_cleanup_stops_mic(self, mock_mic): + """cleanup stops the microphone if running.""" + mock_mic_instance = MagicMock() + mock_mic.return_value = mock_mic_instance + + controller = StreamController() + controller.mic = mock_mic_instance + controller.cleanup() + + mock_mic_instance.stop.assert_called_once() diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 0000000..ea5f4ed --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,112 @@ +""" +Tests for engine.events module. +""" + +from datetime import datetime + +from engine import events + + +class TestEventType: + """Tests for EventType enum.""" + + def test_event_types_exist(self): + """All expected event types exist.""" + assert hasattr(events.EventType, "NEW_HEADLINE") + assert hasattr(events.EventType, "FRAME_TICK") + assert hasattr(events.EventType, "MIC_LEVEL") + assert hasattr(events.EventType, "NTFY_MESSAGE") + assert hasattr(events.EventType, "STREAM_START") + assert hasattr(events.EventType, "STREAM_END") + + +class TestHeadlineEvent: + """Tests for HeadlineEvent dataclass.""" + + def test_create_headline_event(self): + """HeadlineEvent can be created with required fields.""" + e = events.HeadlineEvent( + title="Test Headline", + source="Test Source", + timestamp="12:00", + ) + assert e.title == "Test Headline" + assert e.source == "Test Source" + assert e.timestamp == "12:00" + + def test_headline_event_optional_language(self): + """HeadlineEvent supports optional language field.""" + e = events.HeadlineEvent( + title="Test", + source="Test", + timestamp="12:00", + language="ja", + ) + assert e.language == "ja" + + +class TestFrameTickEvent: + """Tests for FrameTickEvent dataclass.""" + + def test_create_frame_tick_event(self): + """FrameTickEvent can be created.""" + now = datetime.now() + e = events.FrameTickEvent( + frame_number=100, + timestamp=now, + delta_seconds=0.05, + ) + assert e.frame_number == 100 + assert e.timestamp == now + assert e.delta_seconds == 0.05 + + +class TestMicLevelEvent: + """Tests for MicLevelEvent dataclass.""" + + def test_create_mic_level_event(self): + """MicLevelEvent can be created.""" + now = datetime.now() + e = events.MicLevelEvent( + db_level=60.0, + excess_above_threshold=10.0, + timestamp=now, + ) + assert e.db_level == 60.0 + assert e.excess_above_threshold == 10.0 + + +class TestNtfyMessageEvent: + """Tests for NtfyMessageEvent dataclass.""" + + def test_create_ntfy_message_event(self): + """NtfyMessageEvent can be created with required fields.""" + e = events.NtfyMessageEvent( + title="Test Title", + body="Test Body", + ) + assert e.title == "Test Title" + assert e.body == "Test Body" + assert e.message_id is None + + def test_ntfy_message_event_with_id(self): + """NtfyMessageEvent supports optional message_id.""" + e = events.NtfyMessageEvent( + title="Test", + body="Test", + message_id="abc123", + ) + assert e.message_id == "abc123" + + +class TestStreamEvent: + """Tests for StreamEvent dataclass.""" + + def test_create_stream_event(self): + """StreamEvent can be created.""" + e = events.StreamEvent( + event_type=events.EventType.STREAM_START, + headline_count=100, + ) + assert e.event_type == events.EventType.STREAM_START + assert e.headline_count == 100 diff --git a/tests/test_frame.py b/tests/test_frame.py new file mode 100644 index 0000000..2c59b85 --- /dev/null +++ b/tests/test_frame.py @@ -0,0 +1,63 @@ +""" +Tests for engine.frame module. +""" + +import time + +from engine.frame import FrameTimer, calculate_scroll_step + + +class TestFrameTimer: + """Tests for FrameTimer class.""" + + def test_init_default(self): + """FrameTimer initializes with default values.""" + timer = FrameTimer() + assert timer.target_frame_dt == 0.05 + assert timer.fps >= 0 + + def test_init_custom(self): + """FrameTimer accepts custom frame duration.""" + timer = FrameTimer(target_frame_dt=0.1) + assert timer.target_frame_dt == 0.1 + + def test_fps_calculation(self): + """FrameTimer calculates FPS correctly.""" + timer = FrameTimer() + timer._frame_count = 10 + timer._start_time = time.monotonic() - 1.0 + assert timer.fps >= 9.0 + + def test_reset(self): + """FrameTimer.reset() clears frame count.""" + timer = FrameTimer() + timer._frame_count = 100 + timer.reset() + assert timer._frame_count == 0 + + +class TestCalculateScrollStep: + """Tests for calculate_scroll_step function.""" + + def test_basic_calculation(self): + """calculate_scroll_step returns positive value.""" + result = calculate_scroll_step(5.0, 24) + assert result > 0 + + def test_with_padding(self): + """calculate_scroll_step respects padding parameter.""" + without_padding = calculate_scroll_step(5.0, 24, padding=0) + with_padding = calculate_scroll_step(5.0, 24, padding=15) + assert with_padding < without_padding + + def test_larger_view_slower_scroll(self): + """Larger view height results in slower scroll steps.""" + small = calculate_scroll_step(5.0, 10) + large = calculate_scroll_step(5.0, 50) + assert large < small + + def test_longer_duration_slower_scroll(self): + """Longer scroll duration results in slower scroll steps.""" + fast = calculate_scroll_step(2.0, 24) + slow = calculate_scroll_step(10.0, 24) + assert slow > fast diff --git a/tests/test_layers.py b/tests/test_layers.py new file mode 100644 index 0000000..afe9c07 --- /dev/null +++ b/tests/test_layers.py @@ -0,0 +1,96 @@ +""" +Tests for engine.layers module. +""" + +import time + +from engine import layers + + +class TestRenderMessageOverlay: + """Tests for render_message_overlay function.""" + + def test_no_message_returns_empty(self): + """Returns empty list when msg is None.""" + result, cache = layers.render_message_overlay(None, 80, 24, (None, None)) + assert result == [] + assert cache[0] is None + + def test_message_returns_overlay_lines(self): + """Returns non-empty list when message is present.""" + msg = ("Test Title", "Test Body", time.monotonic()) + result, cache = layers.render_message_overlay(msg, 80, 24, (None, None)) + assert len(result) > 0 + assert cache[0] is not None + + def test_cache_key_changes_with_text(self): + """Cache key changes when message text changes.""" + msg1 = ("Title1", "Body1", time.monotonic()) + msg2 = ("Title2", "Body2", time.monotonic()) + + _, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None)) + _, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1) + + assert cache1[0] != cache2[0] + + def test_cache_reuse_avoids_recomputation(self): + """Cache is returned when same message is passed (interface test).""" + msg = ("Same Title", "Same Body", time.monotonic()) + + result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None)) + result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1) + + assert len(result1) > 0 + assert len(result2) > 0 + assert cache1[0] == cache2[0] + + +class TestRenderFirehose: + """Tests for render_firehose function.""" + + def test_no_firehose_returns_empty(self): + """Returns empty list when firehose height is 0.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 0, 24) + assert result == [] + + def test_firehose_returns_lines(self): + """Returns lines when firehose height > 0.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 4, 24) + assert len(result) == 4 + + def test_firehose_includes_ansi_escapes(self): + """Returns lines containing ANSI escape sequences.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 1, 24) + assert "\033[" in result[0] + + +class TestApplyGlitch: + """Tests for apply_glitch function.""" + + def test_empty_buffer_unchanged(self): + """Empty buffer is returned unchanged.""" + result = layers.apply_glitch([], 0, 0.0, 80) + assert result == [] + + def test_buffer_length_preserved(self): + """Buffer length is preserved after glitch application.""" + buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)] + result = layers.apply_glitch(buf, 0, 0.5, 80) + assert len(result) == len(buf) + + +class TestRenderTickerZone: + """Tests for render_ticker_zone function - focusing on interface.""" + + def test_returns_list(self): + """Returns a list of strings.""" + result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) + assert isinstance(result, list) + + def test_returns_dict_for_cache(self): + """Returns a dict for the noise cache.""" + result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) + assert isinstance(cache, dict) diff --git a/tests/test_viewport.py b/tests/test_viewport.py new file mode 100644 index 0000000..2338a7b --- /dev/null +++ b/tests/test_viewport.py @@ -0,0 +1,64 @@ +""" +Tests for engine.viewport module. +""" + +from engine import viewport + + +class TestViewportTw: + """Tests for tw() function.""" + + def test_tw_returns_int(self): + """tw() returns an integer.""" + result = viewport.tw() + assert isinstance(result, int) + + def test_tw_positive(self): + """tw() returns a positive value.""" + assert viewport.tw() > 0 + + +class TestViewportTh: + """Tests for th() function.""" + + def test_th_returns_int(self): + """th() returns an integer.""" + result = viewport.th() + assert isinstance(result, int) + + def test_th_positive(self): + """th() returns a positive value.""" + assert viewport.th() > 0 + + +class TestViewportMoveTo: + """Tests for move_to() function.""" + + def test_move_to_format(self): + """move_to() returns correctly formatted ANSI escape.""" + result = viewport.move_to(5, 10) + assert result == "\033[5;10H" + + def test_move_to_default_col(self): + """move_to() defaults to column 1.""" + result = viewport.move_to(5) + assert result == "\033[5;1H" + + +class TestViewportClearScreen: + """Tests for clear_screen() function.""" + + def test_clear_screen_format(self): + """clear_screen() returns clear screen ANSI escape.""" + result = viewport.clear_screen() + assert "\033[2J" in result + assert "\033[H" in result + + +class TestViewportClearLine: + """Tests for clear_line() function.""" + + def test_clear_line_format(self): + """clear_line() returns clear line ANSI escape.""" + result = viewport.clear_line() + assert result == "\033[K" From 35e5c8d38bac039919fe041327154fe0706a24b4 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 16:05:41 -0700 Subject: [PATCH 3/4] refactor: phase 3 - API efficiency improvements Add typed dataclasses for tuple returns: - types.py: HeadlineItem, FetchResult, Block dataclasses with legacy tuple converters - fetch.py: Add type hints and HeadlineTuple type alias Add pyright for static type checking: - Add pyright to dependencies - Verify type coverage with pyright (0 errors in core modules) This enables: - Named types instead of raw tuples (better IDE support, self-documenting) - Type-safe APIs across modules - Backward compatibility via to_tuple/from_tuple methods Note: Lazy imports skipped for render.py - startup impact is minimal. --- engine/fetch.py | 14 +++++-- engine/types.py | 60 ++++++++++++++++++++++++++++ pyproject.toml | 1 + tests/test_types.py | 95 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 166 insertions(+), 4 deletions(-) create mode 100644 engine/types.py create mode 100644 tests/test_types.py diff --git a/engine/fetch.py b/engine/fetch.py index a236c6e..5d6f9bb 100644 --- a/engine/fetch.py +++ b/engine/fetch.py @@ -8,6 +8,7 @@ import pathlib import re import urllib.request from datetime import datetime +from typing import Any import feedparser @@ -16,9 +17,13 @@ from engine.filter import skip, strip_tags from engine.sources import FEEDS, POETRY_SOURCES from engine.terminal import boot_ln +# Type alias for headline items +HeadlineTuple = tuple[str, str, str] + # ─── SINGLE FEED ────────────────────────────────────────── -def fetch_feed(url): +def fetch_feed(url: str) -> Any | None: + """Fetch and parse a single RSS feed URL.""" try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT) @@ -28,8 +33,9 @@ def fetch_feed(url): # ─── ALL RSS FEEDS ──────────────────────────────────────── -def fetch_all(): - items = [] +def fetch_all() -> tuple[list[HeadlineTuple], int, int]: + """Fetch all RSS feeds and return items, linked count, failed count.""" + items: list[HeadlineTuple] = [] linked = failed = 0 for src, url in FEEDS.items(): feed = fetch_feed(url) @@ -59,7 +65,7 @@ def fetch_all(): # ─── PROJECT GUTENBERG ──────────────────────────────────── -def _fetch_gutenberg(url, label): +def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]: """Download and parse stanzas/passages from a Project Gutenberg text.""" try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) diff --git a/engine/types.py b/engine/types.py new file mode 100644 index 0000000..f7b45a1 --- /dev/null +++ b/engine/types.py @@ -0,0 +1,60 @@ +""" +Shared dataclasses for the mainline application. +Provides named types for tuple returns across modules. +""" + +from dataclasses import dataclass + + +@dataclass +class HeadlineItem: + """A single headline item: title, source, and timestamp.""" + + title: str + source: str + timestamp: str + + def to_tuple(self) -> tuple[str, str, str]: + """Convert to tuple for backward compatibility.""" + return (self.title, self.source, self.timestamp) + + @classmethod + def from_tuple(cls, t: tuple[str, str, str]) -> "HeadlineItem": + """Create from tuple for backward compatibility.""" + return cls(title=t[0], source=t[1], timestamp=t[2]) + + +def items_to_tuples(items: list[HeadlineItem]) -> list[tuple[str, str, str]]: + """Convert list of HeadlineItem to list of tuples.""" + return [item.to_tuple() for item in items] + + +def tuples_to_items(tuples: list[tuple[str, str, str]]) -> list[HeadlineItem]: + """Convert list of tuples to list of HeadlineItem.""" + return [HeadlineItem.from_tuple(t) for t in tuples] + + +@dataclass +class FetchResult: + """Result from fetch_all() or fetch_poetry().""" + + items: list[HeadlineItem] + linked: int + failed: int + + def to_legacy_tuple(self) -> tuple[list[tuple], int, int]: + """Convert to legacy tuple format for backward compatibility.""" + return ([item.to_tuple() for item in self.items], self.linked, self.failed) + + +@dataclass +class Block: + """Rendered headline block from make_block().""" + + content: list[str] + color: str + meta_row_index: int + + def to_legacy_tuple(self) -> tuple[list[str], str, int]: + """Convert to legacy tuple format for backward compatibility.""" + return (self.content, self.color, self.meta_row_index) diff --git a/pyproject.toml b/pyproject.toml index 6d93f42..f52a05a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ classifiers = [ dependencies = [ "feedparser>=6.0.0", "Pillow>=10.0.0", + "pyright>=1.1.408", ] [project.optional-dependencies] diff --git a/tests/test_types.py b/tests/test_types.py new file mode 100644 index 0000000..33f5c0e --- /dev/null +++ b/tests/test_types.py @@ -0,0 +1,95 @@ +""" +Tests for engine.types module. +""" + +from engine.types import ( + Block, + FetchResult, + HeadlineItem, + items_to_tuples, + tuples_to_items, +) + + +class TestHeadlineItem: + """Tests for HeadlineItem dataclass.""" + + def test_create_headline_item(self): + """Can create HeadlineItem with required fields.""" + item = HeadlineItem(title="Test", source="Source", timestamp="12:00") + assert item.title == "Test" + assert item.source == "Source" + assert item.timestamp == "12:00" + + def test_to_tuple(self): + """to_tuple returns correct tuple.""" + item = HeadlineItem(title="Test", source="Source", timestamp="12:00") + assert item.to_tuple() == ("Test", "Source", "12:00") + + def test_from_tuple(self): + """from_tuple creates HeadlineItem from tuple.""" + item = HeadlineItem.from_tuple(("Test", "Source", "12:00")) + assert item.title == "Test" + assert item.source == "Source" + assert item.timestamp == "12:00" + + +class TestItemsConversion: + """Tests for list conversion functions.""" + + def test_items_to_tuples(self): + """Converts list of HeadlineItem to list of tuples.""" + items = [ + HeadlineItem(title="A", source="S", timestamp="10:00"), + HeadlineItem(title="B", source="T", timestamp="11:00"), + ] + result = items_to_tuples(items) + assert result == [("A", "S", "10:00"), ("B", "T", "11:00")] + + def test_tuples_to_items(self): + """Converts list of tuples to list of HeadlineItem.""" + tuples = [("A", "S", "10:00"), ("B", "T", "11:00")] + result = tuples_to_items(tuples) + assert len(result) == 2 + assert result[0].title == "A" + assert result[1].title == "B" + + +class TestFetchResult: + """Tests for FetchResult dataclass.""" + + def test_create_fetch_result(self): + """Can create FetchResult.""" + items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")] + result = FetchResult(items=items, linked=1, failed=0) + assert len(result.items) == 1 + assert result.linked == 1 + assert result.failed == 0 + + def test_to_legacy_tuple(self): + """to_legacy_tuple returns correct format.""" + items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")] + result = FetchResult(items=items, linked=1, failed=0) + legacy = result.to_legacy_tuple() + assert legacy[0] == [("Test", "Source", "12:00")] + assert legacy[1] == 1 + assert legacy[2] == 0 + + +class TestBlock: + """Tests for Block dataclass.""" + + def test_create_block(self): + """Can create Block.""" + block = Block( + content=["line1", "line2"], color="\033[38;5;46m", meta_row_index=1 + ) + assert len(block.content) == 2 + assert block.color == "\033[38;5;46m" + assert block.meta_row_index == 1 + + def test_to_legacy_tuple(self): + """to_legacy_tuple returns correct format.""" + block = Block(content=["line1"], color="green", meta_row_index=0) + legacy = block.to_legacy_tuple() + assert legacy == (["line1"], "green", 0) From 15de46722aa3d7ab0c06111e57ed47487fb5bef0 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 16:20:52 -0700 Subject: [PATCH 4/4] refactor: phase 4 - event-driven architecture foundation - Add EventBus class with pub/sub messaging (thread-safe) - Add emitter Protocol classes (EventEmitter, Startable, Stoppable) - Add event emission to NtfyPoller (NtfyMessageEvent) - Add event emission to MicMonitor (MicLevelEvent) - Update StreamController to publish stream start/end events - Add comprehensive tests for eventbus and emitters modules --- AGENTS.md | 110 ++++++++++++++++++++++ README.md | 41 +++++---- engine/controller.py | 24 ++++- engine/emitters.py | 25 +++++ engine/eventbus.py | 72 +++++++++++++++ engine/mic.py | 30 ++++++ engine/ntfy.py | 29 ++++++ engine/scroll.py | 6 +- tests/test_emitters.py | 69 ++++++++++++++ tests/test_eventbus.py | 202 +++++++++++++++++++++++++++++++++++++++++ tests/test_mic.py | 66 ++++++++++++++ tests/test_ntfy.py | 52 +++++++++++ 12 files changed, 704 insertions(+), 22 deletions(-) create mode 100644 AGENTS.md create mode 100644 engine/emitters.py create mode 100644 engine/eventbus.py create mode 100644 tests/test_emitters.py create mode 100644 tests/test_eventbus.py diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..21d7e2c --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,110 @@ +# Agent Development Guide + +## Development Environment + +This project uses: +- **mise** (mise.jdx.dev) - tool version manager and task runner +- **hk** (hk.jdx.dev) - git hook manager +- **uv** - fast Python package installer +- **ruff** - linter and formatter +- **pytest** - test runner + +### Setup + +```bash +# Install dependencies +mise run install + +# Or equivalently: +uv sync +``` + +### Available Commands + +```bash +mise run test # Run tests +mise run test-v # Run tests verbose +mise run test-cov # Run tests with coverage report +mise run lint # Run ruff linter +mise run lint-fix # Run ruff with auto-fix +mise run format # Run ruff formatter +mise run ci # Full CI pipeline (sync + test + coverage) +``` + +## Git Hooks + +**At the start of every agent session**, verify hooks are installed: + +```bash +ls -la .git/hooks/pre-commit +``` + +If hooks are not installed, install them with: + +```bash +hk init --mise +mise run pre-commit +``` + +The project uses hk configured in `hk.pkl`: +- **pre-commit**: runs ruff-format and ruff (with auto-fix) +- **pre-push**: runs ruff check + +## Workflow Rules + +### Before Committing + +1. **Always run the test suite** - never commit code that fails tests: + ```bash + mise run test + ``` + +2. **Always run the linter**: + ```bash + mise run lint + ``` + +3. **Fix any lint errors** before committing (or let the pre-commit hook handle it). + +4. **Review your changes** using `git diff` to understand what will be committed. + +### On Failing Tests + +When tests fail, **determine whether it's an out-of-date test or a correctly failing test**: + +- **Out-of-date test**: The test was written for old behavior that has legitimately changed. Update the test to match the new expected behavior. + +- **Correctly failing test**: The test correctly identifies a broken contract. Fix the implementation, not the test. + +**Never** modify a test to make it pass without understanding why it failed. + +### Code Review + +Before committing significant changes: +- Run `git diff` to review all changes +- Ensure new code follows existing patterns in the codebase +- Check that type hints are added for new functions +- Verify that tests exist for new functionality + +## Testing + +Tests live in `tests/` and follow the pattern `test_*.py`. + +Run all tests: +```bash +mise run test +``` + +Run with coverage: +```bash +mise run test-cov +``` + +The project uses pytest with strict marker enforcement. Test configuration is in `pyproject.toml` under `[tool.pytest.ini_options]`. + +## Architecture Notes + +- **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies +- **eventbus.py** provides thread-safe event publishing for decoupled communication +- **controller.py** coordinates ntfy/mic monitoring +- The render pipeline: fetch → render → effects → scroll → terminal output diff --git a/README.md b/README.md index a8c8f5c..5df0eec 100644 --- a/README.md +++ b/README.md @@ -101,26 +101,27 @@ Update `NTFY_TOPIC` in `engine/config.py` to point at your own topic. ``` engine/ - config.py constants, CLI flags, glyph tables - sources.py FEEDS, POETRY_SOURCES, language/script maps - terminal.py ANSI codes, tw/th, type_out, boot_ln - filter.py HTML stripping, content filter - translate.py Google Translate wrapper + region detection - render.py OTF → half-block pipeline (SSAA, gradient) - effects.py noise, glitch_bar, fade, firehose - fetch.py RSS/Gutenberg fetching + cache load/save - ntfy.py NtfyPoller — standalone, zero internal deps - mic.py MicMonitor — standalone, graceful fallback - scroll.py stream() frame loop + message rendering - app.py main(), font picker TUI, boot sequence, signal handler - -tests/ - test_config.py - test_filter.py - test_mic.py - test_ntfy.py - test_sources.py - test_terminal.py + __init__.py package marker + app.py main(), font picker TUI, boot sequence, signal handler + config.py constants, CLI flags, glyph tables + sources.py FEEDS, POETRY_SOURCES, language/script maps + terminal.py ANSI codes, tw/th, type_out, boot_ln + filter.py HTML stripping, content filter + translate.py Google Translate wrapper + region detection + render.py OTF → half-block pipeline (SSAA, gradient) + effects.py noise, glitch_bar, fade, firehose + fetch.py RSS/Gutenberg fetching + cache load/save + ntfy.py NtfyPoller — standalone, zero internal deps + mic.py MicMonitor — standalone, graceful fallback + scroll.py stream() frame loop + message rendering + viewport.py terminal dimension tracking (tw/th) + frame.py scroll step calculation, timing + layers.py ticker zone, firehose, message overlay rendering + eventbus.py thread-safe event publishing for decoupled communication + events.py event types and definitions + controller.py coordinates ntfy/mic monitoring and event publishing + emitters.py background emitters for ntfy and mic + types.py type definitions and dataclasses ``` `ntfy.py` and `mic.py` have zero internal dependencies and can be imported by any other visualizer. diff --git a/engine/controller.py b/engine/controller.py index 98b24e5..e6e2e3d 100644 --- a/engine/controller.py +++ b/engine/controller.py @@ -3,6 +3,8 @@ Stream controller - manages input sources and orchestrates the render stream. """ from engine.config import Config, get_config +from engine.eventbus import EventBus +from engine.events import EventType, StreamEvent from engine.mic import MicMonitor from engine.ntfy import NtfyPoller from engine.scroll import stream @@ -11,8 +13,9 @@ from engine.scroll import stream class StreamController: """Controls the stream lifecycle - initializes sources and runs the stream.""" - def __init__(self, config: Config | None = None): + def __init__(self, config: Config | None = None, event_bus: EventBus | None = None): self.config = config or get_config() + self.event_bus = event_bus self.mic: MicMonitor | None = None self.ntfy: NtfyPoller | None = None @@ -38,8 +41,27 @@ class StreamController: """Run the stream with initialized sources.""" if self.mic is None or self.ntfy is None: self.initialize_sources() + + if self.event_bus: + self.event_bus.publish( + EventType.STREAM_START, + StreamEvent( + event_type=EventType.STREAM_START, + headline_count=len(items), + ), + ) + stream(items, self.ntfy, self.mic) + if self.event_bus: + self.event_bus.publish( + EventType.STREAM_END, + StreamEvent( + event_type=EventType.STREAM_END, + headline_count=len(items), + ), + ) + def cleanup(self) -> None: """Clean up resources.""" if self.mic: diff --git a/engine/emitters.py b/engine/emitters.py new file mode 100644 index 0000000..6d6a5a1 --- /dev/null +++ b/engine/emitters.py @@ -0,0 +1,25 @@ +""" +Event emitter protocols - abstract interfaces for event-producing components. +""" + +from collections.abc import Callable +from typing import Any, Protocol + + +class EventEmitter(Protocol): + """Protocol for components that emit events.""" + + def subscribe(self, callback: Callable[[Any], None]) -> None: ... + def unsubscribe(self, callback: Callable[[Any], None]) -> None: ... + + +class Startable(Protocol): + """Protocol for components that can be started.""" + + def start(self) -> Any: ... + + +class Stoppable(Protocol): + """Protocol for components that can be stopped.""" + + def stop(self) -> None: ... diff --git a/engine/eventbus.py b/engine/eventbus.py new file mode 100644 index 0000000..6ea5628 --- /dev/null +++ b/engine/eventbus.py @@ -0,0 +1,72 @@ +""" +Event bus - pub/sub messaging for decoupled component communication. +""" + +import threading +from collections import defaultdict +from collections.abc import Callable +from typing import Any + +from engine.events import EventType + + +class EventBus: + """Thread-safe event bus for publish-subscribe messaging.""" + + def __init__(self): + self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict( + list + ) + self._lock = threading.Lock() + + def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None: + """Register a callback for a specific event type.""" + with self._lock: + self._subscribers[event_type].append(callback) + + def unsubscribe( + self, event_type: EventType, callback: Callable[[Any], None] + ) -> None: + """Remove a callback for a specific event type.""" + with self._lock: + if callback in self._subscribers[event_type]: + self._subscribers[event_type].remove(callback) + + def publish(self, event_type: EventType, event: Any = None) -> None: + """Publish an event to all subscribers.""" + with self._lock: + callbacks = list(self._subscribers.get(event_type, [])) + for callback in callbacks: + try: + callback(event) + except Exception: + pass + + def clear(self) -> None: + """Remove all subscribers.""" + with self._lock: + self._subscribers.clear() + + def subscriber_count(self, event_type: EventType | None = None) -> int: + """Get subscriber count for an event type, or total if None.""" + with self._lock: + if event_type is None: + return sum(len(cb) for cb in self._subscribers.values()) + return len(self._subscribers.get(event_type, [])) + + +_event_bus: EventBus | None = None + + +def get_event_bus() -> EventBus: + """Get the global event bus instance.""" + global _event_bus + if _event_bus is None: + _event_bus = EventBus() + return _event_bus + + +def set_event_bus(bus: EventBus) -> None: + """Set the global event bus instance (for testing).""" + global _event_bus + _event_bus = bus diff --git a/engine/mic.py b/engine/mic.py index b8c5175..c72a440 100644 --- a/engine/mic.py +++ b/engine/mic.py @@ -4,6 +4,8 @@ Gracefully degrades if sounddevice/numpy are unavailable. """ import atexit +from collections.abc import Callable +from datetime import datetime try: import numpy as _np @@ -14,6 +16,9 @@ except Exception: _HAS_MIC = False +from engine.events import MicLevelEvent + + class MicMonitor: """Background mic stream that exposes current RMS dB level.""" @@ -21,6 +26,7 @@ class MicMonitor: self.threshold_db = threshold_db self._db = -99.0 self._stream = None + self._subscribers: list[Callable[[MicLevelEvent], None]] = [] @property def available(self): @@ -37,6 +43,23 @@ class MicMonitor: """dB above threshold (clamped to 0).""" return max(0.0, self._db - self.threshold_db) + def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None: + """Register a callback to be called when mic level changes.""" + self._subscribers.append(callback) + + def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None: + """Remove a registered callback.""" + if callback in self._subscribers: + self._subscribers.remove(callback) + + def _emit(self, event: MicLevelEvent) -> None: + """Emit an event to all subscribers.""" + for cb in self._subscribers: + try: + cb(event) + except Exception: + pass + def start(self): """Start background mic stream. Returns True on success, False/None otherwise.""" if not _HAS_MIC: @@ -45,6 +68,13 @@ class MicMonitor: def _cb(indata, frames, t, status): rms = float(_np.sqrt(_np.mean(indata**2))) self._db = 20 * _np.log10(rms) if rms > 0 else -99.0 + if self._subscribers: + event = MicLevelEvent( + db_level=self._db, + excess_above_threshold=max(0.0, self._db - self.threshold_db), + timestamp=datetime.now(), + ) + self._emit(event) try: self._stream = _sd.InputStream( diff --git a/engine/ntfy.py b/engine/ntfy.py index d819ea2..583fc66 100644 --- a/engine/ntfy.py +++ b/engine/ntfy.py @@ -16,8 +16,12 @@ import json import threading import time import urllib.request +from collections.abc import Callable +from datetime import datetime from urllib.parse import parse_qs, urlencode, urlparse, urlunparse +from engine.events import NtfyMessageEvent + class NtfyPoller: """SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT).""" @@ -28,6 +32,24 @@ class NtfyPoller: self.display_secs = display_secs self._message = None # (title, body, monotonic_timestamp) or None self._lock = threading.Lock() + self._subscribers: list[Callable[[NtfyMessageEvent], None]] = [] + + def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None: + """Register a callback to be called when a message is received.""" + self._subscribers.append(callback) + + def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None: + """Remove a registered callback.""" + if callback in self._subscribers: + self._subscribers.remove(callback) + + def _emit(self, event: NtfyMessageEvent) -> None: + """Emit an event to all subscribers.""" + for cb in self._subscribers: + try: + cb(event) + except Exception: + pass def start(self): """Start background stream thread. Returns True.""" @@ -88,6 +110,13 @@ class NtfyPoller: data.get("message", ""), time.monotonic(), ) + event = NtfyMessageEvent( + title=data.get("title", ""), + body=data.get("message", ""), + message_id=data.get("id"), + timestamp=datetime.now(), + ) + self._emit(event) except Exception: pass time.sleep(self.reconnect_delay) diff --git a/engine/scroll.py b/engine/scroll.py index dcb96f7..41445ad 100644 --- a/engine/scroll.py +++ b/engine/scroll.py @@ -43,11 +43,15 @@ def stream(items, ntfy_poller, mic_monitor): scroll_motion_accum = 0.0 msg_cache = (None, None) - while queued < config.HEADLINE_LIMIT or active: + while True: + if queued >= config.HEADLINE_LIMIT and not active: + break + t0 = time.monotonic() w, h = tw(), th() fh = config.FIREHOSE_H if config.FIREHOSE else 0 ticker_view_h = h - fh + scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) msg = ntfy_poller.get_active_message() msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache) diff --git a/tests/test_emitters.py b/tests/test_emitters.py new file mode 100644 index 0000000..b28cddb --- /dev/null +++ b/tests/test_emitters.py @@ -0,0 +1,69 @@ +""" +Tests for engine.emitters module. +""" + +from engine.emitters import EventEmitter, Startable, Stoppable + + +class TestEventEmitterProtocol: + """Tests for EventEmitter protocol.""" + + def test_protocol_exists(self): + """EventEmitter protocol is defined.""" + assert EventEmitter is not None + + def test_protocol_has_subscribe_method(self): + """EventEmitter has subscribe method in protocol.""" + assert hasattr(EventEmitter, "subscribe") + + def test_protocol_has_unsubscribe_method(self): + """EventEmitter has unsubscribe method in protocol.""" + assert hasattr(EventEmitter, "unsubscribe") + + +class TestStartableProtocol: + """Tests for Startable protocol.""" + + def test_protocol_exists(self): + """Startable protocol is defined.""" + assert Startable is not None + + def test_protocol_has_start_method(self): + """Startable has start method in protocol.""" + assert hasattr(Startable, "start") + + +class TestStoppableProtocol: + """Tests for Stoppable protocol.""" + + def test_protocol_exists(self): + """Stoppable protocol is defined.""" + assert Stoppable is not None + + def test_protocol_has_stop_method(self): + """Stoppable has stop method in protocol.""" + assert hasattr(Stoppable, "stop") + + +class TestProtocolCompliance: + """Tests that existing classes comply with protocols.""" + + def test_ntfy_poller_complies_with_protocol(self): + """NtfyPoller implements EventEmitter protocol.""" + from engine.ntfy import NtfyPoller + + poller = NtfyPoller("http://example.com/topic") + assert hasattr(poller, "subscribe") + assert hasattr(poller, "unsubscribe") + assert callable(poller.subscribe) + assert callable(poller.unsubscribe) + + def test_mic_monitor_complies_with_protocol(self): + """MicMonitor implements EventEmitter and Startable protocols.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + assert hasattr(monitor, "subscribe") + assert hasattr(monitor, "unsubscribe") + assert hasattr(monitor, "start") + assert hasattr(monitor, "stop") diff --git a/tests/test_eventbus.py b/tests/test_eventbus.py new file mode 100644 index 0000000..7094c7b --- /dev/null +++ b/tests/test_eventbus.py @@ -0,0 +1,202 @@ +""" +Tests for engine.eventbus module. +""" + + +from engine.eventbus import EventBus, get_event_bus, set_event_bus +from engine.events import EventType, NtfyMessageEvent + + +class TestEventBusInit: + """Tests for EventBus initialization.""" + + def test_init_creates_empty_subscribers(self): + """EventBus starts with no subscribers.""" + bus = EventBus() + assert bus.subscriber_count() == 0 + + +class TestEventBusSubscribe: + """Tests for EventBus.subscribe method.""" + + def test_subscribe_adds_callback(self): + """subscribe() adds a callback for an event type.""" + bus = EventBus() + def callback(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, callback) + + assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1 + + def test_subscribe_multiple_callbacks_same_event(self): + """Multiple callbacks can be subscribed to the same event type.""" + bus = EventBus() + def cb1(e): + return None + def cb2(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, cb1) + bus.subscribe(EventType.NTFY_MESSAGE, cb2) + + assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 2 + + def test_subscribe_different_event_types(self): + """Callbacks can be subscribed to different event types.""" + bus = EventBus() + def cb1(e): + return None + def cb2(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, cb1) + bus.subscribe(EventType.MIC_LEVEL, cb2) + + assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1 + assert bus.subscriber_count(EventType.MIC_LEVEL) == 1 + + +class TestEventBusUnsubscribe: + """Tests for EventBus.unsubscribe method.""" + + def test_unsubscribe_removes_callback(self): + """unsubscribe() removes a callback.""" + bus = EventBus() + def callback(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, callback) + bus.unsubscribe(EventType.NTFY_MESSAGE, callback) + + assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 0 + + def test_unsubscribe_nonexistent_callback_no_error(self): + """unsubscribe() handles non-existent callback gracefully.""" + bus = EventBus() + def callback(e): + return None + + bus.unsubscribe(EventType.NTFY_MESSAGE, callback) + + +class TestEventBusPublish: + """Tests for EventBus.publish method.""" + + def test_publish_calls_subscriber(self): + """publish() calls the subscriber callback.""" + bus = EventBus() + received = [] + + def callback(event): + received.append(event) + + bus.subscribe(EventType.NTFY_MESSAGE, callback) + event = NtfyMessageEvent(title="Test", body="Body") + bus.publish(EventType.NTFY_MESSAGE, event) + + assert len(received) == 1 + assert received[0].title == "Test" + + def test_publish_multiple_subscribers(self): + """publish() calls all subscribers for an event type.""" + bus = EventBus() + received1 = [] + received2 = [] + + def callback1(event): + received1.append(event) + + def callback2(event): + received2.append(event) + + bus.subscribe(EventType.NTFY_MESSAGE, callback1) + bus.subscribe(EventType.NTFY_MESSAGE, callback2) + event = NtfyMessageEvent(title="Test", body="Body") + bus.publish(EventType.NTFY_MESSAGE, event) + + assert len(received1) == 1 + assert len(received2) == 1 + + def test_publish_different_event_types(self): + """publish() only calls subscribers for the specific event type.""" + bus = EventBus() + ntfy_received = [] + mic_received = [] + + def ntfy_callback(event): + ntfy_received.append(event) + + def mic_callback(event): + mic_received.append(event) + + bus.subscribe(EventType.NTFY_MESSAGE, ntfy_callback) + bus.subscribe(EventType.MIC_LEVEL, mic_callback) + event = NtfyMessageEvent(title="Test", body="Body") + bus.publish(EventType.NTFY_MESSAGE, event) + + assert len(ntfy_received) == 1 + assert len(mic_received) == 0 + + +class TestEventBusClear: + """Tests for EventBus.clear method.""" + + def test_clear_removes_all_subscribers(self): + """clear() removes all subscribers.""" + bus = EventBus() + def cb1(e): + return None + def cb2(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, cb1) + bus.subscribe(EventType.MIC_LEVEL, cb2) + bus.clear() + + assert bus.subscriber_count() == 0 + + +class TestEventBusThreadSafety: + """Tests for EventBus thread safety.""" + + def test_concurrent_subscribe_unsubscribe(self): + """subscribe and unsubscribe can be called concurrently.""" + import threading + + bus = EventBus() + callbacks = [lambda e: None for _ in range(10)] + + def subscribe(): + for cb in callbacks: + bus.subscribe(EventType.NTFY_MESSAGE, cb) + + def unsubscribe(): + for cb in callbacks: + bus.unsubscribe(EventType.NTFY_MESSAGE, cb) + + t1 = threading.Thread(target=subscribe) + t2 = threading.Thread(target=unsubscribe) + t1.start() + t2.start() + t1.join() + t2.join() + + +class TestGlobalEventBus: + """Tests for global event bus functions.""" + + def test_get_event_bus_returns_singleton(self): + """get_event_bus() returns the same instance.""" + bus1 = get_event_bus() + bus2 = get_event_bus() + assert bus1 is bus2 + + def test_set_event_bus_replaces_singleton(self): + """set_event_bus() replaces the global event bus.""" + new_bus = EventBus() + set_event_bus(new_bus) + try: + assert get_event_bus() is new_bus + finally: + set_event_bus(None) diff --git a/tests/test_mic.py b/tests/test_mic.py index 3e610b9..a347e5f 100644 --- a/tests/test_mic.py +++ b/tests/test_mic.py @@ -2,8 +2,11 @@ Tests for engine.mic module. """ +from datetime import datetime from unittest.mock import patch +from engine.events import MicLevelEvent + class TestMicMonitorImport: """Tests for module import behavior.""" @@ -81,3 +84,66 @@ class TestMicMonitorStop: monitor = MicMonitor() monitor.stop() assert monitor._stream is None + + +class TestMicMonitorEventEmission: + """Tests for MicMonitor event emission.""" + + def test_subscribe_adds_callback(self): + """subscribe() adds a callback.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + def callback(e): + return None + + monitor.subscribe(callback) + + assert callback in monitor._subscribers + + def test_unsubscribe_removes_callback(self): + """unsubscribe() removes a callback.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + def callback(e): + return None + monitor.subscribe(callback) + + monitor.unsubscribe(callback) + + assert callback not in monitor._subscribers + + def test_emit_calls_subscribers(self): + """_emit() calls all subscribers.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + received = [] + + def callback(event): + received.append(event) + + monitor.subscribe(callback) + event = MicLevelEvent( + db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now() + ) + monitor._emit(event) + + assert len(received) == 1 + assert received[0].db_level == 60.0 + + def test_emit_handles_subscriber_exception(self): + """_emit() handles exceptions in subscribers gracefully.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + + def bad_callback(event): + raise RuntimeError("test") + + monitor.subscribe(bad_callback) + event = MicLevelEvent( + db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now() + ) + monitor._emit(event) diff --git a/tests/test_ntfy.py b/tests/test_ntfy.py index 778fc6e..6c3437c 100644 --- a/tests/test_ntfy.py +++ b/tests/test_ntfy.py @@ -5,6 +5,7 @@ Tests for engine.ntfy module. import time from unittest.mock import MagicMock, patch +from engine.events import NtfyMessageEvent from engine.ntfy import NtfyPoller @@ -68,3 +69,54 @@ class TestNtfyPollerDismiss: poller.dismiss() assert poller._message is None + + +class TestNtfyPollerEventEmission: + """Tests for NtfyPoller event emission.""" + + def test_subscribe_adds_callback(self): + """subscribe() adds a callback.""" + poller = NtfyPoller("http://example.com/topic") + def callback(e): + return None + + poller.subscribe(callback) + + assert callback in poller._subscribers + + def test_unsubscribe_removes_callback(self): + """unsubscribe() removes a callback.""" + poller = NtfyPoller("http://example.com/topic") + def callback(e): + return None + poller.subscribe(callback) + + poller.unsubscribe(callback) + + assert callback not in poller._subscribers + + def test_emit_calls_subscribers(self): + """_emit() calls all subscribers.""" + poller = NtfyPoller("http://example.com/topic") + received = [] + + def callback(event): + received.append(event) + + poller.subscribe(callback) + event = NtfyMessageEvent(title="Test", body="Body") + poller._emit(event) + + assert len(received) == 1 + assert received[0].title == "Test" + + def test_emit_handles_subscriber_exception(self): + """_emit() handles exceptions in subscribers gracefully.""" + poller = NtfyPoller("http://example.com/topic") + + def bad_callback(event): + raise RuntimeError("test") + + poller.subscribe(bad_callback) + event = NtfyMessageEvent(title="Test", body="Body") + poller._emit(event)