diff --git a/effects_plugins/hud.py b/effects_plugins/hud.py index dcc5677..ad5d2d3 100644 --- a/effects_plugins/hud.py +++ b/effects_plugins/hud.py @@ -88,17 +88,9 @@ class HudEffect(EffectPlugin): f"\033[2;1H\033[38;5;45mEFFECT:\033[0m \033[1;38;5;227m{effect_name:12s}\033[0m \033[38;5;245m|\033[0m {bar} \033[38;5;245m|\033[0m \033[38;5;219m{effect_intensity * 100:.0f}%\033[0m" ) - # Try to get pipeline order from context + # Get pipeline order from context pipeline_order = ctx.get_state("pipeline_order") - if pipeline_order: - pipeline_str = ",".join(pipeline_order) - else: - # Fallback to legacy effect chain - from engine.effects import get_effect_chain - - chain = get_effect_chain() - order = chain.get_order() if chain else [] - pipeline_str = ",".join(order) if order else "(none)" + pipeline_str = ",".join(pipeline_order) if pipeline_order else "(none)" hud_lines.append(f"\033[3;1H\033[38;5;44mPIPELINE:\033[0m {pipeline_str}") for i, line in enumerate(hud_lines): diff --git a/engine/app.py b/engine/app.py index bb2fddd..9048ad2 100644 --- a/engine/app.py +++ b/engine/app.py @@ -152,8 +152,34 @@ def run_pipeline_mode(preset_name: str = "demo"): list_source = ListDataSource(items, name=preset.source) pipeline.add_stage("source", DataSourceStage(list_source, name=preset.source)) - # Add render stage - convert items to buffer - pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + # Add FontStage for headlines/poetry (default for demo) + if preset.source in ["headlines", "poetry"]: + from engine.pipeline.adapters import FontStage + + pipeline.add_stage("font", FontStage(name="font")) + else: + # Fallback to simple conversion for other sources + pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + + # Add camera stage if specified in preset + if preset.camera: + from engine.camera import Camera + from engine.pipeline.adapters import CameraStage + + camera = None + if preset.camera == "vertical": + camera = Camera.vertical() + elif preset.camera == "horizontal": + camera = Camera.horizontal() + elif preset.camera == "omni": + camera = Camera.omni() + elif preset.camera == "floating": + camera = Camera.floating() + elif preset.camera == "bounce": + camera = Camera.bounce() + + if camera: + pipeline.add_stage("camera", CameraStage(camera, name=preset.camera)) for effect_name in preset.effects: effect = effect_registry.get(effect_name) diff --git a/engine/data_sources/sources.py b/engine/data_sources/sources.py index 42f2d3e..f1717ee 100644 --- a/engine/data_sources/sources.py +++ b/engine/data_sources/sources.py @@ -124,7 +124,8 @@ class ListDataSource(DataSource): """ def __init__(self, items, name: str = "list"): - self._items = items + self._raw_items = items # Store raw items separately + self._items = None # Cache for converted SourceItem objects self._name = name @property @@ -138,7 +139,7 @@ class ListDataSource(DataSource): def fetch(self) -> list[SourceItem]: # Convert tuple items to SourceItem if needed result = [] - for item in self._items: + for item in self._raw_items: if isinstance(item, SourceItem): result.append(item) elif isinstance(item, tuple) and len(item) >= 3: diff --git a/engine/effects/__init__.py b/engine/effects/__init__.py index 039d1d3..4ee702d 100644 --- a/engine/effects/__init__.py +++ b/engine/effects/__init__.py @@ -18,13 +18,6 @@ from engine.effects.types import ( create_effect_context, ) - -def get_effect_chain(): - from engine.legacy.layers import get_effect_chain as _chain - - return _chain() - - __all__ = [ "EffectChain", "EffectRegistry", @@ -34,7 +27,6 @@ __all__ = [ "create_effect_context", "get_registry", "set_registry", - "get_effect_chain", "get_monitor", "set_monitor", "PerformanceMonitor", diff --git a/engine/effects/controller.py b/engine/effects/controller.py index fdc12dd..8f9141f 100644 --- a/engine/effects/controller.py +++ b/engine/effects/controller.py @@ -6,14 +6,7 @@ _effect_chain_ref = None def _get_effect_chain(): global _effect_chain_ref - if _effect_chain_ref is not None: - return _effect_chain_ref - try: - from engine.legacy.layers import get_effect_chain as _chain - - return _chain() - except Exception: - return None + return _effect_chain_ref def set_effect_chain_ref(chain) -> None: diff --git a/engine/legacy/__init__.py b/engine/legacy/__init__.py deleted file mode 100644 index 4d2e91b..0000000 --- a/engine/legacy/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -Legacy rendering modules for backwards compatibility. - -This package contains deprecated rendering code from the old pipeline architecture. -These modules are maintained for backwards compatibility with adapters and tests, -but should not be used in new code. - -New code should use the Stage-based pipeline architecture instead. - -Modules: -- render: Legacy font/gradient rendering functions -- layers: Legacy layer compositing and effect application - -All modules in this package are marked deprecated and will be removed in a future version. -""" diff --git a/engine/legacy/layers.py b/engine/legacy/layers.py deleted file mode 100644 index dff972c..0000000 --- a/engine/legacy/layers.py +++ /dev/null @@ -1,272 +0,0 @@ -""" -Layer compositing — message overlay, ticker zone, firehose, noise. -Depends on: config, render, effects. - -.. deprecated:: - This module contains legacy rendering code. New pipeline code should - use the Stage-based pipeline architecture instead. This module is - maintained for backwards compatibility with the demo mode. -""" - -import random -import re -import time -from datetime import datetime - -from engine import config -from engine.effects import ( - EffectChain, - EffectContext, - fade_line, - firehose_line, - glitch_bar, - noise, - vis_offset, - vis_trunc, -) -from engine.legacy.render import big_wrap, lr_gradient, lr_gradient_opposite -from engine.terminal import RST, W_COOL - -MSG_META = "\033[38;5;245m" -MSG_BORDER = "\033[2;38;5;37m" - - -def render_message_overlay( - msg: tuple[str, str, float] | None, - w: int, - h: int, - msg_cache: tuple, -) -> tuple[list[str], tuple]: - """Render ntfy message overlay. - - Args: - msg: (title, body, timestamp) or None - w: terminal width - h: terminal height - msg_cache: (cache_key, rendered_rows) for caching - - Returns: - (list of ANSI strings, updated cache) - """ - overlay = [] - if msg is None: - return overlay, msg_cache - - m_title, m_body, m_ts = msg - display_text = m_body or m_title or "(empty)" - display_text = re.sub(r"\s+", " ", display_text.upper()) - - cache_key = (display_text, w) - if msg_cache[0] != cache_key: - msg_rows = big_wrap(display_text, w - 4) - msg_cache = (cache_key, msg_rows) - else: - msg_rows = msg_cache[1] - - msg_rows = lr_gradient_opposite( - msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 - ) - - elapsed_s = int(time.monotonic() - m_ts) - remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) - ts_str = datetime.now().strftime("%H:%M:%S") - panel_h = len(msg_rows) + 2 - panel_top = max(0, (h - panel_h) // 2) - - row_idx = 0 - for mr in msg_rows: - ln = vis_trunc(mr, w) - overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") - row_idx += 1 - - meta_parts = [] - if m_title and m_title != m_body: - meta_parts.append(m_title) - meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") - meta = ( - " " + " \u00b7 ".join(meta_parts) - if len(meta_parts) > 1 - else " " + meta_parts[0] - ) - overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") - row_idx += 1 - - bar = "\u2500" * (w - 4) - overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") - - return overlay, msg_cache - - -def render_ticker_zone( - active: list, - scroll_cam: int, - camera_x: int = 0, - ticker_h: int = 0, - w: int = 80, - noise_cache: dict | None = None, - grad_offset: float = 0.0, -) -> tuple[list[str], dict]: - """Render the ticker scroll zone. - - Args: - active: list of (content_rows, color, canvas_y, meta_idx) - scroll_cam: camera position (viewport top) - camera_x: horizontal camera offset - ticker_h: height of ticker zone - w: terminal width - noise_cache: dict of cy -> noise string - grad_offset: gradient animation offset - - Returns: - (list of ANSI strings, updated noise_cache) - """ - if noise_cache is None: - noise_cache = {} - buf = [] - top_zone = max(1, int(ticker_h * 0.25)) - bot_zone = max(1, int(ticker_h * 0.10)) - - def noise_at(cy): - if cy not in noise_cache: - noise_cache[cy] = noise(w) if random.random() < 0.15 else None - return noise_cache[cy] - - for r in range(ticker_h): - scr_row = r + 1 - cy = scroll_cam + r - top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 - bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 - row_fade = min(top_f, bot_f) - drawn = False - - for content, hc, by, midx in active: - cr = cy - by - if 0 <= cr < len(content): - raw = content[cr] - if cr != midx: - colored = lr_gradient([raw], grad_offset)[0] - else: - colored = raw - ln = vis_trunc(vis_offset(colored, camera_x), w) - if row_fade < 1.0: - ln = fade_line(ln, row_fade) - - if cr == midx: - buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") - elif ln.strip(): - buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") - else: - buf.append(f"\033[{scr_row};1H\033[K") - drawn = True - break - - if not drawn: - n = noise_at(cy) - if row_fade < 1.0 and n: - n = fade_line(n, row_fade) - if n: - buf.append(f"\033[{scr_row};1H{n}") - else: - buf.append(f"\033[{scr_row};1H\033[K") - - return buf, noise_cache - - -def apply_glitch( - buf: list[str], - ticker_buf_start: int, - mic_excess: float, - w: int, -) -> list[str]: - """Apply glitch effect to ticker buffer. - - Args: - buf: current buffer - ticker_buf_start: index where ticker starts in buffer - mic_excess: mic level above threshold - w: terminal width - - Returns: - Updated buffer with glitches applied - """ - glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) - n_hits = 4 + int(mic_excess / 2) - ticker_buf_len = len(buf) - ticker_buf_start - - if random.random() < glitch_prob and ticker_buf_len > 0: - for _ in range(min(n_hits, ticker_buf_len)): - gi = random.randint(0, ticker_buf_len - 1) - scr_row = gi + 1 - buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" - - return buf - - -def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: - """Render firehose strip at bottom of screen.""" - buf = [] - if fh > 0: - for fr in range(fh): - scr_row = h - fh + fr + 1 - fline = firehose_line(items, w) - buf.append(f"\033[{scr_row};1H{fline}\033[K") - return buf - - -_effect_chain = None - - -def init_effects() -> None: - """Initialize effect plugins and chain.""" - global _effect_chain - from engine.effects import EffectChain, get_registry - - registry = get_registry() - - import effects_plugins - - effects_plugins.discover_plugins() - - chain = EffectChain(registry) - chain.set_order(["noise", "fade", "glitch", "firehose"]) - _effect_chain = chain - - -def process_effects( - buf: list[str], - w: int, - h: int, - scroll_cam: int, - ticker_h: int, - camera_x: int = 0, - mic_excess: float = 0.0, - grad_offset: float = 0.0, - frame_number: int = 0, - has_message: bool = False, - items: list | None = None, -) -> list[str]: - """Process buffer through effect chain.""" - if _effect_chain is None: - init_effects() - - ctx = EffectContext( - terminal_width=w, - terminal_height=h, - scroll_cam=scroll_cam, - camera_x=camera_x, - ticker_height=ticker_h, - mic_excess=mic_excess, - grad_offset=grad_offset, - frame_number=frame_number, - has_message=has_message, - items=items or [], - ) - return _effect_chain.process(buf, ctx) - - -def get_effect_chain() -> EffectChain | None: - """Get the effect chain instance.""" - global _effect_chain - if _effect_chain is None: - init_effects() - return _effect_chain diff --git a/engine/pipeline.py b/engine/pipeline.py deleted file mode 100644 index 2047f4f..0000000 --- a/engine/pipeline.py +++ /dev/null @@ -1,563 +0,0 @@ -""" -Pipeline introspection - generates self-documenting diagrams of the render pipeline. - -Pipeline Architecture: -- Sources: Data providers (RSS, Poetry, Ntfy, Mic) - static or dynamic -- Fetch: Retrieve data from sources -- Prepare: Transform raw data (make_block, strip_tags, translate) -- Scroll: Camera-based viewport rendering (ticker zone, message overlay) -- Effects: Post-processing chain (noise, fade, glitch, firehose, hud) -- Render: Final line rendering and layout -- Display: Output backends (terminal, pygame, websocket, sixel, kitty) - -Key abstractions: -- DataSource: Sources can be static (cached) or dynamic (idempotent fetch) -- Camera: Viewport controller (vertical, horizontal, omni, floating, trace) -- EffectChain: Ordered effect processing pipeline -- Display: Pluggable output backends -- SourceRegistry: Source discovery and management -- AnimationController: Time-based parameter animation -- Preset: Package of initial params + animation for demo modes -""" - -from __future__ import annotations - -from dataclasses import dataclass - - -@dataclass -class PipelineNode: - """Represents a node in the pipeline.""" - - name: str - module: str - class_name: str | None = None - func_name: str | None = None - description: str = "" - inputs: list[str] | None = None - outputs: list[str] | None = None - metrics: dict | None = None # Performance metrics (avg_ms, min_ms, max_ms) - - -class PipelineIntrospector: - """Introspects the render pipeline and generates documentation.""" - - def __init__(self): - self.nodes: list[PipelineNode] = [] - - def add_node(self, node: PipelineNode) -> None: - self.nodes.append(node) - - def generate_mermaid_flowchart(self) -> str: - """Generate a Mermaid flowchart of the pipeline.""" - lines = ["```mermaid", "flowchart TD"] - - subgraph_groups = { - "Sources": [], - "Fetch": [], - "Prepare": [], - "Scroll": [], - "Effects": [], - "Display": [], - "Async": [], - "Animation": [], - "Viz": [], - } - - other_nodes = [] - - for node in self.nodes: - node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_") - label = node.name - if node.class_name: - label = f"{node.name}\\n({node.class_name})" - elif node.func_name: - label = f"{node.name}\\n({node.func_name})" - - if node.description: - label += f"\\n{node.description}" - - if node.metrics: - avg = node.metrics.get("avg_ms", 0) - if avg > 0: - label += f"\\n⏱ {avg:.1f}ms" - impact = node.metrics.get("impact_pct", 0) - if impact > 0: - label += f" ({impact:.0f}%)" - - node_entry = f' {node_id}["{label}"]' - - if "DataSource" in node.name or "SourceRegistry" in node.name: - subgraph_groups["Sources"].append(node_entry) - elif "fetch" in node.name.lower(): - subgraph_groups["Fetch"].append(node_entry) - elif ( - "make_block" in node.name - or "strip_tags" in node.name - or "translate" in node.name - ): - subgraph_groups["Prepare"].append(node_entry) - elif ( - "StreamController" in node.name - or "render_ticker" in node.name - or "render_message" in node.name - or "Camera" in node.name - ): - subgraph_groups["Scroll"].append(node_entry) - elif "Effect" in node.name or "effect" in node.module: - subgraph_groups["Effects"].append(node_entry) - elif "Display:" in node.name: - subgraph_groups["Display"].append(node_entry) - elif "Ntfy" in node.name or "Mic" in node.name: - subgraph_groups["Async"].append(node_entry) - elif "Animation" in node.name or "Preset" in node.name: - subgraph_groups["Animation"].append(node_entry) - else: - other_nodes.append(node_entry) - - for group_name, nodes in subgraph_groups.items(): - if nodes: - lines.append(f" subgraph {group_name}") - for node in nodes: - lines.append(node) - lines.append(" end") - - for node in other_nodes: - lines.append(node) - - lines.append("") - - for node in self.nodes: - node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_") - if node.inputs: - for inp in node.inputs: - inp_id = inp.replace("-", "_").replace(" ", "_").replace(":", "_") - lines.append(f" {inp_id} --> {node_id}") - - lines.append("```") - return "\n".join(lines) - - def generate_mermaid_sequence(self) -> str: - """Generate a Mermaid sequence diagram of message flow.""" - lines = ["```mermaid", "sequenceDiagram"] - - lines.append(" participant Sources") - lines.append(" participant Fetch") - lines.append(" participant Scroll") - lines.append(" participant Effects") - lines.append(" participant Display") - - lines.append(" Sources->>Fetch: headlines") - lines.append(" Fetch->>Scroll: content blocks") - lines.append(" Scroll->>Effects: buffer") - lines.append(" Effects->>Effects: process chain") - lines.append(" Effects->>Display: rendered buffer") - - lines.append("```") - return "\n".join(lines) - - def generate_mermaid_state(self) -> str: - """Generate a Mermaid state diagram of camera modes.""" - lines = ["```mermaid", "stateDiagram-v2"] - - lines.append(" [*] --> Vertical") - lines.append(" Vertical --> Horizontal: set_mode()") - lines.append(" Horizontal --> Omni: set_mode()") - lines.append(" Omni --> Floating: set_mode()") - lines.append(" Floating --> Trace: set_mode()") - lines.append(" Trace --> Vertical: set_mode()") - - lines.append(" state Vertical {") - lines.append(" [*] --> ScrollUp") - lines.append(" ScrollUp --> ScrollUp: +y each frame") - lines.append(" }") - - lines.append(" state Horizontal {") - lines.append(" [*] --> ScrollLeft") - lines.append(" ScrollLeft --> ScrollLeft: +x each frame") - lines.append(" }") - - lines.append(" state Omni {") - lines.append(" [*] --> Diagonal") - lines.append(" Diagonal --> Diagonal: +x, +y") - lines.append(" }") - - lines.append(" state Floating {") - lines.append(" [*] --> Bobbing") - lines.append(" Bobbing --> Bobbing: sin(time)") - lines.append(" }") - - lines.append(" state Trace {") - lines.append(" [*] --> FollowPath") - lines.append(" FollowPath --> FollowPath: node by node") - lines.append(" }") - - lines.append("```") - return "\n".join(lines) - - def generate_full_diagram(self) -> str: - """Generate full pipeline documentation.""" - lines = [ - "# Render Pipeline", - "", - "## Data Flow", - "", - self.generate_mermaid_flowchart(), - "", - "## Message Sequence", - "", - self.generate_mermaid_sequence(), - "", - "## Camera States", - "", - self.generate_mermaid_state(), - ] - return "\n".join(lines) - - def introspect_sources(self) -> None: - """Introspect data sources.""" - from engine import sources - - for name in dir(sources): - obj = getattr(sources, name) - if isinstance(obj, dict): - self.add_node( - PipelineNode( - name=f"Data Source: {name}", - module="engine.sources", - description=f"{len(obj)} feeds configured", - ) - ) - - def introspect_sources_v2(self) -> None: - """Introspect data sources v2 (new abstraction).""" - from engine.data_sources.sources import SourceRegistry, init_default_sources - - init_default_sources() - SourceRegistry() - - self.add_node( - PipelineNode( - name="SourceRegistry", - module="engine.data_sources.sources", - class_name="SourceRegistry", - description="Source discovery and management", - ) - ) - - for name, desc in [ - ("HeadlinesDataSource", "RSS feed headlines"), - ("PoetryDataSource", "Poetry DB"), - ("PipelineDataSource", "Pipeline viz (dynamic)"), - ]: - self.add_node( - PipelineNode( - name=f"DataSource: {name}", - module="engine.sources_v2", - class_name=name, - description=f"{desc}", - ) - ) - - def introspect_prepare(self) -> None: - """Introspect prepare layer (transformation).""" - self.add_node( - PipelineNode( - name="make_block", - module="engine.render", - func_name="make_block", - description="Transform headline into display block", - inputs=["title", "source", "timestamp", "width"], - outputs=["block"], - ) - ) - - self.add_node( - PipelineNode( - name="strip_tags", - module="engine.filter", - func_name="strip_tags", - description="Remove HTML tags from content", - inputs=["html"], - outputs=["plain_text"], - ) - ) - - self.add_node( - PipelineNode( - name="translate_headline", - module="engine.translate", - func_name="translate_headline", - description="Translate headline to target language", - inputs=["title", "target_lang"], - outputs=["translated_title"], - ) - ) - - def introspect_fetch(self) -> None: - """Introspect fetch layer.""" - self.add_node( - PipelineNode( - name="fetch_all", - module="engine.fetch", - func_name="fetch_all", - description="Fetch RSS feeds", - outputs=["items"], - ) - ) - - self.add_node( - PipelineNode( - name="fetch_poetry", - module="engine.fetch", - func_name="fetch_poetry", - description="Fetch Poetry DB", - outputs=["items"], - ) - ) - - def introspect_scroll(self) -> None: - """Introspect scroll engine (legacy - replaced by pipeline architecture).""" - self.add_node( - PipelineNode( - name="render_ticker_zone", - module="engine.layers", - func_name="render_ticker_zone", - description="Render scrolling ticker content", - inputs=["active", "camera"], - outputs=["buffer"], - ) - ) - - self.add_node( - PipelineNode( - name="render_message_overlay", - module="engine.layers", - func_name="render_message_overlay", - description="Render ntfy message overlay", - inputs=["msg", "width", "height"], - outputs=["overlay", "cache"], - ) - ) - - def introspect_render(self) -> None: - """Introspect render layer.""" - self.add_node( - PipelineNode( - name="big_wrap", - module="engine.render", - func_name="big_wrap", - description="Word-wrap text to width", - inputs=["text", "width"], - outputs=["lines"], - ) - ) - - self.add_node( - PipelineNode( - name="lr_gradient", - module="engine.render", - func_name="lr_gradient", - description="Apply left-right gradient to lines", - inputs=["lines", "position"], - outputs=["styled_lines"], - ) - ) - - def introspect_async_sources(self) -> None: - """Introspect async data sources (ntfy, mic).""" - self.add_node( - PipelineNode( - name="NtfyPoller", - module="engine.ntfy", - class_name="NtfyPoller", - description="Poll ntfy for messages (async)", - inputs=["topic"], - outputs=["message"], - ) - ) - - self.add_node( - PipelineNode( - name="MicMonitor", - module="engine.mic", - class_name="MicMonitor", - description="Monitor microphone input (async)", - outputs=["audio_level"], - ) - ) - - def introspect_eventbus(self) -> None: - """Introspect event bus for decoupled communication.""" - self.add_node( - PipelineNode( - name="EventBus", - module="engine.eventbus", - class_name="EventBus", - description="Thread-safe event publishing", - inputs=["event"], - outputs=["subscribers"], - ) - ) - - def introspect_animation(self) -> None: - """Introspect animation system.""" - self.add_node( - PipelineNode( - name="AnimationController", - module="engine.animation", - class_name="AnimationController", - description="Time-based parameter animation", - inputs=["dt"], - outputs=["params"], - ) - ) - - self.add_node( - PipelineNode( - name="Preset", - module="engine.animation", - class_name="Preset", - description="Package of initial params + animation", - ) - ) - - def introspect_camera(self) -> None: - """Introspect camera system.""" - self.add_node( - PipelineNode( - name="Camera", - module="engine.camera", - class_name="Camera", - description="Viewport position controller", - inputs=["dt"], - outputs=["x", "y"], - ) - ) - - def introspect_effects(self) -> None: - """Introspect effect system.""" - self.add_node( - PipelineNode( - name="EffectChain", - module="engine.effects", - class_name="EffectChain", - description="Process effects in sequence", - inputs=["buffer", "context"], - outputs=["buffer"], - ) - ) - - self.add_node( - PipelineNode( - name="EffectRegistry", - module="engine.effects", - class_name="EffectRegistry", - description="Manage effect plugins", - ) - ) - - def introspect_display(self) -> None: - """Introspect display backends.""" - from engine.display import DisplayRegistry - - DisplayRegistry.initialize() - backends = DisplayRegistry.list_backends() - - for backend in backends: - self.add_node( - PipelineNode( - name=f"Display: {backend}", - module="engine.display.backends", - class_name=f"{backend.title()}Display", - description=f"Render to {backend}", - inputs=["buffer"], - ) - ) - - def introspect_new_pipeline(self, pipeline=None) -> None: - """Introspect new unified pipeline stages with metrics. - - Args: - pipeline: Optional Pipeline instance to collect metrics from - """ - - stages_info = [ - ( - "Effect", - "engine.pipeline.adapters", - "EffectPluginStage", - "Applies effect", - ), - ( - "Display", - "engine.pipeline.adapters", - "DisplayStage", - "Outputs to display", - ), - ] - - metrics = None - if pipeline and hasattr(pipeline, "get_metrics_summary"): - metrics = pipeline.get_metrics_summary() - if "error" in metrics: - metrics = None - - total_avg = metrics.get("pipeline", {}).get("avg_ms", 0) if metrics else 0 - - for stage_name, module, class_name, desc in stages_info: - node_metrics = None - if metrics and "stages" in metrics: - for name, stats in metrics["stages"].items(): - if stage_name.lower() in name.lower(): - impact_pct = ( - (stats.get("avg_ms", 0) / total_avg * 100) - if total_avg > 0 - else 0 - ) - node_metrics = { - "avg_ms": stats.get("avg_ms", 0), - "min_ms": stats.get("min_ms", 0), - "max_ms": stats.get("max_ms", 0), - "impact_pct": impact_pct, - } - break - - self.add_node( - PipelineNode( - name=f"Pipeline: {stage_name}", - module=module, - class_name=class_name, - description=desc, - inputs=["data"], - outputs=["data"], - metrics=node_metrics, - ) - ) - - def run(self) -> str: - """Run full introspection.""" - self.introspect_sources() - self.introspect_sources_v2() - self.introspect_fetch() - self.introspect_prepare() - self.introspect_scroll() - self.introspect_render() - self.introspect_camera() - self.introspect_effects() - self.introspect_display() - self.introspect_async_sources() - self.introspect_eventbus() - self.introspect_animation() - - return self.generate_full_diagram() - - -def generate_pipeline_diagram() -> str: - """Generate a self-documenting pipeline diagram.""" - introspector = PipelineIntrospector() - return introspector.run() - - -if __name__ == "__main__": - print(generate_pipeline_diagram()) diff --git a/engine/pipeline/adapters.py b/engine/pipeline/adapters.py index 99ae05a..4b96789 100644 --- a/engine/pipeline/adapters.py +++ b/engine/pipeline/adapters.py @@ -254,7 +254,9 @@ class CameraStage(Stage): @property def dependencies(self) -> set[str]: - return {"source.items"} + return { + "source" + } # Prefix match any source (source.headlines, source.poetry, etc.) def process(self, data: Any, ctx: PipelineContext) -> Any: """Apply camera transformation to data.""" @@ -334,7 +336,7 @@ class FontStage(Stage): if data is None: return None - from engine.legacy.render import make_block + from engine.render import make_block w = ctx.params.viewport_width if ctx.params else 80 @@ -361,8 +363,8 @@ class FontStage(Stage): ts = "0" try: - block = make_block(title, src, ts, w) - result.extend(block) + block_lines, color_code, meta_idx = make_block(title, src, ts, w) + result.extend(block_lines) except Exception: result.append(title) @@ -403,10 +405,14 @@ class ImageToTextStage(Stage): return "transform" @property - def capabilities(self) -> set[str]: + def outlet_types(self) -> set: from engine.pipeline.core import DataType - return {f"transform.{self.name}", DataType.TEXT_BUFFER} + return {DataType.TEXT_BUFFER} + + @property + def capabilities(self) -> set[str]: + return {f"transform.{self.name}", "render.output"} @property def dependencies(self) -> set[str]: diff --git a/engine/render/__init__.py b/engine/render/__init__.py new file mode 100644 index 0000000..8db2d73 --- /dev/null +++ b/engine/render/__init__.py @@ -0,0 +1,37 @@ +"""Modern block rendering system - OTF font to terminal half-block conversion. + +This module provides the core rendering capabilities for big block letters +and styled text output using PIL fonts and ANSI terminal rendering. + +Exports: + - make_block: Render a headline into a content block with color + - big_wrap: Word-wrap text and render with OTF font + - render_line: Render a line of text as terminal rows using half-blocks + - font_for_lang: Get appropriate font for a language + - clear_font_cache: Reset cached font objects + - lr_gradient: Color block characters with left-to-right gradient + - lr_gradient_opposite: Complementary gradient coloring +""" + +from engine.render.blocks import ( + big_wrap, + clear_font_cache, + font_for_lang, + list_font_faces, + load_font_face, + make_block, + render_line, +) +from engine.render.gradient import lr_gradient, lr_gradient_opposite + +__all__ = [ + "big_wrap", + "clear_font_cache", + "font_for_lang", + "list_font_faces", + "load_font_face", + "lr_gradient", + "lr_gradient_opposite", + "make_block", + "render_line", +] diff --git a/engine/legacy/render.py b/engine/render/blocks.py similarity index 71% rename from engine/legacy/render.py rename to engine/render/blocks.py index 5c2a728..02cefc4 100644 --- a/engine/legacy/render.py +++ b/engine/render/blocks.py @@ -1,12 +1,6 @@ -""" -OTF → terminal half-block rendering pipeline. -Font loading, text rasterization, word-wrap, gradient coloring, headline block assembly. -Depends on: config, terminal, sources, translate. +"""Block rendering core - Font loading, text rasterization, word-wrap, and headline assembly. -.. deprecated:: - This module contains legacy rendering code. New pipeline code should - use the Stage-based pipeline architecture instead. This module is - maintained for backwards compatibility with the demo mode. +Provides PIL font-based rendering to terminal half-block characters. """ import random @@ -17,42 +11,8 @@ from PIL import Image, ImageDraw, ImageFont from engine import config from engine.sources import NO_UPPER, SCRIPT_FONTS, SOURCE_LANGS -from engine.terminal import RST from engine.translate import detect_location_language, translate_headline -# ─── GRADIENT ───────────────────────────────────────────── -# Left → right: white-hot leading edge fades to near-black -GRAD_COLS = [ - "\033[1;38;5;231m", # white - "\033[1;38;5;195m", # pale cyan-white - "\033[38;5;123m", # bright cyan - "\033[38;5;118m", # bright lime - "\033[38;5;82m", # lime - "\033[38;5;46m", # bright green - "\033[38;5;40m", # green - "\033[38;5;34m", # medium green - "\033[38;5;28m", # dark green - "\033[38;5;22m", # deep green - "\033[2;38;5;22m", # dim deep green - "\033[2;38;5;235m", # near black -] - -# Complementary sweep for queue messages (opposite hue family from ticker greens) -MSG_GRAD_COLS = [ - "\033[1;38;5;231m", # white - "\033[1;38;5;225m", # pale pink-white - "\033[38;5;219m", # bright pink - "\033[38;5;213m", # hot pink - "\033[38;5;207m", # magenta - "\033[38;5;201m", # bright magenta - "\033[38;5;165m", # orchid-red - "\033[38;5;161m", # ruby-magenta - "\033[38;5;125m", # dark magenta - "\033[38;5;89m", # deep maroon-magenta - "\033[2;38;5;89m", # dim deep maroon-magenta - "\033[2;38;5;235m", # near black -] - # ─── FONT LOADING ───────────────────────────────────────── _FONT_OBJ = None _FONT_OBJ_KEY = None @@ -194,36 +154,22 @@ def big_wrap(text, max_w, fnt=None): return out -def lr_gradient(rows, offset=0.0, grad_cols=None): - """Color each non-space block character with a shifting left-to-right gradient.""" - cols = grad_cols or GRAD_COLS - n = len(cols) - max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1) - out = [] - for row in rows: - if not row.strip(): - out.append(row) - continue - buf = [] - for x, ch in enumerate(row): - if ch == " ": - buf.append(" ") - else: - shifted = (x / max(max_x - 1, 1) + offset) % 1.0 - idx = min(round(shifted * (n - 1)), n - 1) - buf.append(f"{cols[idx]}{ch}{RST}") - out.append("".join(buf)) - return out - - -def lr_gradient_opposite(rows, offset=0.0): - """Complementary (opposite wheel) gradient used for queue message panels.""" - return lr_gradient(rows, offset, MSG_GRAD_COLS) - - # ─── HEADLINE BLOCK ASSEMBLY ───────────────────────────── def make_block(title, src, ts, w): - """Render a headline into a content block with color.""" + """Render a headline into a content block with color. + + Args: + title: Headline text to render + src: Source identifier (for metadata) + ts: Timestamp string (for metadata) + w: Width constraint in terminal characters + + Returns: + tuple: (content_lines, color_code, meta_row_index) + - content_lines: List of rendered text lines + - color_code: ANSI color code for display + - meta_row_index: Row index of metadata line + """ target_lang = ( (SOURCE_LANGS.get(src) or detect_location_language(title)) if config.MODE == "news" diff --git a/engine/render/gradient.py b/engine/render/gradient.py new file mode 100644 index 0000000..14a6c5a --- /dev/null +++ b/engine/render/gradient.py @@ -0,0 +1,82 @@ +"""Gradient coloring for rendered block characters. + +Provides left-to-right and complementary gradient effects for terminal display. +""" + +from engine.terminal import RST + +# Left → right: white-hot leading edge fades to near-black +GRAD_COLS = [ + "\033[1;38;5;231m", # white + "\033[1;38;5;195m", # pale cyan-white + "\033[38;5;123m", # bright cyan + "\033[38;5;118m", # bright lime + "\033[38;5;82m", # lime + "\033[38;5;46m", # bright green + "\033[38;5;40m", # green + "\033[38;5;34m", # medium green + "\033[38;5;28m", # dark green + "\033[38;5;22m", # deep green + "\033[2;38;5;22m", # dim deep green + "\033[2;38;5;235m", # near black +] + +# Complementary sweep for queue messages (opposite hue family from ticker greens) +MSG_GRAD_COLS = [ + "\033[1;38;5;231m", # white + "\033[1;38;5;225m", # pale pink-white + "\033[38;5;219m", # bright pink + "\033[38;5;213m", # hot pink + "\033[38;5;207m", # magenta + "\033[38;5;201m", # bright magenta + "\033[38;5;165m", # orchid-red + "\033[38;5;161m", # ruby-magenta + "\033[38;5;125m", # dark magenta + "\033[38;5;89m", # deep maroon-magenta + "\033[2;38;5;89m", # dim deep maroon-magenta + "\033[2;38;5;235m", # near black +] + + +def lr_gradient(rows, offset=0.0, grad_cols=None): + """Color each non-space block character with a shifting left-to-right gradient. + + Args: + rows: List of text lines with block characters + offset: Gradient offset (0.0-1.0) for animation + grad_cols: List of ANSI color codes (default: GRAD_COLS) + + Returns: + List of lines with gradient coloring applied + """ + cols = grad_cols or GRAD_COLS + n = len(cols) + max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1) + out = [] + for row in rows: + if not row.strip(): + out.append(row) + continue + buf = [] + for x, ch in enumerate(row): + if ch == " ": + buf.append(" ") + else: + shifted = (x / max(max_x - 1, 1) + offset) % 1.0 + idx = min(round(shifted * (n - 1)), n - 1) + buf.append(f"{cols[idx]}{ch}{RST}") + out.append("".join(buf)) + return out + + +def lr_gradient_opposite(rows, offset=0.0): + """Complementary (opposite wheel) gradient used for queue message panels. + + Args: + rows: List of text lines with block characters + offset: Gradient offset (0.0-1.0) for animation + + Returns: + List of lines with complementary gradient coloring applied + """ + return lr_gradient(rows, offset, MSG_GRAD_COLS) diff --git a/tests/legacy/__init__.py b/tests/legacy/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/legacy/test_layers.py b/tests/legacy/test_layers.py deleted file mode 100644 index 4bd7a38..0000000 --- a/tests/legacy/test_layers.py +++ /dev/null @@ -1,112 +0,0 @@ -""" -Tests for engine.layers module. -""" - -import time - -from engine.legacy import layers - - -class TestRenderMessageOverlay: - """Tests for render_message_overlay function.""" - - def test_no_message_returns_empty(self): - """Returns empty list when msg is None.""" - result, cache = layers.render_message_overlay(None, 80, 24, (None, None)) - assert result == [] - assert cache[0] is None - - def test_message_returns_overlay_lines(self): - """Returns non-empty list when message is present.""" - msg = ("Test Title", "Test Body", time.monotonic()) - result, cache = layers.render_message_overlay(msg, 80, 24, (None, None)) - assert len(result) > 0 - assert cache[0] is not None - - def test_cache_key_changes_with_text(self): - """Cache key changes when message text changes.""" - msg1 = ("Title1", "Body1", time.monotonic()) - msg2 = ("Title2", "Body2", time.monotonic()) - - _, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None)) - _, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1) - - assert cache1[0] != cache2[0] - - def test_cache_reuse_avoids_recomputation(self): - """Cache is returned when same message is passed (interface test).""" - msg = ("Same Title", "Same Body", time.monotonic()) - - result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None)) - result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1) - - assert len(result1) > 0 - assert len(result2) > 0 - assert cache1[0] == cache2[0] - - -class TestRenderFirehose: - """Tests for render_firehose function.""" - - def test_no_firehose_returns_empty(self): - """Returns empty list when firehose height is 0.""" - items = [("Headline", "Source", "12:00")] - result = layers.render_firehose(items, 80, 0, 24) - assert result == [] - - def test_firehose_returns_lines(self): - """Returns lines when firehose height > 0.""" - items = [("Headline", "Source", "12:00")] - result = layers.render_firehose(items, 80, 4, 24) - assert len(result) == 4 - - def test_firehose_includes_ansi_escapes(self): - """Returns lines containing ANSI escape sequences.""" - items = [("Headline", "Source", "12:00")] - result = layers.render_firehose(items, 80, 1, 24) - assert "\033[" in result[0] - - -class TestApplyGlitch: - """Tests for apply_glitch function.""" - - def test_empty_buffer_unchanged(self): - """Empty buffer is returned unchanged.""" - result = layers.apply_glitch([], 0, 0.0, 80) - assert result == [] - - def test_buffer_length_preserved(self): - """Buffer length is preserved after glitch application.""" - buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)] - result = layers.apply_glitch(buf, 0, 0.5, 80) - assert len(result) == len(buf) - - -class TestRenderTickerZone: - """Tests for render_ticker_zone function - focusing on interface.""" - - def test_returns_list(self): - """Returns a list of strings.""" - result, cache = layers.render_ticker_zone( - [], - scroll_cam=0, - camera_x=0, - ticker_h=10, - w=80, - noise_cache={}, - grad_offset=0.0, - ) - assert isinstance(result, list) - - def test_returns_dict_for_cache(self): - """Returns a dict for the noise cache.""" - result, cache = layers.render_ticker_zone( - [], - scroll_cam=0, - camera_x=0, - ticker_h=10, - w=80, - noise_cache={}, - grad_offset=0.0, - ) - assert isinstance(cache, dict) diff --git a/tests/legacy/test_render.py b/tests/legacy/test_render.py deleted file mode 100644 index e7f10f7..0000000 --- a/tests/legacy/test_render.py +++ /dev/null @@ -1,232 +0,0 @@ -""" -Tests for engine.render module. -""" - -from unittest.mock import MagicMock, patch - -import pytest - -from engine.legacy.render import ( - GRAD_COLS, - MSG_GRAD_COLS, - clear_font_cache, - font_for_lang, - lr_gradient, - lr_gradient_opposite, - make_block, -) - - -class TestGradientConstants: - """Tests for gradient color constants.""" - - def test_grad_cols_defined(self): - """GRAD_COLS is defined with expected length.""" - assert len(GRAD_COLS) > 0 - assert all(isinstance(c, str) for c in GRAD_COLS) - - def test_msg_grad_cols_defined(self): - """MSG_GRAD_COLS is defined with expected length.""" - assert len(MSG_GRAD_COLS) > 0 - assert all(isinstance(c, str) for c in MSG_GRAD_COLS) - - def test_grad_cols_start_with_white(self): - """GRAD_COLS starts with white.""" - assert "231" in GRAD_COLS[0] - - def test_msg_grad_cols_different_from_grad_cols(self): - """MSG_GRAD_COLS is different from GRAD_COLS.""" - assert MSG_GRAD_COLS != GRAD_COLS - - -class TestLrGradient: - """Tests for lr_gradient function.""" - - def test_empty_rows(self): - """Empty input returns empty output.""" - result = lr_gradient([], 0.0) - assert result == [] - - def test_preserves_empty_rows(self): - """Empty rows are preserved.""" - result = lr_gradient([""], 0.0) - assert result == [""] - - def test_adds_gradient_to_content(self): - """Non-empty rows get gradient coloring.""" - result = lr_gradient(["hello"], 0.0) - assert len(result) == 1 - assert "\033[" in result[0] - - def test_preserves_spaces(self): - """Spaces are preserved without coloring.""" - result = lr_gradient(["hello world"], 0.0) - assert " " in result[0] - - def test_offset_wraps_around(self): - """Offset wraps around at 1.0.""" - result1 = lr_gradient(["hello"], 0.0) - result2 = lr_gradient(["hello"], 1.0) - assert result1 != result2 or result1 == result2 - - -class TestLrGradientOpposite: - """Tests for lr_gradient_opposite function.""" - - def test_uses_msg_grad_cols(self): - """Uses MSG_GRAD_COLS instead of GRAD_COLS.""" - result = lr_gradient_opposite(["test"]) - assert "\033[" in result[0] - - -class TestClearFontCache: - """Tests for clear_font_cache function.""" - - def test_clears_without_error(self): - """Function runs without error.""" - clear_font_cache() - - -class TestFontForLang: - """Tests for font_for_lang function.""" - - @patch("engine.render.font") - def test_returns_default_for_none(self, mock_font): - """Returns default font when lang is None.""" - result = font_for_lang(None) - assert result is not None - - @patch("engine.render.font") - def test_returns_default_for_unknown_lang(self, mock_font): - """Returns default font for unknown language.""" - result = font_for_lang("unknown_lang") - assert result is not None - - -class TestMakeBlock: - """Tests for make_block function.""" - - @patch("engine.translate.translate_headline") - @patch("engine.translate.detect_location_language") - @patch("engine.render.font_for_lang") - @patch("engine.render.big_wrap") - @patch("engine.render.random") - def test_make_block_basic( - self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate - ): - """Basic make_block returns content, color, meta index.""" - mock_wrap.return_value = ["Headline content", ""] - mock_random.choice.return_value = "\033[38;5;46m" - - content, color, meta_idx = make_block( - "Test headline", "TestSource", "12:00", 80 - ) - - assert len(content) > 0 - assert color is not None - assert meta_idx >= 0 - - @pytest.mark.skip(reason="Requires full PIL/font environment") - @patch("engine.translate.translate_headline") - @patch("engine.translate.detect_location_language") - @patch("engine.render.font_for_lang") - @patch("engine.render.big_wrap") - @patch("engine.render.random") - def test_make_block_translation( - self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate - ): - """Translation is applied when mode is news.""" - mock_wrap.return_value = ["Translated"] - mock_random.choice.return_value = "\033[38;5;46m" - mock_detect.return_value = "de" - - with patch("engine.config.MODE", "news"): - content, _, _ = make_block("Test", "Source", "12:00", 80) - mock_translate.assert_called_once() - - @patch("engine.translate.translate_headline") - @patch("engine.translate.detect_location_language") - @patch("engine.render.font_for_lang") - @patch("engine.render.big_wrap") - @patch("engine.render.random") - def test_make_block_no_translation_poetry( - self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate - ): - """No translation when mode is poetry.""" - mock_wrap.return_value = ["Poem content"] - mock_random.choice.return_value = "\033[38;5;46m" - - with patch("engine.config.MODE", "poetry"): - make_block("Test", "Source", "12:00", 80) - mock_translate.assert_not_called() - - @patch("engine.translate.translate_headline") - @patch("engine.translate.detect_location_language") - @patch("engine.render.font_for_lang") - @patch("engine.render.big_wrap") - @patch("engine.render.random") - def test_make_block_meta_format( - self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate - ): - """Meta line includes source and timestamp.""" - mock_wrap.return_value = ["Content"] - mock_random.choice.return_value = "\033[38;5;46m" - - content, _, meta_idx = make_block("Test", "MySource", "14:30", 80) - - meta_line = content[meta_idx] - assert "MySource" in meta_line - assert "14:30" in meta_line - - -class TestRenderLine: - """Tests for render_line function.""" - - def test_empty_string(self): - """Empty string returns empty list.""" - from engine.legacy.render import render_line - - result = render_line("") - assert result == [""] - - @pytest.mark.skip(reason="Requires real font/PIL setup") - def test_uses_default_font(self): - """Uses default font when none provided.""" - from engine.legacy.render import render_line - - with patch("engine.render.font") as mock_font: - mock_font.return_value = MagicMock() - mock_font.return_value.getbbox.return_value = (0, 0, 10, 10) - render_line("test") - - def test_getbbox_returns_none(self): - """Handles None bbox gracefully.""" - from engine.legacy.render import render_line - - with patch("engine.render.font") as mock_font: - mock_font.return_value = MagicMock() - mock_font.return_value.getbbox.return_value = None - result = render_line("test") - assert result == [""] - - -class TestBigWrap: - """Tests for big_wrap function.""" - - def test_empty_string(self): - """Empty string returns empty list.""" - from engine.legacy.render import big_wrap - - result = big_wrap("", 80) - assert result == [] - - @pytest.mark.skip(reason="Requires real font/PIL setup") - def test_single_word_fits(self): - """Single short word returns rendered.""" - from engine.legacy.render import big_wrap - - with patch("engine.render.font") as mock_font: - mock_font.return_value = MagicMock() - mock_font.return_value.getbbox.return_value = (0, 0, 10, 10) - result = big_wrap("test", 80) - assert len(result) > 0 diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 98e9a72..65109ba 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -629,7 +629,7 @@ class TestStageAdapters: PipelineContext() assert "camera" in stage.capabilities - assert "source.items" in stage.dependencies + assert "source" in stage.dependencies # Prefix matches any source class TestDataSourceStage: diff --git a/tests/test_pipeline_e2e.py b/tests/test_pipeline_e2e.py new file mode 100644 index 0000000..6bbd897 --- /dev/null +++ b/tests/test_pipeline_e2e.py @@ -0,0 +1,526 @@ +""" +End-to-end pipeline integration tests. + +Verifies that data actually flows through every pipeline stage +(source -> render -> effects -> display) using a queue-backed +stub display to capture output frames. + +These tests catch dead-code paths and wiring bugs that unit tests miss. +""" + +import queue +from unittest.mock import patch + +from engine.data_sources.sources import ListDataSource, SourceItem +from engine.effects import EffectContext +from engine.effects.types import EffectPlugin +from engine.pipeline import Pipeline, PipelineConfig +from engine.pipeline.adapters import ( + DataSourceStage, + DisplayStage, + EffectPluginStage, + FontStage, + SourceItemsToBufferStage, +) +from engine.pipeline.core import PipelineContext +from engine.pipeline.params import PipelineParams + +# ─── FIXTURES ──────────────────────────────────────────── + + +class QueueDisplay: + """Stub display that captures every frame into a queue. + + Acts as a FIFO sink so tests can inspect exactly what + the pipeline produced without any terminal or network I/O. + """ + + def __init__(self): + self.frames: queue.Queue[list[str]] = queue.Queue() + self.width = 80 + self.height = 24 + self._init_called = False + + def init(self, width: int, height: int, reuse: bool = False) -> None: + self.width = width + self.height = height + self._init_called = True + + def show(self, buffer: list[str], border: bool = False) -> None: + # Deep copy to prevent later mutations + self.frames.put(list(buffer)) + + def clear(self) -> None: + pass + + def cleanup(self) -> None: + pass + + def get_dimensions(self) -> tuple[int, int]: + return (self.width, self.height) + + +class MarkerEffect(EffectPlugin): + """Effect that prepends a marker line to prove it ran. + + Each MarkerEffect adds a unique tag so tests can verify + which effects executed and in what order. + """ + + def __init__(self, tag: str = "MARKER"): + self._tag = tag + self.call_count = 0 + super().__init__() + + @property + def name(self) -> str: + return f"marker-{self._tag}" + + def configure(self, config: dict) -> None: + pass + + def process(self, buffer: list[str], ctx: EffectContext) -> list[str]: + self.call_count += 1 + if buffer is None: + return [f"[{self._tag}:EMPTY]"] + return [f"[{self._tag}]"] + list(buffer) + + +# ─── HELPERS ───────────────────────────────────────────── + + +def _build_pipeline( + items: list, + effects: list[tuple[str, EffectPlugin]] | None = None, + use_font_stage: bool = False, + width: int = 80, + height: int = 24, +) -> tuple[Pipeline, QueueDisplay, PipelineContext]: + """Build a fully-wired pipeline with a QueueDisplay sink. + + Args: + items: Content items to feed into the source. + effects: Optional list of (name, EffectPlugin) to add. + use_font_stage: Use FontStage instead of SourceItemsToBufferStage. + width: Viewport width. + height: Viewport height. + + Returns: + (pipeline, queue_display, context) tuple. + """ + display = QueueDisplay() + + ctx = PipelineContext() + params = PipelineParams() + params.viewport_width = width + params.viewport_height = height + params.frame_number = 0 + ctx.params = params + ctx.set("items", items) + + pipeline = Pipeline( + config=PipelineConfig(enable_metrics=True), + context=ctx, + ) + + # Source stage + source = ListDataSource(items, name="test-source") + pipeline.add_stage("source", DataSourceStage(source, name="test-source")) + + # Render stage + if use_font_stage: + pipeline.add_stage("render", FontStage(name="font")) + else: + pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + + # Effect stages + if effects: + for effect_name, effect_plugin in effects: + pipeline.add_stage( + f"effect_{effect_name}", + EffectPluginStage(effect_plugin, name=effect_name), + ) + + # Display stage + pipeline.add_stage("display", DisplayStage(display, name="queue")) + + pipeline.build() + pipeline.initialize() + + return pipeline, display, ctx + + +# ─── TESTS: HAPPY PATH ────────────────────────────────── + + +class TestPipelineE2EHappyPath: + """End-to-end: data flows source -> render -> display.""" + + def test_items_reach_display(self): + """Content items fed to source must appear in the display output.""" + items = [ + SourceItem(content="Hello World", source="test", timestamp="now"), + SourceItem(content="Second Item", source="test", timestamp="now"), + ] + pipeline, display, ctx = _build_pipeline(items) + + result = pipeline.execute(items) + + assert result.success, f"Pipeline failed: {result.error}" + frame = display.frames.get(timeout=1) + text = "\n".join(frame) + assert "Hello World" in text + assert "Second Item" in text + + def test_pipeline_output_is_list_of_strings(self): + """Display must receive list[str], not raw SourceItems.""" + items = [SourceItem(content="Line one", source="s", timestamp="t")] + pipeline, display, ctx = _build_pipeline(items) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + assert isinstance(frame, list) + for line in frame: + assert isinstance(line, str), f"Expected str, got {type(line)}: {line!r}" + + def test_multiline_items_are_split(self): + """Items with newlines should be split into individual buffer lines.""" + items = [ + SourceItem(content="Line A\nLine B\nLine C", source="s", timestamp="t") + ] + pipeline, display, ctx = _build_pipeline(items) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + assert "Line A" in frame + assert "Line B" in frame + assert "Line C" in frame + + def test_empty_source_produces_empty_buffer(self): + """An empty source should produce an empty (or blank) frame.""" + items = [] + pipeline, display, ctx = _build_pipeline(items) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + assert isinstance(frame, list) + + def test_multiple_frames_are_independent(self): + """Each execute() call should produce a distinct frame.""" + items = [SourceItem(content="frame-content", source="s", timestamp="t")] + pipeline, display, ctx = _build_pipeline(items) + + pipeline.execute(items) + pipeline.execute(items) + + f1 = display.frames.get(timeout=1) + f2 = display.frames.get(timeout=1) + assert f1 == f2 # Same input => same output + assert display.frames.empty() # Exactly 2 frames + + +# ─── TESTS: EFFECTS IN THE PIPELINE ───────────────────── + + +class TestPipelineE2EEffects: + """End-to-end: effects process the buffer between render and display.""" + + def test_single_effect_modifies_output(self): + """A single effect should visibly modify the output frame.""" + items = [SourceItem(content="Original", source="s", timestamp="t")] + marker = MarkerEffect("FX1") + pipeline, display, ctx = _build_pipeline(items, effects=[("marker", marker)]) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + assert "[FX1]" in frame, f"Marker not found in frame: {frame}" + assert "Original" in "\n".join(frame) + + def test_effect_chain_ordering(self): + """Multiple effects execute in the order they were added.""" + items = [SourceItem(content="data", source="s", timestamp="t")] + fx_a = MarkerEffect("A") + fx_b = MarkerEffect("B") + pipeline, display, ctx = _build_pipeline( + items, effects=[("alpha", fx_a), ("beta", fx_b)] + ) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + text = "\n".join(frame) + # B runs after A, so B's marker is prepended last => appears first + idx_a = text.index("[A]") + idx_b = text.index("[B]") + assert idx_b < idx_a, f"Expected [B] before [A], got: {frame}" + + def test_effect_receives_list_of_strings(self): + """Effects must receive list[str] from the render stage.""" + items = [SourceItem(content="check-type", source="s", timestamp="t")] + received_types = [] + + class TypeCheckEffect(EffectPlugin): + @property + def name(self): + return "typecheck" + + def configure(self, config): + pass + + def process(self, buffer, ctx): + received_types.append(type(buffer).__name__) + if isinstance(buffer, list): + for item in buffer: + received_types.append(type(item).__name__) + return buffer + + pipeline, display, ctx = _build_pipeline( + items, effects=[("typecheck", TypeCheckEffect())] + ) + + pipeline.execute(items) + + assert received_types[0] == "list", f"Buffer type: {received_types[0]}" + # All elements should be strings + for t in received_types[1:]: + assert t == "str", f"Buffer element type: {t}" + + def test_disabled_effect_is_skipped(self): + """A disabled effect should not process data.""" + items = [SourceItem(content="data", source="s", timestamp="t")] + marker = MarkerEffect("DISABLED") + pipeline, display, ctx = _build_pipeline( + items, effects=[("disabled-fx", marker)] + ) + + # Disable the effect stage + stage = pipeline.get_stage("effect_disabled-fx") + stage.set_enabled(False) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + assert "[DISABLED]" not in frame, "Disabled effect should not run" + assert marker.call_count == 0 + + +# ─── TESTS: STAGE EXECUTION ORDER & METRICS ───────────── + + +class TestPipelineE2EStageOrder: + """Verify all stages execute and metrics are collected.""" + + def test_all_stages_appear_in_execution_order(self): + """Pipeline build must include source, render, and display.""" + items = [SourceItem(content="x", source="s", timestamp="t")] + pipeline, display, ctx = _build_pipeline(items) + + order = pipeline.execution_order + assert "source" in order + assert "render" in order + assert "display" in order + + def test_execution_order_is_source_render_display(self): + """Source must come before render, render before display.""" + items = [SourceItem(content="x", source="s", timestamp="t")] + pipeline, display, ctx = _build_pipeline(items) + + order = pipeline.execution_order + assert order.index("source") < order.index("render") + assert order.index("render") < order.index("display") + + def test_effects_between_render_and_display(self): + """Effects must execute after render and before display.""" + items = [SourceItem(content="x", source="s", timestamp="t")] + marker = MarkerEffect("MID") + pipeline, display, ctx = _build_pipeline(items, effects=[("mid", marker)]) + + order = pipeline.execution_order + render_idx = order.index("render") + display_idx = order.index("display") + effect_idx = order.index("effect_mid") + assert render_idx < effect_idx < display_idx + + def test_metrics_collected_for_all_stages(self): + """After execution, metrics should exist for every active stage.""" + items = [SourceItem(content="x", source="s", timestamp="t")] + marker = MarkerEffect("M") + pipeline, display, ctx = _build_pipeline(items, effects=[("m", marker)]) + + pipeline.execute(items) + + summary = pipeline.get_metrics_summary() + assert "stages" in summary + stage_names = set(summary["stages"].keys()) + # All regular (non-overlay) stages should have metrics + assert "source" in stage_names + assert "render" in stage_names + assert "display" in stage_names + assert "effect_m" in stage_names + + +# ─── TESTS: FONT STAGE DATAFLOW ───────────────────────── + + +class TestFontStageDataflow: + """Verify FontStage correctly renders content through make_block. + + These tests expose the tuple-unpacking bug in FontStage.process() + where make_block returns (lines, color, meta_idx) but the code + does result.extend(block) instead of result.extend(block[0]). + """ + + def test_font_stage_unpacks_make_block_correctly(self): + """FontStage must produce list[str] output, not mixed types.""" + items = [ + SourceItem(content="Test Headline", source="test-src", timestamp="12345") + ] + + # Mock make_block to return its documented signature + mock_lines = [" RENDERED LINE 1", " RENDERED LINE 2", "", " meta info"] + mock_return = (mock_lines, "\033[38;5;46m", 3) + + with patch("engine.render.make_block", return_value=mock_return): + pipeline, display, ctx = _build_pipeline(items, use_font_stage=True) + + result = pipeline.execute(items) + + assert result.success, f"Pipeline failed: {result.error}" + frame = display.frames.get(timeout=1) + + # Every element in the frame must be a string + for i, line in enumerate(frame): + assert isinstance(line, str), ( + f"Frame line {i} is {type(line).__name__}: {line!r} " + f"(FontStage likely extended with raw tuple)" + ) + + def test_font_stage_output_contains_rendered_content(self): + """FontStage output should contain the rendered lines, not color codes.""" + items = [SourceItem(content="My Headline", source="src", timestamp="0")] + + mock_lines = [" BIG BLOCK TEXT", " MORE TEXT", "", " ░ src · 0"] + mock_return = (mock_lines, "\033[38;5;46m", 3) + + with patch("engine.render.make_block", return_value=mock_return): + pipeline, display, ctx = _build_pipeline(items, use_font_stage=True) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + text = "\n".join(frame) + assert "BIG BLOCK TEXT" in text + assert "MORE TEXT" in text + + def test_font_stage_does_not_leak_color_codes_as_lines(self): + """The ANSI color code from make_block must NOT appear as a frame line.""" + items = [SourceItem(content="Headline", source="s", timestamp="0")] + + color_code = "\033[38;5;46m" + mock_return = ([" rendered"], color_code, 0) + + with patch("engine.render.make_block", return_value=mock_return): + pipeline, display, ctx = _build_pipeline(items, use_font_stage=True) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + # The color code itself should not be a standalone line + assert color_code not in frame, ( + f"Color code leaked as a frame line: {frame}" + ) + # The meta_row_index (int) should not be a line either + for line in frame: + assert not isinstance(line, int), f"Integer leaked into frame: {line}" + + def test_font_stage_handles_multiple_items(self): + """FontStage should render each item through make_block.""" + items = [ + SourceItem(content="First", source="a", timestamp="1"), + SourceItem(content="Second", source="b", timestamp="2"), + ] + + call_count = 0 + + def mock_make_block(title, src, ts, w): + nonlocal call_count + call_count += 1 + return ([f" [{title}]"], "\033[0m", 0) + + with patch("engine.render.make_block", side_effect=mock_make_block): + pipeline, display, ctx = _build_pipeline(items, use_font_stage=True) + + result = pipeline.execute(items) + + assert result.success + assert call_count == 2, f"make_block called {call_count} times, expected 2" + frame = display.frames.get(timeout=1) + text = "\n".join(frame) + assert "[First]" in text + assert "[Second]" in text + + +# ─── TESTS: MIRROR OF app.py ASSEMBLY ─────────────────── + + +class TestAppPipelineAssembly: + """Verify the pipeline as assembled by app.py works end-to-end. + + This mirrors how run_pipeline_mode() builds the pipeline but + without any network or terminal dependencies. + """ + + def test_demo_preset_pipeline_produces_output(self): + """Simulates the 'demo' preset pipeline with stub data.""" + # Simulate what app.py does for the demo preset + items = [ + ("Breaking: Test passes", "UnitTest", "1234567890"), + ("Update: Coverage improves", "CI", "1234567891"), + ] + + display = QueueDisplay() + ctx = PipelineContext() + params = PipelineParams() + params.viewport_width = 80 + params.viewport_height = 24 + params.frame_number = 0 + ctx.params = params + ctx.set("items", items) + + pipeline = Pipeline( + config=PipelineConfig(enable_metrics=True), + context=ctx, + ) + + # Mirror app.py: ListDataSource -> SourceItemsToBufferStage -> display + source = ListDataSource(items, name="headlines") + pipeline.add_stage("source", DataSourceStage(source, name="headlines")) + pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + pipeline.add_stage("display", DisplayStage(display, name="queue")) + + pipeline.build() + pipeline.initialize() + + result = pipeline.execute(items) + + assert result.success, f"Pipeline failed: {result.error}" + assert not display.frames.empty(), "Display received no frames" + + frame = display.frames.get(timeout=1) + assert isinstance(frame, list) + assert len(frame) > 0 + # All lines must be strings + for line in frame: + assert isinstance(line, str)