Merge branch 'main' into feat/code-scroll

This commit is contained in:
2026-03-16 09:16:36 +00:00
46 changed files with 4058 additions and 198 deletions

236
tests/fixtures/__init__.py vendored Normal file
View File

@@ -0,0 +1,236 @@
"""
Pytest fixtures for mocking external dependencies (network, filesystem).
"""
import json
from unittest.mock import MagicMock
import pytest
@pytest.fixture
def mock_feed_response():
"""Mock RSS feed response data."""
return b"""<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>Test Feed</title>
<link>https://example.com</link>
<item>
<title>Test Headline One</title>
<pubDate>Sat, 15 Mar 2025 12:00:00 GMT</pubDate>
</item>
<item>
<title>Test Headline Two</title>
<pubDate>Sat, 15 Mar 2025 11:00:00 GMT</pubDate>
</item>
<item>
<title>Sports: Team Wins Championship</title>
<pubDate>Sat, 15 Mar 2025 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>"""
@pytest.fixture
def mock_gutenberg_response():
"""Mock Project Gutenberg text response."""
return """Project Gutenberg's Collection, by Various
*** START OF SOME TEXT ***
This is a test poem with multiple lines
that should be parsed as stanzas.
Another stanza here with different content
and more lines to test the parsing logic.
Yet another stanza for variety
in the test data.
*** END OF SOME TEXT ***"""
@pytest.fixture
def mock_gutenberg_empty():
"""Mock Gutenberg response with no valid stanzas."""
return """Project Gutenberg's Collection
*** START OF TEXT ***
THIS IS ALL CAPS AND SHOULD BE SKIPPED
I.
*** END OF TEXT ***"""
@pytest.fixture
def mock_ntfy_message():
"""Mock ntfy.sh SSE message."""
return json.dumps(
{
"id": "test123",
"event": "message",
"title": "Test Title",
"message": "Test message body",
"time": 1234567890,
}
).encode()
@pytest.fixture
def mock_ntfy_keepalive():
"""Mock ntfy.sh keepalive message."""
return b'data: {"event":"keepalive"}\n\n'
@pytest.fixture
def mock_google_translate_response():
"""Mock Google Translate API response."""
return json.dumps(
[
[["Translated text", "Original text", None, 0.8], None, "en"],
None,
None,
[],
[],
[],
[],
]
)
@pytest.fixture
def mock_feedparser():
"""Create a mock feedparser.parse function."""
def _mock(data):
mock_result = MagicMock()
mock_result.bozo = False
mock_result.entries = [
{
"title": "Test Headline",
"published_parsed": (2025, 3, 15, 12, 0, 0, 0, 0, 0),
},
{
"title": "Another Headline",
"updated_parsed": (2025, 3, 15, 11, 0, 0, 0, 0, 0),
},
]
return mock_result
return _mock
@pytest.fixture
def mock_urllib_open(mock_feed_response):
"""Create a mock urllib.request.urlopen that returns feed data."""
def _mock(url):
mock_response = MagicMock()
mock_response.read.return_value = mock_feed_response
return mock_response
return _mock
@pytest.fixture
def sample_items():
"""Sample items as returned by fetch module (title, source, timestamp)."""
return [
("Headline One", "Test Source", "12:00"),
("Headline Two", "Another Source", "11:30"),
("Headline Three", "Third Source", "10:45"),
]
@pytest.fixture
def sample_config():
"""Sample config for testing."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="news",
firehose=False,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)
@pytest.fixture
def poetry_config():
"""Sample config for poetry mode."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="poetry",
firehose=False,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)
@pytest.fixture
def firehose_config():
"""Sample config with firehose enabled."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="news",
firehose=True,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)

View File

@@ -7,6 +7,8 @@ import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from engine import config
@@ -160,3 +162,140 @@ class TestSetFontSelection:
config.set_font_selection(font_path=None, font_index=None)
assert original_path == config.FONT_PATH
assert original_index == config.FONT_INDEX
class TestConfigDataclass:
"""Tests for Config dataclass."""
def test_config_has_required_fields(self):
"""Config has all required fields."""
c = config.Config()
assert hasattr(c, "headline_limit")
assert hasattr(c, "feed_timeout")
assert hasattr(c, "mic_threshold_db")
assert hasattr(c, "mode")
assert hasattr(c, "firehose")
assert hasattr(c, "ntfy_topic")
assert hasattr(c, "ntfy_reconnect_delay")
assert hasattr(c, "message_display_secs")
assert hasattr(c, "font_dir")
assert hasattr(c, "font_path")
assert hasattr(c, "font_index")
assert hasattr(c, "font_picker")
assert hasattr(c, "font_sz")
assert hasattr(c, "render_h")
assert hasattr(c, "ssaa")
assert hasattr(c, "scroll_dur")
assert hasattr(c, "frame_dt")
assert hasattr(c, "firehose_h")
assert hasattr(c, "grad_speed")
assert hasattr(c, "glitch_glyphs")
assert hasattr(c, "kata_glyphs")
assert hasattr(c, "script_fonts")
def test_config_defaults(self):
"""Config has sensible defaults."""
c = config.Config()
assert c.headline_limit == 1000
assert c.feed_timeout == 10
assert c.mic_threshold_db == 50
assert c.mode == "news"
assert c.firehose is False
assert c.ntfy_reconnect_delay == 5
assert c.message_display_secs == 30
def test_config_is_immutable(self):
"""Config is frozen (immutable)."""
c = config.Config()
with pytest.raises(AttributeError):
c.headline_limit = 500 # type: ignore
def test_config_custom_values(self):
"""Config accepts custom values."""
c = config.Config(
headline_limit=500,
mode="poetry",
firehose=True,
ntfy_topic="https://ntfy.sh/test",
)
assert c.headline_limit == 500
assert c.mode == "poetry"
assert c.firehose is True
assert c.ntfy_topic == "https://ntfy.sh/test"
class TestConfigFromArgs:
"""Tests for Config.from_args method."""
def test_from_args_defaults(self):
"""from_args creates config with defaults from empty argv."""
c = config.Config.from_args(["prog"])
assert c.mode == "news"
assert c.firehose is False
assert c.font_picker is True
def test_from_args_poetry_mode(self):
"""from_args detects --poetry flag."""
c = config.Config.from_args(["prog", "--poetry"])
assert c.mode == "poetry"
def test_from_args_poetry_short_flag(self):
"""from_args detects -p short flag."""
c = config.Config.from_args(["prog", "-p"])
assert c.mode == "poetry"
def test_from_args_firehose(self):
"""from_args detects --firehose flag."""
c = config.Config.from_args(["prog", "--firehose"])
assert c.firehose is True
def test_from_args_no_font_picker(self):
"""from_args detects --no-font-picker flag."""
c = config.Config.from_args(["prog", "--no-font-picker"])
assert c.font_picker is False
def test_from_args_font_index(self):
"""from_args parses --font-index."""
c = config.Config.from_args(["prog", "--font-index", "3"])
assert c.font_index == 3
class TestGetSetConfig:
"""Tests for get_config and set_config functions."""
def test_get_config_returns_config(self):
"""get_config returns a Config instance."""
c = config.get_config()
assert isinstance(c, config.Config)
def test_set_config_allows_injection(self):
"""set_config allows injecting a custom config."""
custom = config.Config(mode="poetry", headline_limit=100)
config.set_config(custom)
assert config.get_config().mode == "poetry"
assert config.get_config().headline_limit == 100
def test_set_config_then_get_config(self):
"""set_config followed by get_config returns the set config."""
original = config.get_config()
test_config = config.Config(headline_limit=42)
config.set_config(test_config)
result = config.get_config()
assert result.headline_limit == 42
config.set_config(original)
class TestPlatformFontPaths:
"""Tests for platform font path detection."""
def test_get_platform_font_paths_returns_dict(self):
"""_get_platform_font_paths returns a dictionary."""
fonts = config._get_platform_font_paths()
assert isinstance(fonts, dict)
def test_platform_font_paths_common_languages(self):
"""Common language font mappings exist."""
fonts = config._get_platform_font_paths()
common = {"ja", "zh-cn", "ko", "ru", "ar", "hi"}
found = set(fonts.keys()) & common
assert len(found) > 0

117
tests/test_controller.py Normal file
View File

@@ -0,0 +1,117 @@
"""
Tests for engine.controller module.
"""
from unittest.mock import MagicMock, patch
from engine import config
from engine.controller import StreamController
class TestStreamController:
"""Tests for StreamController class."""
def test_init_default_config(self):
"""StreamController initializes with default config."""
controller = StreamController()
assert controller.config is not None
assert isinstance(controller.config, config.Config)
def test_init_custom_config(self):
"""StreamController accepts custom config."""
custom_config = config.Config(headline_limit=500)
controller = StreamController(config=custom_config)
assert controller.config.headline_limit == 500
def test_init_sources_none_by_default(self):
"""Sources are None until initialized."""
controller = StreamController()
assert controller.mic is None
assert controller.ntfy is None
@patch("engine.controller.MicMonitor")
@patch("engine.controller.NtfyPoller")
def test_initialize_sources(self, mock_ntfy, mock_mic):
"""initialize_sources creates mic and ntfy instances."""
mock_mic_instance = MagicMock()
mock_mic_instance.available = True
mock_mic_instance.start.return_value = True
mock_mic.return_value = mock_mic_instance
mock_ntfy_instance = MagicMock()
mock_ntfy_instance.start.return_value = True
mock_ntfy.return_value = mock_ntfy_instance
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
assert mic_ok is True
assert ntfy_ok is True
assert controller.mic is not None
assert controller.ntfy is not None
@patch("engine.controller.MicMonitor")
@patch("engine.controller.NtfyPoller")
def test_initialize_sources_mic_unavailable(self, mock_ntfy, mock_mic):
"""initialize_sources handles unavailable mic."""
mock_mic_instance = MagicMock()
mock_mic_instance.available = False
mock_mic.return_value = mock_mic_instance
mock_ntfy_instance = MagicMock()
mock_ntfy_instance.start.return_value = True
mock_ntfy.return_value = mock_ntfy_instance
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
assert mic_ok is False
assert ntfy_ok is True
class TestStreamControllerCleanup:
"""Tests for StreamController cleanup."""
@patch("engine.controller.MicMonitor")
def test_cleanup_stops_mic(self, mock_mic):
"""cleanup stops the microphone if running."""
mock_mic_instance = MagicMock()
mock_mic.return_value = mock_mic_instance
controller = StreamController()
controller.mic = mock_mic_instance
controller.cleanup()
mock_mic_instance.stop.assert_called_once()
class TestStreamControllerWarmup:
"""Tests for StreamController topic warmup."""
def test_warmup_topics_idempotent(self):
"""warmup_topics can be called multiple times."""
StreamController._topics_warmed = False
with patch("urllib.request.urlopen") as mock_urlopen:
StreamController.warmup_topics()
StreamController.warmup_topics()
assert mock_urlopen.call_count >= 3
def test_warmup_topics_sets_flag(self):
"""warmup_topics sets the warmed flag."""
StreamController._topics_warmed = False
with patch("urllib.request.urlopen"):
StreamController.warmup_topics()
assert StreamController._topics_warmed is True
def test_warmup_topics_skips_after_first(self):
"""warmup_topics skips after first call."""
StreamController._topics_warmed = True
with patch("urllib.request.urlopen") as mock_urlopen:
StreamController.warmup_topics()
mock_urlopen.assert_not_called()

79
tests/test_display.py Normal file
View File

@@ -0,0 +1,79 @@
"""
Tests for engine.display module.
"""
from engine.display import NullDisplay, TerminalDisplay
class TestDisplayProtocol:
"""Test that display backends satisfy the Display protocol."""
def test_terminal_display_is_display(self):
"""TerminalDisplay satisfies Display protocol."""
display = TerminalDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
def test_null_display_is_display(self):
"""NullDisplay satisfies Display protocol."""
display = NullDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
class TestTerminalDisplay:
"""Tests for TerminalDisplay class."""
def test_init_sets_dimensions(self):
"""init stores terminal dimensions."""
display = TerminalDisplay()
display.init(80, 24)
assert display.width == 80
assert display.height == 24
def test_show_returns_none(self):
"""show returns None after writing to stdout."""
display = TerminalDisplay()
display.width = 80
display.height = 24
display.show(["line1", "line2"])
def test_clear_does_not_error(self):
"""clear works without error."""
display = TerminalDisplay()
display.clear()
def test_cleanup_does_not_error(self):
"""cleanup works without error."""
display = TerminalDisplay()
display.cleanup()
class TestNullDisplay:
"""Tests for NullDisplay class."""
def test_init_stores_dimensions(self):
"""init stores dimensions."""
display = NullDisplay()
display.init(100, 50)
assert display.width == 100
assert display.height == 50
def test_show_does_nothing(self):
"""show discards buffer without error."""
display = NullDisplay()
display.show(["line1", "line2", "line3"])
def test_clear_does_nothing(self):
"""clear does nothing."""
display = NullDisplay()
display.clear()
def test_cleanup_does_nothing(self):
"""cleanup does nothing."""
display = NullDisplay()
display.cleanup()

427
tests/test_effects.py Normal file
View File

@@ -0,0 +1,427 @@
"""
Tests for engine.effects module.
"""
from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry
class MockEffect:
name = "mock"
config = EffectConfig(enabled=True, intensity=1.0)
def __init__(self):
self.processed = False
self.last_ctx = None
def process(self, buf, ctx):
self.processed = True
self.last_ctx = ctx
return buf + ["processed"]
def configure(self, config):
self.config = config
class TestEffectConfig:
def test_defaults(self):
cfg = EffectConfig()
assert cfg.enabled is True
assert cfg.intensity == 1.0
assert cfg.params == {}
def test_custom_values(self):
cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"})
assert cfg.enabled is False
assert cfg.intensity == 0.5
assert cfg.params == {"key": "value"}
class TestEffectContext:
def test_defaults(self):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
assert ctx.terminal_width == 80
assert ctx.terminal_height == 24
assert ctx.ticker_height == 20
assert ctx.items == []
def test_with_items(self):
items = [("Title", "Source", "12:00")]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
items=items,
)
assert ctx.items == items
class TestEffectRegistry:
def test_init_empty(self):
registry = EffectRegistry()
assert len(registry.list_all()) == 0
def test_register(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
assert "mock" in registry.list_all()
def test_get(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
retrieved = registry.get("mock")
assert retrieved is effect
def test_get_nonexistent(self):
registry = EffectRegistry()
assert registry.get("nonexistent") is None
def test_enable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = False
registry.register(effect)
registry.enable("mock")
assert effect.config.enabled is True
def test_disable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
registry.disable("mock")
assert effect.config.enabled is False
def test_list_enabled(self):
registry = EffectRegistry()
class EnabledEffect:
name = "enabled_effect"
config = EffectConfig(enabled=True, intensity=1.0)
class DisabledEffect:
name = "disabled_effect"
config = EffectConfig(enabled=False, intensity=1.0)
registry.register(EnabledEffect())
registry.register(DisabledEffect())
enabled = registry.list_enabled()
assert len(enabled) == 1
assert enabled[0].name == "enabled_effect"
def test_configure(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
new_config = EffectConfig(enabled=False, intensity=0.3)
registry.configure("mock", new_config)
assert effect.config.enabled is False
assert effect.config.intensity == 0.3
def test_is_enabled(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
assert registry.is_enabled("mock") is True
assert registry.is_enabled("nonexistent") is False
class TestEffectChain:
def test_init(self):
registry = EffectRegistry()
chain = EffectChain(registry)
assert chain.get_order() == []
def test_set_order(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
registry.register(effect1)
registry.register(effect2)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2"])
assert chain.get_order() == ["effect1", "effect2"]
def test_add_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.add_effect("test_effect")
assert "test_effect" in chain.get_order()
def test_add_effect_invalid(self):
registry = EffectRegistry()
chain = EffectChain(registry)
result = chain.add_effect("nonexistent")
assert result is False
def test_remove_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
chain.remove_effect("test_effect")
assert "test_effect" not in chain.get_order()
def test_reorder(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
effect3 = MockEffect()
effect3.name = "effect3"
registry.register(effect1)
registry.register(effect2)
registry.register(effect3)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2", "effect3"])
result = chain.reorder(["effect3", "effect1", "effect2"])
assert result is True
assert chain.get_order() == ["effect3", "effect1", "effect2"]
def test_reorder_invalid(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "effect1"
registry.register(effect)
chain = EffectChain(registry)
result = chain.reorder(["effect1", "nonexistent"])
assert result is False
def test_process_empty_chain(self):
registry = EffectRegistry()
chain = EffectChain(registry)
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == buf
def test_process_with_effects(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1", "line2", "processed"]
assert effect.processed is True
assert effect.last_ctx is ctx
def test_process_disabled_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
effect.config.enabled = False
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1"]
assert effect.processed is False
class TestEffectsExports:
def test_all_exports_are_importable(self):
"""Verify all exports in __all__ can actually be imported."""
import engine.effects as effects_module
for name in effects_module.__all__:
getattr(effects_module, name)
class TestPerformanceMonitor:
def test_empty_stats(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
stats = monitor.get_stats()
assert "error" in stats
def test_record_and_retrieve(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test_effect", 1.5, 100, 150)
monitor.end_frame(1, 2.0)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["frame_count"] == 1
assert "test_effect" in stats["effects"]
def test_multiple_frames(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=3)
for i in range(5):
monitor.start_frame(i)
monitor.record_effect("effect1", 1.0, 100, 100)
monitor.record_effect("effect2", 0.5, 100, 100)
monitor.end_frame(i, 1.5)
stats = monitor.get_stats()
assert stats["frame_count"] == 3
assert "effect1" in stats["effects"]
assert "effect2" in stats["effects"]
def test_reset(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test", 1.0, 100, 100)
monitor.end_frame(1, 1.0)
monitor.reset()
stats = monitor.get_stats()
assert "error" in stats
class TestEffectPipelinePerformance:
def test_pipeline_stays_within_frame_budget(self):
"""Verify effect pipeline completes within frame budget (33ms for 30fps)."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class DummyEffect:
name = "dummy"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
return [line * 2 for line in buf]
registry = EffectRegistry()
registry.register(DummyEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=10)
chain = EffectChain(registry, monitor)
chain.set_order(["dummy"])
buf = ["x" * 80] * 20
for i in range(10):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["pipeline"]["max_ms"] < 33.0
def test_individual_effects_performance(self):
"""Verify individual effects don't exceed 10ms per frame."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class SlowEffect:
name = "slow"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
result = []
for line in buf:
result.append(line)
result.append(line + line)
return result
registry = EffectRegistry()
registry.register(SlowEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=5)
chain = EffectChain(registry, monitor)
chain.set_order(["slow"])
buf = ["x" * 80] * 10
for i in range(5):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["effects"]["slow"]["max_ms"] < 10.0

View File

@@ -0,0 +1,117 @@
"""
Tests for engine.effects.controller module.
"""
from unittest.mock import MagicMock, patch
from engine.effects.controller import (
handle_effects_command,
set_effect_chain_ref,
)
class TestHandleEffectsCommand:
"""Tests for handle_effects_command function."""
def test_list_effects(self):
"""list command returns formatted effects list."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_plugin.config.enabled = True
mock_plugin.config.intensity = 0.5
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
mock_chain.return_value.get_order.return_value = ["noise"]
result = handle_effects_command("/effects list")
assert "noise: ON" in result
assert "intensity=0.5" in result
def test_enable_effect(self):
"""enable command calls registry.enable."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise on")
assert "Enabled: noise" in result
mock_registry.return_value.enable.assert_called_once_with("noise")
def test_disable_effect(self):
"""disable command calls registry.disable."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise off")
assert "Disabled: noise" in result
mock_registry.return_value.disable.assert_called_once_with("noise")
def test_set_intensity(self):
"""intensity command sets plugin intensity."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_plugin.config.intensity = 0.5
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise intensity 0.8")
assert "intensity to 0.8" in result
assert mock_plugin.config.intensity == 0.8
def test_invalid_intensity_range(self):
"""intensity outside 0.0-1.0 returns error."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise intensity 1.5")
assert "between 0.0 and 1.0" in result
def test_reorder_pipeline(self):
"""reorder command calls chain.reorder."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_registry.return_value.list_all.return_value = {}
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
mock_chain_instance = MagicMock()
mock_chain_instance.reorder.return_value = True
mock_chain.return_value = mock_chain_instance
result = handle_effects_command("/effects reorder noise,fade")
assert "Reordered pipeline" in result
mock_chain_instance.reorder.assert_called_once_with(["noise", "fade"])
def test_unknown_command(self):
"""unknown command returns error."""
result = handle_effects_command("/unknown")
assert "Unknown command" in result
def test_non_effects_command(self):
"""non-effects command returns error."""
result = handle_effects_command("not a command")
assert "Unknown command" in result
class TestSetEffectChainRef:
"""Tests for set_effect_chain_ref function."""
def test_sets_global_ref(self):
"""set_effect_chain_ref updates global reference."""
mock_chain = MagicMock()
set_effect_chain_ref(mock_chain)
from engine.effects.controller import _get_effect_chain
result = _get_effect_chain()
assert result == mock_chain

69
tests/test_emitters.py Normal file
View File

@@ -0,0 +1,69 @@
"""
Tests for engine.emitters module.
"""
from engine.emitters import EventEmitter, Startable, Stoppable
class TestEventEmitterProtocol:
"""Tests for EventEmitter protocol."""
def test_protocol_exists(self):
"""EventEmitter protocol is defined."""
assert EventEmitter is not None
def test_protocol_has_subscribe_method(self):
"""EventEmitter has subscribe method in protocol."""
assert hasattr(EventEmitter, "subscribe")
def test_protocol_has_unsubscribe_method(self):
"""EventEmitter has unsubscribe method in protocol."""
assert hasattr(EventEmitter, "unsubscribe")
class TestStartableProtocol:
"""Tests for Startable protocol."""
def test_protocol_exists(self):
"""Startable protocol is defined."""
assert Startable is not None
def test_protocol_has_start_method(self):
"""Startable has start method in protocol."""
assert hasattr(Startable, "start")
class TestStoppableProtocol:
"""Tests for Stoppable protocol."""
def test_protocol_exists(self):
"""Stoppable protocol is defined."""
assert Stoppable is not None
def test_protocol_has_stop_method(self):
"""Stoppable has stop method in protocol."""
assert hasattr(Stoppable, "stop")
class TestProtocolCompliance:
"""Tests that existing classes comply with protocols."""
def test_ntfy_poller_complies_with_protocol(self):
"""NtfyPoller implements EventEmitter protocol."""
from engine.ntfy import NtfyPoller
poller = NtfyPoller("http://example.com/topic")
assert hasattr(poller, "subscribe")
assert hasattr(poller, "unsubscribe")
assert callable(poller.subscribe)
assert callable(poller.unsubscribe)
def test_mic_monitor_complies_with_protocol(self):
"""MicMonitor implements EventEmitter and Startable protocols."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert hasattr(monitor, "subscribe")
assert hasattr(monitor, "unsubscribe")
assert hasattr(monitor, "start")
assert hasattr(monitor, "stop")

