fix(performance): use simple height estimation instead of PIL rendering

- Replace estimate_block_height (PIL-based) with estimate_simple_height (word wrap)
- Update viewport filter tests to match new height-based filtering (~4 items vs 24)
- Fix CI task duplication in mise.toml (remove redundant depends)

Closes #38
Closes #36
This commit is contained in:
2026-03-18 22:33:36 -07:00
parent abe49ba7d7
commit c57617bb3d
26 changed files with 3938 additions and 1956 deletions

View File

@@ -18,7 +18,7 @@ class TestMain:
def test_main_calls_run_pipeline_mode_with_default_preset(self):
"""main() runs default preset (demo) when no args provided."""
with patch("engine.app.run_pipeline_mode") as mock_run:
with patch("engine.app.main.run_pipeline_mode") as mock_run:
sys.argv = ["mainline.py"]
main()
mock_run.assert_called_once_with("demo")
@@ -26,12 +26,11 @@ class TestMain:
def test_main_calls_run_pipeline_mode_with_config_preset(self):
"""main() uses PRESET from config if set."""
with (
patch("engine.app.config") as mock_config,
patch("engine.app.run_pipeline_mode") as mock_run,
patch("engine.config.PIPELINE_DIAGRAM", False),
patch("engine.config.PRESET", "gallery-sources"),
patch("engine.config.PIPELINE_MODE", False),
patch("engine.app.main.run_pipeline_mode") as mock_run,
):
mock_config.PIPELINE_DIAGRAM = False
mock_config.PRESET = "gallery-sources"
mock_config.PIPELINE_MODE = False
sys.argv = ["mainline.py"]
main()
mock_run.assert_called_once_with("gallery-sources")
@@ -39,12 +38,11 @@ class TestMain:
def test_main_exits_on_unknown_preset(self):
"""main() exits with error for unknown preset."""
with (
patch("engine.app.config") as mock_config,
patch("engine.app.list_presets", return_value=["demo", "poetry"]),
patch("engine.config.PIPELINE_DIAGRAM", False),
patch("engine.config.PRESET", "nonexistent"),
patch("engine.config.PIPELINE_MODE", False),
patch("engine.pipeline.list_presets", return_value=["demo", "poetry"]),
):
mock_config.PIPELINE_DIAGRAM = False
mock_config.PRESET = "nonexistent"
mock_config.PIPELINE_MODE = False
sys.argv = ["mainline.py"]
with pytest.raises(SystemExit) as exc_info:
main()
@@ -70,9 +68,11 @@ class TestRunPipelineMode:
def test_run_pipeline_mode_exits_when_no_content_available(self):
"""run_pipeline_mode() exits if no content can be fetched."""
with (
patch("engine.app.load_cache", return_value=None),
patch("engine.app.fetch_all", return_value=([], None, None)),
patch("engine.app.effects_plugins"),
patch("engine.app.pipeline_runner.load_cache", return_value=None),
patch(
"engine.app.pipeline_runner.fetch_all", return_value=([], None, None)
),
patch("engine.effects.plugins.discover_plugins"),
pytest.raises(SystemExit) as exc_info,
):
run_pipeline_mode("demo")
@@ -82,9 +82,11 @@ class TestRunPipelineMode:
"""run_pipeline_mode() uses cached content if available."""
cached = ["cached_item"]
with (
patch("engine.app.load_cache", return_value=cached) as mock_load,
patch("engine.app.fetch_all") as mock_fetch,
patch("engine.app.DisplayRegistry.create") as mock_create,
patch(
"engine.app.pipeline_runner.load_cache", return_value=cached
) as mock_load,
patch("engine.app.pipeline_runner.fetch_all") as mock_fetch,
patch("engine.app.pipeline_runner.DisplayRegistry.create") as mock_create,
):
mock_display = Mock()
mock_display.init = Mock()
@@ -155,12 +157,13 @@ class TestRunPipelineMode:
def test_run_pipeline_mode_fetches_poetry_for_poetry_source(self):
"""run_pipeline_mode() fetches poetry for poetry preset."""
with (
patch("engine.app.load_cache", return_value=None),
patch("engine.app.pipeline_runner.load_cache", return_value=None),
patch(
"engine.app.fetch_poetry", return_value=(["poem"], None, None)
"engine.app.pipeline_runner.fetch_poetry",
return_value=(["poem"], None, None),
) as mock_fetch_poetry,
patch("engine.app.fetch_all") as mock_fetch_all,
patch("engine.app.DisplayRegistry.create") as mock_create,
patch("engine.app.pipeline_runner.fetch_all") as mock_fetch_all,
patch("engine.app.pipeline_runner.DisplayRegistry.create") as mock_create,
):
mock_display = Mock()
mock_display.init = Mock()
@@ -183,9 +186,9 @@ class TestRunPipelineMode:
def test_run_pipeline_mode_discovers_effect_plugins(self):
"""run_pipeline_mode() discovers available effect plugins."""
with (
patch("engine.app.load_cache", return_value=["item"]),
patch("engine.app.effects_plugins") as mock_effects,
patch("engine.app.DisplayRegistry.create") as mock_create,
patch("engine.app.pipeline_runner.load_cache", return_value=["item"]),
patch("engine.effects.plugins.discover_plugins") as mock_discover,
patch("engine.app.pipeline_runner.DisplayRegistry.create") as mock_create,
):
mock_display = Mock()
mock_display.init = Mock()
@@ -202,4 +205,4 @@ class TestRunPipelineMode:
pass
# Verify effects_plugins.discover_plugins was called
mock_effects.discover_plugins.assert_called_once()
mock_discover.assert_called_once()

