forked from genewildish/Mainline
Two critical fixes: 1. ListDataSource Cache Bug - Previously, ListDataSource.__init__ cached raw tuples directly - get_items() would return cached raw tuples without converting to SourceItem - This caused SourceItemsToBufferStage to receive tuples and stringify them - Results: ugly tuple representations in terminal/pygame instead of formatted text - Fix: Store raw items in _raw_items, let fetch() convert to SourceItem - Cache now contains proper SourceItem objects 2. Camera Dependency Resolution - CameraStage declared dependency on 'source.items' exactly - DataSourceStage provides 'source.headlines' (or 'source.poetry', etc.) - Capability matching didn't trigger prefix match for exact dependency - Fix: Change CameraStage dependency to 'source' for prefix matching 3. Added app.py Camera Stage Support - Pipeline now adds camera stage from preset.camera config - Supports vertical, horizontal, omni, floating, bounce modes - Tests now passing with proper data flow through all stages Tests: All 502 tests passing, 16 skipped
527 lines
18 KiB
Python
527 lines
18 KiB
Python
"""
|
|
End-to-end pipeline integration tests.
|
|
|
|
Verifies that data actually flows through every pipeline stage
|
|
(source -> render -> effects -> display) using a queue-backed
|
|
stub display to capture output frames.
|
|
|
|
These tests catch dead-code paths and wiring bugs that unit tests miss.
|
|
"""
|
|
|
|
import queue
|
|
from unittest.mock import patch
|
|
|
|
from engine.data_sources.sources import ListDataSource, SourceItem
|
|
from engine.effects import EffectContext
|
|
from engine.effects.types import EffectPlugin
|
|
from engine.pipeline import Pipeline, PipelineConfig
|
|
from engine.pipeline.adapters import (
|
|
DataSourceStage,
|
|
DisplayStage,
|
|
EffectPluginStage,
|
|
FontStage,
|
|
SourceItemsToBufferStage,
|
|
)
|
|
from engine.pipeline.core import PipelineContext
|
|
from engine.pipeline.params import PipelineParams
|
|
|
|
# ─── FIXTURES ────────────────────────────────────────────
|
|
|
|
|
|
class QueueDisplay:
|
|
"""Stub display that captures every frame into a queue.
|
|
|
|
Acts as a FIFO sink so tests can inspect exactly what
|
|
the pipeline produced without any terminal or network I/O.
|
|
"""
|
|
|
|
def __init__(self):
|
|
self.frames: queue.Queue[list[str]] = queue.Queue()
|
|
self.width = 80
|
|
self.height = 24
|
|
self._init_called = False
|
|
|
|
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
|
self.width = width
|
|
self.height = height
|
|
self._init_called = True
|
|
|
|
def show(self, buffer: list[str], border: bool = False) -> None:
|
|
# Deep copy to prevent later mutations
|
|
self.frames.put(list(buffer))
|
|
|
|
def clear(self) -> None:
|
|
pass
|
|
|
|
def cleanup(self) -> None:
|
|
pass
|
|
|
|
def get_dimensions(self) -> tuple[int, int]:
|
|
return (self.width, self.height)
|
|
|
|
|
|
class MarkerEffect(EffectPlugin):
|
|
"""Effect that prepends a marker line to prove it ran.
|
|
|
|
Each MarkerEffect adds a unique tag so tests can verify
|
|
which effects executed and in what order.
|
|
"""
|
|
|
|
def __init__(self, tag: str = "MARKER"):
|
|
self._tag = tag
|
|
self.call_count = 0
|
|
super().__init__()
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return f"marker-{self._tag}"
|
|
|
|
def configure(self, config: dict) -> None:
|
|
pass
|
|
|
|
def process(self, buffer: list[str], ctx: EffectContext) -> list[str]:
|
|
self.call_count += 1
|
|
if buffer is None:
|
|
return [f"[{self._tag}:EMPTY]"]
|
|
return [f"[{self._tag}]"] + list(buffer)
|
|
|
|
|
|
# ─── HELPERS ─────────────────────────────────────────────
|
|
|
|
|
|
def _build_pipeline(
|
|
items: list,
|
|
effects: list[tuple[str, EffectPlugin]] | None = None,
|
|
use_font_stage: bool = False,
|
|
width: int = 80,
|
|
height: int = 24,
|
|
) -> tuple[Pipeline, QueueDisplay, PipelineContext]:
|
|
"""Build a fully-wired pipeline with a QueueDisplay sink.
|
|
|
|
Args:
|
|
items: Content items to feed into the source.
|
|
effects: Optional list of (name, EffectPlugin) to add.
|
|
use_font_stage: Use FontStage instead of SourceItemsToBufferStage.
|
|
width: Viewport width.
|
|
height: Viewport height.
|
|
|
|
Returns:
|
|
(pipeline, queue_display, context) tuple.
|
|
"""
|
|
display = QueueDisplay()
|
|
|
|
ctx = PipelineContext()
|
|
params = PipelineParams()
|
|
params.viewport_width = width
|
|
params.viewport_height = height
|
|
params.frame_number = 0
|
|
ctx.params = params
|
|
ctx.set("items", items)
|
|
|
|
pipeline = Pipeline(
|
|
config=PipelineConfig(enable_metrics=True),
|
|
context=ctx,
|
|
)
|
|
|
|
# Source stage
|
|
source = ListDataSource(items, name="test-source")
|
|
pipeline.add_stage("source", DataSourceStage(source, name="test-source"))
|
|
|
|
# Render stage
|
|
if use_font_stage:
|
|
pipeline.add_stage("render", FontStage(name="font"))
|
|
else:
|
|
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
|
|
|
|
# Effect stages
|
|
if effects:
|
|
for effect_name, effect_plugin in effects:
|
|
pipeline.add_stage(
|
|
f"effect_{effect_name}",
|
|
EffectPluginStage(effect_plugin, name=effect_name),
|
|
)
|
|
|
|
# Display stage
|
|
pipeline.add_stage("display", DisplayStage(display, name="queue"))
|
|
|
|
pipeline.build()
|
|
pipeline.initialize()
|
|
|
|
return pipeline, display, ctx
|
|
|
|
|
|
# ─── TESTS: HAPPY PATH ──────────────────────────────────
|
|
|
|
|
|
class TestPipelineE2EHappyPath:
|
|
"""End-to-end: data flows source -> render -> display."""
|
|
|
|
def test_items_reach_display(self):
|
|
"""Content items fed to source must appear in the display output."""
|
|
items = [
|
|
SourceItem(content="Hello World", source="test", timestamp="now"),
|
|
SourceItem(content="Second Item", source="test", timestamp="now"),
|
|
]
|
|
pipeline, display, ctx = _build_pipeline(items)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success, f"Pipeline failed: {result.error}"
|
|
frame = display.frames.get(timeout=1)
|
|
text = "\n".join(frame)
|
|
assert "Hello World" in text
|
|
assert "Second Item" in text
|
|
|
|
def test_pipeline_output_is_list_of_strings(self):
|
|
"""Display must receive list[str], not raw SourceItems."""
|
|
items = [SourceItem(content="Line one", source="s", timestamp="t")]
|
|
pipeline, display, ctx = _build_pipeline(items)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
frame = display.frames.get(timeout=1)
|
|
assert isinstance(frame, list)
|
|
for line in frame:
|
|
assert isinstance(line, str), f"Expected str, got {type(line)}: {line!r}"
|
|
|
|
def test_multiline_items_are_split(self):
|
|
"""Items with newlines should be split into individual buffer lines."""
|
|
items = [
|
|
SourceItem(content="Line A\nLine B\nLine C", source="s", timestamp="t")
|
|
]
|
|
pipeline, display, ctx = _build_pipeline(items)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
frame = display.frames.get(timeout=1)
|
|
assert "Line A" in frame
|
|
assert "Line B" in frame
|
|
assert "Line C" in frame
|
|
|
|
def test_empty_source_produces_empty_buffer(self):
|
|
"""An empty source should produce an empty (or blank) frame."""
|
|
items = []
|
|
pipeline, display, ctx = _build_pipeline(items)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
frame = display.frames.get(timeout=1)
|
|
assert isinstance(frame, list)
|
|
|
|
def test_multiple_frames_are_independent(self):
|
|
"""Each execute() call should produce a distinct frame."""
|
|
items = [SourceItem(content="frame-content", source="s", timestamp="t")]
|
|
pipeline, display, ctx = _build_pipeline(items)
|
|
|
|
pipeline.execute(items)
|
|
pipeline.execute(items)
|
|
|
|
f1 = display.frames.get(timeout=1)
|
|
f2 = display.frames.get(timeout=1)
|
|
assert f1 == f2 # Same input => same output
|
|
assert display.frames.empty() # Exactly 2 frames
|
|
|
|
|
|
# ─── TESTS: EFFECTS IN THE PIPELINE ─────────────────────
|
|
|
|
|
|
class TestPipelineE2EEffects:
|
|
"""End-to-end: effects process the buffer between render and display."""
|
|
|
|
def test_single_effect_modifies_output(self):
|
|
"""A single effect should visibly modify the output frame."""
|
|
items = [SourceItem(content="Original", source="s", timestamp="t")]
|
|
marker = MarkerEffect("FX1")
|
|
pipeline, display, ctx = _build_pipeline(items, effects=[("marker", marker)])
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
frame = display.frames.get(timeout=1)
|
|
assert "[FX1]" in frame, f"Marker not found in frame: {frame}"
|
|
assert "Original" in "\n".join(frame)
|
|
|
|
def test_effect_chain_ordering(self):
|
|
"""Multiple effects execute in the order they were added."""
|
|
items = [SourceItem(content="data", source="s", timestamp="t")]
|
|
fx_a = MarkerEffect("A")
|
|
fx_b = MarkerEffect("B")
|
|
pipeline, display, ctx = _build_pipeline(
|
|
items, effects=[("alpha", fx_a), ("beta", fx_b)]
|
|
)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
frame = display.frames.get(timeout=1)
|
|
text = "\n".join(frame)
|
|
# B runs after A, so B's marker is prepended last => appears first
|
|
idx_a = text.index("[A]")
|
|
idx_b = text.index("[B]")
|
|
assert idx_b < idx_a, f"Expected [B] before [A], got: {frame}"
|
|
|
|
def test_effect_receives_list_of_strings(self):
|
|
"""Effects must receive list[str] from the render stage."""
|
|
items = [SourceItem(content="check-type", source="s", timestamp="t")]
|
|
received_types = []
|
|
|
|
class TypeCheckEffect(EffectPlugin):
|
|
@property
|
|
def name(self):
|
|
return "typecheck"
|
|
|
|
def configure(self, config):
|
|
pass
|
|
|
|
def process(self, buffer, ctx):
|
|
received_types.append(type(buffer).__name__)
|
|
if isinstance(buffer, list):
|
|
for item in buffer:
|
|
received_types.append(type(item).__name__)
|
|
return buffer
|
|
|
|
pipeline, display, ctx = _build_pipeline(
|
|
items, effects=[("typecheck", TypeCheckEffect())]
|
|
)
|
|
|
|
pipeline.execute(items)
|
|
|
|
assert received_types[0] == "list", f"Buffer type: {received_types[0]}"
|
|
# All elements should be strings
|
|
for t in received_types[1:]:
|
|
assert t == "str", f"Buffer element type: {t}"
|
|
|
|
def test_disabled_effect_is_skipped(self):
|
|
"""A disabled effect should not process data."""
|
|
items = [SourceItem(content="data", source="s", timestamp="t")]
|
|
marker = MarkerEffect("DISABLED")
|
|
pipeline, display, ctx = _build_pipeline(
|
|
items, effects=[("disabled-fx", marker)]
|
|
)
|
|
|
|
# Disable the effect stage
|
|
stage = pipeline.get_stage("effect_disabled-fx")
|
|
stage.set_enabled(False)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
frame = display.frames.get(timeout=1)
|
|
assert "[DISABLED]" not in frame, "Disabled effect should not run"
|
|
assert marker.call_count == 0
|
|
|
|
|
|
# ─── TESTS: STAGE EXECUTION ORDER & METRICS ─────────────
|
|
|
|
|
|
class TestPipelineE2EStageOrder:
|
|
"""Verify all stages execute and metrics are collected."""
|
|
|
|
def test_all_stages_appear_in_execution_order(self):
|
|
"""Pipeline build must include source, render, and display."""
|
|
items = [SourceItem(content="x", source="s", timestamp="t")]
|
|
pipeline, display, ctx = _build_pipeline(items)
|
|
|
|
order = pipeline.execution_order
|
|
assert "source" in order
|
|
assert "render" in order
|
|
assert "display" in order
|
|
|
|
def test_execution_order_is_source_render_display(self):
|
|
"""Source must come before render, render before display."""
|
|
items = [SourceItem(content="x", source="s", timestamp="t")]
|
|
pipeline, display, ctx = _build_pipeline(items)
|
|
|
|
order = pipeline.execution_order
|
|
assert order.index("source") < order.index("render")
|
|
assert order.index("render") < order.index("display")
|
|
|
|
def test_effects_between_render_and_display(self):
|
|
"""Effects must execute after render and before display."""
|
|
items = [SourceItem(content="x", source="s", timestamp="t")]
|
|
marker = MarkerEffect("MID")
|
|
pipeline, display, ctx = _build_pipeline(items, effects=[("mid", marker)])
|
|
|
|
order = pipeline.execution_order
|
|
render_idx = order.index("render")
|
|
display_idx = order.index("display")
|
|
effect_idx = order.index("effect_mid")
|
|
assert render_idx < effect_idx < display_idx
|
|
|
|
def test_metrics_collected_for_all_stages(self):
|
|
"""After execution, metrics should exist for every active stage."""
|
|
items = [SourceItem(content="x", source="s", timestamp="t")]
|
|
marker = MarkerEffect("M")
|
|
pipeline, display, ctx = _build_pipeline(items, effects=[("m", marker)])
|
|
|
|
pipeline.execute(items)
|
|
|
|
summary = pipeline.get_metrics_summary()
|
|
assert "stages" in summary
|
|
stage_names = set(summary["stages"].keys())
|
|
# All regular (non-overlay) stages should have metrics
|
|
assert "source" in stage_names
|
|
assert "render" in stage_names
|
|
assert "display" in stage_names
|
|
assert "effect_m" in stage_names
|
|
|
|
|
|
# ─── TESTS: FONT STAGE DATAFLOW ─────────────────────────
|
|
|
|
|
|
class TestFontStageDataflow:
|
|
"""Verify FontStage correctly renders content through make_block.
|
|
|
|
These tests expose the tuple-unpacking bug in FontStage.process()
|
|
where make_block returns (lines, color, meta_idx) but the code
|
|
does result.extend(block) instead of result.extend(block[0]).
|
|
"""
|
|
|
|
def test_font_stage_unpacks_make_block_correctly(self):
|
|
"""FontStage must produce list[str] output, not mixed types."""
|
|
items = [
|
|
SourceItem(content="Test Headline", source="test-src", timestamp="12345")
|
|
]
|
|
|
|
# Mock make_block to return its documented signature
|
|
mock_lines = [" RENDERED LINE 1", " RENDERED LINE 2", "", " meta info"]
|
|
mock_return = (mock_lines, "\033[38;5;46m", 3)
|
|
|
|
with patch("engine.render.make_block", return_value=mock_return):
|
|
pipeline, display, ctx = _build_pipeline(items, use_font_stage=True)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success, f"Pipeline failed: {result.error}"
|
|
frame = display.frames.get(timeout=1)
|
|
|
|
# Every element in the frame must be a string
|
|
for i, line in enumerate(frame):
|
|
assert isinstance(line, str), (
|
|
f"Frame line {i} is {type(line).__name__}: {line!r} "
|
|
f"(FontStage likely extended with raw tuple)"
|
|
)
|
|
|
|
def test_font_stage_output_contains_rendered_content(self):
|
|
"""FontStage output should contain the rendered lines, not color codes."""
|
|
items = [SourceItem(content="My Headline", source="src", timestamp="0")]
|
|
|
|
mock_lines = [" BIG BLOCK TEXT", " MORE TEXT", "", " ░ src · 0"]
|
|
mock_return = (mock_lines, "\033[38;5;46m", 3)
|
|
|
|
with patch("engine.render.make_block", return_value=mock_return):
|
|
pipeline, display, ctx = _build_pipeline(items, use_font_stage=True)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
frame = display.frames.get(timeout=1)
|
|
text = "\n".join(frame)
|
|
assert "BIG BLOCK TEXT" in text
|
|
assert "MORE TEXT" in text
|
|
|
|
def test_font_stage_does_not_leak_color_codes_as_lines(self):
|
|
"""The ANSI color code from make_block must NOT appear as a frame line."""
|
|
items = [SourceItem(content="Headline", source="s", timestamp="0")]
|
|
|
|
color_code = "\033[38;5;46m"
|
|
mock_return = ([" rendered"], color_code, 0)
|
|
|
|
with patch("engine.render.make_block", return_value=mock_return):
|
|
pipeline, display, ctx = _build_pipeline(items, use_font_stage=True)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
frame = display.frames.get(timeout=1)
|
|
# The color code itself should not be a standalone line
|
|
assert color_code not in frame, (
|
|
f"Color code leaked as a frame line: {frame}"
|
|
)
|
|
# The meta_row_index (int) should not be a line either
|
|
for line in frame:
|
|
assert not isinstance(line, int), f"Integer leaked into frame: {line}"
|
|
|
|
def test_font_stage_handles_multiple_items(self):
|
|
"""FontStage should render each item through make_block."""
|
|
items = [
|
|
SourceItem(content="First", source="a", timestamp="1"),
|
|
SourceItem(content="Second", source="b", timestamp="2"),
|
|
]
|
|
|
|
call_count = 0
|
|
|
|
def mock_make_block(title, src, ts, w):
|
|
nonlocal call_count
|
|
call_count += 1
|
|
return ([f" [{title}]"], "\033[0m", 0)
|
|
|
|
with patch("engine.render.make_block", side_effect=mock_make_block):
|
|
pipeline, display, ctx = _build_pipeline(items, use_font_stage=True)
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success
|
|
assert call_count == 2, f"make_block called {call_count} times, expected 2"
|
|
frame = display.frames.get(timeout=1)
|
|
text = "\n".join(frame)
|
|
assert "[First]" in text
|
|
assert "[Second]" in text
|
|
|
|
|
|
# ─── TESTS: MIRROR OF app.py ASSEMBLY ───────────────────
|
|
|
|
|
|
class TestAppPipelineAssembly:
|
|
"""Verify the pipeline as assembled by app.py works end-to-end.
|
|
|
|
This mirrors how run_pipeline_mode() builds the pipeline but
|
|
without any network or terminal dependencies.
|
|
"""
|
|
|
|
def test_demo_preset_pipeline_produces_output(self):
|
|
"""Simulates the 'demo' preset pipeline with stub data."""
|
|
# Simulate what app.py does for the demo preset
|
|
items = [
|
|
("Breaking: Test passes", "UnitTest", "1234567890"),
|
|
("Update: Coverage improves", "CI", "1234567891"),
|
|
]
|
|
|
|
display = QueueDisplay()
|
|
ctx = PipelineContext()
|
|
params = PipelineParams()
|
|
params.viewport_width = 80
|
|
params.viewport_height = 24
|
|
params.frame_number = 0
|
|
ctx.params = params
|
|
ctx.set("items", items)
|
|
|
|
pipeline = Pipeline(
|
|
config=PipelineConfig(enable_metrics=True),
|
|
context=ctx,
|
|
)
|
|
|
|
# Mirror app.py: ListDataSource -> SourceItemsToBufferStage -> display
|
|
source = ListDataSource(items, name="headlines")
|
|
pipeline.add_stage("source", DataSourceStage(source, name="headlines"))
|
|
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
|
|
pipeline.add_stage("display", DisplayStage(display, name="queue"))
|
|
|
|
pipeline.build()
|
|
pipeline.initialize()
|
|
|
|
result = pipeline.execute(items)
|
|
|
|
assert result.success, f"Pipeline failed: {result.error}"
|
|
assert not display.frames.empty(), "Display received no frames"
|
|
|
|
frame = display.frames.get(timeout=1)
|
|
assert isinstance(frame, list)
|
|
assert len(frame) > 0
|
|
# All lines must be strings
|
|
for line in frame:
|
|
assert isinstance(line, str)
|