""" End-to-end pipeline integration tests. Verifies that data actually flows through every pipeline stage (source -> render -> effects -> display) using a queue-backed stub display to capture output frames. These tests catch dead-code paths and wiring bugs that unit tests miss. """ import queue from unittest.mock import patch from engine.data_sources.sources import ListDataSource, SourceItem from engine.effects import EffectContext from engine.effects.types import EffectPlugin from engine.pipeline import Pipeline, PipelineConfig from engine.pipeline.adapters import ( DataSourceStage, DisplayStage, EffectPluginStage, FontStage, SourceItemsToBufferStage, ViewportFilterStage, ) from engine.pipeline.core import PipelineContext from engine.pipeline.params import PipelineParams # ─── FIXTURES ──────────────────────────────────────────── class QueueDisplay: """Stub display that captures every frame into a queue. Acts as a FIFO sink so tests can inspect exactly what the pipeline produced without any terminal or network I/O. """ def __init__(self): self.frames: queue.Queue[list[str]] = queue.Queue() self.width = 80 self.height = 24 self._init_called = False def init(self, width: int, height: int, reuse: bool = False) -> None: self.width = width self.height = height self._init_called = True def show(self, buffer: list[str], border: bool = False) -> None: # Deep copy to prevent later mutations self.frames.put(list(buffer)) def clear(self) -> None: pass def cleanup(self) -> None: pass def get_dimensions(self) -> tuple[int, int]: return (self.width, self.height) class MarkerEffect(EffectPlugin): """Effect that prepends a marker line to prove it ran. Each MarkerEffect adds a unique tag so tests can verify which effects executed and in what order. """ def __init__(self, tag: str = "MARKER"): self._tag = tag self.call_count = 0 super().__init__() @property def name(self) -> str: return f"marker-{self._tag}" def configure(self, config: dict) -> None: pass def process(self, buffer: list[str], ctx: EffectContext) -> list[str]: self.call_count += 1 if buffer is None: return [f"[{self._tag}:EMPTY]"] return [f"[{self._tag}]"] + list(buffer) # ─── HELPERS ───────────────────────────────────────────── def _build_pipeline( items: list, effects: list[tuple[str, EffectPlugin]] | None = None, use_font_stage: bool = False, width: int = 80, height: int = 24, ) -> tuple[Pipeline, QueueDisplay, PipelineContext]: """Build a fully-wired pipeline with a QueueDisplay sink. Args: items: Content items to feed into the source. effects: Optional list of (name, EffectPlugin) to add. use_font_stage: Use FontStage instead of SourceItemsToBufferStage. width: Viewport width. height: Viewport height. Returns: (pipeline, queue_display, context) tuple. """ display = QueueDisplay() ctx = PipelineContext() params = PipelineParams() params.viewport_width = width params.viewport_height = height params.frame_number = 0 ctx.params = params ctx.set("items", items) pipeline = Pipeline( config=PipelineConfig(enable_metrics=True), context=ctx, ) # Source stage source = ListDataSource(items, name="test-source") pipeline.add_stage("source", DataSourceStage(source, name="test-source")) # Render stage if use_font_stage: # FontStage requires viewport_filter stage which requires camera state from engine.camera import Camera from engine.pipeline.adapters import CameraClockStage, CameraStage camera = Camera.scroll(speed=0.0) camera.set_canvas_size(200, 200) # CameraClockStage updates camera state, must come before viewport_filter pipeline.add_stage( "camera_update", CameraClockStage(camera, name="camera-clock") ) # ViewportFilterStage requires camera.state pipeline.add_stage( "viewport_filter", ViewportFilterStage(name="viewport-filter") ) # FontStage converts items to buffer pipeline.add_stage("render", FontStage(name="font")) # CameraStage applies viewport transformation to rendered buffer pipeline.add_stage("camera", CameraStage(camera, name="static")) else: pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) # Effect stages if effects: for effect_name, effect_plugin in effects: pipeline.add_stage( f"effect_{effect_name}", EffectPluginStage(effect_plugin, name=effect_name), ) # Display stage pipeline.add_stage("display", DisplayStage(display, name="queue")) pipeline.build() pipeline.initialize() return pipeline, display, ctx # ─── TESTS: HAPPY PATH ────────────────────────────────── class TestPipelineE2EHappyPath: """End-to-end: data flows source -> render -> display.""" def test_items_reach_display(self): """Content items fed to source must appear in the display output.""" items = [ SourceItem(content="Hello World", source="test", timestamp="now"), SourceItem(content="Second Item", source="test", timestamp="now"), ] pipeline, display, ctx = _build_pipeline(items) result = pipeline.execute(items) assert result.success, f"Pipeline failed: {result.error}" frame = display.frames.get(timeout=1) text = "\n".join(frame) assert "Hello World" in text assert "Second Item" in text def test_pipeline_output_is_list_of_strings(self): """Display must receive list[str], not raw SourceItems.""" items = [SourceItem(content="Line one", source="s", timestamp="t")] pipeline, display, ctx = _build_pipeline(items) result = pipeline.execute(items) assert result.success frame = display.frames.get(timeout=1) assert isinstance(frame, list) for line in frame: assert isinstance(line, str), f"Expected str, got {type(line)}: {line!r}" def test_multiline_items_are_split(self): """Items with newlines should be split into individual buffer lines.""" items = [ SourceItem(content="Line A\nLine B\nLine C", source="s", timestamp="t") ] pipeline, display, ctx = _build_pipeline(items) result = pipeline.execute(items) assert result.success frame = display.frames.get(timeout=1) # Camera stage pads lines to viewport width, so check for substring match assert any("Line A" in line for line in frame) assert any("Line B" in line for line in frame) assert any("Line C" in line for line in frame) def test_empty_source_produces_empty_buffer(self): """An empty source should produce an empty (or blank) frame.""" items = [] pipeline, display, ctx = _build_pipeline(items) result = pipeline.execute(items) assert result.success frame = display.frames.get(timeout=1) assert isinstance(frame, list) def test_multiple_frames_are_independent(self): """Each execute() call should produce a distinct frame.""" items = [SourceItem(content="frame-content", source="s", timestamp="t")] pipeline, display, ctx = _build_pipeline(items) pipeline.execute(items) pipeline.execute(items) f1 = display.frames.get(timeout=1) f2 = display.frames.get(timeout=1) assert f1 == f2 # Same input => same output assert display.frames.empty() # Exactly 2 frames # ─── TESTS: EFFECTS IN THE PIPELINE ───────────────────── class TestPipelineE2EEffects: """End-to-end: effects process the buffer between render and display.""" def test_single_effect_modifies_output(self): """A single effect should visibly modify the output frame.""" items = [SourceItem(content="Original", source="s", timestamp="t")] marker = MarkerEffect("FX1") pipeline, display, ctx = _build_pipeline(items, effects=[("marker", marker)]) result = pipeline.execute(items) assert result.success frame = display.frames.get(timeout=1) # Camera stage pads lines to viewport width, so check for substring match assert any("[FX1]" in line for line in frame), ( f"Marker not found in frame: {frame}" ) assert "Original" in "\n".join(frame) def test_effect_chain_ordering(self): """Multiple effects execute in the order they were added.""" items = [SourceItem(content="data", source="s", timestamp="t")] fx_a = MarkerEffect("A") fx_b = MarkerEffect("B") pipeline, display, ctx = _build_pipeline( items, effects=[("alpha", fx_a), ("beta", fx_b)] ) result = pipeline.execute(items) assert result.success frame = display.frames.get(timeout=1) text = "\n".join(frame) # B runs after A, so B's marker is prepended last => appears first idx_a = text.index("[A]") idx_b = text.index("[B]") assert idx_b < idx_a, f"Expected [B] before [A], got: {frame}" def test_effect_receives_list_of_strings(self): """Effects must receive list[str] from the render stage.""" items = [SourceItem(content="check-type", source="s", timestamp="t")] received_types = [] class TypeCheckEffect(EffectPlugin): @property def name(self): return "typecheck" def configure(self, config): pass def process(self, buffer, ctx): received_types.append(type(buffer).__name__) if isinstance(buffer, list): for item in buffer: received_types.append(type(item).__name__) return buffer pipeline, display, ctx = _build_pipeline( items, effects=[("typecheck", TypeCheckEffect())] ) pipeline.execute(items) assert received_types[0] == "list", f"Buffer type: {received_types[0]}" # All elements should be strings for t in received_types[1:]: assert t == "str", f"Buffer element type: {t}" def test_disabled_effect_is_skipped(self): """A disabled effect should not process data.""" items = [SourceItem(content="data", source="s", timestamp="t")] marker = MarkerEffect("DISABLED") pipeline, display, ctx = _build_pipeline( items, effects=[("disabled-fx", marker)] ) # Disable the effect stage stage = pipeline.get_stage("effect_disabled-fx") stage.set_enabled(False) result = pipeline.execute(items) assert result.success frame = display.frames.get(timeout=1) assert "[DISABLED]" not in frame, "Disabled effect should not run" assert marker.call_count == 0 # ─── TESTS: STAGE EXECUTION ORDER & METRICS ───────────── class TestPipelineE2EStageOrder: """Verify all stages execute and metrics are collected.""" def test_all_stages_appear_in_execution_order(self): """Pipeline build must include source, render, and display.""" items = [SourceItem(content="x", source="s", timestamp="t")] pipeline, display, ctx = _build_pipeline(items) order = pipeline.execution_order assert "source" in order assert "render" in order assert "display" in order def test_execution_order_is_source_render_display(self): """Source must come before render, render before display.""" items = [SourceItem(content="x", source="s", timestamp="t")] pipeline, display, ctx = _build_pipeline(items) order = pipeline.execution_order assert order.index("source") < order.index("render") assert order.index("render") < order.index("display") def test_effects_between_render_and_display(self): """Effects must execute after render and before display.""" items = [SourceItem(content="x", source="s", timestamp="t")] marker = MarkerEffect("MID") pipeline, display, ctx = _build_pipeline(items, effects=[("mid", marker)]) order = pipeline.execution_order render_idx = order.index("render") display_idx = order.index("display") effect_idx = order.index("effect_mid") assert render_idx < effect_idx < display_idx def test_metrics_collected_for_all_stages(self): """After execution, metrics should exist for every active stage.""" items = [SourceItem(content="x", source="s", timestamp="t")] marker = MarkerEffect("M") pipeline, display, ctx = _build_pipeline(items, effects=[("m", marker)]) pipeline.execute(items) summary = pipeline.get_metrics_summary() assert "stages" in summary stage_names = set(summary["stages"].keys()) # All regular (non-overlay) stages should have metrics assert "source" in stage_names assert "render" in stage_names assert "queue" in stage_names # Display stage is named "queue" in the test assert "effect_m" in stage_names # ─── TESTS: FONT STAGE DATAFLOW ───────────────────────── class TestFontStageDataflow: """Verify FontStage correctly renders content through make_block. These tests expose the tuple-unpacking bug in FontStage.process() where make_block returns (lines, color, meta_idx) but the code does result.extend(block) instead of result.extend(block[0]). """ def test_font_stage_unpacks_make_block_correctly(self): """FontStage must produce list[str] output, not mixed types.""" items = [ SourceItem(content="Test Headline", source="test-src", timestamp="12345") ] # Mock make_block to return its documented signature mock_lines = [" RENDERED LINE 1", " RENDERED LINE 2", "", " meta info"] mock_return = (mock_lines, "\033[38;5;46m", 3) with patch("engine.render.make_block", return_value=mock_return): pipeline, display, ctx = _build_pipeline(items, use_font_stage=True) result = pipeline.execute(items) assert result.success, f"Pipeline failed: {result.error}" frame = display.frames.get(timeout=1) # Every element in the frame must be a string for i, line in enumerate(frame): assert isinstance(line, str), ( f"Frame line {i} is {type(line).__name__}: {line!r} " f"(FontStage likely extended with raw tuple)" ) def test_font_stage_output_contains_rendered_content(self): """FontStage output should contain the rendered lines, not color codes.""" items = [SourceItem(content="My Headline", source="src", timestamp="0")] mock_lines = [" BIG BLOCK TEXT", " MORE TEXT", "", " ░ src · 0"] mock_return = (mock_lines, "\033[38;5;46m", 3) with patch("engine.render.make_block", return_value=mock_return): pipeline, display, ctx = _build_pipeline(items, use_font_stage=True) result = pipeline.execute(items) assert result.success frame = display.frames.get(timeout=1) text = "\n".join(frame) assert "BIG BLOCK TEXT" in text assert "MORE TEXT" in text def test_font_stage_does_not_leak_color_codes_as_lines(self): """The ANSI color code from make_block must NOT appear as a frame line.""" items = [SourceItem(content="Headline", source="s", timestamp="0")] color_code = "\033[38;5;46m" mock_return = ([" rendered"], color_code, 0) with patch("engine.render.make_block", return_value=mock_return): pipeline, display, ctx = _build_pipeline(items, use_font_stage=True) result = pipeline.execute(items) assert result.success frame = display.frames.get(timeout=1) # The color code itself should not be a standalone line assert color_code not in frame, ( f"Color code leaked as a frame line: {frame}" ) # The meta_row_index (int) should not be a line either for line in frame: assert not isinstance(line, int), f"Integer leaked into frame: {line}" def test_font_stage_handles_multiple_items(self): """FontStage should render each item through make_block.""" items = [ SourceItem(content="First", source="a", timestamp="1"), SourceItem(content="Second", source="b", timestamp="2"), ] call_count = 0 def mock_make_block(title, src, ts, w): nonlocal call_count call_count += 1 return ([f" [{title}]"], "\033[0m", 0) with patch("engine.render.make_block", side_effect=mock_make_block): pipeline, display, ctx = _build_pipeline(items, use_font_stage=True) result = pipeline.execute(items) assert result.success assert call_count == 2, f"make_block called {call_count} times, expected 2" frame = display.frames.get(timeout=1) text = "\n".join(frame) assert "[First]" in text assert "[Second]" in text # ─── TESTS: MIRROR OF app.py ASSEMBLY ─────────────────── class TestAppPipelineAssembly: """Verify the pipeline as assembled by app.py works end-to-end. This mirrors how run_pipeline_mode() builds the pipeline but without any network or terminal dependencies. """ def test_demo_preset_pipeline_produces_output(self): """Simulates the 'demo' preset pipeline with stub data.""" # Simulate what app.py does for the demo preset items = [ ("Breaking: Test passes", "UnitTest", "1234567890"), ("Update: Coverage improves", "CI", "1234567891"), ] display = QueueDisplay() ctx = PipelineContext() params = PipelineParams() params.viewport_width = 80 params.viewport_height = 24 params.frame_number = 0 ctx.params = params ctx.set("items", items) pipeline = Pipeline( config=PipelineConfig(enable_metrics=True), context=ctx, ) # Mirror app.py: ListDataSource -> SourceItemsToBufferStage -> display source = ListDataSource(items, name="headlines") pipeline.add_stage("source", DataSourceStage(source, name="headlines")) pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) pipeline.add_stage("display", DisplayStage(display, name="queue")) pipeline.build() pipeline.initialize() result = pipeline.execute(items) assert result.success, f"Pipeline failed: {result.error}" assert not display.frames.empty(), "Display received no frames" frame = display.frames.get(timeout=1) assert isinstance(frame, list) assert len(frame) > 0 # All lines must be strings for line in frame: assert isinstance(line, str)