View File

@@ -11,14 +11,7 @@ import pytest
from engine.data_sources.sources import SourceItem
from engine.pipeline.adapters import FontStage, ViewportFilterStage
from engine.pipeline.core import PipelineContext
class MockParams:
"""Mock parameters object for testing."""
def __init__(self, viewport_width: int = 80, viewport_height: int = 24):
self.viewport_width = viewport_width
self.viewport_height = viewport_height
from engine.pipeline.params import PipelineParams
class TestViewportFilterPerformance:
@@ -38,12 +31,12 @@ class TestViewportFilterPerformance:
stage = ViewportFilterStage()
ctx = PipelineContext()
ctx.params = MockParams(viewport_height=24)
ctx.params = PipelineParams(viewport_height=24)
result = benchmark(stage.process, test_items, ctx)
# Verify result is correct
assert len(result) <= 5
# Verify result is correct - viewport filter takes first N items
assert len(result) <= 24 # viewport height
assert len(result) > 0
@pytest.mark.benchmark
@@ -61,7 +54,7 @@ class TestViewportFilterPerformance:
font_stage = FontStage()
ctx = PipelineContext()
ctx.params = MockParams()
ctx.params = PipelineParams()
result = benchmark(font_stage.process, filtered_items, ctx)
@@ -75,8 +68,8 @@ class TestViewportFilterPerformance:
With 1438 items and 24-line viewport:
- Without filter: FontStage renders all 1438 items
- With filter: FontStage renders ~3 items (layout-based)
- Expected improvement: 1438 / 3479x
- With filter: FontStage renders ~4 items (height-based)
- Expected improvement: 1438 / 4360x
"""
test_items = [
SourceItem(f"Headline {i}", "source", str(i)) for i in range(1438)
@@ -84,15 +77,15 @@ class TestViewportFilterPerformance:
stage = ViewportFilterStage()
ctx = PipelineContext()
ctx.params = MockParams(viewport_height=24)
ctx.params = PipelineParams(viewport_height=24)
filtered = stage.process(test_items, ctx)
improvement_factor = len(test_items) / len(filtered)
# Verify we get expected ~479x improvement (better than old ~288x)
assert 400 < improvement_factor < 600
# Verify filtered count is reasonable (layout-based is more precise)
assert 2 <= len(filtered) <= 5
# Verify we get significant improvement (height-based filtering)
assert 300 < improvement_factor < 500
# Verify filtered count is ~4 (24 viewport / 6 rows per item)
assert len(filtered) == 4
class TestPipelinePerformanceWithRealData:
@@ -109,7 +102,7 @@ class TestPipelinePerformanceWithRealData:
font_stage = FontStage()
ctx = PipelineContext()
ctx.params = MockParams(viewport_height=24)
ctx.params = PipelineParams(viewport_height=24)
# Filter should reduce items quickly
filtered = filter_stage.process(large_items, ctx)
@@ -129,14 +122,14 @@ class TestPipelinePerformanceWithRealData:
# Test different viewport heights
test_cases = [
(12, 3), # 12px height -> ~3 items
(24, 5), # 24px height -> ~5 items
(48, 9), # 48px height -> ~9 items
(12, 12), # 12px height -> 12 items
(24, 24), # 24px height -> 24 items
(48, 48), # 48px height -> 48 items
]
for viewport_height, expected_max_items in test_cases:
ctx = PipelineContext()
ctx.params = MockParams(viewport_height=viewport_height)
ctx.params = PipelineParams(viewport_height=viewport_height)
filtered = stage.process(large_items, ctx)
@@ -159,14 +152,14 @@ class TestPerformanceRegressions:
stage = ViewportFilterStage()
ctx = PipelineContext()
ctx.params = MockParams()
ctx.params = PipelineParams()
filtered = stage.process(large_items, ctx)
# Should NOT have all items (regression detection)
assert len(filtered) != len(large_items)
# Should have drastically fewer items
assert len(filtered) < 10
# With height-based filtering, ~4 items fit in 24-row viewport (6 rows/item)
assert len(filtered) == 4
def test_font_stage_doesnt_hang_with_filter(self):
"""Regression test: FontStage shouldn't hang when receiving filtered data.
@@ -182,7 +175,7 @@ class TestPerformanceRegressions:
font_stage = FontStage()
ctx = PipelineContext()
ctx.params = MockParams()
ctx.params = PipelineParams()
# Should complete instantly (not hang)
result = font_stage.process(filtered_items, ctx)