202
tests/test_eventbus.py Normal file
View File

@@ -0,0 +1,202 @@
"""
Tests for engine.eventbus module.
"""
from engine.eventbus import EventBus, get_event_bus, set_event_bus
from engine.events import EventType, NtfyMessageEvent
class TestEventBusInit:
"""Tests for EventBus initialization."""
def test_init_creates_empty_subscribers(self):
"""EventBus starts with no subscribers."""
bus = EventBus()
assert bus.subscriber_count() == 0
class TestEventBusSubscribe:
"""Tests for EventBus.subscribe method."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback for an event type."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
def test_subscribe_multiple_callbacks_same_event(self):
"""Multiple callbacks can be subscribed to the same event type."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.NTFY_MESSAGE, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 2
def test_subscribe_different_event_types(self):
"""Callbacks can be subscribed to different event types."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
assert bus.subscriber_count(EventType.MIC_LEVEL) == 1
class TestEventBusUnsubscribe:
"""Tests for EventBus.unsubscribe method."""
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 0
def test_unsubscribe_nonexistent_callback_no_error(self):
"""unsubscribe() handles non-existent callback gracefully."""
bus = EventBus()
def callback(e):
return None
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
class TestEventBusPublish:
"""Tests for EventBus.publish method."""
def test_publish_calls_subscriber(self):
"""publish() calls the subscriber callback."""
bus = EventBus()
received = []
def callback(event):
received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received) == 1
assert received[0].title == "Test"
def test_publish_multiple_subscribers(self):
"""publish() calls all subscribers for an event type."""
bus = EventBus()
received1 = []
received2 = []
def callback1(event):
received1.append(event)
def callback2(event):
received2.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback1)
bus.subscribe(EventType.NTFY_MESSAGE, callback2)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received1) == 1
assert len(received2) == 1
def test_publish_different_event_types(self):
"""publish() only calls subscribers for the specific event type."""
bus = EventBus()
ntfy_received = []
mic_received = []
def ntfy_callback(event):
ntfy_received.append(event)
def mic_callback(event):
mic_received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, ntfy_callback)
bus.subscribe(EventType.MIC_LEVEL, mic_callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(ntfy_received) == 1
assert len(mic_received) == 0
class TestEventBusClear:
"""Tests for EventBus.clear method."""
def test_clear_removes_all_subscribers(self):
"""clear() removes all subscribers."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
bus.clear()
assert bus.subscriber_count() == 0
class TestEventBusThreadSafety:
"""Tests for EventBus thread safety."""
def test_concurrent_subscribe_unsubscribe(self):
"""subscribe and unsubscribe can be called concurrently."""
import threading
bus = EventBus()
callbacks = [lambda e: None for _ in range(10)]
def subscribe():
for cb in callbacks:
bus.subscribe(EventType.NTFY_MESSAGE, cb)
def unsubscribe():
for cb in callbacks:
bus.unsubscribe(EventType.NTFY_MESSAGE, cb)
t1 = threading.Thread(target=subscribe)
t2 = threading.Thread(target=unsubscribe)
t1.start()
t2.start()
t1.join()
t2.join()
class TestGlobalEventBus:
"""Tests for global event bus functions."""
def test_get_event_bus_returns_singleton(self):
"""get_event_bus() returns the same instance."""
bus1 = get_event_bus()
bus2 = get_event_bus()
assert bus1 is bus2
def test_set_event_bus_replaces_singleton(self):
"""set_event_bus() replaces the global event bus."""
new_bus = EventBus()
set_event_bus(new_bus)
try:
assert get_event_bus() is new_bus
finally:
set_event_bus(None)

