From f1701439391fa4175368b7450b661d39435d07e0 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 15:44:39 -0700 Subject: [PATCH 1/7] refactor: phase 1 - testability improvements - Add Config dataclass with get_config()/set_config() for injection - Add Config.from_args() for CLI argument parsing (testable) - Add platform font path detection (Darwin/Linux) - Bound translate cache with @lru_cache(maxsize=500) - Add fixtures for external dependencies (network, feeds, config) - Add 15 tests for Config class, from_args, and platform detection This enables testability by: - Allowing config injection instead of global mutable state - Supporting custom argv in from_args() for testing - Providing reusable fixtures for mocking network/filesystem - Preventing unbounded memory growth in translation cache Fixes: _arg_value/_arg_int not accepting custom argv --- engine/config.py | 143 +++++++++++++++++++++- engine/translate.py | 35 +++--- tests/fixtures/__init__.py | 236 +++++++++++++++++++++++++++++++++++++ tests/test_config.py | 139 ++++++++++++++++++++++ 4 files changed, 531 insertions(+), 22 deletions(-) create mode 100644 tests/fixtures/__init__.py diff --git a/engine/config.py b/engine/config.py index 2174568..284877c 100644 --- a/engine/config.py +++ b/engine/config.py @@ -1,25 +1,28 @@ """ Configuration constants, CLI flags, and glyph tables. +Supports both global constants (backward compatible) and injected config for testing. """ import sys +from dataclasses import dataclass, field from pathlib import Path _REPO_ROOT = Path(__file__).resolve().parent.parent _FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"} -def _arg_value(flag): +def _arg_value(flag, argv: list[str] | None = None): """Get value following a CLI flag, if present.""" - if flag not in sys.argv: + argv = argv or sys.argv + if flag not in argv: return None - i = sys.argv.index(flag) - return sys.argv[i + 1] if i + 1 < len(sys.argv) else None + i = argv.index(flag) + return argv[i + 1] if i + 1 < len(argv) else None -def _arg_int(flag, default): +def _arg_int(flag, default, argv: list[str] | None = None): """Get int CLI argument with safe fallback.""" - raw = _arg_value(flag) + raw = _arg_value(flag, argv) if raw is None: return default try: @@ -53,6 +56,134 @@ def list_repo_font_files(): return _list_font_files(FONT_DIR) +def _get_platform_font_paths() -> dict[str, str]: + """Get platform-appropriate font paths for non-Latin scripts.""" + import platform + + system = platform.system() + + if system == "Darwin": + return { + "zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc", + "ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc", + "ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc", + "ru": "/System/Library/Fonts/Supplemental/Arial.ttf", + "uk": "/System/Library/Fonts/Supplemental/Arial.ttf", + "el": "/System/Library/Fonts/Supplemental/Arial.ttf", + "he": "/System/Library/Fonts/Supplemental/Arial.ttf", + "ar": "/System/Library/Fonts/GeezaPro.ttc", + "fa": "/System/Library/Fonts/GeezaPro.ttc", + "hi": "/System/Library/Fonts/Kohinoor.ttc", + "th": "/System/Library/Fonts/ThonburiUI.ttc", + } + elif system == "Linux": + return { + "zh-cn": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", + "ja": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", + "ko": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc", + "ru": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "uk": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "el": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "he": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "ar": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "fa": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf", + "hi": "/usr/share/fonts/truetype/noto/NotoSansDevanagari-Regular.ttf", + "th": "/usr/share/fonts/truetype/noto/NotoSansThai-Regular.ttf", + } + else: + return {} + + +@dataclass(frozen=True) +class Config: + """Immutable configuration container for injected config.""" + + headline_limit: int = 1000 + feed_timeout: int = 10 + mic_threshold_db: int = 50 + mode: str = "news" + firehose: bool = False + + ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json" + ntfy_reconnect_delay: int = 5 + message_display_secs: int = 30 + + font_dir: str = "fonts" + font_path: str = "" + font_index: int = 0 + font_picker: bool = True + font_sz: int = 60 + render_h: int = 8 + + ssaa: int = 4 + + scroll_dur: float = 5.625 + frame_dt: float = 0.05 + firehose_h: int = 12 + grad_speed: float = 0.08 + + glitch_glyphs: str = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" + kata_glyphs: str = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ" + + script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths) + + @classmethod + def from_args(cls, argv: list[str] | None = None) -> "Config": + """Create Config from CLI arguments (or custom argv for testing).""" + argv = argv or sys.argv + + font_dir = _resolve_font_path(_arg_value("--font-dir", argv) or "fonts") + font_file_arg = _arg_value("--font-file", argv) + font_files = _list_font_files(font_dir) + font_path = ( + _resolve_font_path(font_file_arg) + if font_file_arg + else (font_files[0] if font_files else "") + ) + + return cls( + headline_limit=1000, + feed_timeout=10, + mic_threshold_db=50, + mode="poetry" if "--poetry" in argv or "-p" in argv else "news", + firehose="--firehose" in argv, + ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json", + ntfy_reconnect_delay=5, + message_display_secs=30, + font_dir=font_dir, + font_path=font_path, + font_index=max(0, _arg_int("--font-index", 0, argv)), + font_picker="--no-font-picker" not in argv, + font_sz=60, + render_h=8, + ssaa=4, + scroll_dur=5.625, + frame_dt=0.05, + firehose_h=12, + grad_speed=0.08, + glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋", + kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ", + script_fonts=_get_platform_font_paths(), + ) + + +_config: Config | None = None + + +def get_config() -> Config: + """Get the global config instance (lazy-loaded).""" + global _config + if _config is None: + _config = Config.from_args() + return _config + + +def set_config(config: Config) -> None: + """Set the global config instance (for testing).""" + global _config + _config = config + + # ─── RUNTIME ────────────────────────────────────────────── HEADLINE_LIMIT = 1000 FEED_TIMEOUT = 10 diff --git a/engine/translate.py b/engine/translate.py index eb1f2ca..04e04dd 100644 --- a/engine/translate.py +++ b/engine/translate.py @@ -7,26 +7,16 @@ import json import re import urllib.parse import urllib.request +from functools import lru_cache from engine.sources import LOCATION_LANGS -_TRANSLATE_CACHE = {} +TRANSLATE_CACHE_SIZE = 500 -def detect_location_language(title): - """Detect if headline mentions a location, return target language.""" - title_lower = title.lower() - for pattern, lang in LOCATION_LANGS.items(): - if re.search(pattern, title_lower): - return lang - return None - - -def translate_headline(title, target_lang): - """Translate headline via Google Translate API (zero dependencies).""" - key = (title, target_lang) - if key in _TRANSLATE_CACHE: - return _TRANSLATE_CACHE[key] +@lru_cache(maxsize=TRANSLATE_CACHE_SIZE) +def _translate_cached(title: str, target_lang: str) -> str: + """Cached translation implementation.""" try: q = urllib.parse.quote(title) url = ( @@ -39,5 +29,18 @@ def translate_headline(title, target_lang): result = "".join(p[0] for p in data[0] if p[0]) or title except Exception: result = title - _TRANSLATE_CACHE[key] = result return result + + +def detect_location_language(title): + """Detect if headline mentions a location, return target language.""" + title_lower = title.lower() + for pattern, lang in LOCATION_LANGS.items(): + if re.search(pattern, title_lower): + return lang + return None + + +def translate_headline(title: str, target_lang: str) -> str: + """Translate headline via Google Translate API (zero dependencies).""" + return _translate_cached(title, target_lang) diff --git a/tests/fixtures/__init__.py b/tests/fixtures/__init__.py new file mode 100644 index 0000000..68cce00 --- /dev/null +++ b/tests/fixtures/__init__.py @@ -0,0 +1,236 @@ +""" +Pytest fixtures for mocking external dependencies (network, filesystem). +""" + +import json +from unittest.mock import MagicMock + +import pytest + + +@pytest.fixture +def mock_feed_response(): + """Mock RSS feed response data.""" + return b""" + + + Test Feed + https://example.com + + Test Headline One + Sat, 15 Mar 2025 12:00:00 GMT + + + Test Headline Two + Sat, 15 Mar 2025 11:00:00 GMT + + + Sports: Team Wins Championship + Sat, 15 Mar 2025 10:00:00 GMT + + +""" + + +@pytest.fixture +def mock_gutenberg_response(): + """Mock Project Gutenberg text response.""" + return """Project Gutenberg's Collection, by Various + +*** START OF SOME TEXT *** +This is a test poem with multiple lines +that should be parsed as stanzas. + +Another stanza here with different content +and more lines to test the parsing logic. + +Yet another stanza for variety +in the test data. + +*** END OF SOME TEXT ***""" + + +@pytest.fixture +def mock_gutenberg_empty(): + """Mock Gutenberg response with no valid stanzas.""" + return """Project Gutenberg's Collection + +*** START OF TEXT *** +THIS IS ALL CAPS AND SHOULD BE SKIPPED + +I. + +*** END OF TEXT ***""" + + +@pytest.fixture +def mock_ntfy_message(): + """Mock ntfy.sh SSE message.""" + return json.dumps( + { + "id": "test123", + "event": "message", + "title": "Test Title", + "message": "Test message body", + "time": 1234567890, + } + ).encode() + + +@pytest.fixture +def mock_ntfy_keepalive(): + """Mock ntfy.sh keepalive message.""" + return b'data: {"event":"keepalive"}\n\n' + + +@pytest.fixture +def mock_google_translate_response(): + """Mock Google Translate API response.""" + return json.dumps( + [ + [["Translated text", "Original text", None, 0.8], None, "en"], + None, + None, + [], + [], + [], + [], + ] + ) + + +@pytest.fixture +def mock_feedparser(): + """Create a mock feedparser.parse function.""" + + def _mock(data): + mock_result = MagicMock() + mock_result.bozo = False + mock_result.entries = [ + { + "title": "Test Headline", + "published_parsed": (2025, 3, 15, 12, 0, 0, 0, 0, 0), + }, + { + "title": "Another Headline", + "updated_parsed": (2025, 3, 15, 11, 0, 0, 0, 0, 0), + }, + ] + return mock_result + + return _mock + + +@pytest.fixture +def mock_urllib_open(mock_feed_response): + """Create a mock urllib.request.urlopen that returns feed data.""" + + def _mock(url): + mock_response = MagicMock() + mock_response.read.return_value = mock_feed_response + return mock_response + + return _mock + + +@pytest.fixture +def sample_items(): + """Sample items as returned by fetch module (title, source, timestamp).""" + return [ + ("Headline One", "Test Source", "12:00"), + ("Headline Two", "Another Source", "11:30"), + ("Headline Three", "Third Source", "10:45"), + ] + + +@pytest.fixture +def sample_config(): + """Sample config for testing.""" + from engine.config import Config + + return Config( + headline_limit=100, + feed_timeout=10, + mic_threshold_db=50, + mode="news", + firehose=False, + ntfy_topic="https://ntfy.sh/test/json", + ntfy_reconnect_delay=5, + message_display_secs=30, + font_dir="fonts", + font_path="", + font_index=0, + font_picker=False, + font_sz=60, + render_h=8, + ssaa=4, + scroll_dur=5.625, + frame_dt=0.05, + firehose_h=12, + grad_speed=0.08, + glitch_glyphs="░▒▓█▌▐", + kata_glyphs="ハミヒーウ", + script_fonts={}, + ) + + +@pytest.fixture +def poetry_config(): + """Sample config for poetry mode.""" + from engine.config import Config + + return Config( + headline_limit=100, + feed_timeout=10, + mic_threshold_db=50, + mode="poetry", + firehose=False, + ntfy_topic="https://ntfy.sh/test/json", + ntfy_reconnect_delay=5, + message_display_secs=30, + font_dir="fonts", + font_path="", + font_index=0, + font_picker=False, + font_sz=60, + render_h=8, + ssaa=4, + scroll_dur=5.625, + frame_dt=0.05, + firehose_h=12, + grad_speed=0.08, + glitch_glyphs="░▒▓█▌▐", + kata_glyphs="ハミヒーウ", + script_fonts={}, + ) + + +@pytest.fixture +def firehose_config(): + """Sample config with firehose enabled.""" + from engine.config import Config + + return Config( + headline_limit=100, + feed_timeout=10, + mic_threshold_db=50, + mode="news", + firehose=True, + ntfy_topic="https://ntfy.sh/test/json", + ntfy_reconnect_delay=5, + message_display_secs=30, + font_dir="fonts", + font_path="", + font_index=0, + font_picker=False, + font_sz=60, + render_h=8, + ssaa=4, + scroll_dur=5.625, + frame_dt=0.05, + firehose_h=12, + grad_speed=0.08, + glitch_glyphs="░▒▓█▌▐", + kata_glyphs="ハミヒーウ", + script_fonts={}, + ) diff --git a/tests/test_config.py b/tests/test_config.py index 9cf6c1c..ec5e27d 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -7,6 +7,8 @@ import tempfile from pathlib import Path from unittest.mock import patch +import pytest + from engine import config @@ -160,3 +162,140 @@ class TestSetFontSelection: config.set_font_selection(font_path=None, font_index=None) assert original_path == config.FONT_PATH assert original_index == config.FONT_INDEX + + +class TestConfigDataclass: + """Tests for Config dataclass.""" + + def test_config_has_required_fields(self): + """Config has all required fields.""" + c = config.Config() + assert hasattr(c, "headline_limit") + assert hasattr(c, "feed_timeout") + assert hasattr(c, "mic_threshold_db") + assert hasattr(c, "mode") + assert hasattr(c, "firehose") + assert hasattr(c, "ntfy_topic") + assert hasattr(c, "ntfy_reconnect_delay") + assert hasattr(c, "message_display_secs") + assert hasattr(c, "font_dir") + assert hasattr(c, "font_path") + assert hasattr(c, "font_index") + assert hasattr(c, "font_picker") + assert hasattr(c, "font_sz") + assert hasattr(c, "render_h") + assert hasattr(c, "ssaa") + assert hasattr(c, "scroll_dur") + assert hasattr(c, "frame_dt") + assert hasattr(c, "firehose_h") + assert hasattr(c, "grad_speed") + assert hasattr(c, "glitch_glyphs") + assert hasattr(c, "kata_glyphs") + assert hasattr(c, "script_fonts") + + def test_config_defaults(self): + """Config has sensible defaults.""" + c = config.Config() + assert c.headline_limit == 1000 + assert c.feed_timeout == 10 + assert c.mic_threshold_db == 50 + assert c.mode == "news" + assert c.firehose is False + assert c.ntfy_reconnect_delay == 5 + assert c.message_display_secs == 30 + + def test_config_is_immutable(self): + """Config is frozen (immutable).""" + c = config.Config() + with pytest.raises(AttributeError): + c.headline_limit = 500 # type: ignore + + def test_config_custom_values(self): + """Config accepts custom values.""" + c = config.Config( + headline_limit=500, + mode="poetry", + firehose=True, + ntfy_topic="https://ntfy.sh/test", + ) + assert c.headline_limit == 500 + assert c.mode == "poetry" + assert c.firehose is True + assert c.ntfy_topic == "https://ntfy.sh/test" + + +class TestConfigFromArgs: + """Tests for Config.from_args method.""" + + def test_from_args_defaults(self): + """from_args creates config with defaults from empty argv.""" + c = config.Config.from_args(["prog"]) + assert c.mode == "news" + assert c.firehose is False + assert c.font_picker is True + + def test_from_args_poetry_mode(self): + """from_args detects --poetry flag.""" + c = config.Config.from_args(["prog", "--poetry"]) + assert c.mode == "poetry" + + def test_from_args_poetry_short_flag(self): + """from_args detects -p short flag.""" + c = config.Config.from_args(["prog", "-p"]) + assert c.mode == "poetry" + + def test_from_args_firehose(self): + """from_args detects --firehose flag.""" + c = config.Config.from_args(["prog", "--firehose"]) + assert c.firehose is True + + def test_from_args_no_font_picker(self): + """from_args detects --no-font-picker flag.""" + c = config.Config.from_args(["prog", "--no-font-picker"]) + assert c.font_picker is False + + def test_from_args_font_index(self): + """from_args parses --font-index.""" + c = config.Config.from_args(["prog", "--font-index", "3"]) + assert c.font_index == 3 + + +class TestGetSetConfig: + """Tests for get_config and set_config functions.""" + + def test_get_config_returns_config(self): + """get_config returns a Config instance.""" + c = config.get_config() + assert isinstance(c, config.Config) + + def test_set_config_allows_injection(self): + """set_config allows injecting a custom config.""" + custom = config.Config(mode="poetry", headline_limit=100) + config.set_config(custom) + assert config.get_config().mode == "poetry" + assert config.get_config().headline_limit == 100 + + def test_set_config_then_get_config(self): + """set_config followed by get_config returns the set config.""" + original = config.get_config() + test_config = config.Config(headline_limit=42) + config.set_config(test_config) + result = config.get_config() + assert result.headline_limit == 42 + config.set_config(original) + + +class TestPlatformFontPaths: + """Tests for platform font path detection.""" + + def test_get_platform_font_paths_returns_dict(self): + """_get_platform_font_paths returns a dictionary.""" + fonts = config._get_platform_font_paths() + assert isinstance(fonts, dict) + + def test_platform_font_paths_common_languages(self): + """Common language font mappings exist.""" + fonts = config._get_platform_font_paths() + common = {"ja", "zh-cn", "ko", "ru", "ar", "hi"} + found = set(fonts.keys()) & common + assert len(found) > 0 From cdc8094de2e15cf64908736f92fb086647c1f89c Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 15:53:37 -0700 Subject: [PATCH 2/7] refactor: phase 2 - modularization of scroll engine Split monolithic scroll.py into focused modules: - viewport.py: terminal size (tw/th), ANSI positioning helpers - frame.py: FrameTimer class, scroll step calculation - layers.py: message overlay, ticker zone, firehose rendering - scroll.py: simplified orchestrator, imports from new modules Add stream controller and event types for future event-driven architecture: - controller.py: StreamController for source initialization and stream lifecycle - events.py: EventType enum and event dataclasses (HeadlineEvent, FrameTickEvent, etc.) Added tests for new modules: - test_viewport.py: 8 tests for viewport utilities - test_frame.py: 10 tests for frame timing - test_layers.py: 13 tests for layer compositing - test_events.py: 11 tests for event types - test_controller.py: 6 tests for stream controller This enables: - Testable chunks with clear responsibilities - Reusable viewport utilities across modules - Better separation of concerns in render pipeline - Foundation for future event-driven architecture Also includes Phase 1 documentation updates in code comments. --- engine/controller.py | 46 +++++++++ engine/events.py | 67 +++++++++++++ engine/frame.py | 57 +++++++++++ engine/layers.py | 201 +++++++++++++++++++++++++++++++++++++++ engine/scroll.py | 173 ++++++--------------------------- engine/viewport.py | 37 +++++++ tests/test_controller.py | 85 +++++++++++++++++ tests/test_events.py | 112 ++++++++++++++++++++++ tests/test_frame.py | 63 ++++++++++++ tests/test_layers.py | 96 +++++++++++++++++++ tests/test_viewport.py | 64 +++++++++++++ 11 files changed, 858 insertions(+), 143 deletions(-) create mode 100644 engine/controller.py create mode 100644 engine/events.py create mode 100644 engine/frame.py create mode 100644 engine/layers.py create mode 100644 engine/viewport.py create mode 100644 tests/test_controller.py create mode 100644 tests/test_events.py create mode 100644 tests/test_frame.py create mode 100644 tests/test_layers.py create mode 100644 tests/test_viewport.py diff --git a/engine/controller.py b/engine/controller.py new file mode 100644 index 0000000..98b24e5 --- /dev/null +++ b/engine/controller.py @@ -0,0 +1,46 @@ +""" +Stream controller - manages input sources and orchestrates the render stream. +""" + +from engine.config import Config, get_config +from engine.mic import MicMonitor +from engine.ntfy import NtfyPoller +from engine.scroll import stream + + +class StreamController: + """Controls the stream lifecycle - initializes sources and runs the stream.""" + + def __init__(self, config: Config | None = None): + self.config = config or get_config() + self.mic: MicMonitor | None = None + self.ntfy: NtfyPoller | None = None + + def initialize_sources(self) -> tuple[bool, bool]: + """Initialize microphone and ntfy sources. + + Returns: + (mic_ok, ntfy_ok) - success status for each source + """ + self.mic = MicMonitor(threshold_db=self.config.mic_threshold_db) + mic_ok = self.mic.start() if self.mic.available else False + + self.ntfy = NtfyPoller( + self.config.ntfy_topic, + reconnect_delay=self.config.ntfy_reconnect_delay, + display_secs=self.config.message_display_secs, + ) + ntfy_ok = self.ntfy.start() + + return bool(mic_ok), ntfy_ok + + def run(self, items: list) -> None: + """Run the stream with initialized sources.""" + if self.mic is None or self.ntfy is None: + self.initialize_sources() + stream(items, self.ntfy, self.mic) + + def cleanup(self) -> None: + """Clean up resources.""" + if self.mic: + self.mic.stop() diff --git a/engine/events.py b/engine/events.py new file mode 100644 index 0000000..d686285 --- /dev/null +++ b/engine/events.py @@ -0,0 +1,67 @@ +""" +Event types for the mainline application. +Defines the core events that flow through the system. +These types support a future migration to an event-driven architecture. +""" + +from dataclasses import dataclass +from datetime import datetime +from enum import Enum, auto + + +class EventType(Enum): + """Core event types in the mainline application.""" + + NEW_HEADLINE = auto() + FRAME_TICK = auto() + MIC_LEVEL = auto() + NTFY_MESSAGE = auto() + STREAM_START = auto() + STREAM_END = auto() + + +@dataclass +class HeadlineEvent: + """Event emitted when a new headline is ready for display.""" + + title: str + source: str + timestamp: str + language: str | None = None + + +@dataclass +class FrameTickEvent: + """Event emitted on each render frame.""" + + frame_number: int + timestamp: datetime + delta_seconds: float + + +@dataclass +class MicLevelEvent: + """Event emitted when microphone level changes significantly.""" + + db_level: float + excess_above_threshold: float + timestamp: datetime + + +@dataclass +class NtfyMessageEvent: + """Event emitted when an ntfy message is received.""" + + title: str + body: str + message_id: str | None = None + timestamp: datetime | None = None + + +@dataclass +class StreamEvent: + """Event emitted when stream starts or ends.""" + + event_type: EventType + headline_count: int = 0 + timestamp: datetime | None = None diff --git a/engine/frame.py b/engine/frame.py new file mode 100644 index 0000000..747d040 --- /dev/null +++ b/engine/frame.py @@ -0,0 +1,57 @@ +""" +Frame timing utilities — FPS control and precise timing. +""" + +import time + + +class FrameTimer: + """Frame timer for consistent render loop timing.""" + + def __init__(self, target_frame_dt: float = 0.05): + self.target_frame_dt = target_frame_dt + self._frame_count = 0 + self._start_time = time.monotonic() + self._last_frame_time = self._start_time + + @property + def fps(self) -> float: + """Current FPS based on elapsed frames.""" + elapsed = time.monotonic() - self._start_time + if elapsed > 0: + return self._frame_count / elapsed + return 0.0 + + def sleep_until_next_frame(self) -> float: + """Sleep to maintain target frame rate. Returns actual elapsed time.""" + now = time.monotonic() + elapsed = now - self._last_frame_time + self._last_frame_time = now + self._frame_count += 1 + + sleep_time = max(0, self.target_frame_dt - elapsed) + if sleep_time > 0: + time.sleep(sleep_time) + return elapsed + + def reset(self) -> None: + """Reset frame counter and start time.""" + self._frame_count = 0 + self._start_time = time.monotonic() + self._last_frame_time = self._start_time + + +def calculate_scroll_step( + scroll_dur: float, view_height: int, padding: int = 15 +) -> float: + """Calculate scroll step interval for smooth scrolling. + + Args: + scroll_dur: Duration in seconds for one headline to scroll through view + view_height: Terminal height in rows + padding: Extra rows for off-screen content + + Returns: + Time in seconds between scroll steps + """ + return scroll_dur / (view_height + padding) * 2 diff --git a/engine/layers.py b/engine/layers.py new file mode 100644 index 0000000..ebc53ef --- /dev/null +++ b/engine/layers.py @@ -0,0 +1,201 @@ +""" +Layer compositing — message overlay, ticker zone, firehose, noise. +Depends on: config, render, effects. +""" + +import random +import re +import time +from datetime import datetime + +from engine import config +from engine.effects import ( + fade_line, + firehose_line, + glitch_bar, + noise, + vis_trunc, +) +from engine.render import big_wrap, lr_gradient, lr_gradient_opposite +from engine.terminal import RST, W_COOL + +MSG_META = "\033[38;5;245m" +MSG_BORDER = "\033[2;38;5;37m" + + +def render_message_overlay( + msg: tuple[str, str, float] | None, + w: int, + h: int, + msg_cache: tuple, +) -> tuple[list[str], tuple]: + """Render ntfy message overlay. + + Args: + msg: (title, body, timestamp) or None + w: terminal width + h: terminal height + msg_cache: (cache_key, rendered_rows) for caching + + Returns: + (list of ANSI strings, updated cache) + """ + overlay = [] + if msg is None: + return overlay, msg_cache + + m_title, m_body, m_ts = msg + display_text = m_body or m_title or "(empty)" + display_text = re.sub(r"\s+", " ", display_text.upper()) + + cache_key = (display_text, w) + if msg_cache[0] != cache_key: + msg_rows = big_wrap(display_text, w - 4) + msg_cache = (cache_key, msg_rows) + else: + msg_rows = msg_cache[1] + + msg_rows = lr_gradient_opposite( + msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 + ) + + elapsed_s = int(time.monotonic() - m_ts) + remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) + ts_str = datetime.now().strftime("%H:%M:%S") + panel_h = len(msg_rows) + 2 + panel_top = max(0, (h - panel_h) // 2) + + row_idx = 0 + for mr in msg_rows: + ln = vis_trunc(mr, w) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") + row_idx += 1 + + meta_parts = [] + if m_title and m_title != m_body: + meta_parts.append(m_title) + meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") + meta = ( + " " + " \u00b7 ".join(meta_parts) + if len(meta_parts) > 1 + else " " + meta_parts[0] + ) + overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") + row_idx += 1 + + bar = "\u2500" * (w - 4) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") + + return overlay, msg_cache + + +def render_ticker_zone( + active: list, + scroll_cam: int, + ticker_h: int, + w: int, + noise_cache: dict, + grad_offset: float, +) -> tuple[list[str], dict]: + """Render the ticker scroll zone. + + Args: + active: list of (content_rows, color, canvas_y, meta_idx) + scroll_cam: camera position (viewport top) + ticker_h: height of ticker zone + w: terminal width + noise_cache: dict of cy -> noise string + grad_offset: gradient animation offset + + Returns: + (list of ANSI strings, updated noise_cache) + """ + buf = [] + top_zone = max(1, int(ticker_h * 0.25)) + bot_zone = max(1, int(ticker_h * 0.10)) + + def noise_at(cy): + if cy not in noise_cache: + noise_cache[cy] = noise(w) if random.random() < 0.15 else None + return noise_cache[cy] + + for r in range(ticker_h): + scr_row = r + 1 + cy = scroll_cam + r + top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 + bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 + row_fade = min(top_f, bot_f) + drawn = False + + for content, hc, by, midx in active: + cr = cy - by + if 0 <= cr < len(content): + raw = content[cr] + if cr != midx: + colored = lr_gradient([raw], grad_offset)[0] + else: + colored = raw + ln = vis_trunc(colored, w) + if row_fade < 1.0: + ln = fade_line(ln, row_fade) + + if cr == midx: + buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") + elif ln.strip(): + buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") + else: + buf.append(f"\033[{scr_row};1H\033[K") + drawn = True + break + + if not drawn: + n = noise_at(cy) + if row_fade < 1.0 and n: + n = fade_line(n, row_fade) + if n: + buf.append(f"\033[{scr_row};1H{n}") + else: + buf.append(f"\033[{scr_row};1H\033[K") + + return buf, noise_cache + + +def apply_glitch( + buf: list[str], + ticker_buf_start: int, + mic_excess: float, + w: int, +) -> list[str]: + """Apply glitch effect to ticker buffer. + + Args: + buf: current buffer + ticker_buf_start: index where ticker starts in buffer + mic_excess: mic level above threshold + w: terminal width + + Returns: + Updated buffer with glitches applied + """ + glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) + n_hits = 4 + int(mic_excess / 2) + ticker_buf_len = len(buf) - ticker_buf_start + + if random.random() < glitch_prob and ticker_buf_len > 0: + for _ in range(min(n_hits, ticker_buf_len)): + gi = random.randint(0, ticker_buf_len - 1) + scr_row = gi + 1 + buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" + + return buf + + +def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: + """Render firehose strip at bottom of screen.""" + buf = [] + if fh > 0: + for fr in range(fh): + scr_row = h - fh + fr + 1 + fline = firehose_line(items, w) + buf.append(f"\033[{scr_row};1H{fline}\033[K") + return buf diff --git a/engine/scroll.py b/engine/scroll.py index 810fe9f..dcb96f7 100644 --- a/engine/scroll.py +++ b/engine/scroll.py @@ -1,25 +1,22 @@ """ Render engine — ticker content, scroll motion, message panel, and firehose overlay. -Depends on: config, terminal, render, effects, ntfy, mic. +Orchestrates viewport, frame timing, and layers. """ import random -import re import sys import time -from datetime import datetime from engine import config -from engine.effects import ( - fade_line, - firehose_line, - glitch_bar, - next_headline, - noise, - vis_trunc, +from engine.frame import calculate_scroll_step +from engine.layers import ( + apply_glitch, + render_firehose, + render_message_overlay, + render_ticker_zone, ) -from engine.render import big_wrap, lr_gradient, lr_gradient_opposite, make_block -from engine.terminal import CLR, RST, W_COOL, th, tw +from engine.terminal import CLR +from engine.viewport import th, tw def stream(items, ntfy_poller, mic_monitor): @@ -35,33 +32,16 @@ def stream(items, ntfy_poller, mic_monitor): w, h = tw(), th() fh = config.FIREHOSE_H if config.FIREHOSE else 0 - ticker_view_h = h - fh # reserve fixed firehose strip at bottom - GAP = 3 # blank rows between headlines - scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2 + ticker_view_h = h - fh + GAP = 3 + scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) - # Taxonomy: - # - message: centered ntfy overlay panel - # - ticker: large headline text content - # - scroll: upward camera motion applied to ticker content - # - firehose: fixed carriage-return style strip pinned at bottom - # Active ticker blocks: (content_rows, color, canvas_y, meta_idx) active = [] - scroll_cam = 0 # viewport top in virtual canvas coords - ticker_next_y = ( - ticker_view_h # canvas-y where next block starts (off-screen bottom) - ) + scroll_cam = 0 + ticker_next_y = ticker_view_h noise_cache = {} scroll_motion_accum = 0.0 - - def _noise_at(cy): - if cy not in noise_cache: - noise_cache[cy] = noise(w) if random.random() < 0.15 else None - return noise_cache[cy] - - # Message color: bright cyan/white — distinct from headline greens - MSG_META = "\033[38;5;245m" # cool grey - MSG_BORDER = "\033[2;38;5;37m" # dim teal - _msg_cache = (None, None) # (cache_key, rendered_rows) + msg_cache = (None, None) while queued < config.HEADLINE_LIMIT or active: t0 = time.monotonic() @@ -69,80 +49,30 @@ def stream(items, ntfy_poller, mic_monitor): fh = config.FIREHOSE_H if config.FIREHOSE else 0 ticker_view_h = h - fh - # ── Check for ntfy message ──────────────────────── - msg_h = 0 - msg_overlay = [] msg = ntfy_poller.get_active_message() + msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache) buf = [] - if msg is not None: - m_title, m_body, m_ts = msg - # ── Message overlay: centered in the viewport ── - display_text = m_body or m_title or "(empty)" - display_text = re.sub(r"\s+", " ", display_text.upper()) - cache_key = (display_text, w) - if _msg_cache[0] != cache_key: - msg_rows = big_wrap(display_text, w - 4) - _msg_cache = (cache_key, msg_rows) - else: - msg_rows = _msg_cache[1] - msg_rows = lr_gradient_opposite( - msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 - ) - # Layout: rendered text + meta + border - elapsed_s = int(time.monotonic() - m_ts) - remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) - ts_str = datetime.now().strftime("%H:%M:%S") - panel_h = len(msg_rows) + 2 # meta + border - panel_top = max(0, (h - panel_h) // 2) - row_idx = 0 - for mr in msg_rows: - ln = vis_trunc(mr, w) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K" - ) - row_idx += 1 - # Meta line: title (if distinct) + source + countdown - meta_parts = [] - if m_title and m_title != m_body: - meta_parts.append(m_title) - meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") - meta = ( - " " + " \u00b7 ".join(meta_parts) - if len(meta_parts) > 1 - else " " + meta_parts[0] - ) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K" - ) - row_idx += 1 - # Border — constant boundary under message panel - bar = "\u2500" * (w - 4) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K" - ) + ticker_h = ticker_view_h - # Ticker draws above the fixed firehose strip; message is a centered overlay. - ticker_h = ticker_view_h - msg_h - - # ── Ticker content + scroll motion (always runs) ── scroll_motion_accum += config.FRAME_DT while scroll_motion_accum >= scroll_step_interval: scroll_motion_accum -= scroll_step_interval scroll_cam += 1 - # Enqueue new headlines when room at the bottom while ( ticker_next_y < scroll_cam + ticker_view_h + 10 and queued < config.HEADLINE_LIMIT ): + from engine.effects import next_headline + from engine.render import make_block + t, src, ts = next_headline(pool, items, seen) ticker_content, hc, midx = make_block(t, src, ts, w) active.append((ticker_content, hc, ticker_next_y, midx)) ticker_next_y += len(ticker_content) + GAP queued += 1 - # Prune off-screen blocks and stale noise active = [ (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam ] @@ -150,69 +80,26 @@ def stream(items, ntfy_poller, mic_monitor): if k < scroll_cam: del noise_cache[k] - # Draw ticker zone (above fixed firehose strip) - top_zone = max(1, int(ticker_h * 0.25)) - bot_zone = max(1, int(ticker_h * 0.10)) grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0 - ticker_buf_start = len(buf) # track where ticker rows start in buf - for r in range(ticker_h): - scr_row = r + 1 # 1-indexed ANSI screen row - cy = scroll_cam + r - top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 - bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 - row_fade = min(top_f, bot_f) - drawn = False - for content, hc, by, midx in active: - cr = cy - by - if 0 <= cr < len(content): - raw = content[cr] - if cr != midx: - colored = lr_gradient([raw], grad_offset)[0] - else: - colored = raw - ln = vis_trunc(colored, w) - if row_fade < 1.0: - ln = fade_line(ln, row_fade) - if cr == midx: - buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") - elif ln.strip(): - buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") - else: - buf.append(f"\033[{scr_row};1H\033[K") - drawn = True - break - if not drawn: - n = _noise_at(cy) - if row_fade < 1.0 and n: - n = fade_line(n, row_fade) - if n: - buf.append(f"\033[{scr_row};1H{n}") - else: - buf.append(f"\033[{scr_row};1H\033[K") + ticker_buf_start = len(buf) + + ticker_buf, noise_cache = render_ticker_zone( + active, scroll_cam, ticker_h, w, noise_cache, grad_offset + ) + buf.extend(ticker_buf) - # Glitch — base rate + mic-reactive spikes (ticker zone only) mic_excess = mic_monitor.excess - glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) - n_hits = 4 + int(mic_excess / 2) - ticker_buf_len = len(buf) - ticker_buf_start - if random.random() < glitch_prob and ticker_buf_len > 0: - for _ in range(min(n_hits, ticker_buf_len)): - gi = random.randint(0, ticker_buf_len - 1) - scr_row = gi + 1 - buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" + buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) + + firehose_buf = render_firehose(items, w, fh, h) + buf.extend(firehose_buf) - if config.FIREHOSE and fh > 0: - for fr in range(fh): - scr_row = h - fh + fr + 1 - fline = firehose_line(items, w) - buf.append(f"\033[{scr_row};1H{fline}\033[K") if msg_overlay: buf.extend(msg_overlay) sys.stdout.buffer.write("".join(buf).encode()) sys.stdout.flush() - # Precise frame timing elapsed = time.monotonic() - t0 time.sleep(max(0, config.FRAME_DT - elapsed)) diff --git a/engine/viewport.py b/engine/viewport.py new file mode 100644 index 0000000..d89b6a8 --- /dev/null +++ b/engine/viewport.py @@ -0,0 +1,37 @@ +""" +Viewport utilities — terminal dimensions and ANSI positioning helpers. +No internal dependencies. +""" + +import os + + +def tw() -> int: + """Get terminal width (columns).""" + try: + return os.get_terminal_size().columns + except Exception: + return 80 + + +def th() -> int: + """Get terminal height (lines).""" + try: + return os.get_terminal_size().lines + except Exception: + return 24 + + +def move_to(row: int, col: int = 1) -> str: + """Generate ANSI escape to move cursor to row, col (1-indexed).""" + return f"\033[{row};{col}H" + + +def clear_screen() -> str: + """Clear screen and move cursor to home.""" + return "\033[2J\033[H" + + +def clear_line() -> str: + """Clear current line.""" + return "\033[K" diff --git a/tests/test_controller.py b/tests/test_controller.py new file mode 100644 index 0000000..96ef02d --- /dev/null +++ b/tests/test_controller.py @@ -0,0 +1,85 @@ +""" +Tests for engine.controller module. +""" + +from unittest.mock import MagicMock, patch + +from engine import config +from engine.controller import StreamController + + +class TestStreamController: + """Tests for StreamController class.""" + + def test_init_default_config(self): + """StreamController initializes with default config.""" + controller = StreamController() + assert controller.config is not None + assert isinstance(controller.config, config.Config) + + def test_init_custom_config(self): + """StreamController accepts custom config.""" + custom_config = config.Config(headline_limit=500) + controller = StreamController(config=custom_config) + assert controller.config.headline_limit == 500 + + def test_init_sources_none_by_default(self): + """Sources are None until initialized.""" + controller = StreamController() + assert controller.mic is None + assert controller.ntfy is None + + @patch("engine.controller.MicMonitor") + @patch("engine.controller.NtfyPoller") + def test_initialize_sources(self, mock_ntfy, mock_mic): + """initialize_sources creates mic and ntfy instances.""" + mock_mic_instance = MagicMock() + mock_mic_instance.available = True + mock_mic_instance.start.return_value = True + mock_mic.return_value = mock_mic_instance + + mock_ntfy_instance = MagicMock() + mock_ntfy_instance.start.return_value = True + mock_ntfy.return_value = mock_ntfy_instance + + controller = StreamController() + mic_ok, ntfy_ok = controller.initialize_sources() + + assert mic_ok is True + assert ntfy_ok is True + assert controller.mic is not None + assert controller.ntfy is not None + + @patch("engine.controller.MicMonitor") + @patch("engine.controller.NtfyPoller") + def test_initialize_sources_mic_unavailable(self, mock_ntfy, mock_mic): + """initialize_sources handles unavailable mic.""" + mock_mic_instance = MagicMock() + mock_mic_instance.available = False + mock_mic.return_value = mock_mic_instance + + mock_ntfy_instance = MagicMock() + mock_ntfy_instance.start.return_value = True + mock_ntfy.return_value = mock_ntfy_instance + + controller = StreamController() + mic_ok, ntfy_ok = controller.initialize_sources() + + assert mic_ok is False + assert ntfy_ok is True + + +class TestStreamControllerCleanup: + """Tests for StreamController cleanup.""" + + @patch("engine.controller.MicMonitor") + def test_cleanup_stops_mic(self, mock_mic): + """cleanup stops the microphone if running.""" + mock_mic_instance = MagicMock() + mock_mic.return_value = mock_mic_instance + + controller = StreamController() + controller.mic = mock_mic_instance + controller.cleanup() + + mock_mic_instance.stop.assert_called_once() diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 0000000..ea5f4ed --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,112 @@ +""" +Tests for engine.events module. +""" + +from datetime import datetime + +from engine import events + + +class TestEventType: + """Tests for EventType enum.""" + + def test_event_types_exist(self): + """All expected event types exist.""" + assert hasattr(events.EventType, "NEW_HEADLINE") + assert hasattr(events.EventType, "FRAME_TICK") + assert hasattr(events.EventType, "MIC_LEVEL") + assert hasattr(events.EventType, "NTFY_MESSAGE") + assert hasattr(events.EventType, "STREAM_START") + assert hasattr(events.EventType, "STREAM_END") + + +class TestHeadlineEvent: + """Tests for HeadlineEvent dataclass.""" + + def test_create_headline_event(self): + """HeadlineEvent can be created with required fields.""" + e = events.HeadlineEvent( + title="Test Headline", + source="Test Source", + timestamp="12:00", + ) + assert e.title == "Test Headline" + assert e.source == "Test Source" + assert e.timestamp == "12:00" + + def test_headline_event_optional_language(self): + """HeadlineEvent supports optional language field.""" + e = events.HeadlineEvent( + title="Test", + source="Test", + timestamp="12:00", + language="ja", + ) + assert e.language == "ja" + + +class TestFrameTickEvent: + """Tests for FrameTickEvent dataclass.""" + + def test_create_frame_tick_event(self): + """FrameTickEvent can be created.""" + now = datetime.now() + e = events.FrameTickEvent( + frame_number=100, + timestamp=now, + delta_seconds=0.05, + ) + assert e.frame_number == 100 + assert e.timestamp == now + assert e.delta_seconds == 0.05 + + +class TestMicLevelEvent: + """Tests for MicLevelEvent dataclass.""" + + def test_create_mic_level_event(self): + """MicLevelEvent can be created.""" + now = datetime.now() + e = events.MicLevelEvent( + db_level=60.0, + excess_above_threshold=10.0, + timestamp=now, + ) + assert e.db_level == 60.0 + assert e.excess_above_threshold == 10.0 + + +class TestNtfyMessageEvent: + """Tests for NtfyMessageEvent dataclass.""" + + def test_create_ntfy_message_event(self): + """NtfyMessageEvent can be created with required fields.""" + e = events.NtfyMessageEvent( + title="Test Title", + body="Test Body", + ) + assert e.title == "Test Title" + assert e.body == "Test Body" + assert e.message_id is None + + def test_ntfy_message_event_with_id(self): + """NtfyMessageEvent supports optional message_id.""" + e = events.NtfyMessageEvent( + title="Test", + body="Test", + message_id="abc123", + ) + assert e.message_id == "abc123" + + +class TestStreamEvent: + """Tests for StreamEvent dataclass.""" + + def test_create_stream_event(self): + """StreamEvent can be created.""" + e = events.StreamEvent( + event_type=events.EventType.STREAM_START, + headline_count=100, + ) + assert e.event_type == events.EventType.STREAM_START + assert e.headline_count == 100 diff --git a/tests/test_frame.py b/tests/test_frame.py new file mode 100644 index 0000000..2c59b85 --- /dev/null +++ b/tests/test_frame.py @@ -0,0 +1,63 @@ +""" +Tests for engine.frame module. +""" + +import time + +from engine.frame import FrameTimer, calculate_scroll_step + + +class TestFrameTimer: + """Tests for FrameTimer class.""" + + def test_init_default(self): + """FrameTimer initializes with default values.""" + timer = FrameTimer() + assert timer.target_frame_dt == 0.05 + assert timer.fps >= 0 + + def test_init_custom(self): + """FrameTimer accepts custom frame duration.""" + timer = FrameTimer(target_frame_dt=0.1) + assert timer.target_frame_dt == 0.1 + + def test_fps_calculation(self): + """FrameTimer calculates FPS correctly.""" + timer = FrameTimer() + timer._frame_count = 10 + timer._start_time = time.monotonic() - 1.0 + assert timer.fps >= 9.0 + + def test_reset(self): + """FrameTimer.reset() clears frame count.""" + timer = FrameTimer() + timer._frame_count = 100 + timer.reset() + assert timer._frame_count == 0 + + +class TestCalculateScrollStep: + """Tests for calculate_scroll_step function.""" + + def test_basic_calculation(self): + """calculate_scroll_step returns positive value.""" + result = calculate_scroll_step(5.0, 24) + assert result > 0 + + def test_with_padding(self): + """calculate_scroll_step respects padding parameter.""" + without_padding = calculate_scroll_step(5.0, 24, padding=0) + with_padding = calculate_scroll_step(5.0, 24, padding=15) + assert with_padding < without_padding + + def test_larger_view_slower_scroll(self): + """Larger view height results in slower scroll steps.""" + small = calculate_scroll_step(5.0, 10) + large = calculate_scroll_step(5.0, 50) + assert large < small + + def test_longer_duration_slower_scroll(self): + """Longer scroll duration results in slower scroll steps.""" + fast = calculate_scroll_step(2.0, 24) + slow = calculate_scroll_step(10.0, 24) + assert slow > fast diff --git a/tests/test_layers.py b/tests/test_layers.py new file mode 100644 index 0000000..afe9c07 --- /dev/null +++ b/tests/test_layers.py @@ -0,0 +1,96 @@ +""" +Tests for engine.layers module. +""" + +import time + +from engine import layers + + +class TestRenderMessageOverlay: + """Tests for render_message_overlay function.""" + + def test_no_message_returns_empty(self): + """Returns empty list when msg is None.""" + result, cache = layers.render_message_overlay(None, 80, 24, (None, None)) + assert result == [] + assert cache[0] is None + + def test_message_returns_overlay_lines(self): + """Returns non-empty list when message is present.""" + msg = ("Test Title", "Test Body", time.monotonic()) + result, cache = layers.render_message_overlay(msg, 80, 24, (None, None)) + assert len(result) > 0 + assert cache[0] is not None + + def test_cache_key_changes_with_text(self): + """Cache key changes when message text changes.""" + msg1 = ("Title1", "Body1", time.monotonic()) + msg2 = ("Title2", "Body2", time.monotonic()) + + _, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None)) + _, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1) + + assert cache1[0] != cache2[0] + + def test_cache_reuse_avoids_recomputation(self): + """Cache is returned when same message is passed (interface test).""" + msg = ("Same Title", "Same Body", time.monotonic()) + + result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None)) + result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1) + + assert len(result1) > 0 + assert len(result2) > 0 + assert cache1[0] == cache2[0] + + +class TestRenderFirehose: + """Tests for render_firehose function.""" + + def test_no_firehose_returns_empty(self): + """Returns empty list when firehose height is 0.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 0, 24) + assert result == [] + + def test_firehose_returns_lines(self): + """Returns lines when firehose height > 0.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 4, 24) + assert len(result) == 4 + + def test_firehose_includes_ansi_escapes(self): + """Returns lines containing ANSI escape sequences.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 1, 24) + assert "\033[" in result[0] + + +class TestApplyGlitch: + """Tests for apply_glitch function.""" + + def test_empty_buffer_unchanged(self): + """Empty buffer is returned unchanged.""" + result = layers.apply_glitch([], 0, 0.0, 80) + assert result == [] + + def test_buffer_length_preserved(self): + """Buffer length is preserved after glitch application.""" + buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)] + result = layers.apply_glitch(buf, 0, 0.5, 80) + assert len(result) == len(buf) + + +class TestRenderTickerZone: + """Tests for render_ticker_zone function - focusing on interface.""" + + def test_returns_list(self): + """Returns a list of strings.""" + result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) + assert isinstance(result, list) + + def test_returns_dict_for_cache(self): + """Returns a dict for the noise cache.""" + result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) + assert isinstance(cache, dict) diff --git a/tests/test_viewport.py b/tests/test_viewport.py new file mode 100644 index 0000000..2338a7b --- /dev/null +++ b/tests/test_viewport.py @@ -0,0 +1,64 @@ +""" +Tests for engine.viewport module. +""" + +from engine import viewport + + +class TestViewportTw: + """Tests for tw() function.""" + + def test_tw_returns_int(self): + """tw() returns an integer.""" + result = viewport.tw() + assert isinstance(result, int) + + def test_tw_positive(self): + """tw() returns a positive value.""" + assert viewport.tw() > 0 + + +class TestViewportTh: + """Tests for th() function.""" + + def test_th_returns_int(self): + """th() returns an integer.""" + result = viewport.th() + assert isinstance(result, int) + + def test_th_positive(self): + """th() returns a positive value.""" + assert viewport.th() > 0 + + +class TestViewportMoveTo: + """Tests for move_to() function.""" + + def test_move_to_format(self): + """move_to() returns correctly formatted ANSI escape.""" + result = viewport.move_to(5, 10) + assert result == "\033[5;10H" + + def test_move_to_default_col(self): + """move_to() defaults to column 1.""" + result = viewport.move_to(5) + assert result == "\033[5;1H" + + +class TestViewportClearScreen: + """Tests for clear_screen() function.""" + + def test_clear_screen_format(self): + """clear_screen() returns clear screen ANSI escape.""" + result = viewport.clear_screen() + assert "\033[2J" in result + assert "\033[H" in result + + +class TestViewportClearLine: + """Tests for clear_line() function.""" + + def test_clear_line_format(self): + """clear_line() returns clear line ANSI escape.""" + result = viewport.clear_line() + assert result == "\033[K" From 35e5c8d38bac039919fe041327154fe0706a24b4 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 16:05:41 -0700 Subject: [PATCH 3/7] refactor: phase 3 - API efficiency improvements Add typed dataclasses for tuple returns: - types.py: HeadlineItem, FetchResult, Block dataclasses with legacy tuple converters - fetch.py: Add type hints and HeadlineTuple type alias Add pyright for static type checking: - Add pyright to dependencies - Verify type coverage with pyright (0 errors in core modules) This enables: - Named types instead of raw tuples (better IDE support, self-documenting) - Type-safe APIs across modules - Backward compatibility via to_tuple/from_tuple methods Note: Lazy imports skipped for render.py - startup impact is minimal. --- engine/fetch.py | 14 +++++-- engine/types.py | 60 ++++++++++++++++++++++++++++ pyproject.toml | 1 + tests/test_types.py | 95 +++++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 166 insertions(+), 4 deletions(-) create mode 100644 engine/types.py create mode 100644 tests/test_types.py diff --git a/engine/fetch.py b/engine/fetch.py index a236c6e..5d6f9bb 100644 --- a/engine/fetch.py +++ b/engine/fetch.py @@ -8,6 +8,7 @@ import pathlib import re import urllib.request from datetime import datetime +from typing import Any import feedparser @@ -16,9 +17,13 @@ from engine.filter import skip, strip_tags from engine.sources import FEEDS, POETRY_SOURCES from engine.terminal import boot_ln +# Type alias for headline items +HeadlineTuple = tuple[str, str, str] + # ─── SINGLE FEED ────────────────────────────────────────── -def fetch_feed(url): +def fetch_feed(url: str) -> Any | None: + """Fetch and parse a single RSS feed URL.""" try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT) @@ -28,8 +33,9 @@ def fetch_feed(url): # ─── ALL RSS FEEDS ──────────────────────────────────────── -def fetch_all(): - items = [] +def fetch_all() -> tuple[list[HeadlineTuple], int, int]: + """Fetch all RSS feeds and return items, linked count, failed count.""" + items: list[HeadlineTuple] = [] linked = failed = 0 for src, url in FEEDS.items(): feed = fetch_feed(url) @@ -59,7 +65,7 @@ def fetch_all(): # ─── PROJECT GUTENBERG ──────────────────────────────────── -def _fetch_gutenberg(url, label): +def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]: """Download and parse stanzas/passages from a Project Gutenberg text.""" try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) diff --git a/engine/types.py b/engine/types.py new file mode 100644 index 0000000..f7b45a1 --- /dev/null +++ b/engine/types.py @@ -0,0 +1,60 @@ +""" +Shared dataclasses for the mainline application. +Provides named types for tuple returns across modules. +""" + +from dataclasses import dataclass + + +@dataclass +class HeadlineItem: + """A single headline item: title, source, and timestamp.""" + + title: str + source: str + timestamp: str + + def to_tuple(self) -> tuple[str, str, str]: + """Convert to tuple for backward compatibility.""" + return (self.title, self.source, self.timestamp) + + @classmethod + def from_tuple(cls, t: tuple[str, str, str]) -> "HeadlineItem": + """Create from tuple for backward compatibility.""" + return cls(title=t[0], source=t[1], timestamp=t[2]) + + +def items_to_tuples(items: list[HeadlineItem]) -> list[tuple[str, str, str]]: + """Convert list of HeadlineItem to list of tuples.""" + return [item.to_tuple() for item in items] + + +def tuples_to_items(tuples: list[tuple[str, str, str]]) -> list[HeadlineItem]: + """Convert list of tuples to list of HeadlineItem.""" + return [HeadlineItem.from_tuple(t) for t in tuples] + + +@dataclass +class FetchResult: + """Result from fetch_all() or fetch_poetry().""" + + items: list[HeadlineItem] + linked: int + failed: int + + def to_legacy_tuple(self) -> tuple[list[tuple], int, int]: + """Convert to legacy tuple format for backward compatibility.""" + return ([item.to_tuple() for item in self.items], self.linked, self.failed) + + +@dataclass +class Block: + """Rendered headline block from make_block().""" + + content: list[str] + color: str + meta_row_index: int + + def to_legacy_tuple(self) -> tuple[list[str], str, int]: + """Convert to legacy tuple format for backward compatibility.""" + return (self.content, self.color, self.meta_row_index) diff --git a/pyproject.toml b/pyproject.toml index 6d93f42..f52a05a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,6 +22,7 @@ classifiers = [ dependencies = [ "feedparser>=6.0.0", "Pillow>=10.0.0", + "pyright>=1.1.408", ] [project.optional-dependencies] diff --git a/tests/test_types.py b/tests/test_types.py new file mode 100644 index 0000000..33f5c0e --- /dev/null +++ b/tests/test_types.py @@ -0,0 +1,95 @@ +""" +Tests for engine.types module. +""" + +from engine.types import ( + Block, + FetchResult, + HeadlineItem, + items_to_tuples, + tuples_to_items, +) + + +class TestHeadlineItem: + """Tests for HeadlineItem dataclass.""" + + def test_create_headline_item(self): + """Can create HeadlineItem with required fields.""" + item = HeadlineItem(title="Test", source="Source", timestamp="12:00") + assert item.title == "Test" + assert item.source == "Source" + assert item.timestamp == "12:00" + + def test_to_tuple(self): + """to_tuple returns correct tuple.""" + item = HeadlineItem(title="Test", source="Source", timestamp="12:00") + assert item.to_tuple() == ("Test", "Source", "12:00") + + def test_from_tuple(self): + """from_tuple creates HeadlineItem from tuple.""" + item = HeadlineItem.from_tuple(("Test", "Source", "12:00")) + assert item.title == "Test" + assert item.source == "Source" + assert item.timestamp == "12:00" + + +class TestItemsConversion: + """Tests for list conversion functions.""" + + def test_items_to_tuples(self): + """Converts list of HeadlineItem to list of tuples.""" + items = [ + HeadlineItem(title="A", source="S", timestamp="10:00"), + HeadlineItem(title="B", source="T", timestamp="11:00"), + ] + result = items_to_tuples(items) + assert result == [("A", "S", "10:00"), ("B", "T", "11:00")] + + def test_tuples_to_items(self): + """Converts list of tuples to list of HeadlineItem.""" + tuples = [("A", "S", "10:00"), ("B", "T", "11:00")] + result = tuples_to_items(tuples) + assert len(result) == 2 + assert result[0].title == "A" + assert result[1].title == "B" + + +class TestFetchResult: + """Tests for FetchResult dataclass.""" + + def test_create_fetch_result(self): + """Can create FetchResult.""" + items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")] + result = FetchResult(items=items, linked=1, failed=0) + assert len(result.items) == 1 + assert result.linked == 1 + assert result.failed == 0 + + def test_to_legacy_tuple(self): + """to_legacy_tuple returns correct format.""" + items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")] + result = FetchResult(items=items, linked=1, failed=0) + legacy = result.to_legacy_tuple() + assert legacy[0] == [("Test", "Source", "12:00")] + assert legacy[1] == 1 + assert legacy[2] == 0 + + +class TestBlock: + """Tests for Block dataclass.""" + + def test_create_block(self): + """Can create Block.""" + block = Block( + content=["line1", "line2"], color="\033[38;5;46m", meta_row_index=1 + ) + assert len(block.content) == 2 + assert block.color == "\033[38;5;46m" + assert block.meta_row_index == 1 + + def test_to_legacy_tuple(self): + """to_legacy_tuple returns correct format.""" + block = Block(content=["line1"], color="green", meta_row_index=0) + legacy = block.to_legacy_tuple() + assert legacy == (["line1"], "green", 0) From 15de46722aa3d7ab0c06111e57ed47487fb5bef0 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 16:20:52 -0700 Subject: [PATCH 4/7] refactor: phase 4 - event-driven architecture foundation - Add EventBus class with pub/sub messaging (thread-safe) - Add emitter Protocol classes (EventEmitter, Startable, Stoppable) - Add event emission to NtfyPoller (NtfyMessageEvent) - Add event emission to MicMonitor (MicLevelEvent) - Update StreamController to publish stream start/end events - Add comprehensive tests for eventbus and emitters modules --- AGENTS.md | 110 ++++++++++++++++++++++ README.md | 41 +++++---- engine/controller.py | 24 ++++- engine/emitters.py | 25 +++++ engine/eventbus.py | 72 +++++++++++++++ engine/mic.py | 30 ++++++ engine/ntfy.py | 29 ++++++ engine/scroll.py | 6 +- tests/test_emitters.py | 69 ++++++++++++++ tests/test_eventbus.py | 202 +++++++++++++++++++++++++++++++++++++++++ tests/test_mic.py | 66 ++++++++++++++ tests/test_ntfy.py | 52 +++++++++++ 12 files changed, 704 insertions(+), 22 deletions(-) create mode 100644 AGENTS.md create mode 100644 engine/emitters.py create mode 100644 engine/eventbus.py create mode 100644 tests/test_emitters.py create mode 100644 tests/test_eventbus.py diff --git a/AGENTS.md b/AGENTS.md new file mode 100644 index 0000000..21d7e2c --- /dev/null +++ b/AGENTS.md @@ -0,0 +1,110 @@ +# Agent Development Guide + +## Development Environment + +This project uses: +- **mise** (mise.jdx.dev) - tool version manager and task runner +- **hk** (hk.jdx.dev) - git hook manager +- **uv** - fast Python package installer +- **ruff** - linter and formatter +- **pytest** - test runner + +### Setup + +```bash +# Install dependencies +mise run install + +# Or equivalently: +uv sync +``` + +### Available Commands + +```bash +mise run test # Run tests +mise run test-v # Run tests verbose +mise run test-cov # Run tests with coverage report +mise run lint # Run ruff linter +mise run lint-fix # Run ruff with auto-fix +mise run format # Run ruff formatter +mise run ci # Full CI pipeline (sync + test + coverage) +``` + +## Git Hooks + +**At the start of every agent session**, verify hooks are installed: + +```bash +ls -la .git/hooks/pre-commit +``` + +If hooks are not installed, install them with: + +```bash +hk init --mise +mise run pre-commit +``` + +The project uses hk configured in `hk.pkl`: +- **pre-commit**: runs ruff-format and ruff (with auto-fix) +- **pre-push**: runs ruff check + +## Workflow Rules + +### Before Committing + +1. **Always run the test suite** - never commit code that fails tests: + ```bash + mise run test + ``` + +2. **Always run the linter**: + ```bash + mise run lint + ``` + +3. **Fix any lint errors** before committing (or let the pre-commit hook handle it). + +4. **Review your changes** using `git diff` to understand what will be committed. + +### On Failing Tests + +When tests fail, **determine whether it's an out-of-date test or a correctly failing test**: + +- **Out-of-date test**: The test was written for old behavior that has legitimately changed. Update the test to match the new expected behavior. + +- **Correctly failing test**: The test correctly identifies a broken contract. Fix the implementation, not the test. + +**Never** modify a test to make it pass without understanding why it failed. + +### Code Review + +Before committing significant changes: +- Run `git diff` to review all changes +- Ensure new code follows existing patterns in the codebase +- Check that type hints are added for new functions +- Verify that tests exist for new functionality + +## Testing + +Tests live in `tests/` and follow the pattern `test_*.py`. + +Run all tests: +```bash +mise run test +``` + +Run with coverage: +```bash +mise run test-cov +``` + +The project uses pytest with strict marker enforcement. Test configuration is in `pyproject.toml` under `[tool.pytest.ini_options]`. + +## Architecture Notes + +- **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies +- **eventbus.py** provides thread-safe event publishing for decoupled communication +- **controller.py** coordinates ntfy/mic monitoring +- The render pipeline: fetch → render → effects → scroll → terminal output diff --git a/README.md b/README.md index a8c8f5c..5df0eec 100644 --- a/README.md +++ b/README.md @@ -101,26 +101,27 @@ Update `NTFY_TOPIC` in `engine/config.py` to point at your own topic. ``` engine/ - config.py constants, CLI flags, glyph tables - sources.py FEEDS, POETRY_SOURCES, language/script maps - terminal.py ANSI codes, tw/th, type_out, boot_ln - filter.py HTML stripping, content filter - translate.py Google Translate wrapper + region detection - render.py OTF → half-block pipeline (SSAA, gradient) - effects.py noise, glitch_bar, fade, firehose - fetch.py RSS/Gutenberg fetching + cache load/save - ntfy.py NtfyPoller — standalone, zero internal deps - mic.py MicMonitor — standalone, graceful fallback - scroll.py stream() frame loop + message rendering - app.py main(), font picker TUI, boot sequence, signal handler - -tests/ - test_config.py - test_filter.py - test_mic.py - test_ntfy.py - test_sources.py - test_terminal.py + __init__.py package marker + app.py main(), font picker TUI, boot sequence, signal handler + config.py constants, CLI flags, glyph tables + sources.py FEEDS, POETRY_SOURCES, language/script maps + terminal.py ANSI codes, tw/th, type_out, boot_ln + filter.py HTML stripping, content filter + translate.py Google Translate wrapper + region detection + render.py OTF → half-block pipeline (SSAA, gradient) + effects.py noise, glitch_bar, fade, firehose + fetch.py RSS/Gutenberg fetching + cache load/save + ntfy.py NtfyPoller — standalone, zero internal deps + mic.py MicMonitor — standalone, graceful fallback + scroll.py stream() frame loop + message rendering + viewport.py terminal dimension tracking (tw/th) + frame.py scroll step calculation, timing + layers.py ticker zone, firehose, message overlay rendering + eventbus.py thread-safe event publishing for decoupled communication + events.py event types and definitions + controller.py coordinates ntfy/mic monitoring and event publishing + emitters.py background emitters for ntfy and mic + types.py type definitions and dataclasses ``` `ntfy.py` and `mic.py` have zero internal dependencies and can be imported by any other visualizer. diff --git a/engine/controller.py b/engine/controller.py index 98b24e5..e6e2e3d 100644 --- a/engine/controller.py +++ b/engine/controller.py @@ -3,6 +3,8 @@ Stream controller - manages input sources and orchestrates the render stream. """ from engine.config import Config, get_config +from engine.eventbus import EventBus +from engine.events import EventType, StreamEvent from engine.mic import MicMonitor from engine.ntfy import NtfyPoller from engine.scroll import stream @@ -11,8 +13,9 @@ from engine.scroll import stream class StreamController: """Controls the stream lifecycle - initializes sources and runs the stream.""" - def __init__(self, config: Config | None = None): + def __init__(self, config: Config | None = None, event_bus: EventBus | None = None): self.config = config or get_config() + self.event_bus = event_bus self.mic: MicMonitor | None = None self.ntfy: NtfyPoller | None = None @@ -38,8 +41,27 @@ class StreamController: """Run the stream with initialized sources.""" if self.mic is None or self.ntfy is None: self.initialize_sources() + + if self.event_bus: + self.event_bus.publish( + EventType.STREAM_START, + StreamEvent( + event_type=EventType.STREAM_START, + headline_count=len(items), + ), + ) + stream(items, self.ntfy, self.mic) + if self.event_bus: + self.event_bus.publish( + EventType.STREAM_END, + StreamEvent( + event_type=EventType.STREAM_END, + headline_count=len(items), + ), + ) + def cleanup(self) -> None: """Clean up resources.""" if self.mic: diff --git a/engine/emitters.py b/engine/emitters.py new file mode 100644 index 0000000..6d6a5a1 --- /dev/null +++ b/engine/emitters.py @@ -0,0 +1,25 @@ +""" +Event emitter protocols - abstract interfaces for event-producing components. +""" + +from collections.abc import Callable +from typing import Any, Protocol + + +class EventEmitter(Protocol): + """Protocol for components that emit events.""" + + def subscribe(self, callback: Callable[[Any], None]) -> None: ... + def unsubscribe(self, callback: Callable[[Any], None]) -> None: ... + + +class Startable(Protocol): + """Protocol for components that can be started.""" + + def start(self) -> Any: ... + + +class Stoppable(Protocol): + """Protocol for components that can be stopped.""" + + def stop(self) -> None: ... diff --git a/engine/eventbus.py b/engine/eventbus.py new file mode 100644 index 0000000..6ea5628 --- /dev/null +++ b/engine/eventbus.py @@ -0,0 +1,72 @@ +""" +Event bus - pub/sub messaging for decoupled component communication. +""" + +import threading +from collections import defaultdict +from collections.abc import Callable +from typing import Any + +from engine.events import EventType + + +class EventBus: + """Thread-safe event bus for publish-subscribe messaging.""" + + def __init__(self): + self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict( + list + ) + self._lock = threading.Lock() + + def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None: + """Register a callback for a specific event type.""" + with self._lock: + self._subscribers[event_type].append(callback) + + def unsubscribe( + self, event_type: EventType, callback: Callable[[Any], None] + ) -> None: + """Remove a callback for a specific event type.""" + with self._lock: + if callback in self._subscribers[event_type]: + self._subscribers[event_type].remove(callback) + + def publish(self, event_type: EventType, event: Any = None) -> None: + """Publish an event to all subscribers.""" + with self._lock: + callbacks = list(self._subscribers.get(event_type, [])) + for callback in callbacks: + try: + callback(event) + except Exception: + pass + + def clear(self) -> None: + """Remove all subscribers.""" + with self._lock: + self._subscribers.clear() + + def subscriber_count(self, event_type: EventType | None = None) -> int: + """Get subscriber count for an event type, or total if None.""" + with self._lock: + if event_type is None: + return sum(len(cb) for cb in self._subscribers.values()) + return len(self._subscribers.get(event_type, [])) + + +_event_bus: EventBus | None = None + + +def get_event_bus() -> EventBus: + """Get the global event bus instance.""" + global _event_bus + if _event_bus is None: + _event_bus = EventBus() + return _event_bus + + +def set_event_bus(bus: EventBus) -> None: + """Set the global event bus instance (for testing).""" + global _event_bus + _event_bus = bus diff --git a/engine/mic.py b/engine/mic.py index b8c5175..c72a440 100644 --- a/engine/mic.py +++ b/engine/mic.py @@ -4,6 +4,8 @@ Gracefully degrades if sounddevice/numpy are unavailable. """ import atexit +from collections.abc import Callable +from datetime import datetime try: import numpy as _np @@ -14,6 +16,9 @@ except Exception: _HAS_MIC = False +from engine.events import MicLevelEvent + + class MicMonitor: """Background mic stream that exposes current RMS dB level.""" @@ -21,6 +26,7 @@ class MicMonitor: self.threshold_db = threshold_db self._db = -99.0 self._stream = None + self._subscribers: list[Callable[[MicLevelEvent], None]] = [] @property def available(self): @@ -37,6 +43,23 @@ class MicMonitor: """dB above threshold (clamped to 0).""" return max(0.0, self._db - self.threshold_db) + def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None: + """Register a callback to be called when mic level changes.""" + self._subscribers.append(callback) + + def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None: + """Remove a registered callback.""" + if callback in self._subscribers: + self._subscribers.remove(callback) + + def _emit(self, event: MicLevelEvent) -> None: + """Emit an event to all subscribers.""" + for cb in self._subscribers: + try: + cb(event) + except Exception: + pass + def start(self): """Start background mic stream. Returns True on success, False/None otherwise.""" if not _HAS_MIC: @@ -45,6 +68,13 @@ class MicMonitor: def _cb(indata, frames, t, status): rms = float(_np.sqrt(_np.mean(indata**2))) self._db = 20 * _np.log10(rms) if rms > 0 else -99.0 + if self._subscribers: + event = MicLevelEvent( + db_level=self._db, + excess_above_threshold=max(0.0, self._db - self.threshold_db), + timestamp=datetime.now(), + ) + self._emit(event) try: self._stream = _sd.InputStream( diff --git a/engine/ntfy.py b/engine/ntfy.py index d819ea2..583fc66 100644 --- a/engine/ntfy.py +++ b/engine/ntfy.py @@ -16,8 +16,12 @@ import json import threading import time import urllib.request +from collections.abc import Callable +from datetime import datetime from urllib.parse import parse_qs, urlencode, urlparse, urlunparse +from engine.events import NtfyMessageEvent + class NtfyPoller: """SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT).""" @@ -28,6 +32,24 @@ class NtfyPoller: self.display_secs = display_secs self._message = None # (title, body, monotonic_timestamp) or None self._lock = threading.Lock() + self._subscribers: list[Callable[[NtfyMessageEvent], None]] = [] + + def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None: + """Register a callback to be called when a message is received.""" + self._subscribers.append(callback) + + def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None: + """Remove a registered callback.""" + if callback in self._subscribers: + self._subscribers.remove(callback) + + def _emit(self, event: NtfyMessageEvent) -> None: + """Emit an event to all subscribers.""" + for cb in self._subscribers: + try: + cb(event) + except Exception: + pass def start(self): """Start background stream thread. Returns True.""" @@ -88,6 +110,13 @@ class NtfyPoller: data.get("message", ""), time.monotonic(), ) + event = NtfyMessageEvent( + title=data.get("title", ""), + body=data.get("message", ""), + message_id=data.get("id"), + timestamp=datetime.now(), + ) + self._emit(event) except Exception: pass time.sleep(self.reconnect_delay) diff --git a/engine/scroll.py b/engine/scroll.py index dcb96f7..41445ad 100644 --- a/engine/scroll.py +++ b/engine/scroll.py @@ -43,11 +43,15 @@ def stream(items, ntfy_poller, mic_monitor): scroll_motion_accum = 0.0 msg_cache = (None, None) - while queued < config.HEADLINE_LIMIT or active: + while True: + if queued >= config.HEADLINE_LIMIT and not active: + break + t0 = time.monotonic() w, h = tw(), th() fh = config.FIREHOSE_H if config.FIREHOSE else 0 ticker_view_h = h - fh + scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) msg = ntfy_poller.get_active_message() msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache) diff --git a/tests/test_emitters.py b/tests/test_emitters.py new file mode 100644 index 0000000..b28cddb --- /dev/null +++ b/tests/test_emitters.py @@ -0,0 +1,69 @@ +""" +Tests for engine.emitters module. +""" + +from engine.emitters import EventEmitter, Startable, Stoppable + + +class TestEventEmitterProtocol: + """Tests for EventEmitter protocol.""" + + def test_protocol_exists(self): + """EventEmitter protocol is defined.""" + assert EventEmitter is not None + + def test_protocol_has_subscribe_method(self): + """EventEmitter has subscribe method in protocol.""" + assert hasattr(EventEmitter, "subscribe") + + def test_protocol_has_unsubscribe_method(self): + """EventEmitter has unsubscribe method in protocol.""" + assert hasattr(EventEmitter, "unsubscribe") + + +class TestStartableProtocol: + """Tests for Startable protocol.""" + + def test_protocol_exists(self): + """Startable protocol is defined.""" + assert Startable is not None + + def test_protocol_has_start_method(self): + """Startable has start method in protocol.""" + assert hasattr(Startable, "start") + + +class TestStoppableProtocol: + """Tests for Stoppable protocol.""" + + def test_protocol_exists(self): + """Stoppable protocol is defined.""" + assert Stoppable is not None + + def test_protocol_has_stop_method(self): + """Stoppable has stop method in protocol.""" + assert hasattr(Stoppable, "stop") + + +class TestProtocolCompliance: + """Tests that existing classes comply with protocols.""" + + def test_ntfy_poller_complies_with_protocol(self): + """NtfyPoller implements EventEmitter protocol.""" + from engine.ntfy import NtfyPoller + + poller = NtfyPoller("http://example.com/topic") + assert hasattr(poller, "subscribe") + assert hasattr(poller, "unsubscribe") + assert callable(poller.subscribe) + assert callable(poller.unsubscribe) + + def test_mic_monitor_complies_with_protocol(self): + """MicMonitor implements EventEmitter and Startable protocols.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + assert hasattr(monitor, "subscribe") + assert hasattr(monitor, "unsubscribe") + assert hasattr(monitor, "start") + assert hasattr(monitor, "stop") diff --git a/tests/test_eventbus.py b/tests/test_eventbus.py new file mode 100644 index 0000000..7094c7b --- /dev/null +++ b/tests/test_eventbus.py @@ -0,0 +1,202 @@ +""" +Tests for engine.eventbus module. +""" + + +from engine.eventbus import EventBus, get_event_bus, set_event_bus +from engine.events import EventType, NtfyMessageEvent + + +class TestEventBusInit: + """Tests for EventBus initialization.""" + + def test_init_creates_empty_subscribers(self): + """EventBus starts with no subscribers.""" + bus = EventBus() + assert bus.subscriber_count() == 0 + + +class TestEventBusSubscribe: + """Tests for EventBus.subscribe method.""" + + def test_subscribe_adds_callback(self): + """subscribe() adds a callback for an event type.""" + bus = EventBus() + def callback(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, callback) + + assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1 + + def test_subscribe_multiple_callbacks_same_event(self): + """Multiple callbacks can be subscribed to the same event type.""" + bus = EventBus() + def cb1(e): + return None + def cb2(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, cb1) + bus.subscribe(EventType.NTFY_MESSAGE, cb2) + + assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 2 + + def test_subscribe_different_event_types(self): + """Callbacks can be subscribed to different event types.""" + bus = EventBus() + def cb1(e): + return None + def cb2(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, cb1) + bus.subscribe(EventType.MIC_LEVEL, cb2) + + assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1 + assert bus.subscriber_count(EventType.MIC_LEVEL) == 1 + + +class TestEventBusUnsubscribe: + """Tests for EventBus.unsubscribe method.""" + + def test_unsubscribe_removes_callback(self): + """unsubscribe() removes a callback.""" + bus = EventBus() + def callback(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, callback) + bus.unsubscribe(EventType.NTFY_MESSAGE, callback) + + assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 0 + + def test_unsubscribe_nonexistent_callback_no_error(self): + """unsubscribe() handles non-existent callback gracefully.""" + bus = EventBus() + def callback(e): + return None + + bus.unsubscribe(EventType.NTFY_MESSAGE, callback) + + +class TestEventBusPublish: + """Tests for EventBus.publish method.""" + + def test_publish_calls_subscriber(self): + """publish() calls the subscriber callback.""" + bus = EventBus() + received = [] + + def callback(event): + received.append(event) + + bus.subscribe(EventType.NTFY_MESSAGE, callback) + event = NtfyMessageEvent(title="Test", body="Body") + bus.publish(EventType.NTFY_MESSAGE, event) + + assert len(received) == 1 + assert received[0].title == "Test" + + def test_publish_multiple_subscribers(self): + """publish() calls all subscribers for an event type.""" + bus = EventBus() + received1 = [] + received2 = [] + + def callback1(event): + received1.append(event) + + def callback2(event): + received2.append(event) + + bus.subscribe(EventType.NTFY_MESSAGE, callback1) + bus.subscribe(EventType.NTFY_MESSAGE, callback2) + event = NtfyMessageEvent(title="Test", body="Body") + bus.publish(EventType.NTFY_MESSAGE, event) + + assert len(received1) == 1 + assert len(received2) == 1 + + def test_publish_different_event_types(self): + """publish() only calls subscribers for the specific event type.""" + bus = EventBus() + ntfy_received = [] + mic_received = [] + + def ntfy_callback(event): + ntfy_received.append(event) + + def mic_callback(event): + mic_received.append(event) + + bus.subscribe(EventType.NTFY_MESSAGE, ntfy_callback) + bus.subscribe(EventType.MIC_LEVEL, mic_callback) + event = NtfyMessageEvent(title="Test", body="Body") + bus.publish(EventType.NTFY_MESSAGE, event) + + assert len(ntfy_received) == 1 + assert len(mic_received) == 0 + + +class TestEventBusClear: + """Tests for EventBus.clear method.""" + + def test_clear_removes_all_subscribers(self): + """clear() removes all subscribers.""" + bus = EventBus() + def cb1(e): + return None + def cb2(e): + return None + + bus.subscribe(EventType.NTFY_MESSAGE, cb1) + bus.subscribe(EventType.MIC_LEVEL, cb2) + bus.clear() + + assert bus.subscriber_count() == 0 + + +class TestEventBusThreadSafety: + """Tests for EventBus thread safety.""" + + def test_concurrent_subscribe_unsubscribe(self): + """subscribe and unsubscribe can be called concurrently.""" + import threading + + bus = EventBus() + callbacks = [lambda e: None for _ in range(10)] + + def subscribe(): + for cb in callbacks: + bus.subscribe(EventType.NTFY_MESSAGE, cb) + + def unsubscribe(): + for cb in callbacks: + bus.unsubscribe(EventType.NTFY_MESSAGE, cb) + + t1 = threading.Thread(target=subscribe) + t2 = threading.Thread(target=unsubscribe) + t1.start() + t2.start() + t1.join() + t2.join() + + +class TestGlobalEventBus: + """Tests for global event bus functions.""" + + def test_get_event_bus_returns_singleton(self): + """get_event_bus() returns the same instance.""" + bus1 = get_event_bus() + bus2 = get_event_bus() + assert bus1 is bus2 + + def test_set_event_bus_replaces_singleton(self): + """set_event_bus() replaces the global event bus.""" + new_bus = EventBus() + set_event_bus(new_bus) + try: + assert get_event_bus() is new_bus + finally: + set_event_bus(None) diff --git a/tests/test_mic.py b/tests/test_mic.py index 3e610b9..a347e5f 100644 --- a/tests/test_mic.py +++ b/tests/test_mic.py @@ -2,8 +2,11 @@ Tests for engine.mic module. """ +from datetime import datetime from unittest.mock import patch +from engine.events import MicLevelEvent + class TestMicMonitorImport: """Tests for module import behavior.""" @@ -81,3 +84,66 @@ class TestMicMonitorStop: monitor = MicMonitor() monitor.stop() assert monitor._stream is None + + +class TestMicMonitorEventEmission: + """Tests for MicMonitor event emission.""" + + def test_subscribe_adds_callback(self): + """subscribe() adds a callback.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + def callback(e): + return None + + monitor.subscribe(callback) + + assert callback in monitor._subscribers + + def test_unsubscribe_removes_callback(self): + """unsubscribe() removes a callback.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + def callback(e): + return None + monitor.subscribe(callback) + + monitor.unsubscribe(callback) + + assert callback not in monitor._subscribers + + def test_emit_calls_subscribers(self): + """_emit() calls all subscribers.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + received = [] + + def callback(event): + received.append(event) + + monitor.subscribe(callback) + event = MicLevelEvent( + db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now() + ) + monitor._emit(event) + + assert len(received) == 1 + assert received[0].db_level == 60.0 + + def test_emit_handles_subscriber_exception(self): + """_emit() handles exceptions in subscribers gracefully.""" + from engine.mic import MicMonitor + + monitor = MicMonitor() + + def bad_callback(event): + raise RuntimeError("test") + + monitor.subscribe(bad_callback) + event = MicLevelEvent( + db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now() + ) + monitor._emit(event) diff --git a/tests/test_ntfy.py b/tests/test_ntfy.py index 778fc6e..6c3437c 100644 --- a/tests/test_ntfy.py +++ b/tests/test_ntfy.py @@ -5,6 +5,7 @@ Tests for engine.ntfy module. import time from unittest.mock import MagicMock, patch +from engine.events import NtfyMessageEvent from engine.ntfy import NtfyPoller @@ -68,3 +69,54 @@ class TestNtfyPollerDismiss: poller.dismiss() assert poller._message is None + + +class TestNtfyPollerEventEmission: + """Tests for NtfyPoller event emission.""" + + def test_subscribe_adds_callback(self): + """subscribe() adds a callback.""" + poller = NtfyPoller("http://example.com/topic") + def callback(e): + return None + + poller.subscribe(callback) + + assert callback in poller._subscribers + + def test_unsubscribe_removes_callback(self): + """unsubscribe() removes a callback.""" + poller = NtfyPoller("http://example.com/topic") + def callback(e): + return None + poller.subscribe(callback) + + poller.unsubscribe(callback) + + assert callback not in poller._subscribers + + def test_emit_calls_subscribers(self): + """_emit() calls all subscribers.""" + poller = NtfyPoller("http://example.com/topic") + received = [] + + def callback(event): + received.append(event) + + poller.subscribe(callback) + event = NtfyMessageEvent(title="Test", body="Body") + poller._emit(event) + + assert len(received) == 1 + assert received[0].title == "Test" + + def test_emit_handles_subscriber_exception(self): + """_emit() handles exceptions in subscribers gracefully.""" + poller = NtfyPoller("http://example.com/topic") + + def bad_callback(event): + raise RuntimeError("test") + + poller.subscribe(bad_callback) + event = NtfyMessageEvent(title="Test", body="Body") + poller._emit(event) From cfd7e8931e4b13b84316ae136c5c7da5c0d2871d Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 18:42:42 -0700 Subject: [PATCH 5/7] feat(effects): add plugin architecture with performance monitoring --- effects_plugins/__init__.py | 35 ++ effects_plugins/fade.py | 58 +++ effects_plugins/firehose.py | 72 ++++ effects_plugins/glitch.py | 37 ++ effects_plugins/noise.py | 36 ++ engine/effects/__init__.py | 42 +++ engine/effects/chain.py | 71 ++++ engine/effects/controller.py | 144 ++++++++ engine/{effects.py => effects/legacy.py} | 0 engine/effects/performance.py | 103 ++++++ engine/effects/registry.py | 59 ++++ engine/effects/types.py | 39 +++ engine/layers.py | 59 ++++ tests/test_effects.py | 427 +++++++++++++++++++++++ 14 files changed, 1182 insertions(+) create mode 100644 effects_plugins/__init__.py create mode 100644 effects_plugins/fade.py create mode 100644 effects_plugins/firehose.py create mode 100644 effects_plugins/glitch.py create mode 100644 effects_plugins/noise.py create mode 100644 engine/effects/__init__.py create mode 100644 engine/effects/chain.py create mode 100644 engine/effects/controller.py rename engine/{effects.py => effects/legacy.py} (100%) create mode 100644 engine/effects/performance.py create mode 100644 engine/effects/registry.py create mode 100644 engine/effects/types.py create mode 100644 tests/test_effects.py diff --git a/effects_plugins/__init__.py b/effects_plugins/__init__.py new file mode 100644 index 0000000..fc3c8d5 --- /dev/null +++ b/effects_plugins/__init__.py @@ -0,0 +1,35 @@ +from pathlib import Path + +PLUGIN_DIR = Path(__file__).parent + + +def discover_plugins(): + from engine.effects.registry import get_registry + + registry = get_registry() + imported = {} + + for file_path in PLUGIN_DIR.glob("*.py"): + if file_path.name.startswith("_"): + continue + module_name = file_path.stem + if module_name in ("base", "types"): + continue + + try: + module = __import__(f"effects_plugins.{module_name}", fromlist=[""]) + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and hasattr(attr, "name") + and hasattr(attr, "process") + and attr_name.endswith("Effect") + ): + plugin = attr() + registry.register(plugin) + imported[plugin.name] = plugin + except Exception: + pass + + return imported diff --git a/effects_plugins/fade.py b/effects_plugins/fade.py new file mode 100644 index 0000000..98ede65 --- /dev/null +++ b/effects_plugins/fade.py @@ -0,0 +1,58 @@ +import random + +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin + + +class FadeEffect: + name = "fade" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not ctx.ticker_height: + return buf + result = list(buf) + intensity = self.config.intensity + + top_zone = max(1, int(ctx.ticker_height * 0.25)) + bot_zone = max(1, int(ctx.ticker_height * 0.10)) + + for r in range(len(result)): + if r >= ctx.ticker_height: + continue + top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 + bot_f = ( + min(1.0, (ctx.ticker_height - 1 - r) / bot_zone) + if bot_zone > 0 + else 1.0 + ) + row_fade = min(top_f, bot_f) * intensity + + if row_fade < 1.0 and result[r].strip(): + result[r] = self._fade_line(result[r], row_fade) + + return result + + def _fade_line(self, s: str, fade: float) -> str: + if fade >= 1.0: + return s + if fade <= 0.0: + return "" + result = [] + i = 0 + while i < len(s): + if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[": + j = i + 2 + while j < len(s) and not s[j].isalpha(): + j += 1 + result.append(s[i : j + 1]) + i = j + 1 + elif s[i] == " ": + result.append(" ") + i += 1 + else: + result.append(s[i] if random.random() < fade else " ") + i += 1 + return "".join(result) + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/firehose.py b/effects_plugins/firehose.py new file mode 100644 index 0000000..4be520b --- /dev/null +++ b/effects_plugins/firehose.py @@ -0,0 +1,72 @@ +import random +from datetime import datetime + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.sources import FEEDS, POETRY_SOURCES +from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST + + +class FirehoseEffect: + name = "firehose" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0 + if firehose_h <= 0 or not ctx.items: + return buf + + result = list(buf) + intensity = self.config.intensity + h = ctx.terminal_height + + for fr in range(firehose_h): + scr_row = h - firehose_h + fr + 1 + fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity) + result.append(f"\033[{scr_row};1H{fline}\033[K") + return result + + def _firehose_line(self, items: list, w: int, intensity: float) -> str: + r = random.random() + if r < 0.35 * intensity: + title, src, ts = random.choice(items) + text = title[: w - 1] + color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM]) + return f"{color}{text}{RST}" + elif r < 0.55 * intensity: + d = random.choice([0.45, 0.55, 0.65, 0.75]) + return "".join( + f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" + f"{random.choice(config.GLITCH + config.KATA)}{RST}" + if random.random() < d + else " " + for _ in range(w) + ) + elif r < 0.78 * intensity: + sources = FEEDS if config.MODE == "news" else POETRY_SOURCES + src = random.choice(list(sources.keys())) + msgs = [ + f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}", + f" ░░ FEED ACTIVE :: {src}", + f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}", + f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}", + f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM " + f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}", + ] + text = random.choice(msgs)[: w - 1] + color = random.choice([G_LO, G_DIM, W_GHOST]) + return f"{color}{text}{RST}" + else: + title, _, _ = random.choice(items) + start = random.randint(0, max(0, len(title) - 20)) + frag = title[start : start + random.randint(10, 35)] + pad = random.randint(0, max(0, w - len(frag) - 8)) + gp = "".join( + random.choice(config.GLITCH) for _ in range(random.randint(1, 3)) + ) + text = (" " * pad + gp + " " + frag)[: w - 1] + color = random.choice([G_LO, C_DIM, W_GHOST]) + return f"{color}{text}{RST}" + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/glitch.py b/effects_plugins/glitch.py new file mode 100644 index 0000000..d23244a --- /dev/null +++ b/effects_plugins/glitch.py @@ -0,0 +1,37 @@ +import random + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST + + +class GlitchEffect: + name = "glitch" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not buf: + return buf + result = list(buf) + intensity = self.config.intensity + + glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16) + glitch_prob = glitch_prob * intensity + n_hits = 4 + int(ctx.mic_excess / 2) + n_hits = int(n_hits * intensity) + + if random.random() < glitch_prob: + for _ in range(min(n_hits, len(result))): + gi = random.randint(0, len(result) - 1) + scr_row = gi + 1 + result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}" + return result + + def _glitch_bar(self, w: int) -> str: + c = random.choice(["░", "▒", "─", "\xc2"]) + n = random.randint(3, w // 2) + o = random.randint(0, w - n) + return " " * o + f"{G_LO}{DIM}" + c * n + RST + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/noise.py b/effects_plugins/noise.py new file mode 100644 index 0000000..d7bf316 --- /dev/null +++ b/effects_plugins/noise.py @@ -0,0 +1,36 @@ +import random + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST + + +class NoiseEffect: + name = "noise" + config = EffectConfig(enabled=True, intensity=0.15) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not ctx.ticker_height: + return buf + result = list(buf) + intensity = self.config.intensity + probability = intensity * 0.15 + + for r in range(len(result)): + cy = ctx.scroll_cam + r + if random.random() < probability: + result[r] = self._generate_noise(ctx.terminal_width, cy) + return result + + def _generate_noise(self, w: int, cy: int) -> str: + d = random.choice([0.15, 0.25, 0.35, 0.12]) + return "".join( + f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" + f"{random.choice(config.GLITCH + config.KATA)}{RST}" + if random.random() < d + else " " + for _ in range(w) + ) + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/engine/effects/__init__.py b/engine/effects/__init__.py new file mode 100644 index 0000000..923d361 --- /dev/null +++ b/engine/effects/__init__.py @@ -0,0 +1,42 @@ +from engine.effects.chain import EffectChain +from engine.effects.controller import handle_effects_command, show_effects_menu +from engine.effects.legacy import ( + fade_line, + firehose_line, + glitch_bar, + next_headline, + noise, + vis_trunc, +) +from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor +from engine.effects.registry import EffectRegistry, get_registry, set_registry +from engine.effects.types import EffectConfig, EffectContext, PipelineConfig + + +def get_effect_chain(): + from engine.layers import get_effect_chain as _chain + + return _chain() + + +__all__ = [ + "EffectChain", + "EffectRegistry", + "EffectConfig", + "EffectContext", + "PipelineConfig", + "get_registry", + "set_registry", + "get_effect_chain", + "get_monitor", + "set_monitor", + "PerformanceMonitor", + "handle_effects_command", + "show_effects_menu", + "fade_line", + "firehose_line", + "glitch_bar", + "noise", + "next_headline", + "vis_trunc", +] diff --git a/engine/effects/chain.py b/engine/effects/chain.py new file mode 100644 index 0000000..c687266 --- /dev/null +++ b/engine/effects/chain.py @@ -0,0 +1,71 @@ +import time + +from engine.effects.performance import PerformanceMonitor, get_monitor +from engine.effects.registry import EffectRegistry +from engine.effects.types import EffectContext + + +class EffectChain: + def __init__( + self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None + ): + self._registry = registry + self._order: list[str] = [] + self._monitor = monitor + + def _get_monitor(self) -> PerformanceMonitor: + if self._monitor is not None: + return self._monitor + return get_monitor() + + def set_order(self, names: list[str]) -> None: + self._order = list(names) + + def get_order(self) -> list[str]: + return self._order.copy() + + def add_effect(self, name: str, position: int | None = None) -> bool: + if name not in self._registry.list_all(): + return False + if position is None: + self._order.append(name) + else: + self._order.insert(position, name) + return True + + def remove_effect(self, name: str) -> bool: + if name in self._order: + self._order.remove(name) + return True + return False + + def reorder(self, new_order: list[str]) -> bool: + all_plugins = set(self._registry.list_all().keys()) + if not all(name in all_plugins for name in new_order): + return False + self._order = list(new_order) + return True + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + monitor = self._get_monitor() + frame_number = ctx.frame_number + monitor.start_frame(frame_number) + + frame_start = time.perf_counter() + result = list(buf) + for name in self._order: + plugin = self._registry.get(name) + if plugin and plugin.config.enabled: + chars_in = sum(len(line) for line in result) + effect_start = time.perf_counter() + try: + result = plugin.process(result, ctx) + except Exception: + plugin.config.enabled = False + elapsed = time.perf_counter() - effect_start + chars_out = sum(len(line) for line in result) + monitor.record_effect(name, elapsed * 1000, chars_in, chars_out) + + total_elapsed = time.perf_counter() - frame_start + monitor.end_frame(frame_number, total_elapsed * 1000) + return result diff --git a/engine/effects/controller.py b/engine/effects/controller.py new file mode 100644 index 0000000..3e72881 --- /dev/null +++ b/engine/effects/controller.py @@ -0,0 +1,144 @@ +from engine.effects.performance import get_monitor +from engine.effects.registry import get_registry + +_effect_chain_ref = None + + +def _get_effect_chain(): + global _effect_chain_ref + if _effect_chain_ref is not None: + return _effect_chain_ref + try: + from engine.layers import get_effect_chain as _chain + + return _chain() + except Exception: + return None + + +def set_effect_chain_ref(chain) -> None: + global _effect_chain_ref + _effect_chain_ref = chain + + +def handle_effects_command(cmd: str) -> str: + """Handle /effects command from NTFY message. + + Commands: + /effects list - list all effects and their status + /effects on - enable an effect + /effects off - disable an effect + /effects intensity <0.0-1.0> - set intensity + /effects reorder ,,... - reorder pipeline + /effects stats - show performance statistics + """ + parts = cmd.strip().split() + if not parts or parts[0] != "/effects": + return "Unknown command" + + registry = get_registry() + chain = _get_effect_chain() + + if len(parts) == 1 or parts[1] == "list": + result = ["Effects:"] + for name, plugin in registry.list_all().items(): + status = "ON" if plugin.config.enabled else "OFF" + intensity = plugin.config.intensity + result.append(f" {name}: {status} (intensity={intensity})") + if chain: + result.append(f"Order: {chain.get_order()}") + return "\n".join(result) + + if parts[1] == "stats": + return _format_stats() + + if parts[1] == "reorder" and len(parts) >= 3: + new_order = parts[2].split(",") + if chain and chain.reorder(new_order): + return f"Reordered pipeline: {new_order}" + return "Failed to reorder pipeline" + + if len(parts) < 3: + return "Usage: /effects on|off|intensity " + + effect_name = parts[1] + action = parts[2] + + if effect_name not in registry.list_all(): + return f"Unknown effect: {effect_name}" + + if action == "on": + registry.enable(effect_name) + return f"Enabled: {effect_name}" + + if action == "off": + registry.disable(effect_name) + return f"Disabled: {effect_name}" + + if action == "intensity" and len(parts) >= 4: + try: + value = float(parts[3]) + if not 0.0 <= value <= 1.0: + return "Intensity must be between 0.0 and 1.0" + plugin = registry.get(effect_name) + if plugin: + plugin.config.intensity = value + return f"Set {effect_name} intensity to {value}" + except ValueError: + return "Invalid intensity value" + + return f"Unknown action: {action}" + + +def _format_stats() -> str: + monitor = get_monitor() + stats = monitor.get_stats() + + if "error" in stats: + return stats["error"] + + lines = ["Performance Stats:"] + + pipeline = stats["pipeline"] + lines.append( + f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)" + ) + + if stats["effects"]: + lines.append(" Per-effect (avg ms):") + for name, effect_stats in stats["effects"].items(): + lines.append( + f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms" + ) + + return "\n".join(lines) + + +def show_effects_menu() -> str: + """Generate effects menu text for display.""" + registry = get_registry() + chain = _get_effect_chain() + + lines = [ + "\033[1;38;5;231m=== EFFECTS MENU ===\033[0m", + "", + "Effects:", + ] + + for name, plugin in registry.list_all().items(): + status = "ON" if plugin.config.enabled else "OFF" + intensity = plugin.config.intensity + lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}") + + if chain: + lines.append("") + lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}") + + lines.append("") + lines.append("Controls:") + lines.append(" /effects on|off") + lines.append(" /effects intensity <0.0-1.0>") + lines.append(" /effects reorder name1,name2,...") + lines.append("") + + return "\n".join(lines) diff --git a/engine/effects.py b/engine/effects/legacy.py similarity index 100% rename from engine/effects.py rename to engine/effects/legacy.py diff --git a/engine/effects/performance.py b/engine/effects/performance.py new file mode 100644 index 0000000..7a26bb9 --- /dev/null +++ b/engine/effects/performance.py @@ -0,0 +1,103 @@ +from collections import deque +from dataclasses import dataclass + + +@dataclass +class EffectTiming: + name: str + duration_ms: float + buffer_chars_in: int + buffer_chars_out: int + + +@dataclass +class FrameTiming: + frame_number: int + total_ms: float + effects: list[EffectTiming] + + +class PerformanceMonitor: + """Collects and stores performance metrics for effect pipeline.""" + + def __init__(self, max_frames: int = 60): + self._max_frames = max_frames + self._frames: deque[FrameTiming] = deque(maxlen=max_frames) + self._current_frame: list[EffectTiming] = [] + + def start_frame(self, frame_number: int) -> None: + self._current_frame = [] + + def record_effect( + self, name: str, duration_ms: float, chars_in: int, chars_out: int + ) -> None: + self._current_frame.append( + EffectTiming( + name=name, + duration_ms=duration_ms, + buffer_chars_in=chars_in, + buffer_chars_out=chars_out, + ) + ) + + def end_frame(self, frame_number: int, total_ms: float) -> None: + self._frames.append( + FrameTiming( + frame_number=frame_number, + total_ms=total_ms, + effects=self._current_frame, + ) + ) + + def get_stats(self) -> dict: + if not self._frames: + return {"error": "No timing data available"} + + total_times = [f.total_ms for f in self._frames] + avg_total = sum(total_times) / len(total_times) + min_total = min(total_times) + max_total = max(total_times) + + effect_stats: dict[str, dict] = {} + for frame in self._frames: + for effect in frame.effects: + if effect.name not in effect_stats: + effect_stats[effect.name] = {"times": [], "total_chars": 0} + effect_stats[effect.name]["times"].append(effect.duration_ms) + effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out + + for name, stats in effect_stats.items(): + times = stats["times"] + stats["avg_ms"] = sum(times) / len(times) + stats["min_ms"] = min(times) + stats["max_ms"] = max(times) + del stats["times"] + + return { + "frame_count": len(self._frames), + "pipeline": { + "avg_ms": avg_total, + "min_ms": min_total, + "max_ms": max_total, + }, + "effects": effect_stats, + } + + def reset(self) -> None: + self._frames.clear() + self._current_frame = [] + + +_monitor: PerformanceMonitor | None = None + + +def get_monitor() -> PerformanceMonitor: + global _monitor + if _monitor is None: + _monitor = PerformanceMonitor() + return _monitor + + +def set_monitor(monitor: PerformanceMonitor) -> None: + global _monitor + _monitor = monitor diff --git a/engine/effects/registry.py b/engine/effects/registry.py new file mode 100644 index 0000000..bdf13d8 --- /dev/null +++ b/engine/effects/registry.py @@ -0,0 +1,59 @@ +from engine.effects.types import EffectConfig, EffectPlugin + + +class EffectRegistry: + def __init__(self): + self._plugins: dict[str, EffectPlugin] = {} + self._discovered: bool = False + + def register(self, plugin: EffectPlugin) -> None: + self._plugins[plugin.name] = plugin + + def get(self, name: str) -> EffectPlugin | None: + return self._plugins.get(name) + + def list_all(self) -> dict[str, EffectPlugin]: + return self._plugins.copy() + + def list_enabled(self) -> list[EffectPlugin]: + return [p for p in self._plugins.values() if p.config.enabled] + + def enable(self, name: str) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.config.enabled = True + return True + return False + + def disable(self, name: str) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.config.enabled = False + return True + return False + + def configure(self, name: str, config: EffectConfig) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.configure(config) + return True + return False + + def is_enabled(self, name: str) -> bool: + plugin = self._plugins.get(name) + return plugin.config.enabled if plugin else False + + +_registry: EffectRegistry | None = None + + +def get_registry() -> EffectRegistry: + global _registry + if _registry is None: + _registry = EffectRegistry() + return _registry + + +def set_registry(registry: EffectRegistry) -> None: + global _registry + _registry = registry diff --git a/engine/effects/types.py b/engine/effects/types.py new file mode 100644 index 0000000..1d2c340 --- /dev/null +++ b/engine/effects/types.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class EffectContext: + terminal_width: int + terminal_height: int + scroll_cam: int + ticker_height: int + mic_excess: float + grad_offset: float + frame_number: int + has_message: bool + items: list = field(default_factory=list) + + +@dataclass +class EffectConfig: + enabled: bool = True + intensity: float = 1.0 + params: dict[str, Any] = field(default_factory=dict) + + +class EffectPlugin: + name: str + config: EffectConfig + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + raise NotImplementedError + + def configure(self, config: EffectConfig) -> None: + raise NotImplementedError + + +@dataclass +class PipelineConfig: + order: list[str] = field(default_factory=list) + effects: dict[str, EffectConfig] = field(default_factory=dict) diff --git a/engine/layers.py b/engine/layers.py index ebc53ef..b5ac428 100644 --- a/engine/layers.py +++ b/engine/layers.py @@ -10,6 +10,8 @@ from datetime import datetime from engine import config from engine.effects import ( + EffectChain, + EffectContext, fade_line, firehose_line, glitch_bar, @@ -199,3 +201,60 @@ def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: fline = firehose_line(items, w) buf.append(f"\033[{scr_row};1H{fline}\033[K") return buf + + +_effect_chain = None + + +def init_effects() -> None: + """Initialize effect plugins and chain.""" + global _effect_chain + from engine.effects import EffectChain, get_registry + + registry = get_registry() + + import effects_plugins + + effects_plugins.discover_plugins() + + chain = EffectChain(registry) + chain.set_order(["noise", "fade", "glitch", "firehose"]) + _effect_chain = chain + + +def process_effects( + buf: list[str], + w: int, + h: int, + scroll_cam: int, + ticker_h: int, + mic_excess: float, + grad_offset: float, + frame_number: int, + has_message: bool, + items: list, +) -> list[str]: + """Process buffer through effect chain.""" + if _effect_chain is None: + init_effects() + + ctx = EffectContext( + terminal_width=w, + terminal_height=h, + scroll_cam=scroll_cam, + ticker_height=ticker_h, + mic_excess=mic_excess, + grad_offset=grad_offset, + frame_number=frame_number, + has_message=has_message, + items=items, + ) + return _effect_chain.process(buf, ctx) + + +def get_effect_chain() -> EffectChain | None: + """Get the effect chain instance.""" + global _effect_chain + if _effect_chain is None: + init_effects() + return _effect_chain diff --git a/tests/test_effects.py b/tests/test_effects.py new file mode 100644 index 0000000..12d41a5 --- /dev/null +++ b/tests/test_effects.py @@ -0,0 +1,427 @@ +""" +Tests for engine.effects module. +""" + +from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry + + +class MockEffect: + name = "mock" + config = EffectConfig(enabled=True, intensity=1.0) + + def __init__(self): + self.processed = False + self.last_ctx = None + + def process(self, buf, ctx): + self.processed = True + self.last_ctx = ctx + return buf + ["processed"] + + def configure(self, config): + self.config = config + + +class TestEffectConfig: + def test_defaults(self): + cfg = EffectConfig() + assert cfg.enabled is True + assert cfg.intensity == 1.0 + assert cfg.params == {} + + def test_custom_values(self): + cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"}) + assert cfg.enabled is False + assert cfg.intensity == 0.5 + assert cfg.params == {"key": "value"} + + +class TestEffectContext: + def test_defaults(self): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + assert ctx.terminal_width == 80 + assert ctx.terminal_height == 24 + assert ctx.ticker_height == 20 + assert ctx.items == [] + + def test_with_items(self): + items = [("Title", "Source", "12:00")] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + items=items, + ) + assert ctx.items == items + + +class TestEffectRegistry: + def test_init_empty(self): + registry = EffectRegistry() + assert len(registry.list_all()) == 0 + + def test_register(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + assert "mock" in registry.list_all() + + def test_get(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + retrieved = registry.get("mock") + assert retrieved is effect + + def test_get_nonexistent(self): + registry = EffectRegistry() + assert registry.get("nonexistent") is None + + def test_enable(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = False + registry.register(effect) + registry.enable("mock") + assert effect.config.enabled is True + + def test_disable(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = True + registry.register(effect) + registry.disable("mock") + assert effect.config.enabled is False + + def test_list_enabled(self): + registry = EffectRegistry() + + class EnabledEffect: + name = "enabled_effect" + config = EffectConfig(enabled=True, intensity=1.0) + + class DisabledEffect: + name = "disabled_effect" + config = EffectConfig(enabled=False, intensity=1.0) + + registry.register(EnabledEffect()) + registry.register(DisabledEffect()) + enabled = registry.list_enabled() + assert len(enabled) == 1 + assert enabled[0].name == "enabled_effect" + + def test_configure(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + new_config = EffectConfig(enabled=False, intensity=0.3) + registry.configure("mock", new_config) + assert effect.config.enabled is False + assert effect.config.intensity == 0.3 + + def test_is_enabled(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = True + registry.register(effect) + assert registry.is_enabled("mock") is True + assert registry.is_enabled("nonexistent") is False + + +class TestEffectChain: + def test_init(self): + registry = EffectRegistry() + chain = EffectChain(registry) + assert chain.get_order() == [] + + def test_set_order(self): + registry = EffectRegistry() + effect1 = MockEffect() + effect1.name = "effect1" + effect2 = MockEffect() + effect2.name = "effect2" + registry.register(effect1) + registry.register(effect2) + chain = EffectChain(registry) + chain.set_order(["effect1", "effect2"]) + assert chain.get_order() == ["effect1", "effect2"] + + def test_add_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.add_effect("test_effect") + assert "test_effect" in chain.get_order() + + def test_add_effect_invalid(self): + registry = EffectRegistry() + chain = EffectChain(registry) + result = chain.add_effect("nonexistent") + assert result is False + + def test_remove_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + chain.remove_effect("test_effect") + assert "test_effect" not in chain.get_order() + + def test_reorder(self): + registry = EffectRegistry() + effect1 = MockEffect() + effect1.name = "effect1" + effect2 = MockEffect() + effect2.name = "effect2" + effect3 = MockEffect() + effect3.name = "effect3" + registry.register(effect1) + registry.register(effect2) + registry.register(effect3) + chain = EffectChain(registry) + chain.set_order(["effect1", "effect2", "effect3"]) + result = chain.reorder(["effect3", "effect1", "effect2"]) + assert result is True + assert chain.get_order() == ["effect3", "effect1", "effect2"] + + def test_reorder_invalid(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "effect1" + registry.register(effect) + chain = EffectChain(registry) + result = chain.reorder(["effect1", "nonexistent"]) + assert result is False + + def test_process_empty_chain(self): + registry = EffectRegistry() + chain = EffectChain(registry) + buf = ["line1", "line2"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == buf + + def test_process_with_effects(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + buf = ["line1", "line2"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == ["line1", "line2", "processed"] + assert effect.processed is True + assert effect.last_ctx is ctx + + def test_process_disabled_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + effect.config.enabled = False + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + buf = ["line1"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == ["line1"] + assert effect.processed is False + + +class TestEffectsExports: + def test_all_exports_are_importable(self): + """Verify all exports in __all__ can actually be imported.""" + import engine.effects as effects_module + + for name in effects_module.__all__: + getattr(effects_module, name) + + +class TestPerformanceMonitor: + def test_empty_stats(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + stats = monitor.get_stats() + assert "error" in stats + + def test_record_and_retrieve(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + monitor.start_frame(1) + monitor.record_effect("test_effect", 1.5, 100, 150) + monitor.end_frame(1, 2.0) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["frame_count"] == 1 + assert "test_effect" in stats["effects"] + + def test_multiple_frames(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=3) + for i in range(5): + monitor.start_frame(i) + monitor.record_effect("effect1", 1.0, 100, 100) + monitor.record_effect("effect2", 0.5, 100, 100) + monitor.end_frame(i, 1.5) + + stats = monitor.get_stats() + assert stats["frame_count"] == 3 + assert "effect1" in stats["effects"] + assert "effect2" in stats["effects"] + + def test_reset(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + monitor.start_frame(1) + monitor.record_effect("test", 1.0, 100, 100) + monitor.end_frame(1, 1.0) + + monitor.reset() + stats = monitor.get_stats() + assert "error" in stats + + +class TestEffectPipelinePerformance: + def test_pipeline_stays_within_frame_budget(self): + """Verify effect pipeline completes within frame budget (33ms for 30fps).""" + from engine.effects import ( + EffectChain, + EffectConfig, + EffectContext, + EffectRegistry, + ) + + class DummyEffect: + name = "dummy" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf, ctx): + return [line * 2 for line in buf] + + registry = EffectRegistry() + registry.register(DummyEffect()) + + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=10) + chain = EffectChain(registry, monitor) + chain.set_order(["dummy"]) + + buf = ["x" * 80] * 20 + + for i in range(10): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=i, + has_message=False, + ) + chain.process(buf, ctx) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["pipeline"]["max_ms"] < 33.0 + + def test_individual_effects_performance(self): + """Verify individual effects don't exceed 10ms per frame.""" + from engine.effects import ( + EffectChain, + EffectConfig, + EffectContext, + EffectRegistry, + ) + + class SlowEffect: + name = "slow" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf, ctx): + result = [] + for line in buf: + result.append(line) + result.append(line + line) + return result + + registry = EffectRegistry() + registry.register(SlowEffect()) + + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=5) + chain = EffectChain(registry, monitor) + chain.set_order(["slow"]) + + buf = ["x" * 80] * 10 + + for i in range(5): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=i, + has_message=False, + ) + chain.process(buf, ctx) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["effects"]["slow"]["max_ms"] < 10.0 From 05cc475858697b1683d47563b8f9597144f6d886 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 18:42:54 -0700 Subject: [PATCH 6/7] feat(cmdline): add command-line interface for mainline control --- cmdline.py | 250 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 250 insertions(+) create mode 100644 cmdline.py diff --git a/cmdline.py b/cmdline.py new file mode 100644 index 0000000..9ee9ba6 --- /dev/null +++ b/cmdline.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +""" +Command-line utility for interacting with mainline via ntfy. + +Usage: + python cmdline.py # Interactive TUI mode + python cmdline.py --help # Show help + python cmdline.py /effects list # Send single command via ntfy + python cmdline.py /effects stats # Get performance stats via ntfy + python cmdline.py -w /effects stats # Watch mode (polls for stats) + +The TUI mode provides: + - Arrow keys to navigate command history + - Tab completion for commands + - Auto-refresh for performance stats + +C&C works like a serial port: + 1. Send command to ntfy_cc_topic + 2. Mainline receives, processes, responds to same topic + 3. Cmdline polls for response +""" + +import argparse +import json +import sys +import time +import threading +import urllib.request +from pathlib import Path + +from engine import config +from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST + +try: + CC_CMD_TOPIC = config.NTFY_CC_CMD_TOPIC + CC_RESP_TOPIC = config.NTFY_CC_RESP_TOPIC +except AttributeError: + CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json" + CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json" + + +class NtfyResponsePoller: + """Polls ntfy for command responses.""" + + def __init__(self, cmd_topic: str, resp_topic: str, timeout: float = 10.0): + self.cmd_topic = cmd_topic + self.resp_topic = resp_topic + self.timeout = timeout + self._last_id = None + self._lock = threading.Lock() + + def _build_url(self) -> str: + from urllib.parse import parse_qs, urlencode, urlparse, urlunparse + + parsed = urlparse(self.resp_topic) + params = parse_qs(parsed.query, keep_blank_values=True) + params["since"] = [self._last_id if self._last_id else "20s"] + new_query = urlencode({k: v[0] for k, v in params.items()}) + return urlunparse(parsed._replace(query=new_query)) + + def send_and_wait(self, cmd: str) -> str: + """Send command and wait for response.""" + url = self.cmd_topic.replace("/json", "") + data = cmd.encode("utf-8") + + req = urllib.request.Request( + url, + data=data, + headers={ + "User-Agent": "mainline-cmdline/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + urllib.request.urlopen(req, timeout=5) + except Exception as e: + return f"Error sending command: {e}" + + return self._wait_for_response(cmd) + + def _wait_for_response(self, expected_cmd: str = "") -> str: + """Poll for response message.""" + start = time.time() + while time.time() - start < self.timeout: + try: + url = self._build_url() + req = urllib.request.Request( + url, headers={"User-Agent": "mainline-cmdline/0.1"} + ) + with urllib.request.urlopen(req, timeout=10) as resp: + for line in resp: + try: + data = json.loads(line.decode("utf-8", errors="replace")) + except json.JSONDecodeError: + continue + if data.get("event") == "message": + self._last_id = data.get("id") + msg = data.get("message", "") + if msg: + return msg + except Exception: + pass + time.sleep(0.5) + return "Timeout waiting for response" + + +AVAILABLE_COMMANDS = """Available commands: + /effects list - List all effects and status + /effects on - Enable an effect + /effects off - Disable an effect + /effects intensity <0.0-1.0> - Set effect intensity + /effects reorder ,,... - Reorder pipeline + /effects stats - Show performance statistics + /help - Show this help + /quit - Exit +""" + + +def print_header(): + w = 60 + print(CLR, end="") + print(CURSOR_OFF, end="") + print(f"\033[1;1H", end="") + print(f" \033[1;38;5;231m╔{'═' * (w - 6)}╗\033[0m") + print( + f" \033[1;38;5;231m║\033[0m \033[1;38;5;82mMAINLINE\033[0m \033[3;38;5;245mCommand Center\033[0m \033[1;38;5;231m ║\033[0m" + ) + print(f" \033[1;38;5;231m╚{'═' * (w - 6)}╝\033[0m") + print(f" \033[2;38;5;37mCMD: {CC_CMD_TOPIC.split('/')[-2]}\033[0m") + print(f" \033[2;38;5;37mRESP: {CC_RESP_TOPIC.split('/')[-2]}\033[0m") + print() + + +def print_response(response: str, is_error: bool = False) -> None: + """Print response with nice formatting.""" + print() + if is_error: + print(f" \033[1;38;5;196m✗ Error\033[0m") + print(f" \033[38;5;196m{'─' * 40}\033[0m") + else: + print(f" \033[1;38;5;82m✓ Response\033[0m") + print(f" \033[38;5;37m{'─' * 40}\033[0m") + + for line in response.split("\n"): + print(f" {line}") + print() + + +def interactive_mode(): + """Interactive TUI for sending commands.""" + import readline + + print_header() + poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC) + + print(f" \033[38;5;245mType /help for commands, /quit to exit\033[0m") + print() + + while True: + try: + cmd = input(f" \033[1;38;5;82m❯\033[0m {G_HI}").strip() + except (EOFError, KeyboardInterrupt): + print() + break + + if not cmd: + continue + + if cmd.startswith("/"): + if cmd == "/quit" or cmd == "/exit": + print(f"\n \033[1;38;5;245mGoodbye!{RST}\n") + break + + if cmd == "/help": + print(f"\n{AVAILABLE_COMMANDS}\n") + continue + + print(f" \033[38;5;245m⟳ Sending to mainline...{RST}") + result = poller.send_and_wait(cmd) + print_response(result, is_error=result.startswith("Error")) + else: + print(f"\n \033[1;38;5;196m⚠ Commands must start with /{RST}\n") + + print(CURSOR_ON, end="") + return 0 + + +def main(): + parser = argparse.ArgumentParser( + description="Mainline command-line interface", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=AVAILABLE_COMMANDS, + ) + parser.add_argument( + "command", + nargs="?", + default=None, + help="Command to send (e.g., /effects list)", + ) + parser.add_argument( + "--watch", + "-w", + action="store_true", + help="Watch mode: continuously poll for stats (Ctrl+C to exit)", + ) + + args = parser.parse_args() + + if args.command is None: + return interactive_mode() + + poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC) + + if args.watch and "/effects stats" in args.command: + import signal + + def handle_sigterm(*_): + print(f"\n \033[1;38;5;245mStopped watching{RST}") + print(CURSOR_ON, end="") + sys.exit(0) + + signal.signal(signal.SIGTERM, handle_sigterm) + + print_header() + print(f" \033[38;5;245mWatching /effects stats (Ctrl+C to exit)...{RST}\n") + try: + while True: + result = poller.send_and_wait(args.command) + print(f"\033[2J\033[1;1H", end="") + print( + f" \033[1;38;5;82m❯\033[0m Performance Stats - \033[1;38;5;245m{time.strftime('%H:%M:%S')}{RST}" + ) + print(f" \033[38;5;37m{'─' * 44}{RST}") + for line in result.split("\n"): + print(f" {line}") + time.sleep(2) + except KeyboardInterrupt: + print(f"\n \033[1;38;5;245mStopped watching{RST}") + return 0 + return 0 + + result = poller.send_and_wait(args.command) + print(result) + return 0 + + +if __name__ == "__main__": + main() From 4228400c43c17e0b93d2edbbfa978e3496f524b0 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 18:43:18 -0700 Subject: [PATCH 7/7] feat(daemon): add display abstraction and daemon mode with C&C --- engine/display.py | 102 +++++++++++++++++++++++++++ engine/scroll.py | 56 +++++++++++---- tests/test_controller.py | 32 +++++++++ tests/test_display.py | 79 +++++++++++++++++++++ tests/test_effects_controller.py | 117 +++++++++++++++++++++++++++++++ 5 files changed, 373 insertions(+), 13 deletions(-) create mode 100644 engine/display.py create mode 100644 tests/test_display.py create mode 100644 tests/test_effects_controller.py diff --git a/engine/display.py b/engine/display.py new file mode 100644 index 0000000..32eb09e --- /dev/null +++ b/engine/display.py @@ -0,0 +1,102 @@ +""" +Display output abstraction - allows swapping output backends. + +Protocol: + - init(width, height): Initialize display with terminal dimensions + - show(buffer): Render buffer (list of strings) to display + - clear(): Clear the display + - cleanup(): Shutdown display +""" + +import time +from typing import Protocol + + +class Display(Protocol): + """Protocol for display backends.""" + + def init(self, width: int, height: int) -> None: + """Initialize display with dimensions.""" + ... + + def show(self, buffer: list[str]) -> None: + """Show buffer on display.""" + ... + + def clear(self) -> None: + """Clear display.""" + ... + + def cleanup(self) -> None: + """Shutdown display.""" + ... + + +def get_monitor(): + """Get the performance monitor.""" + try: + from engine.effects.performance import get_monitor as _get_monitor + + return _get_monitor() + except Exception: + return None + + +class TerminalDisplay: + """ANSI terminal display backend.""" + + def __init__(self): + self.width = 80 + self.height = 24 + + def init(self, width: int, height: int) -> None: + from engine.terminal import CURSOR_OFF + + self.width = width + self.height = height + print(CURSOR_OFF, end="", flush=True) + + def show(self, buffer: list[str]) -> None: + import sys + + t0 = time.perf_counter() + sys.stdout.buffer.write("".join(buffer).encode()) + sys.stdout.flush() + elapsed_ms = (time.perf_counter() - t0) * 1000 + + monitor = get_monitor() + if monitor: + chars_in = sum(len(line) for line in buffer) + monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in) + + def clear(self) -> None: + from engine.terminal import CLR + + print(CLR, end="", flush=True) + + def cleanup(self) -> None: + from engine.terminal import CURSOR_ON + + print(CURSOR_ON, end="", flush=True) + + +class NullDisplay: + """Headless/null display - discards all output.""" + + def init(self, width: int, height: int) -> None: + self.width = width + self.height = height + + def show(self, buffer: list[str]) -> None: + monitor = get_monitor() + if monitor: + t0 = time.perf_counter() + chars_in = sum(len(line) for line in buffer) + elapsed_ms = (time.perf_counter() - t0) * 1000 + monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in) + + def clear(self) -> None: + pass + + def cleanup(self) -> None: + pass diff --git a/engine/scroll.py b/engine/scroll.py index 41445ad..d13408b 100644 --- a/engine/scroll.py +++ b/engine/scroll.py @@ -4,33 +4,42 @@ Orchestrates viewport, frame timing, and layers. """ import random -import sys import time from engine import config +from engine.display import ( + Display, + TerminalDisplay, +) +from engine.display import ( + get_monitor as _get_display_monitor, +) from engine.frame import calculate_scroll_step from engine.layers import ( apply_glitch, + process_effects, render_firehose, render_message_overlay, render_ticker_zone, ) -from engine.terminal import CLR from engine.viewport import th, tw +USE_EFFECT_CHAIN = True -def stream(items, ntfy_poller, mic_monitor): + +def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): """Main render loop with four layers: message, ticker, scroll motion, firehose.""" + if display is None: + display = TerminalDisplay() random.shuffle(items) pool = list(items) seen = set() queued = 0 time.sleep(0.5) - sys.stdout.write(CLR) - sys.stdout.flush() - w, h = tw(), th() + display.init(w, h) + display.clear() fh = config.FIREHOSE_H if config.FIREHOSE else 0 ticker_view_h = h - fh GAP = 3 @@ -42,6 +51,7 @@ def stream(items, ntfy_poller, mic_monitor): noise_cache = {} scroll_motion_accum = 0.0 msg_cache = (None, None) + frame_number = 0 while True: if queued >= config.HEADLINE_LIMIT and not active: @@ -93,19 +103,39 @@ def stream(items, ntfy_poller, mic_monitor): buf.extend(ticker_buf) mic_excess = mic_monitor.excess - buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) + render_start = time.perf_counter() - firehose_buf = render_firehose(items, w, fh, h) - buf.extend(firehose_buf) + if USE_EFFECT_CHAIN: + buf = process_effects( + buf, + w, + h, + scroll_cam, + ticker_h, + mic_excess, + grad_offset, + frame_number, + msg is not None, + items, + ) + else: + buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) + firehose_buf = render_firehose(items, w, fh, h) + buf.extend(firehose_buf) if msg_overlay: buf.extend(msg_overlay) - sys.stdout.buffer.write("".join(buf).encode()) - sys.stdout.flush() + render_elapsed = (time.perf_counter() - render_start) * 1000 + monitor = _get_display_monitor() + if monitor: + chars = sum(len(line) for line in buf) + monitor.record_effect("render", render_elapsed, chars, chars) + + display.show(buf) elapsed = time.monotonic() - t0 time.sleep(max(0, config.FRAME_DT - elapsed)) + frame_number += 1 - sys.stdout.write(CLR) - sys.stdout.flush() + display.cleanup() diff --git a/tests/test_controller.py b/tests/test_controller.py index 96ef02d..0f08b9b 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -83,3 +83,35 @@ class TestStreamControllerCleanup: controller.cleanup() mock_mic_instance.stop.assert_called_once() + + +class TestStreamControllerWarmup: + """Tests for StreamController topic warmup.""" + + def test_warmup_topics_idempotent(self): + """warmup_topics can be called multiple times.""" + StreamController._topics_warmed = False + + with patch("urllib.request.urlopen") as mock_urlopen: + StreamController.warmup_topics() + StreamController.warmup_topics() + + assert mock_urlopen.call_count >= 3 + + def test_warmup_topics_sets_flag(self): + """warmup_topics sets the warmed flag.""" + StreamController._topics_warmed = False + + with patch("urllib.request.urlopen"): + StreamController.warmup_topics() + + assert StreamController._topics_warmed is True + + def test_warmup_topics_skips_after_first(self): + """warmup_topics skips after first call.""" + StreamController._topics_warmed = True + + with patch("urllib.request.urlopen") as mock_urlopen: + StreamController.warmup_topics() + + mock_urlopen.assert_not_called() diff --git a/tests/test_display.py b/tests/test_display.py new file mode 100644 index 0000000..e2c08b4 --- /dev/null +++ b/tests/test_display.py @@ -0,0 +1,79 @@ +""" +Tests for engine.display module. +""" + +from engine.display import NullDisplay, TerminalDisplay + + +class TestDisplayProtocol: + """Test that display backends satisfy the Display protocol.""" + + def test_terminal_display_is_display(self): + """TerminalDisplay satisfies Display protocol.""" + display = TerminalDisplay() + assert hasattr(display, "init") + assert hasattr(display, "show") + assert hasattr(display, "clear") + assert hasattr(display, "cleanup") + + def test_null_display_is_display(self): + """NullDisplay satisfies Display protocol.""" + display = NullDisplay() + assert hasattr(display, "init") + assert hasattr(display, "show") + assert hasattr(display, "clear") + assert hasattr(display, "cleanup") + + +class TestTerminalDisplay: + """Tests for TerminalDisplay class.""" + + def test_init_sets_dimensions(self): + """init stores terminal dimensions.""" + display = TerminalDisplay() + display.init(80, 24) + assert display.width == 80 + assert display.height == 24 + + def test_show_returns_none(self): + """show returns None after writing to stdout.""" + display = TerminalDisplay() + display.width = 80 + display.height = 24 + display.show(["line1", "line2"]) + + def test_clear_does_not_error(self): + """clear works without error.""" + display = TerminalDisplay() + display.clear() + + def test_cleanup_does_not_error(self): + """cleanup works without error.""" + display = TerminalDisplay() + display.cleanup() + + +class TestNullDisplay: + """Tests for NullDisplay class.""" + + def test_init_stores_dimensions(self): + """init stores dimensions.""" + display = NullDisplay() + display.init(100, 50) + assert display.width == 100 + assert display.height == 50 + + def test_show_does_nothing(self): + """show discards buffer without error.""" + display = NullDisplay() + display.show(["line1", "line2", "line3"]) + + def test_clear_does_nothing(self): + """clear does nothing.""" + display = NullDisplay() + display.clear() + + def test_cleanup_does_nothing(self): + """cleanup does nothing.""" + display = NullDisplay() + display.cleanup() diff --git a/tests/test_effects_controller.py b/tests/test_effects_controller.py new file mode 100644 index 0000000..fd17fe8 --- /dev/null +++ b/tests/test_effects_controller.py @@ -0,0 +1,117 @@ +""" +Tests for engine.effects.controller module. +""" + +from unittest.mock import MagicMock, patch + +from engine.effects.controller import ( + handle_effects_command, + set_effect_chain_ref, +) + + +class TestHandleEffectsCommand: + """Tests for handle_effects_command function.""" + + def test_list_effects(self): + """list command returns formatted effects list.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_plugin.config.enabled = True + mock_plugin.config.intensity = 0.5 + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + with patch("engine.effects.controller._get_effect_chain") as mock_chain: + mock_chain.return_value.get_order.return_value = ["noise"] + + result = handle_effects_command("/effects list") + + assert "noise: ON" in result + assert "intensity=0.5" in result + + def test_enable_effect(self): + """enable command calls registry.enable.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_registry.return_value.get.return_value = mock_plugin + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + result = handle_effects_command("/effects noise on") + + assert "Enabled: noise" in result + mock_registry.return_value.enable.assert_called_once_with("noise") + + def test_disable_effect(self): + """disable command calls registry.disable.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_registry.return_value.get.return_value = mock_plugin + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + result = handle_effects_command("/effects noise off") + + assert "Disabled: noise" in result + mock_registry.return_value.disable.assert_called_once_with("noise") + + def test_set_intensity(self): + """intensity command sets plugin intensity.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_plugin.config.intensity = 0.5 + mock_registry.return_value.get.return_value = mock_plugin + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + result = handle_effects_command("/effects noise intensity 0.8") + + assert "intensity to 0.8" in result + assert mock_plugin.config.intensity == 0.8 + + def test_invalid_intensity_range(self): + """intensity outside 0.0-1.0 returns error.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_registry.return_value.get.return_value = mock_plugin + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + result = handle_effects_command("/effects noise intensity 1.5") + + assert "between 0.0 and 1.0" in result + + def test_reorder_pipeline(self): + """reorder command calls chain.reorder.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_registry.return_value.list_all.return_value = {} + + with patch("engine.effects.controller._get_effect_chain") as mock_chain: + mock_chain_instance = MagicMock() + mock_chain_instance.reorder.return_value = True + mock_chain.return_value = mock_chain_instance + + result = handle_effects_command("/effects reorder noise,fade") + + assert "Reordered pipeline" in result + mock_chain_instance.reorder.assert_called_once_with(["noise", "fade"]) + + def test_unknown_command(self): + """unknown command returns error.""" + result = handle_effects_command("/unknown") + assert "Unknown command" in result + + def test_non_effects_command(self): + """non-effects command returns error.""" + result = handle_effects_command("not a command") + assert "Unknown command" in result + + +class TestSetEffectChainRef: + """Tests for set_effect_chain_ref function.""" + + def test_sets_global_ref(self): + """set_effect_chain_ref updates global reference.""" + mock_chain = MagicMock() + set_effect_chain_ref(mock_chain) + + from engine.effects.controller import _get_effect_chain + + result = _get_effect_chain() + assert result == mock_chain