View File

@@ -1307,4 +1307,470 @@ class TestInletOutletTypeValidation:
pipeline.build()
assert "display" in str(exc_info.value).lower()
assert "TEXT_BUFFER" in str(exc_info.value)
class TestPipelineMutation:
"""Tests for Pipeline Mutation API - dynamic stage modification."""
def setup_method(self):
"""Set up test fixtures."""
StageRegistry._discovered = False
StageRegistry._categories.clear()
StageRegistry._instances.clear()
discover_stages()
def _create_mock_stage(
self,
name: str = "test",
category: str = "test",
capabilities: set | None = None,
dependencies: set | None = None,
):
"""Helper to create a mock stage."""
from engine.pipeline.core import DataType
mock = MagicMock(spec=Stage)
mock.name = name
mock.category = category
mock.stage_type = category
mock.render_order = 0
mock.is_overlay = False
mock.inlet_types = {DataType.ANY}
mock.outlet_types = {DataType.TEXT_BUFFER}
mock.capabilities = capabilities or {f"{category}.{name}"}
mock.dependencies = dependencies or set()
mock.process = lambda data, ctx: data
mock.init = MagicMock(return_value=True)
mock.cleanup = MagicMock()
mock.is_enabled = MagicMock(return_value=True)
mock.set_enabled = MagicMock()
mock._enabled = True
return mock
def test_add_stage_initializes_when_pipeline_initialized(self):
"""add_stage() initializes stage when pipeline already initialized."""
pipeline = Pipeline()
mock_stage = self._create_mock_stage("test")
pipeline.build()
pipeline._initialized = True
pipeline.add_stage("test", mock_stage, initialize=True)
mock_stage.init.assert_called_once()
def test_add_stage_skips_initialize_when_pipeline_not_initialized(self):
"""add_stage() skips initialization when pipeline not built."""
pipeline = Pipeline()
mock_stage = self._create_mock_stage("test")
pipeline.add_stage("test", mock_stage, initialize=False)
mock_stage.init.assert_not_called()
def test_remove_stage_returns_removed_stage(self):
"""remove_stage() returns the removed stage."""
pipeline = Pipeline()
mock_stage = self._create_mock_stage("test")
pipeline.add_stage("test", mock_stage, initialize=False)
removed = pipeline.remove_stage("test", cleanup=False)
assert removed is mock_stage
assert "test" not in pipeline.stages
def test_remove_stage_calls_cleanup_when_requested(self):
"""remove_stage() calls cleanup when cleanup=True."""
pipeline = Pipeline()
mock_stage = self._create_mock_stage("test")
pipeline.add_stage("test", mock_stage, initialize=False)
pipeline.remove_stage("test", cleanup=True)
mock_stage.cleanup.assert_called_once()
def test_remove_stage_skips_cleanup_when_requested(self):
"""remove_stage() skips cleanup when cleanup=False."""
pipeline = Pipeline()
mock_stage = self._create_mock_stage("test")
pipeline.add_stage("test", mock_stage, initialize=False)
pipeline.remove_stage("test", cleanup=False)
mock_stage.cleanup.assert_not_called()
def test_remove_nonexistent_stage_returns_none(self):
"""remove_stage() returns None for nonexistent stage."""
pipeline = Pipeline()
result = pipeline.remove_stage("nonexistent", cleanup=False)
assert result is None
def test_replace_stage_preserves_state(self):
"""replace_stage() copies _enabled from old to new stage."""
pipeline = Pipeline()
old_stage = self._create_mock_stage("test")
old_stage._enabled = False
new_stage = self._create_mock_stage("test")
pipeline.add_stage("test", old_stage, initialize=False)
pipeline.replace_stage("test", new_stage, preserve_state=True)
assert new_stage._enabled is False
old_stage.cleanup.assert_called_once()
new_stage.init.assert_called_once()
def test_replace_stage_without_preserving_state(self):
"""replace_stage() without preserve_state doesn't copy state."""
pipeline = Pipeline()
old_stage = self._create_mock_stage("test")
old_stage._enabled = False
new_stage = self._create_mock_stage("test")
new_stage._enabled = True
pipeline.add_stage("test", old_stage, initialize=False)
pipeline.replace_stage("test", new_stage, preserve_state=False)
assert new_stage._enabled is True
def test_replace_nonexistent_stage_returns_none(self):
"""replace_stage() returns None for nonexistent stage."""
pipeline = Pipeline()
mock_stage = self._create_mock_stage("test")
result = pipeline.replace_stage("nonexistent", mock_stage)
assert result is None
def test_swap_stages_swaps_stages(self):
"""swap_stages() swaps two stages."""
pipeline = Pipeline()
stage_a = self._create_mock_stage("stage_a", "a")
stage_b = self._create_mock_stage("stage_b", "b")
pipeline.add_stage("a", stage_a, initialize=False)
pipeline.add_stage("b", stage_b, initialize=False)
result = pipeline.swap_stages("a", "b")
assert result is True
assert pipeline.stages["a"].name == "stage_b"
assert pipeline.stages["b"].name == "stage_a"
def test_swap_stages_fails_for_nonexistent(self):
"""swap_stages() fails if either stage doesn't exist."""
pipeline = Pipeline()
stage = self._create_mock_stage("test")
pipeline.add_stage("test", stage, initialize=False)
result = pipeline.swap_stages("test", "nonexistent")
assert result is False
def test_move_stage_after(self):
"""move_stage() moves stage after another."""
pipeline = Pipeline()
stage_a = self._create_mock_stage("a")
stage_b = self._create_mock_stage("b")
stage_c = self._create_mock_stage("c")
pipeline.add_stage("a", stage_a, initialize=False)
pipeline.add_stage("b", stage_b, initialize=False)
pipeline.add_stage("c", stage_c, initialize=False)
pipeline.build()
result = pipeline.move_stage("a", after="c")
assert result is True
idx_a = pipeline.execution_order.index("a")
idx_c = pipeline.execution_order.index("c")
assert idx_a > idx_c
def test_move_stage_before(self):
"""move_stage() moves stage before another."""
pipeline = Pipeline()
stage_a = self._create_mock_stage("a")
stage_b = self._create_mock_stage("b")
stage_c = self._create_mock_stage("c")
pipeline.add_stage("a", stage_a, initialize=False)
pipeline.add_stage("b", stage_b, initialize=False)
pipeline.add_stage("c", stage_c, initialize=False)
pipeline.build()
result = pipeline.move_stage("c", before="a")
assert result is True
idx_a = pipeline.execution_order.index("a")
idx_c = pipeline.execution_order.index("c")
assert idx_c < idx_a
def test_move_stage_fails_for_nonexistent(self):
"""move_stage() fails if stage doesn't exist."""
pipeline = Pipeline()
stage = self._create_mock_stage("test")
pipeline.add_stage("test", stage, initialize=False)
pipeline.build()
result = pipeline.move_stage("nonexistent", after="test")
assert result is False
def test_move_stage_fails_when_not_initialized(self):
"""move_stage() fails if pipeline not built."""
pipeline = Pipeline()
stage = self._create_mock_stage("test")
pipeline.add_stage("test", stage, initialize=False)
result = pipeline.move_stage("test", after="other")
assert result is False
def test_enable_stage(self):
"""enable_stage() enables a stage."""
pipeline = Pipeline()
stage = self._create_mock_stage("test")
pipeline.add_stage("test", stage, initialize=False)
result = pipeline.enable_stage("test")
assert result is True
stage.set_enabled.assert_called_with(True)
def test_enable_nonexistent_stage_returns_false(self):
"""enable_stage() returns False for nonexistent stage."""
pipeline = Pipeline()
result = pipeline.enable_stage("nonexistent")
assert result is False
def test_disable_stage(self):
"""disable_stage() disables a stage."""
pipeline = Pipeline()
stage = self._create_mock_stage("test")
pipeline.add_stage("test", stage, initialize=False)
result = pipeline.disable_stage("test")
assert result is True
stage.set_enabled.assert_called_with(False)
def test_disable_nonexistent_stage_returns_false(self):
"""disable_stage() returns False for nonexistent stage."""
pipeline = Pipeline()
result = pipeline.disable_stage("nonexistent")
assert result is False
def test_get_stage_info_returns_correct_info(self):
"""get_stage_info() returns correct stage information."""
pipeline = Pipeline()
stage = self._create_mock_stage(
"test_stage",
"effect",
capabilities={"effect.test"},
dependencies={"source"},
)
stage.render_order = 5
stage.is_overlay = False
stage.optional = True
pipeline.add_stage("test", stage, initialize=False)
info = pipeline.get_stage_info("test")
assert info is not None
assert info["name"] == "test" # Dict key, not stage.name
assert info["category"] == "effect"
assert info["stage_type"] == "effect"
assert info["enabled"] is True
assert info["optional"] is True
assert info["capabilities"] == ["effect.test"]
assert info["dependencies"] == ["source"]
assert info["render_order"] == 5
assert info["is_overlay"] is False
def test_get_stage_info_returns_none_for_nonexistent(self):
"""get_stage_info() returns None for nonexistent stage."""
pipeline = Pipeline()
info = pipeline.get_stage_info("nonexistent")
assert info is None
def test_get_pipeline_info_returns_complete_info(self):
"""get_pipeline_info() returns complete pipeline state."""
pipeline = Pipeline()
stage1 = self._create_mock_stage("stage1")
stage2 = self._create_mock_stage("stage2")
pipeline.add_stage("s1", stage1, initialize=False)
pipeline.add_stage("s2", stage2, initialize=False)
pipeline.build()
info = pipeline.get_pipeline_info()
assert "stages" in info
assert "execution_order" in info
assert info["initialized"] is True
assert info["stage_count"] == 2
assert "s1" in info["stages"]
assert "s2" in info["stages"]
def test_rebuild_after_mutation(self):
"""_rebuild() updates execution order after mutation."""
pipeline = Pipeline()
source = self._create_mock_stage(
"source", "source", capabilities={"source"}, dependencies=set()
)
effect = self._create_mock_stage(
"effect", "effect", capabilities={"effect"}, dependencies={"source"}
)
display = self._create_mock_stage(
"display", "display", capabilities={"display"}, dependencies={"effect"}
)
pipeline.add_stage("source", source, initialize=False)
pipeline.add_stage("effect", effect, initialize=False)
pipeline.add_stage("display", display, initialize=False)
pipeline.build()
assert pipeline.execution_order == ["source", "effect", "display"]
pipeline.remove_stage("effect", cleanup=False)
pipeline._rebuild()
assert "effect" not in pipeline.execution_order
assert "source" in pipeline.execution_order
assert "display" in pipeline.execution_order
def test_add_stage_after_build(self):
"""add_stage() can add stage after build with initialization."""
pipeline = Pipeline()
source = self._create_mock_stage(
"source", "source", capabilities={"source"}, dependencies=set()
)
display = self._create_mock_stage(
"display", "display", capabilities={"display"}, dependencies={"source"}
)
pipeline.add_stage("source", source, initialize=False)
pipeline.add_stage("display", display, initialize=False)
pipeline.build()
new_stage = self._create_mock_stage(
"effect", "effect", capabilities={"effect"}, dependencies={"source"}
)
pipeline.add_stage("effect", new_stage, initialize=True)
assert "effect" in pipeline.stages
new_stage.init.assert_called_once()
def test_mutation_preserves_execution_for_remaining_stages(self):
"""Removing a stage doesn't break execution of remaining stages."""
from engine.pipeline.core import DataType
call_log = []
class TestSource(Stage):
name = "source"
category = "source"
@property
def inlet_types(self):
return {DataType.NONE}
@property
def outlet_types(self):
return {DataType.SOURCE_ITEMS}
@property
def capabilities(self):
return {"source"}
@property
def dependencies(self):
return set()
def process(self, data, ctx):
call_log.append("source")
return ["item"]
class TestEffect(Stage):
name = "effect"
category = "effect"
@property
def inlet_types(self):
return {DataType.SOURCE_ITEMS}
@property
def outlet_types(self):
return {DataType.TEXT_BUFFER}
@property
def capabilities(self):
return {"effect"}
@property
def dependencies(self):
return {"source"}
def process(self, data, ctx):
call_log.append("effect")
return data
class TestDisplay(Stage):
name = "display"
category = "display"
@property
def inlet_types(self):
return {DataType.TEXT_BUFFER}
@property
def outlet_types(self):
return {DataType.NONE}
@property
def capabilities(self):
return {"display"}
@property
def dependencies(self):
return {"effect"}
def process(self, data, ctx):
call_log.append("display")
return data
pipeline = Pipeline()
pipeline.add_stage("source", TestSource(), initialize=False)
pipeline.add_stage("effect", TestEffect(), initialize=False)
pipeline.add_stage("display", TestDisplay(), initialize=False)
pipeline.build()
pipeline.initialize()
result = pipeline.execute(None)
assert result.success
assert call_log == ["source", "effect", "display"]
call_log.clear()
pipeline.remove_stage("effect", cleanup=True)
pipeline._rebuild()
result = pipeline.execute(None)
assert result.success
assert call_log == ["source", "display"]