112
tests/test_events.py Normal file
View File

@@ -0,0 +1,112 @@
"""
Tests for engine.events module.
"""
from datetime import datetime
from engine import events
class TestEventType:
"""Tests for EventType enum."""
def test_event_types_exist(self):
"""All expected event types exist."""
assert hasattr(events.EventType, "NEW_HEADLINE")
assert hasattr(events.EventType, "FRAME_TICK")
assert hasattr(events.EventType, "MIC_LEVEL")
assert hasattr(events.EventType, "NTFY_MESSAGE")
assert hasattr(events.EventType, "STREAM_START")
assert hasattr(events.EventType, "STREAM_END")
class TestHeadlineEvent:
"""Tests for HeadlineEvent dataclass."""
def test_create_headline_event(self):
"""HeadlineEvent can be created with required fields."""
e = events.HeadlineEvent(
title="Test Headline",
source="Test Source",
timestamp="12:00",
)
assert e.title == "Test Headline"
assert e.source == "Test Source"
assert e.timestamp == "12:00"
def test_headline_event_optional_language(self):
"""HeadlineEvent supports optional language field."""
e = events.HeadlineEvent(
title="Test",
source="Test",
timestamp="12:00",
language="ja",
)
assert e.language == "ja"
class TestFrameTickEvent:
"""Tests for FrameTickEvent dataclass."""
def test_create_frame_tick_event(self):
"""FrameTickEvent can be created."""
now = datetime.now()
e = events.FrameTickEvent(
frame_number=100,
timestamp=now,
delta_seconds=0.05,
)
assert e.frame_number == 100
assert e.timestamp == now
assert e.delta_seconds == 0.05
class TestMicLevelEvent:
"""Tests for MicLevelEvent dataclass."""
def test_create_mic_level_event(self):
"""MicLevelEvent can be created."""
now = datetime.now()
e = events.MicLevelEvent(
db_level=60.0,
excess_above_threshold=10.0,
timestamp=now,
)
assert e.db_level == 60.0
assert e.excess_above_threshold == 10.0
class TestNtfyMessageEvent:
"""Tests for NtfyMessageEvent dataclass."""
def test_create_ntfy_message_event(self):
"""NtfyMessageEvent can be created with required fields."""
e = events.NtfyMessageEvent(
title="Test Title",
body="Test Body",
)
assert e.title == "Test Title"
assert e.body == "Test Body"
assert e.message_id is None
def test_ntfy_message_event_with_id(self):
"""NtfyMessageEvent supports optional message_id."""
e = events.NtfyMessageEvent(
title="Test",
body="Test",
message_id="abc123",
)
assert e.message_id == "abc123"
class TestStreamEvent:
"""Tests for StreamEvent dataclass."""
def test_create_stream_event(self):
"""StreamEvent can be created."""
e = events.StreamEvent(
event_type=events.EventType.STREAM_START,
headline_count=100,
)
assert e.event_type == events.EventType.STREAM_START
assert e.headline_count == 100

