diff --git a/engine/controller.py b/engine/controller.py new file mode 100644 index 0000000..98b24e5 --- /dev/null +++ b/engine/controller.py @@ -0,0 +1,46 @@ +""" +Stream controller - manages input sources and orchestrates the render stream. +""" + +from engine.config import Config, get_config +from engine.mic import MicMonitor +from engine.ntfy import NtfyPoller +from engine.scroll import stream + + +class StreamController: + """Controls the stream lifecycle - initializes sources and runs the stream.""" + + def __init__(self, config: Config | None = None): + self.config = config or get_config() + self.mic: MicMonitor | None = None + self.ntfy: NtfyPoller | None = None + + def initialize_sources(self) -> tuple[bool, bool]: + """Initialize microphone and ntfy sources. + + Returns: + (mic_ok, ntfy_ok) - success status for each source + """ + self.mic = MicMonitor(threshold_db=self.config.mic_threshold_db) + mic_ok = self.mic.start() if self.mic.available else False + + self.ntfy = NtfyPoller( + self.config.ntfy_topic, + reconnect_delay=self.config.ntfy_reconnect_delay, + display_secs=self.config.message_display_secs, + ) + ntfy_ok = self.ntfy.start() + + return bool(mic_ok), ntfy_ok + + def run(self, items: list) -> None: + """Run the stream with initialized sources.""" + if self.mic is None or self.ntfy is None: + self.initialize_sources() + stream(items, self.ntfy, self.mic) + + def cleanup(self) -> None: + """Clean up resources.""" + if self.mic: + self.mic.stop() diff --git a/engine/events.py b/engine/events.py new file mode 100644 index 0000000..d686285 --- /dev/null +++ b/engine/events.py @@ -0,0 +1,67 @@ +""" +Event types for the mainline application. +Defines the core events that flow through the system. +These types support a future migration to an event-driven architecture. +""" + +from dataclasses import dataclass +from datetime import datetime +from enum import Enum, auto + + +class EventType(Enum): + """Core event types in the mainline application.""" + + NEW_HEADLINE = auto() + FRAME_TICK = auto() + MIC_LEVEL = auto() + NTFY_MESSAGE = auto() + STREAM_START = auto() + STREAM_END = auto() + + +@dataclass +class HeadlineEvent: + """Event emitted when a new headline is ready for display.""" + + title: str + source: str + timestamp: str + language: str | None = None + + +@dataclass +class FrameTickEvent: + """Event emitted on each render frame.""" + + frame_number: int + timestamp: datetime + delta_seconds: float + + +@dataclass +class MicLevelEvent: + """Event emitted when microphone level changes significantly.""" + + db_level: float + excess_above_threshold: float + timestamp: datetime + + +@dataclass +class NtfyMessageEvent: + """Event emitted when an ntfy message is received.""" + + title: str + body: str + message_id: str | None = None + timestamp: datetime | None = None + + +@dataclass +class StreamEvent: + """Event emitted when stream starts or ends.""" + + event_type: EventType + headline_count: int = 0 + timestamp: datetime | None = None diff --git a/engine/frame.py b/engine/frame.py new file mode 100644 index 0000000..747d040 --- /dev/null +++ b/engine/frame.py @@ -0,0 +1,57 @@ +""" +Frame timing utilities — FPS control and precise timing. +""" + +import time + + +class FrameTimer: + """Frame timer for consistent render loop timing.""" + + def __init__(self, target_frame_dt: float = 0.05): + self.target_frame_dt = target_frame_dt + self._frame_count = 0 + self._start_time = time.monotonic() + self._last_frame_time = self._start_time + + @property + def fps(self) -> float: + """Current FPS based on elapsed frames.""" + elapsed = time.monotonic() - self._start_time + if elapsed > 0: + return self._frame_count / elapsed + return 0.0 + + def sleep_until_next_frame(self) -> float: + """Sleep to maintain target frame rate. Returns actual elapsed time.""" + now = time.monotonic() + elapsed = now - self._last_frame_time + self._last_frame_time = now + self._frame_count += 1 + + sleep_time = max(0, self.target_frame_dt - elapsed) + if sleep_time > 0: + time.sleep(sleep_time) + return elapsed + + def reset(self) -> None: + """Reset frame counter and start time.""" + self._frame_count = 0 + self._start_time = time.monotonic() + self._last_frame_time = self._start_time + + +def calculate_scroll_step( + scroll_dur: float, view_height: int, padding: int = 15 +) -> float: + """Calculate scroll step interval for smooth scrolling. + + Args: + scroll_dur: Duration in seconds for one headline to scroll through view + view_height: Terminal height in rows + padding: Extra rows for off-screen content + + Returns: + Time in seconds between scroll steps + """ + return scroll_dur / (view_height + padding) * 2 diff --git a/engine/layers.py b/engine/layers.py new file mode 100644 index 0000000..ebc53ef --- /dev/null +++ b/engine/layers.py @@ -0,0 +1,201 @@ +""" +Layer compositing — message overlay, ticker zone, firehose, noise. +Depends on: config, render, effects. +""" + +import random +import re +import time +from datetime import datetime + +from engine import config +from engine.effects import ( + fade_line, + firehose_line, + glitch_bar, + noise, + vis_trunc, +) +from engine.render import big_wrap, lr_gradient, lr_gradient_opposite +from engine.terminal import RST, W_COOL + +MSG_META = "\033[38;5;245m" +MSG_BORDER = "\033[2;38;5;37m" + + +def render_message_overlay( + msg: tuple[str, str, float] | None, + w: int, + h: int, + msg_cache: tuple, +) -> tuple[list[str], tuple]: + """Render ntfy message overlay. + + Args: + msg: (title, body, timestamp) or None + w: terminal width + h: terminal height + msg_cache: (cache_key, rendered_rows) for caching + + Returns: + (list of ANSI strings, updated cache) + """ + overlay = [] + if msg is None: + return overlay, msg_cache + + m_title, m_body, m_ts = msg + display_text = m_body or m_title or "(empty)" + display_text = re.sub(r"\s+", " ", display_text.upper()) + + cache_key = (display_text, w) + if msg_cache[0] != cache_key: + msg_rows = big_wrap(display_text, w - 4) + msg_cache = (cache_key, msg_rows) + else: + msg_rows = msg_cache[1] + + msg_rows = lr_gradient_opposite( + msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 + ) + + elapsed_s = int(time.monotonic() - m_ts) + remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) + ts_str = datetime.now().strftime("%H:%M:%S") + panel_h = len(msg_rows) + 2 + panel_top = max(0, (h - panel_h) // 2) + + row_idx = 0 + for mr in msg_rows: + ln = vis_trunc(mr, w) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") + row_idx += 1 + + meta_parts = [] + if m_title and m_title != m_body: + meta_parts.append(m_title) + meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") + meta = ( + " " + " \u00b7 ".join(meta_parts) + if len(meta_parts) > 1 + else " " + meta_parts[0] + ) + overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") + row_idx += 1 + + bar = "\u2500" * (w - 4) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") + + return overlay, msg_cache + + +def render_ticker_zone( + active: list, + scroll_cam: int, + ticker_h: int, + w: int, + noise_cache: dict, + grad_offset: float, +) -> tuple[list[str], dict]: + """Render the ticker scroll zone. + + Args: + active: list of (content_rows, color, canvas_y, meta_idx) + scroll_cam: camera position (viewport top) + ticker_h: height of ticker zone + w: terminal width + noise_cache: dict of cy -> noise string + grad_offset: gradient animation offset + + Returns: + (list of ANSI strings, updated noise_cache) + """ + buf = [] + top_zone = max(1, int(ticker_h * 0.25)) + bot_zone = max(1, int(ticker_h * 0.10)) + + def noise_at(cy): + if cy not in noise_cache: + noise_cache[cy] = noise(w) if random.random() < 0.15 else None + return noise_cache[cy] + + for r in range(ticker_h): + scr_row = r + 1 + cy = scroll_cam + r + top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 + bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 + row_fade = min(top_f, bot_f) + drawn = False + + for content, hc, by, midx in active: + cr = cy - by + if 0 <= cr < len(content): + raw = content[cr] + if cr != midx: + colored = lr_gradient([raw], grad_offset)[0] + else: + colored = raw + ln = vis_trunc(colored, w) + if row_fade < 1.0: + ln = fade_line(ln, row_fade) + + if cr == midx: + buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") + elif ln.strip(): + buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") + else: + buf.append(f"\033[{scr_row};1H\033[K") + drawn = True + break + + if not drawn: + n = noise_at(cy) + if row_fade < 1.0 and n: + n = fade_line(n, row_fade) + if n: + buf.append(f"\033[{scr_row};1H{n}") + else: + buf.append(f"\033[{scr_row};1H\033[K") + + return buf, noise_cache + + +def apply_glitch( + buf: list[str], + ticker_buf_start: int, + mic_excess: float, + w: int, +) -> list[str]: + """Apply glitch effect to ticker buffer. + + Args: + buf: current buffer + ticker_buf_start: index where ticker starts in buffer + mic_excess: mic level above threshold + w: terminal width + + Returns: + Updated buffer with glitches applied + """ + glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) + n_hits = 4 + int(mic_excess / 2) + ticker_buf_len = len(buf) - ticker_buf_start + + if random.random() < glitch_prob and ticker_buf_len > 0: + for _ in range(min(n_hits, ticker_buf_len)): + gi = random.randint(0, ticker_buf_len - 1) + scr_row = gi + 1 + buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" + + return buf + + +def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: + """Render firehose strip at bottom of screen.""" + buf = [] + if fh > 0: + for fr in range(fh): + scr_row = h - fh + fr + 1 + fline = firehose_line(items, w) + buf.append(f"\033[{scr_row};1H{fline}\033[K") + return buf diff --git a/engine/scroll.py b/engine/scroll.py index 4f190f9..dcb96f7 100644 --- a/engine/scroll.py +++ b/engine/scroll.py @@ -1,25 +1,22 @@ """ Render engine — ticker content, scroll motion, message panel, and firehose overlay. -Depends on: config, terminal, render, effects, ntfy, mic. +Orchestrates viewport, frame timing, and layers. """ import random -import re import sys import time -from datetime import datetime from engine import config -from engine.effects import ( - fade_line, - firehose_line, - glitch_bar, - next_headline, - noise, - vis_trunc, +from engine.frame import calculate_scroll_step +from engine.layers import ( + apply_glitch, + render_firehose, + render_message_overlay, + render_ticker_zone, ) -from engine.render import big_wrap, lr_gradient, make_block, msg_gradient -from engine.terminal import CLR, RST, W_COOL, th, tw +from engine.terminal import CLR +from engine.viewport import th, tw def stream(items, ntfy_poller, mic_monitor): @@ -35,33 +32,16 @@ def stream(items, ntfy_poller, mic_monitor): w, h = tw(), th() fh = config.FIREHOSE_H if config.FIREHOSE else 0 - ticker_view_h = h - fh # reserve fixed firehose strip at bottom - GAP = 3 # blank rows between headlines - scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2 + ticker_view_h = h - fh + GAP = 3 + scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) - # Taxonomy: - # - message: centered ntfy overlay panel - # - ticker: large headline text content - # - scroll: upward camera motion applied to ticker content - # - firehose: fixed carriage-return style strip pinned at bottom - # Active ticker blocks: (content_rows, color, canvas_y, meta_idx) active = [] - scroll_cam = 0 # viewport top in virtual canvas coords - ticker_next_y = ( - ticker_view_h # canvas-y where next block starts (off-screen bottom) - ) + scroll_cam = 0 + ticker_next_y = ticker_view_h noise_cache = {} scroll_motion_accum = 0.0 - - def _noise_at(cy): - if cy not in noise_cache: - noise_cache[cy] = noise(w) if random.random() < 0.15 else None - return noise_cache[cy] - - # Message color: bright cyan/white — distinct from headline greens - MSG_META = "\033[38;5;245m" # cool grey - MSG_BORDER = "\033[2;38;5;37m" # dim teal - _msg_cache = (None, None) # (cache_key, rendered_rows) + msg_cache = (None, None) while queued < config.HEADLINE_LIMIT or active: t0 = time.monotonic() @@ -69,80 +49,30 @@ def stream(items, ntfy_poller, mic_monitor): fh = config.FIREHOSE_H if config.FIREHOSE else 0 ticker_view_h = h - fh - # ── Check for ntfy message ──────────────────────── - msg_h = 0 - msg_overlay = [] msg = ntfy_poller.get_active_message() + msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache) buf = [] - if msg is not None: - m_title, m_body, m_ts = msg - # ── Message overlay: centered in the viewport ── - display_text = m_body or m_title or "(empty)" - display_text = re.sub(r"\s+", " ", display_text.upper()) - cache_key = (display_text, w) - if _msg_cache[0] != cache_key: - msg_rows = big_wrap(display_text, w - 4) - _msg_cache = (cache_key, msg_rows) - else: - msg_rows = _msg_cache[1] - msg_rows = msg_gradient( - msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 - ) - # Layout: rendered text + meta + border - elapsed_s = int(time.monotonic() - m_ts) - remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) - ts_str = datetime.now().strftime("%H:%M:%S") - panel_h = len(msg_rows) + 2 # meta + border - panel_top = max(0, (h - panel_h) // 2) - row_idx = 0 - for mr in msg_rows: - ln = vis_trunc(mr, w) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K" - ) - row_idx += 1 - # Meta line: title (if distinct) + source + countdown - meta_parts = [] - if m_title and m_title != m_body: - meta_parts.append(m_title) - meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") - meta = ( - " " + " \u00b7 ".join(meta_parts) - if len(meta_parts) > 1 - else " " + meta_parts[0] - ) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K" - ) - row_idx += 1 - # Border — constant boundary under message panel - bar = "\u2500" * (w - 4) - msg_overlay.append( - f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K" - ) + ticker_h = ticker_view_h - # Ticker draws above the fixed firehose strip; message is a centered overlay. - ticker_h = ticker_view_h - msg_h - - # ── Ticker content + scroll motion (always runs) ── scroll_motion_accum += config.FRAME_DT while scroll_motion_accum >= scroll_step_interval: scroll_motion_accum -= scroll_step_interval scroll_cam += 1 - # Enqueue new headlines when room at the bottom while ( ticker_next_y < scroll_cam + ticker_view_h + 10 and queued < config.HEADLINE_LIMIT ): + from engine.effects import next_headline + from engine.render import make_block + t, src, ts = next_headline(pool, items, seen) ticker_content, hc, midx = make_block(t, src, ts, w) active.append((ticker_content, hc, ticker_next_y, midx)) ticker_next_y += len(ticker_content) + GAP queued += 1 - # Prune off-screen blocks and stale noise active = [ (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam ] @@ -150,69 +80,26 @@ def stream(items, ntfy_poller, mic_monitor): if k < scroll_cam: del noise_cache[k] - # Draw ticker zone (above fixed firehose strip) - top_zone = max(1, int(ticker_h * 0.25)) - bot_zone = max(1, int(ticker_h * 0.10)) grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0 - ticker_buf_start = len(buf) # track where ticker rows start in buf - for r in range(ticker_h): - scr_row = r + 1 # 1-indexed ANSI screen row - cy = scroll_cam + r - top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 - bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 - row_fade = min(top_f, bot_f) - drawn = False - for content, hc, by, midx in active: - cr = cy - by - if 0 <= cr < len(content): - raw = content[cr] - if cr != midx: - colored = lr_gradient([raw], grad_offset)[0] - else: - colored = raw - ln = vis_trunc(colored, w) - if row_fade < 1.0: - ln = fade_line(ln, row_fade) - if cr == midx: - buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") - elif ln.strip(): - buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") - else: - buf.append(f"\033[{scr_row};1H\033[K") - drawn = True - break - if not drawn: - n = _noise_at(cy) - if row_fade < 1.0 and n: - n = fade_line(n, row_fade) - if n: - buf.append(f"\033[{scr_row};1H{n}") - else: - buf.append(f"\033[{scr_row};1H\033[K") + ticker_buf_start = len(buf) + + ticker_buf, noise_cache = render_ticker_zone( + active, scroll_cam, ticker_h, w, noise_cache, grad_offset + ) + buf.extend(ticker_buf) - # Glitch — base rate + mic-reactive spikes (ticker zone only) mic_excess = mic_monitor.excess - glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) - n_hits = 4 + int(mic_excess / 2) - ticker_buf_len = len(buf) - ticker_buf_start - if random.random() < glitch_prob and ticker_buf_len > 0: - for _ in range(min(n_hits, ticker_buf_len)): - gi = random.randint(0, ticker_buf_len - 1) - scr_row = gi + 1 - buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" + buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) + + firehose_buf = render_firehose(items, w, fh, h) + buf.extend(firehose_buf) - if config.FIREHOSE and fh > 0: - for fr in range(fh): - scr_row = h - fh + fr + 1 - fline = firehose_line(items, w) - buf.append(f"\033[{scr_row};1H{fline}\033[K") if msg_overlay: buf.extend(msg_overlay) sys.stdout.buffer.write("".join(buf).encode()) sys.stdout.flush() - # Precise frame timing elapsed = time.monotonic() - t0 time.sleep(max(0, config.FRAME_DT - elapsed)) diff --git a/engine/viewport.py b/engine/viewport.py new file mode 100644 index 0000000..d89b6a8 --- /dev/null +++ b/engine/viewport.py @@ -0,0 +1,37 @@ +""" +Viewport utilities — terminal dimensions and ANSI positioning helpers. +No internal dependencies. +""" + +import os + + +def tw() -> int: + """Get terminal width (columns).""" + try: + return os.get_terminal_size().columns + except Exception: + return 80 + + +def th() -> int: + """Get terminal height (lines).""" + try: + return os.get_terminal_size().lines + except Exception: + return 24 + + +def move_to(row: int, col: int = 1) -> str: + """Generate ANSI escape to move cursor to row, col (1-indexed).""" + return f"\033[{row};{col}H" + + +def clear_screen() -> str: + """Clear screen and move cursor to home.""" + return "\033[2J\033[H" + + +def clear_line() -> str: + """Clear current line.""" + return "\033[K" diff --git a/tests/test_controller.py b/tests/test_controller.py new file mode 100644 index 0000000..96ef02d --- /dev/null +++ b/tests/test_controller.py @@ -0,0 +1,85 @@ +""" +Tests for engine.controller module. +""" + +from unittest.mock import MagicMock, patch + +from engine import config +from engine.controller import StreamController + + +class TestStreamController: + """Tests for StreamController class.""" + + def test_init_default_config(self): + """StreamController initializes with default config.""" + controller = StreamController() + assert controller.config is not None + assert isinstance(controller.config, config.Config) + + def test_init_custom_config(self): + """StreamController accepts custom config.""" + custom_config = config.Config(headline_limit=500) + controller = StreamController(config=custom_config) + assert controller.config.headline_limit == 500 + + def test_init_sources_none_by_default(self): + """Sources are None until initialized.""" + controller = StreamController() + assert controller.mic is None + assert controller.ntfy is None + + @patch("engine.controller.MicMonitor") + @patch("engine.controller.NtfyPoller") + def test_initialize_sources(self, mock_ntfy, mock_mic): + """initialize_sources creates mic and ntfy instances.""" + mock_mic_instance = MagicMock() + mock_mic_instance.available = True + mock_mic_instance.start.return_value = True + mock_mic.return_value = mock_mic_instance + + mock_ntfy_instance = MagicMock() + mock_ntfy_instance.start.return_value = True + mock_ntfy.return_value = mock_ntfy_instance + + controller = StreamController() + mic_ok, ntfy_ok = controller.initialize_sources() + + assert mic_ok is True + assert ntfy_ok is True + assert controller.mic is not None + assert controller.ntfy is not None + + @patch("engine.controller.MicMonitor") + @patch("engine.controller.NtfyPoller") + def test_initialize_sources_mic_unavailable(self, mock_ntfy, mock_mic): + """initialize_sources handles unavailable mic.""" + mock_mic_instance = MagicMock() + mock_mic_instance.available = False + mock_mic.return_value = mock_mic_instance + + mock_ntfy_instance = MagicMock() + mock_ntfy_instance.start.return_value = True + mock_ntfy.return_value = mock_ntfy_instance + + controller = StreamController() + mic_ok, ntfy_ok = controller.initialize_sources() + + assert mic_ok is False + assert ntfy_ok is True + + +class TestStreamControllerCleanup: + """Tests for StreamController cleanup.""" + + @patch("engine.controller.MicMonitor") + def test_cleanup_stops_mic(self, mock_mic): + """cleanup stops the microphone if running.""" + mock_mic_instance = MagicMock() + mock_mic.return_value = mock_mic_instance + + controller = StreamController() + controller.mic = mock_mic_instance + controller.cleanup() + + mock_mic_instance.stop.assert_called_once() diff --git a/tests/test_events.py b/tests/test_events.py new file mode 100644 index 0000000..ea5f4ed --- /dev/null +++ b/tests/test_events.py @@ -0,0 +1,112 @@ +""" +Tests for engine.events module. +""" + +from datetime import datetime + +from engine import events + + +class TestEventType: + """Tests for EventType enum.""" + + def test_event_types_exist(self): + """All expected event types exist.""" + assert hasattr(events.EventType, "NEW_HEADLINE") + assert hasattr(events.EventType, "FRAME_TICK") + assert hasattr(events.EventType, "MIC_LEVEL") + assert hasattr(events.EventType, "NTFY_MESSAGE") + assert hasattr(events.EventType, "STREAM_START") + assert hasattr(events.EventType, "STREAM_END") + + +class TestHeadlineEvent: + """Tests for HeadlineEvent dataclass.""" + + def test_create_headline_event(self): + """HeadlineEvent can be created with required fields.""" + e = events.HeadlineEvent( + title="Test Headline", + source="Test Source", + timestamp="12:00", + ) + assert e.title == "Test Headline" + assert e.source == "Test Source" + assert e.timestamp == "12:00" + + def test_headline_event_optional_language(self): + """HeadlineEvent supports optional language field.""" + e = events.HeadlineEvent( + title="Test", + source="Test", + timestamp="12:00", + language="ja", + ) + assert e.language == "ja" + + +class TestFrameTickEvent: + """Tests for FrameTickEvent dataclass.""" + + def test_create_frame_tick_event(self): + """FrameTickEvent can be created.""" + now = datetime.now() + e = events.FrameTickEvent( + frame_number=100, + timestamp=now, + delta_seconds=0.05, + ) + assert e.frame_number == 100 + assert e.timestamp == now + assert e.delta_seconds == 0.05 + + +class TestMicLevelEvent: + """Tests for MicLevelEvent dataclass.""" + + def test_create_mic_level_event(self): + """MicLevelEvent can be created.""" + now = datetime.now() + e = events.MicLevelEvent( + db_level=60.0, + excess_above_threshold=10.0, + timestamp=now, + ) + assert e.db_level == 60.0 + assert e.excess_above_threshold == 10.0 + + +class TestNtfyMessageEvent: + """Tests for NtfyMessageEvent dataclass.""" + + def test_create_ntfy_message_event(self): + """NtfyMessageEvent can be created with required fields.""" + e = events.NtfyMessageEvent( + title="Test Title", + body="Test Body", + ) + assert e.title == "Test Title" + assert e.body == "Test Body" + assert e.message_id is None + + def test_ntfy_message_event_with_id(self): + """NtfyMessageEvent supports optional message_id.""" + e = events.NtfyMessageEvent( + title="Test", + body="Test", + message_id="abc123", + ) + assert e.message_id == "abc123" + + +class TestStreamEvent: + """Tests for StreamEvent dataclass.""" + + def test_create_stream_event(self): + """StreamEvent can be created.""" + e = events.StreamEvent( + event_type=events.EventType.STREAM_START, + headline_count=100, + ) + assert e.event_type == events.EventType.STREAM_START + assert e.headline_count == 100 diff --git a/tests/test_frame.py b/tests/test_frame.py new file mode 100644 index 0000000..2c59b85 --- /dev/null +++ b/tests/test_frame.py @@ -0,0 +1,63 @@ +""" +Tests for engine.frame module. +""" + +import time + +from engine.frame import FrameTimer, calculate_scroll_step + + +class TestFrameTimer: + """Tests for FrameTimer class.""" + + def test_init_default(self): + """FrameTimer initializes with default values.""" + timer = FrameTimer() + assert timer.target_frame_dt == 0.05 + assert timer.fps >= 0 + + def test_init_custom(self): + """FrameTimer accepts custom frame duration.""" + timer = FrameTimer(target_frame_dt=0.1) + assert timer.target_frame_dt == 0.1 + + def test_fps_calculation(self): + """FrameTimer calculates FPS correctly.""" + timer = FrameTimer() + timer._frame_count = 10 + timer._start_time = time.monotonic() - 1.0 + assert timer.fps >= 9.0 + + def test_reset(self): + """FrameTimer.reset() clears frame count.""" + timer = FrameTimer() + timer._frame_count = 100 + timer.reset() + assert timer._frame_count == 0 + + +class TestCalculateScrollStep: + """Tests for calculate_scroll_step function.""" + + def test_basic_calculation(self): + """calculate_scroll_step returns positive value.""" + result = calculate_scroll_step(5.0, 24) + assert result > 0 + + def test_with_padding(self): + """calculate_scroll_step respects padding parameter.""" + without_padding = calculate_scroll_step(5.0, 24, padding=0) + with_padding = calculate_scroll_step(5.0, 24, padding=15) + assert with_padding < without_padding + + def test_larger_view_slower_scroll(self): + """Larger view height results in slower scroll steps.""" + small = calculate_scroll_step(5.0, 10) + large = calculate_scroll_step(5.0, 50) + assert large < small + + def test_longer_duration_slower_scroll(self): + """Longer scroll duration results in slower scroll steps.""" + fast = calculate_scroll_step(2.0, 24) + slow = calculate_scroll_step(10.0, 24) + assert slow > fast diff --git a/tests/test_layers.py b/tests/test_layers.py new file mode 100644 index 0000000..afe9c07 --- /dev/null +++ b/tests/test_layers.py @@ -0,0 +1,96 @@ +""" +Tests for engine.layers module. +""" + +import time + +from engine import layers + + +class TestRenderMessageOverlay: + """Tests for render_message_overlay function.""" + + def test_no_message_returns_empty(self): + """Returns empty list when msg is None.""" + result, cache = layers.render_message_overlay(None, 80, 24, (None, None)) + assert result == [] + assert cache[0] is None + + def test_message_returns_overlay_lines(self): + """Returns non-empty list when message is present.""" + msg = ("Test Title", "Test Body", time.monotonic()) + result, cache = layers.render_message_overlay(msg, 80, 24, (None, None)) + assert len(result) > 0 + assert cache[0] is not None + + def test_cache_key_changes_with_text(self): + """Cache key changes when message text changes.""" + msg1 = ("Title1", "Body1", time.monotonic()) + msg2 = ("Title2", "Body2", time.monotonic()) + + _, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None)) + _, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1) + + assert cache1[0] != cache2[0] + + def test_cache_reuse_avoids_recomputation(self): + """Cache is returned when same message is passed (interface test).""" + msg = ("Same Title", "Same Body", time.monotonic()) + + result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None)) + result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1) + + assert len(result1) > 0 + assert len(result2) > 0 + assert cache1[0] == cache2[0] + + +class TestRenderFirehose: + """Tests for render_firehose function.""" + + def test_no_firehose_returns_empty(self): + """Returns empty list when firehose height is 0.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 0, 24) + assert result == [] + + def test_firehose_returns_lines(self): + """Returns lines when firehose height > 0.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 4, 24) + assert len(result) == 4 + + def test_firehose_includes_ansi_escapes(self): + """Returns lines containing ANSI escape sequences.""" + items = [("Headline", "Source", "12:00")] + result = layers.render_firehose(items, 80, 1, 24) + assert "\033[" in result[0] + + +class TestApplyGlitch: + """Tests for apply_glitch function.""" + + def test_empty_buffer_unchanged(self): + """Empty buffer is returned unchanged.""" + result = layers.apply_glitch([], 0, 0.0, 80) + assert result == [] + + def test_buffer_length_preserved(self): + """Buffer length is preserved after glitch application.""" + buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)] + result = layers.apply_glitch(buf, 0, 0.5, 80) + assert len(result) == len(buf) + + +class TestRenderTickerZone: + """Tests for render_ticker_zone function - focusing on interface.""" + + def test_returns_list(self): + """Returns a list of strings.""" + result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) + assert isinstance(result, list) + + def test_returns_dict_for_cache(self): + """Returns a dict for the noise cache.""" + result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) + assert isinstance(cache, dict) diff --git a/tests/test_viewport.py b/tests/test_viewport.py new file mode 100644 index 0000000..2338a7b --- /dev/null +++ b/tests/test_viewport.py @@ -0,0 +1,64 @@ +""" +Tests for engine.viewport module. +""" + +from engine import viewport + + +class TestViewportTw: + """Tests for tw() function.""" + + def test_tw_returns_int(self): + """tw() returns an integer.""" + result = viewport.tw() + assert isinstance(result, int) + + def test_tw_positive(self): + """tw() returns a positive value.""" + assert viewport.tw() > 0 + + +class TestViewportTh: + """Tests for th() function.""" + + def test_th_returns_int(self): + """th() returns an integer.""" + result = viewport.th() + assert isinstance(result, int) + + def test_th_positive(self): + """th() returns a positive value.""" + assert viewport.th() > 0 + + +class TestViewportMoveTo: + """Tests for move_to() function.""" + + def test_move_to_format(self): + """move_to() returns correctly formatted ANSI escape.""" + result = viewport.move_to(5, 10) + assert result == "\033[5;10H" + + def test_move_to_default_col(self): + """move_to() defaults to column 1.""" + result = viewport.move_to(5) + assert result == "\033[5;1H" + + +class TestViewportClearScreen: + """Tests for clear_screen() function.""" + + def test_clear_screen_format(self): + """clear_screen() returns clear screen ANSI escape.""" + result = viewport.clear_screen() + assert "\033[2J" in result + assert "\033[H" in result + + +class TestViewportClearLine: + """Tests for clear_line() function.""" + + def test_clear_line_format(self): + """clear_line() returns clear line ANSI escape.""" + result = viewport.clear_line() + assert result == "\033[K"