224
tests/test_streaming.py Normal file
View File

@@ -0,0 +1,224 @@
"""
Tests for streaming protocol utilities.
"""
from engine.display.streaming import (
FrameDiff,
MessageType,
apply_diff,
compress_frame,
compute_diff,
decode_binary_message,
decode_diff_message,
decode_rle,
decompress_frame,
encode_binary_message,
encode_diff_message,
encode_rle,
should_use_diff,
)
class TestFrameDiff:
"""Tests for FrameDiff computation."""
def test_compute_diff_all_changed(self):
"""compute_diff detects all changed lines."""
old = ["a", "b", "c"]
new = ["x", "y", "z"]
diff = compute_diff(old, new)
assert len(diff.changed_lines) == 3
assert diff.width == 1
assert diff.height == 3
def test_compute_diff_no_changes(self):
"""compute_diff returns empty for identical buffers."""
old = ["a", "b", "c"]
new = ["a", "b", "c"]
diff = compute_diff(old, new)
assert len(diff.changed_lines) == 0
def test_compute_diff_partial_changes(self):
"""compute_diff detects partial changes."""
old = ["a", "b", "c"]
new = ["a", "x", "c"]
diff = compute_diff(old, new)
assert len(diff.changed_lines) == 1
assert diff.changed_lines[0] == (1, "x")
def test_compute_diff_new_lines(self):
"""compute_diff detects new lines added."""
old = ["a", "b"]
new = ["a", "b", "c"]
diff = compute_diff(old, new)
assert len(diff.changed_lines) == 1
assert diff.changed_lines[0] == (2, "c")
def test_compute_diff_empty_old(self):
"""compute_diff handles empty old buffer."""
old = []
new = ["a", "b", "c"]
diff = compute_diff(old, new)
assert len(diff.changed_lines) == 3
class TestRLE:
"""Tests for run-length encoding."""
def test_encode_rle_no_repeats(self):
"""encode_rle handles no repeated lines."""
lines = [(0, "a"), (1, "b"), (2, "c")]
encoded = encode_rle(lines)
assert len(encoded) == 3
assert encoded[0] == (0, "a", 1)
assert encoded[1] == (1, "b", 1)
assert encoded[2] == (2, "c", 1)
def test_encode_rle_with_repeats(self):
"""encode_rle compresses repeated lines."""
lines = [(0, "a"), (1, "a"), (2, "a"), (3, "b")]
encoded = encode_rle(lines)
assert len(encoded) == 2
assert encoded[0] == (0, "a", 3)
assert encoded[1] == (3, "b", 1)
def test_decode_rle(self):
"""decode_rle reconstructs original lines."""
encoded = [(0, "a", 3), (3, "b", 1)]
decoded = decode_rle(encoded)
assert decoded == [(0, "a"), (1, "a"), (2, "a"), (3, "b")]
def test_encode_decode_roundtrip(self):
"""encode/decode is lossless."""
original = [(i, f"line{i % 3}") for i in range(10)]
encoded = encode_rle(original)
decoded = decode_rle(encoded)
assert decoded == original
class TestCompression:
"""Tests for frame compression."""
def test_compress_decompress(self):
"""compress_frame is lossless."""
buffer = [f"Line {i:02d}" for i in range(24)]
compressed = compress_frame(buffer)
decompressed = decompress_frame(compressed, 24)
assert decompressed == buffer
def test_compress_empty(self):
"""compress_frame handles empty buffer."""
compressed = compress_frame([])
decompressed = decompress_frame(compressed, 0)
assert decompressed == []
class TestBinaryProtocol:
"""Tests for binary message encoding."""
def test_encode_decode_message(self):
"""encode_binary_message is lossless."""
payload = b"test payload"
encoded = encode_binary_message(MessageType.FULL_FRAME, 80, 24, payload)
msg_type, width, height, decoded_payload = decode_binary_message(encoded)
assert msg_type == MessageType.FULL_FRAME
assert width == 80
assert height == 24
assert decoded_payload == payload
def test_encode_decode_all_types(self):
"""All message types encode correctly."""
for msg_type in MessageType:
payload = b"test"
encoded = encode_binary_message(msg_type, 80, 24, payload)
decoded_type, _, _, _ = decode_binary_message(encoded)
assert decoded_type == msg_type
class TestDiffProtocol:
"""Tests for diff message encoding."""
def test_encode_decode_diff(self):
"""encode_diff_message is lossless."""
diff = FrameDiff(width=80, height=24, changed_lines=[(0, "a"), (5, "b")])
payload = encode_diff_message(diff)
decoded = decode_diff_message(payload)
assert decoded == diff.changed_lines
class TestApplyDiff:
"""Tests for applying diffs."""
def test_apply_diff(self):
"""apply_diff reconstructs new buffer."""
old_buffer = ["a", "b", "c", "d"]
diff = FrameDiff(width=1, height=4, changed_lines=[(1, "x"), (2, "y")])
new_buffer = apply_diff(old_buffer, diff)
assert new_buffer == ["a", "x", "y", "d"]
def test_apply_diff_new_lines(self):
"""apply_diff handles new lines."""
old_buffer = ["a", "b"]
diff = FrameDiff(width=1, height=4, changed_lines=[(2, "c"), (3, "d")])
new_buffer = apply_diff(old_buffer, diff)
assert new_buffer == ["a", "b", "c", "d"]
class TestShouldUseDiff:
"""Tests for diff threshold decision."""
def test_uses_diff_when_small_changes(self):
"""should_use_diff returns True when few changes."""
old = ["a"] * 100
new = ["a"] * 95 + ["b"] * 5
assert should_use_diff(old, new, threshold=0.3) is True
def test_uses_full_when_many_changes(self):
"""should_use_diff returns False when many changes."""
old = ["a"] * 100
new = ["b"] * 100
assert should_use_diff(old, new, threshold=0.3) is False
def test_uses_diff_at_threshold(self):
"""should_use_diff handles threshold boundary."""
old = ["a"] * 100
new = ["a"] * 70 + ["b"] * 30
result = should_use_diff(old, new, threshold=0.3)
assert result is True or result is False # At boundary
def test_returns_false_for_empty(self):
"""should_use_diff returns False for empty buffers."""
assert should_use_diff([], ["a", "b"]) is False
assert should_use_diff(["a", "b"], []) is False