63
tests/test_frame.py Normal file
View File

@@ -0,0 +1,63 @@
"""
Tests for engine.frame module.
"""
import time
from engine.frame import FrameTimer, calculate_scroll_step
class TestFrameTimer:
"""Tests for FrameTimer class."""
def test_init_default(self):
"""FrameTimer initializes with default values."""
timer = FrameTimer()
assert timer.target_frame_dt == 0.05
assert timer.fps >= 0
def test_init_custom(self):
"""FrameTimer accepts custom frame duration."""
timer = FrameTimer(target_frame_dt=0.1)
assert timer.target_frame_dt == 0.1
def test_fps_calculation(self):
"""FrameTimer calculates FPS correctly."""
timer = FrameTimer()
timer._frame_count = 10
timer._start_time = time.monotonic() - 1.0
assert timer.fps >= 9.0
def test_reset(self):
"""FrameTimer.reset() clears frame count."""
timer = FrameTimer()
timer._frame_count = 100
timer.reset()
assert timer._frame_count == 0
class TestCalculateScrollStep:
"""Tests for calculate_scroll_step function."""
def test_basic_calculation(self):
"""calculate_scroll_step returns positive value."""
result = calculate_scroll_step(5.0, 24)
assert result > 0
def test_with_padding(self):
"""calculate_scroll_step respects padding parameter."""
without_padding = calculate_scroll_step(5.0, 24, padding=0)
with_padding = calculate_scroll_step(5.0, 24, padding=15)
assert with_padding < without_padding
def test_larger_view_slower_scroll(self):
"""Larger view height results in slower scroll steps."""
small = calculate_scroll_step(5.0, 10)
large = calculate_scroll_step(5.0, 50)
assert large < small
def test_longer_duration_slower_scroll(self):
"""Longer scroll duration results in slower scroll steps."""
fast = calculate_scroll_step(2.0, 24)
slow = calculate_scroll_step(10.0, 24)
assert slow > fast

96
tests/test_layers.py Normal file
View File

@@ -0,0 +1,96 @@
"""
Tests for engine.layers module.
"""
import time
from engine import layers
class TestRenderMessageOverlay:
"""Tests for render_message_overlay function."""
def test_no_message_returns_empty(self):
"""Returns empty list when msg is None."""
result, cache = layers.render_message_overlay(None, 80, 24, (None, None))
assert result == []
assert cache[0] is None
def test_message_returns_overlay_lines(self):
"""Returns non-empty list when message is present."""
msg = ("Test Title", "Test Body", time.monotonic())
result, cache = layers.render_message_overlay(msg, 80, 24, (None, None))
assert len(result) > 0
assert cache[0] is not None
def test_cache_key_changes_with_text(self):
"""Cache key changes when message text changes."""
msg1 = ("Title1", "Body1", time.monotonic())
msg2 = ("Title2", "Body2", time.monotonic())
_, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None))
_, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1)
assert cache1[0] != cache2[0]
def test_cache_reuse_avoids_recomputation(self):
"""Cache is returned when same message is passed (interface test)."""
msg = ("Same Title", "Same Body", time.monotonic())
result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None))
result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1)
assert len(result1) > 0
assert len(result2) > 0
assert cache1[0] == cache2[0]
class TestRenderFirehose:
"""Tests for render_firehose function."""
def test_no_firehose_returns_empty(self):
"""Returns empty list when firehose height is 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 0, 24)
assert result == []
def test_firehose_returns_lines(self):
"""Returns lines when firehose height > 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 4, 24)
assert len(result) == 4
def test_firehose_includes_ansi_escapes(self):
"""Returns lines containing ANSI escape sequences."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 1, 24)
assert "\033[" in result[0]
class TestApplyGlitch:
"""Tests for apply_glitch function."""
def test_empty_buffer_unchanged(self):
"""Empty buffer is returned unchanged."""
result = layers.apply_glitch([], 0, 0.0, 80)
assert result == []
def test_buffer_length_preserved(self):
"""Buffer length is preserved after glitch application."""
buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)]
result = layers.apply_glitch(buf, 0, 0.5, 80)
assert len(result) == len(buf)
class TestRenderTickerZone:
"""Tests for render_ticker_zone function - focusing on interface."""
def test_returns_list(self):
"""Returns a list of strings."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
assert isinstance(result, list)
def test_returns_dict_for_cache(self):
"""Returns a dict for the noise cache."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
assert isinstance(cache, dict)

View File

@@ -2,8 +2,11 @@
Tests for engine.mic module.
"""
from datetime import datetime
from unittest.mock import patch
from engine.events import MicLevelEvent
class TestMicMonitorImport:
"""Tests for module import behavior."""
@@ -81,3 +84,66 @@ class TestMicMonitorStop:
monitor = MicMonitor()
monitor.stop()
assert monitor._stream is None
class TestMicMonitorEventEmission:
"""Tests for MicMonitor event emission."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def callback(e):
return None
monitor.subscribe(callback)
assert callback in monitor._subscribers
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def callback(e):
return None
monitor.subscribe(callback)
monitor.unsubscribe(callback)
assert callback not in monitor._subscribers
def test_emit_calls_subscribers(self):
"""_emit() calls all subscribers."""
from engine.mic import MicMonitor
monitor = MicMonitor()
received = []
def callback(event):
received.append(event)
monitor.subscribe(callback)
event = MicLevelEvent(
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
)
monitor._emit(event)
assert len(received) == 1
assert received[0].db_level == 60.0
def test_emit_handles_subscriber_exception(self):
"""_emit() handles exceptions in subscribers gracefully."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def bad_callback(event):
raise RuntimeError("test")
monitor.subscribe(bad_callback)
event = MicLevelEvent(
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
)
monitor._emit(event)