View File

@@ -110,10 +110,9 @@ class TestViewportFilterStage:
filtered = stage.process(test_items, ctx)
improvement_factor = len(test_items) / len(filtered)
# Verify we get at least 400x improvement (better than old ~288x)
assert improvement_factor > 400
# Verify we get the expected ~479x improvement
assert 400 < improvement_factor < 600
# Verify we get significant improvement (360x with 4 items vs 1438)
assert improvement_factor > 300
assert 300 < improvement_factor < 500
class TestViewportFilterIntegration:

View File

@@ -160,3 +160,236 @@ class TestWebSocketDisplayUnavailable:
"""show does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.show(["line1", "line2"])
class TestWebSocketUIPanelIntegration:
"""Tests for WebSocket-UIPanel integration for remote control."""
def test_set_controller_stores_controller(self):
"""set_controller stores the controller reference."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
mock_controller = MagicMock()
display.set_controller(mock_controller)
assert display._controller is mock_controller
def test_set_command_callback_stores_callback(self):
"""set_command_callback stores the callback."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_command_callback(callback)
assert display._command_callback is callback
def test_get_state_snapshot_returns_none_without_controller(self):
"""_get_state_snapshot returns None when no controller is set."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert display._get_state_snapshot() is None
def test_get_state_snapshot_returns_controller_state(self):
"""_get_state_snapshot returns state from controller."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
# Create mock controller with expected attributes
mock_controller = MagicMock()
mock_controller.stages = {
"test_stage": MagicMock(
enabled=True, params={"intensity": 0.5}, selected=False
)
}
mock_controller._current_preset = "demo"
mock_controller._presets = ["demo", "test"]
mock_controller.selected_stage = "test_stage"
display.set_controller(mock_controller)
state = display._get_state_snapshot()
assert state is not None
assert "stages" in state
assert "test_stage" in state["stages"]
assert state["stages"]["test_stage"]["enabled"] is True
assert state["stages"]["test_stage"]["params"] == {"intensity": 0.5}
assert state["preset"] == "demo"
assert state["presets"] == ["demo", "test"]
assert state["selected_stage"] == "test_stage"
def test_get_state_snapshot_handles_missing_attributes(self):
"""_get_state_snapshot handles controller without all attributes."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
# Create mock controller without stages attribute using spec
# This prevents MagicMock from auto-creating the attribute
mock_controller = MagicMock(spec=[]) # Empty spec means no attributes
display.set_controller(mock_controller)
state = display._get_state_snapshot()
assert state == {}
def test_broadcast_state_sends_to_clients(self):
"""broadcast_state sends state update to all connected clients."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
# Mock client with send method
mock_client = MagicMock()
mock_client.send = MagicMock()
display._clients.add(mock_client)
test_state = {"test": "state"}
display.broadcast_state(test_state)
# Verify send was called with JSON containing state
mock_client.send.assert_called_once()
call_args = mock_client.send.call_args[0][0]
assert '"type": "state"' in call_args
assert '"test"' in call_args
def test_broadcast_state_noop_when_no_clients(self):
"""broadcast_state does nothing when no clients connected."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
display._clients.clear()
# Should not raise error
display.broadcast_state({"test": "state"})
class TestWebSocketHTTPServerPath:
"""Tests for WebSocket HTTP server client directory path calculation."""
def test_client_dir_path_calculation(self):
"""Client directory path is correctly calculated from websocket.py location."""
import os
# Use the actual websocket.py file location, not the test file
websocket_module = __import__(
"engine.display.backends.websocket", fromlist=["WebSocketDisplay"]
)
websocket_file = websocket_module.__file__
parts = websocket_file.split(os.sep)
if "engine" in parts:
engine_idx = parts.index("engine")
project_root = os.sep.join(parts[:engine_idx])
client_dir = os.path.join(project_root, "client")
else:
# Fallback calculation (shouldn't happen in normal test runs)
client_dir = os.path.join(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(websocket_file)))
),
"client",
)
# Verify the client directory exists and contains expected files
assert os.path.exists(client_dir), f"Client directory not found: {client_dir}"
assert "index.html" in os.listdir(client_dir), (
"index.html not found in client directory"
)
assert "editor.html" in os.listdir(client_dir), (
"editor.html not found in client directory"
)
# Verify the path is correct (should be .../Mainline/client)
assert client_dir.endswith("client"), (
f"Client dir should end with 'client': {client_dir}"
)
assert "Mainline" in client_dir, (
f"Client dir should contain 'Mainline': {client_dir}"
)
def test_http_server_directory_serves_client_files(self):
"""HTTP server directory correctly serves client files."""
import os
# Use the actual websocket.py file location, not the test file
websocket_module = __import__(
"engine.display.backends.websocket", fromlist=["WebSocketDisplay"]
)
websocket_file = websocket_module.__file__
parts = websocket_file.split(os.sep)
if "engine" in parts:
engine_idx = parts.index("engine")
project_root = os.sep.join(parts[:engine_idx])
client_dir = os.path.join(project_root, "client")
else:
client_dir = os.path.join(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(websocket_file)))
),
"client",
)
# Verify the handler would be able to serve files from this directory
# We can't actually instantiate the handler without a valid request,
# but we can verify the directory is accessible
assert os.access(client_dir, os.R_OK), (
f"Client directory not readable: {client_dir}"
)
# Verify key files exist
index_path = os.path.join(client_dir, "index.html")
editor_path = os.path.join(client_dir, "editor.html")
assert os.path.exists(index_path), f"index.html not found at: {index_path}"
assert os.path.exists(editor_path), f"editor.html not found at: {editor_path}"
# Verify files are readable
assert os.access(index_path, os.R_OK), "index.html not readable"
assert os.access(editor_path, os.R_OK), "editor.html not readable"
def test_old_buggy_path_does_not_find_client_directory(self):
"""The old buggy path (3 dirname calls) should NOT find the client directory.
This test verifies that the old buggy behavior would have failed.
The old code used:
client_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "client"
)
This would resolve to: .../engine/client (which doesn't exist)
Instead of: .../Mainline/client (which does exist)
"""
import os
# Use the actual websocket.py file location
websocket_module = __import__(
"engine.display.backends.websocket", fromlist=["WebSocketDisplay"]
)
websocket_file = websocket_module.__file__
# OLD BUGGY CODE: 3 dirname calls
old_buggy_client_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(websocket_file))), "client"
)
# This path should NOT exist (it's the buggy path)
assert not os.path.exists(old_buggy_client_dir), (
f"Old buggy path should not exist: {old_buggy_client_dir}\n"
f"If this assertion fails, the bug may have been fixed elsewhere or "
f"the test needs updating."
)
# The buggy path should be .../engine/client, not .../Mainline/client
assert old_buggy_client_dir.endswith("engine/client"), (
f"Old buggy path should end with 'engine/client': {old_buggy_client_dir}"
)
# Verify that going up one more level (4 dirname calls) finds the correct path
correct_client_dir = os.path.join(
os.path.dirname(
os.path.dirname(os.path.dirname(os.path.dirname(websocket_file)))
),
"client",
)
assert os.path.exists(correct_client_dir), (
f"Correct path should exist: {correct_client_dir}"
)
assert "index.html" in os.listdir(correct_client_dir), (
f"index.html should exist in correct path: {correct_client_dir}"
)