View File

@@ -5,6 +5,7 @@ Tests for engine.ntfy module.
import time
from unittest.mock import MagicMock, patch
from engine.events import NtfyMessageEvent
from engine.ntfy import NtfyPoller
@@ -68,3 +69,54 @@ class TestNtfyPollerDismiss:
poller.dismiss()
assert poller._message is None
class TestNtfyPollerEventEmission:
"""Tests for NtfyPoller event emission."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback."""
poller = NtfyPoller("http://example.com/topic")
def callback(e):
return None
poller.subscribe(callback)
assert callback in poller._subscribers
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
poller = NtfyPoller("http://example.com/topic")
def callback(e):
return None
poller.subscribe(callback)
poller.unsubscribe(callback)
assert callback not in poller._subscribers
def test_emit_calls_subscribers(self):
"""_emit() calls all subscribers."""
poller = NtfyPoller("http://example.com/topic")
received = []
def callback(event):
received.append(event)
poller.subscribe(callback)
event = NtfyMessageEvent(title="Test", body="Body")
poller._emit(event)
assert len(received) == 1
assert received[0].title == "Test"
def test_emit_handles_subscriber_exception(self):
"""_emit() handles exceptions in subscribers gracefully."""
poller = NtfyPoller("http://example.com/topic")
def bad_callback(event):
raise RuntimeError("test")
poller.subscribe(bad_callback)
event = NtfyMessageEvent(title="Test", body="Body")
poller._emit(event)

95
tests/test_types.py Normal file
View File

@@ -0,0 +1,95 @@
"""
Tests for engine.types module.
"""
from engine.types import (
Block,
FetchResult,
HeadlineItem,
items_to_tuples,
tuples_to_items,
)
class TestHeadlineItem:
"""Tests for HeadlineItem dataclass."""
def test_create_headline_item(self):
"""Can create HeadlineItem with required fields."""
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
assert item.title == "Test"
assert item.source == "Source"
assert item.timestamp == "12:00"
def test_to_tuple(self):
"""to_tuple returns correct tuple."""
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
assert item.to_tuple() == ("Test", "Source", "12:00")
def test_from_tuple(self):
"""from_tuple creates HeadlineItem from tuple."""
item = HeadlineItem.from_tuple(("Test", "Source", "12:00"))
assert item.title == "Test"
assert item.source == "Source"
assert item.timestamp == "12:00"
class TestItemsConversion:
"""Tests for list conversion functions."""
def test_items_to_tuples(self):
"""Converts list of HeadlineItem to list of tuples."""
items = [
HeadlineItem(title="A", source="S", timestamp="10:00"),
HeadlineItem(title="B", source="T", timestamp="11:00"),
]
result = items_to_tuples(items)
assert result == [("A", "S", "10:00"), ("B", "T", "11:00")]
def test_tuples_to_items(self):
"""Converts list of tuples to list of HeadlineItem."""
tuples = [("A", "S", "10:00"), ("B", "T", "11:00")]
result = tuples_to_items(tuples)
assert len(result) == 2
assert result[0].title == "A"
assert result[1].title == "B"
class TestFetchResult:
"""Tests for FetchResult dataclass."""
def test_create_fetch_result(self):
"""Can create FetchResult."""
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
result = FetchResult(items=items, linked=1, failed=0)
assert len(result.items) == 1
assert result.linked == 1
assert result.failed == 0
def test_to_legacy_tuple(self):
"""to_legacy_tuple returns correct format."""
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
result = FetchResult(items=items, linked=1, failed=0)
legacy = result.to_legacy_tuple()
assert legacy[0] == [("Test", "Source", "12:00")]
assert legacy[1] == 1
assert legacy[2] == 0
class TestBlock:
"""Tests for Block dataclass."""
def test_create_block(self):
"""Can create Block."""
block = Block(
content=["line1", "line2"], color="\033[38;5;46m", meta_row_index=1
)
assert len(block.content) == 2
assert block.color == "\033[38;5;46m"
assert block.meta_row_index == 1
def test_to_legacy_tuple(self):
"""to_legacy_tuple returns correct format."""
block = Block(content=["line1"], color="green", meta_row_index=0)
legacy = block.to_legacy_tuple()
assert legacy == (["line1"], "green", 0)

64
tests/test_viewport.py Normal file
View File

@@ -0,0 +1,64 @@
"""
Tests for engine.viewport module.
"""
from engine import viewport
class TestViewportTw:
"""Tests for tw() function."""
def test_tw_returns_int(self):
"""tw() returns an integer."""
result = viewport.tw()
assert isinstance(result, int)
def test_tw_positive(self):
"""tw() returns a positive value."""
assert viewport.tw() > 0
class TestViewportTh:
"""Tests for th() function."""
def test_th_returns_int(self):
"""th() returns an integer."""
result = viewport.th()
assert isinstance(result, int)
def test_th_positive(self):
"""th() returns a positive value."""
assert viewport.th() > 0
class TestViewportMoveTo:
"""Tests for move_to() function."""
def test_move_to_format(self):
"""move_to() returns correctly formatted ANSI escape."""
result = viewport.move_to(5, 10)
assert result == "\033[5;10H"
def test_move_to_default_col(self):
"""move_to() defaults to column 1."""
result = viewport.move_to(5)
assert result == "\033[5;1H"
class TestViewportClearScreen:
"""Tests for clear_screen() function."""
def test_clear_screen_format(self):
"""clear_screen() returns clear screen ANSI escape."""
result = viewport.clear_screen()
assert "\033[2J" in result
assert "\033[H" in result
class TestViewportClearLine:
"""Tests for clear_line() function."""
def test_clear_line_format(self):
"""clear_line() returns clear line ANSI escape."""
result = viewport.clear_line()
assert result == "\033[K"