fix: ListDataSource cache and camera dependency resolution

Two critical fixes:

1. ListDataSource Cache Bug
   - Previously, ListDataSource.__init__ cached raw tuples directly
   - get_items() would return cached raw tuples without converting to SourceItem
   - This caused SourceItemsToBufferStage to receive tuples and stringify them
   - Results: ugly tuple representations in terminal/pygame instead of formatted text
   - Fix: Store raw items in _raw_items, let fetch() convert to SourceItem
   - Cache now contains proper SourceItem objects

2. Camera Dependency Resolution
   - CameraStage declared dependency on 'source.items' exactly
   - DataSourceStage provides 'source.headlines' (or 'source.poetry', etc.)
   - Capability matching didn't trigger prefix match for exact dependency
   - Fix: Change CameraStage dependency to 'source' for prefix matching

3. Added app.py Camera Stage Support
   - Pipeline now adds camera stage from preset.camera config
   - Supports vertical, horizontal, omni, floating, bounce modes
   - Tests now passing with proper data flow through all stages

Tests: All 502 tests passing, 16 skipped
This commit is contained in:
2026-03-16 21:55:57 -07:00
parent 85d8b29bab
commit affafe810c
17 changed files with 708 additions and 1301 deletions

View File

@@ -88,17 +88,9 @@ class HudEffect(EffectPlugin):
f"\033[2;1H\033[38;5;45mEFFECT:\033[0m \033[1;38;5;227m{effect_name:12s}\033[0m \033[38;5;245m|\033[0m {bar} \033[38;5;245m|\033[0m \033[38;5;219m{effect_intensity * 100:.0f}%\033[0m" f"\033[2;1H\033[38;5;45mEFFECT:\033[0m \033[1;38;5;227m{effect_name:12s}\033[0m \033[38;5;245m|\033[0m {bar} \033[38;5;245m|\033[0m \033[38;5;219m{effect_intensity * 100:.0f}%\033[0m"
) )
# Try to get pipeline order from context # Get pipeline order from context
pipeline_order = ctx.get_state("pipeline_order") pipeline_order = ctx.get_state("pipeline_order")
if pipeline_order: pipeline_str = ",".join(pipeline_order) if pipeline_order else "(none)"
pipeline_str = ",".join(pipeline_order)
else:
# Fallback to legacy effect chain
from engine.effects import get_effect_chain
chain = get_effect_chain()
order = chain.get_order() if chain else []
pipeline_str = ",".join(order) if order else "(none)"
hud_lines.append(f"\033[3;1H\033[38;5;44mPIPELINE:\033[0m {pipeline_str}") hud_lines.append(f"\033[3;1H\033[38;5;44mPIPELINE:\033[0m {pipeline_str}")
for i, line in enumerate(hud_lines): for i, line in enumerate(hud_lines):

View File

@@ -152,9 +152,35 @@ def run_pipeline_mode(preset_name: str = "demo"):
list_source = ListDataSource(items, name=preset.source) list_source = ListDataSource(items, name=preset.source)
pipeline.add_stage("source", DataSourceStage(list_source, name=preset.source)) pipeline.add_stage("source", DataSourceStage(list_source, name=preset.source))
# Add render stage - convert items to buffer # Add FontStage for headlines/poetry (default for demo)
if preset.source in ["headlines", "poetry"]:
from engine.pipeline.adapters import FontStage
pipeline.add_stage("font", FontStage(name="font"))
else:
# Fallback to simple conversion for other sources
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
# Add camera stage if specified in preset
if preset.camera:
from engine.camera import Camera
from engine.pipeline.adapters import CameraStage
camera = None
if preset.camera == "vertical":
camera = Camera.vertical()
elif preset.camera == "horizontal":
camera = Camera.horizontal()
elif preset.camera == "omni":
camera = Camera.omni()
elif preset.camera == "floating":
camera = Camera.floating()
elif preset.camera == "bounce":
camera = Camera.bounce()
if camera:
pipeline.add_stage("camera", CameraStage(camera, name=preset.camera))
for effect_name in preset.effects: for effect_name in preset.effects:
effect = effect_registry.get(effect_name) effect = effect_registry.get(effect_name)
if effect: if effect:

View File

@@ -124,7 +124,8 @@ class ListDataSource(DataSource):
""" """
def __init__(self, items, name: str = "list"): def __init__(self, items, name: str = "list"):
self._items = items self._raw_items = items # Store raw items separately
self._items = None # Cache for converted SourceItem objects
self._name = name self._name = name
@property @property
@@ -138,7 +139,7 @@ class ListDataSource(DataSource):
def fetch(self) -> list[SourceItem]: def fetch(self) -> list[SourceItem]:
# Convert tuple items to SourceItem if needed # Convert tuple items to SourceItem if needed
result = [] result = []
for item in self._items: for item in self._raw_items:
if isinstance(item, SourceItem): if isinstance(item, SourceItem):
result.append(item) result.append(item)
elif isinstance(item, tuple) and len(item) >= 3: elif isinstance(item, tuple) and len(item) >= 3:

View File

@@ -18,13 +18,6 @@ from engine.effects.types import (
create_effect_context, create_effect_context,
) )
def get_effect_chain():
from engine.legacy.layers import get_effect_chain as _chain
return _chain()
__all__ = [ __all__ = [
"EffectChain", "EffectChain",
"EffectRegistry", "EffectRegistry",
@@ -34,7 +27,6 @@ __all__ = [
"create_effect_context", "create_effect_context",
"get_registry", "get_registry",
"set_registry", "set_registry",
"get_effect_chain",
"get_monitor", "get_monitor",
"set_monitor", "set_monitor",
"PerformanceMonitor", "PerformanceMonitor",

View File

@@ -6,14 +6,7 @@ _effect_chain_ref = None
def _get_effect_chain(): def _get_effect_chain():
global _effect_chain_ref global _effect_chain_ref
if _effect_chain_ref is not None:
return _effect_chain_ref return _effect_chain_ref
try:
from engine.legacy.layers import get_effect_chain as _chain
return _chain()
except Exception:
return None
def set_effect_chain_ref(chain) -> None: def set_effect_chain_ref(chain) -> None:

View File

@@ -1,15 +0,0 @@
"""
Legacy rendering modules for backwards compatibility.
This package contains deprecated rendering code from the old pipeline architecture.
These modules are maintained for backwards compatibility with adapters and tests,
but should not be used in new code.
New code should use the Stage-based pipeline architecture instead.
Modules:
- render: Legacy font/gradient rendering functions
- layers: Legacy layer compositing and effect application
All modules in this package are marked deprecated and will be removed in a future version.
"""

View File

@@ -1,272 +0,0 @@
"""
Layer compositing — message overlay, ticker zone, firehose, noise.
Depends on: config, render, effects.
.. deprecated::
This module contains legacy rendering code. New pipeline code should
use the Stage-based pipeline architecture instead. This module is
maintained for backwards compatibility with the demo mode.
"""
import random
import re
import time
from datetime import datetime
from engine import config
from engine.effects import (
EffectChain,
EffectContext,
fade_line,
firehose_line,
glitch_bar,
noise,
vis_offset,
vis_trunc,
)
from engine.legacy.render import big_wrap, lr_gradient, lr_gradient_opposite
from engine.terminal import RST, W_COOL
MSG_META = "\033[38;5;245m"
MSG_BORDER = "\033[2;38;5;37m"
def render_message_overlay(
msg: tuple[str, str, float] | None,
w: int,
h: int,
msg_cache: tuple,
) -> tuple[list[str], tuple]:
"""Render ntfy message overlay.
Args:
msg: (title, body, timestamp) or None
w: terminal width
h: terminal height
msg_cache: (cache_key, rendered_rows) for caching
Returns:
(list of ANSI strings, updated cache)
"""
overlay = []
if msg is None:
return overlay, msg_cache
m_title, m_body, m_ts = msg
display_text = m_body or m_title or "(empty)"
display_text = re.sub(r"\s+", " ", display_text.upper())
cache_key = (display_text, w)
if msg_cache[0] != cache_key:
msg_rows = big_wrap(display_text, w - 4)
msg_cache = (cache_key, msg_rows)
else:
msg_rows = msg_cache[1]
msg_rows = lr_gradient_opposite(
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
)
elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
ts_str = datetime.now().strftime("%H:%M:%S")
panel_h = len(msg_rows) + 2
panel_top = max(0, (h - panel_h) // 2)
row_idx = 0
for mr in msg_rows:
ln = vis_trunc(mr, w)
overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K")
row_idx += 1
meta_parts = []
if m_title and m_title != m_body:
meta_parts.append(m_title)
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
meta = (
" " + " \u00b7 ".join(meta_parts)
if len(meta_parts) > 1
else " " + meta_parts[0]
)
overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K")
row_idx += 1
bar = "\u2500" * (w - 4)
overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K")
return overlay, msg_cache
def render_ticker_zone(
active: list,
scroll_cam: int,
camera_x: int = 0,
ticker_h: int = 0,
w: int = 80,
noise_cache: dict | None = None,
grad_offset: float = 0.0,
) -> tuple[list[str], dict]:
"""Render the ticker scroll zone.
Args:
active: list of (content_rows, color, canvas_y, meta_idx)
scroll_cam: camera position (viewport top)
camera_x: horizontal camera offset
ticker_h: height of ticker zone
w: terminal width
noise_cache: dict of cy -> noise string
grad_offset: gradient animation offset
Returns:
(list of ANSI strings, updated noise_cache)
"""
if noise_cache is None:
noise_cache = {}
buf = []
top_zone = max(1, int(ticker_h * 0.25))
bot_zone = max(1, int(ticker_h * 0.10))
def noise_at(cy):
if cy not in noise_cache:
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
return noise_cache[cy]
for r in range(ticker_h):
scr_row = r + 1
cy = scroll_cam + r
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
row_fade = min(top_f, bot_f)
drawn = False
for content, hc, by, midx in active:
cr = cy - by
if 0 <= cr < len(content):
raw = content[cr]
if cr != midx:
colored = lr_gradient([raw], grad_offset)[0]
else:
colored = raw
ln = vis_trunc(vis_offset(colored, camera_x), w)
if row_fade < 1.0:
ln = fade_line(ln, row_fade)
if cr == midx:
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
elif ln.strip():
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
else:
buf.append(f"\033[{scr_row};1H\033[K")
drawn = True
break
if not drawn:
n = noise_at(cy)
if row_fade < 1.0 and n:
n = fade_line(n, row_fade)
if n:
buf.append(f"\033[{scr_row};1H{n}")
else:
buf.append(f"\033[{scr_row};1H\033[K")
return buf, noise_cache
def apply_glitch(
buf: list[str],
ticker_buf_start: int,
mic_excess: float,
w: int,
) -> list[str]:
"""Apply glitch effect to ticker buffer.
Args:
buf: current buffer
ticker_buf_start: index where ticker starts in buffer
mic_excess: mic level above threshold
w: terminal width
Returns:
Updated buffer with glitches applied
"""
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
n_hits = 4 + int(mic_excess / 2)
ticker_buf_len = len(buf) - ticker_buf_start
if random.random() < glitch_prob and ticker_buf_len > 0:
for _ in range(min(n_hits, ticker_buf_len)):
gi = random.randint(0, ticker_buf_len - 1)
scr_row = gi + 1
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
return buf
def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
"""Render firehose strip at bottom of screen."""
buf = []
if fh > 0:
for fr in range(fh):
scr_row = h - fh + fr + 1
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
return buf
_effect_chain = None
def init_effects() -> None:
"""Initialize effect plugins and chain."""
global _effect_chain
from engine.effects import EffectChain, get_registry
registry = get_registry()
import effects_plugins
effects_plugins.discover_plugins()
chain = EffectChain(registry)
chain.set_order(["noise", "fade", "glitch", "firehose"])
_effect_chain = chain
def process_effects(
buf: list[str],
w: int,
h: int,
scroll_cam: int,
ticker_h: int,
camera_x: int = 0,
mic_excess: float = 0.0,
grad_offset: float = 0.0,
frame_number: int = 0,
has_message: bool = False,
items: list | None = None,
) -> list[str]:
"""Process buffer through effect chain."""
if _effect_chain is None:
init_effects()
ctx = EffectContext(
terminal_width=w,
terminal_height=h,
scroll_cam=scroll_cam,
camera_x=camera_x,
ticker_height=ticker_h,
mic_excess=mic_excess,
grad_offset=grad_offset,
frame_number=frame_number,
has_message=has_message,
items=items or [],
)
return _effect_chain.process(buf, ctx)
def get_effect_chain() -> EffectChain | None:
"""Get the effect chain instance."""
global _effect_chain
if _effect_chain is None:
init_effects()
return _effect_chain

View File

@@ -1,563 +0,0 @@
"""
Pipeline introspection - generates self-documenting diagrams of the render pipeline.
Pipeline Architecture:
- Sources: Data providers (RSS, Poetry, Ntfy, Mic) - static or dynamic
- Fetch: Retrieve data from sources
- Prepare: Transform raw data (make_block, strip_tags, translate)
- Scroll: Camera-based viewport rendering (ticker zone, message overlay)
- Effects: Post-processing chain (noise, fade, glitch, firehose, hud)
- Render: Final line rendering and layout
- Display: Output backends (terminal, pygame, websocket, sixel, kitty)
Key abstractions:
- DataSource: Sources can be static (cached) or dynamic (idempotent fetch)
- Camera: Viewport controller (vertical, horizontal, omni, floating, trace)
- EffectChain: Ordered effect processing pipeline
- Display: Pluggable output backends
- SourceRegistry: Source discovery and management
- AnimationController: Time-based parameter animation
- Preset: Package of initial params + animation for demo modes
"""
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class PipelineNode:
"""Represents a node in the pipeline."""
name: str
module: str
class_name: str | None = None
func_name: str | None = None
description: str = ""
inputs: list[str] | None = None
outputs: list[str] | None = None
metrics: dict | None = None # Performance metrics (avg_ms, min_ms, max_ms)
class PipelineIntrospector:
"""Introspects the render pipeline and generates documentation."""
def __init__(self):
self.nodes: list[PipelineNode] = []
def add_node(self, node: PipelineNode) -> None:
self.nodes.append(node)
def generate_mermaid_flowchart(self) -> str:
"""Generate a Mermaid flowchart of the pipeline."""
lines = ["```mermaid", "flowchart TD"]
subgraph_groups = {
"Sources": [],
"Fetch": [],
"Prepare": [],
"Scroll": [],
"Effects": [],
"Display": [],
"Async": [],
"Animation": [],
"Viz": [],
}
other_nodes = []
for node in self.nodes:
node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_")
label = node.name
if node.class_name:
label = f"{node.name}\\n({node.class_name})"
elif node.func_name:
label = f"{node.name}\\n({node.func_name})"
if node.description:
label += f"\\n{node.description}"
if node.metrics:
avg = node.metrics.get("avg_ms", 0)
if avg > 0:
label += f"\\n⏱ {avg:.1f}ms"
impact = node.metrics.get("impact_pct", 0)
if impact > 0:
label += f" ({impact:.0f}%)"
node_entry = f' {node_id}["{label}"]'
if "DataSource" in node.name or "SourceRegistry" in node.name:
subgraph_groups["Sources"].append(node_entry)
elif "fetch" in node.name.lower():
subgraph_groups["Fetch"].append(node_entry)
elif (
"make_block" in node.name
or "strip_tags" in node.name
or "translate" in node.name
):
subgraph_groups["Prepare"].append(node_entry)
elif (
"StreamController" in node.name
or "render_ticker" in node.name
or "render_message" in node.name
or "Camera" in node.name
):
subgraph_groups["Scroll"].append(node_entry)
elif "Effect" in node.name or "effect" in node.module:
subgraph_groups["Effects"].append(node_entry)
elif "Display:" in node.name:
subgraph_groups["Display"].append(node_entry)
elif "Ntfy" in node.name or "Mic" in node.name:
subgraph_groups["Async"].append(node_entry)
elif "Animation" in node.name or "Preset" in node.name:
subgraph_groups["Animation"].append(node_entry)
else:
other_nodes.append(node_entry)
for group_name, nodes in subgraph_groups.items():
if nodes:
lines.append(f" subgraph {group_name}")
for node in nodes:
lines.append(node)
lines.append(" end")
for node in other_nodes:
lines.append(node)
lines.append("")
for node in self.nodes:
node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_")
if node.inputs:
for inp in node.inputs:
inp_id = inp.replace("-", "_").replace(" ", "_").replace(":", "_")
lines.append(f" {inp_id} --> {node_id}")
lines.append("```")
return "\n".join(lines)
def generate_mermaid_sequence(self) -> str:
"""Generate a Mermaid sequence diagram of message flow."""
lines = ["```mermaid", "sequenceDiagram"]
lines.append(" participant Sources")
lines.append(" participant Fetch")
lines.append(" participant Scroll")
lines.append(" participant Effects")
lines.append(" participant Display")
lines.append(" Sources->>Fetch: headlines")
lines.append(" Fetch->>Scroll: content blocks")
lines.append(" Scroll->>Effects: buffer")
lines.append(" Effects->>Effects: process chain")
lines.append(" Effects->>Display: rendered buffer")
lines.append("```")
return "\n".join(lines)
def generate_mermaid_state(self) -> str:
"""Generate a Mermaid state diagram of camera modes."""
lines = ["```mermaid", "stateDiagram-v2"]
lines.append(" [*] --> Vertical")
lines.append(" Vertical --> Horizontal: set_mode()")
lines.append(" Horizontal --> Omni: set_mode()")
lines.append(" Omni --> Floating: set_mode()")
lines.append(" Floating --> Trace: set_mode()")
lines.append(" Trace --> Vertical: set_mode()")
lines.append(" state Vertical {")
lines.append(" [*] --> ScrollUp")
lines.append(" ScrollUp --> ScrollUp: +y each frame")
lines.append(" }")
lines.append(" state Horizontal {")
lines.append(" [*] --> ScrollLeft")
lines.append(" ScrollLeft --> ScrollLeft: +x each frame")
lines.append(" }")
lines.append(" state Omni {")
lines.append(" [*] --> Diagonal")
lines.append(" Diagonal --> Diagonal: +x, +y")
lines.append(" }")
lines.append(" state Floating {")
lines.append(" [*] --> Bobbing")
lines.append(" Bobbing --> Bobbing: sin(time)")
lines.append(" }")
lines.append(" state Trace {")
lines.append(" [*] --> FollowPath")
lines.append(" FollowPath --> FollowPath: node by node")
lines.append(" }")
lines.append("```")
return "\n".join(lines)
def generate_full_diagram(self) -> str:
"""Generate full pipeline documentation."""
lines = [
"# Render Pipeline",
"",
"## Data Flow",
"",
self.generate_mermaid_flowchart(),
"",
"## Message Sequence",
"",
self.generate_mermaid_sequence(),
"",
"## Camera States",
"",
self.generate_mermaid_state(),
]
return "\n".join(lines)
def introspect_sources(self) -> None:
"""Introspect data sources."""
from engine import sources
for name in dir(sources):
obj = getattr(sources, name)
if isinstance(obj, dict):
self.add_node(
PipelineNode(
name=f"Data Source: {name}",
module="engine.sources",
description=f"{len(obj)} feeds configured",
)
)
def introspect_sources_v2(self) -> None:
"""Introspect data sources v2 (new abstraction)."""
from engine.data_sources.sources import SourceRegistry, init_default_sources
init_default_sources()
SourceRegistry()
self.add_node(
PipelineNode(
name="SourceRegistry",
module="engine.data_sources.sources",
class_name="SourceRegistry",
description="Source discovery and management",
)
)
for name, desc in [
("HeadlinesDataSource", "RSS feed headlines"),
("PoetryDataSource", "Poetry DB"),
("PipelineDataSource", "Pipeline viz (dynamic)"),
]:
self.add_node(
PipelineNode(
name=f"DataSource: {name}",
module="engine.sources_v2",
class_name=name,
description=f"{desc}",
)
)
def introspect_prepare(self) -> None:
"""Introspect prepare layer (transformation)."""
self.add_node(
PipelineNode(
name="make_block",
module="engine.render",
func_name="make_block",
description="Transform headline into display block",
inputs=["title", "source", "timestamp", "width"],
outputs=["block"],
)
)
self.add_node(
PipelineNode(
name="strip_tags",
module="engine.filter",
func_name="strip_tags",
description="Remove HTML tags from content",
inputs=["html"],
outputs=["plain_text"],
)
)
self.add_node(
PipelineNode(
name="translate_headline",
module="engine.translate",
func_name="translate_headline",
description="Translate headline to target language",
inputs=["title", "target_lang"],
outputs=["translated_title"],
)
)
def introspect_fetch(self) -> None:
"""Introspect fetch layer."""
self.add_node(
PipelineNode(
name="fetch_all",
module="engine.fetch",
func_name="fetch_all",
description="Fetch RSS feeds",
outputs=["items"],
)
)
self.add_node(
PipelineNode(
name="fetch_poetry",
module="engine.fetch",
func_name="fetch_poetry",
description="Fetch Poetry DB",
outputs=["items"],
)
)
def introspect_scroll(self) -> None:
"""Introspect scroll engine (legacy - replaced by pipeline architecture)."""
self.add_node(
PipelineNode(
name="render_ticker_zone",
module="engine.layers",
func_name="render_ticker_zone",
description="Render scrolling ticker content",
inputs=["active", "camera"],
outputs=["buffer"],
)
)
self.add_node(
PipelineNode(
name="render_message_overlay",
module="engine.layers",
func_name="render_message_overlay",
description="Render ntfy message overlay",
inputs=["msg", "width", "height"],
outputs=["overlay", "cache"],
)
)
def introspect_render(self) -> None:
"""Introspect render layer."""
self.add_node(
PipelineNode(
name="big_wrap",
module="engine.render",
func_name="big_wrap",
description="Word-wrap text to width",
inputs=["text", "width"],
outputs=["lines"],
)
)
self.add_node(
PipelineNode(
name="lr_gradient",
module="engine.render",
func_name="lr_gradient",
description="Apply left-right gradient to lines",
inputs=["lines", "position"],
outputs=["styled_lines"],
)
)
def introspect_async_sources(self) -> None:
"""Introspect async data sources (ntfy, mic)."""
self.add_node(
PipelineNode(
name="NtfyPoller",
module="engine.ntfy",
class_name="NtfyPoller",
description="Poll ntfy for messages (async)",
inputs=["topic"],
outputs=["message"],
)
)
self.add_node(
PipelineNode(
name="MicMonitor",
module="engine.mic",
class_name="MicMonitor",
description="Monitor microphone input (async)",
outputs=["audio_level"],
)
)
def introspect_eventbus(self) -> None:
"""Introspect event bus for decoupled communication."""
self.add_node(
PipelineNode(
name="EventBus",
module="engine.eventbus",
class_name="EventBus",
description="Thread-safe event publishing",
inputs=["event"],
outputs=["subscribers"],
)
)
def introspect_animation(self) -> None:
"""Introspect animation system."""
self.add_node(
PipelineNode(
name="AnimationController",
module="engine.animation",
class_name="AnimationController",
description="Time-based parameter animation",
inputs=["dt"],
outputs=["params"],
)
)
self.add_node(
PipelineNode(
name="Preset",
module="engine.animation",
class_name="Preset",
description="Package of initial params + animation",
)
)
def introspect_camera(self) -> None:
"""Introspect camera system."""
self.add_node(
PipelineNode(
name="Camera",
module="engine.camera",
class_name="Camera",
description="Viewport position controller",
inputs=["dt"],
outputs=["x", "y"],
)
)
def introspect_effects(self) -> None:
"""Introspect effect system."""
self.add_node(
PipelineNode(
name="EffectChain",
module="engine.effects",
class_name="EffectChain",
description="Process effects in sequence",
inputs=["buffer", "context"],
outputs=["buffer"],
)
)
self.add_node(
PipelineNode(
name="EffectRegistry",
module="engine.effects",
class_name="EffectRegistry",
description="Manage effect plugins",
)
)
def introspect_display(self) -> None:
"""Introspect display backends."""
from engine.display import DisplayRegistry
DisplayRegistry.initialize()
backends = DisplayRegistry.list_backends()
for backend in backends:
self.add_node(
PipelineNode(
name=f"Display: {backend}",
module="engine.display.backends",
class_name=f"{backend.title()}Display",
description=f"Render to {backend}",
inputs=["buffer"],
)
)
def introspect_new_pipeline(self, pipeline=None) -> None:
"""Introspect new unified pipeline stages with metrics.
Args:
pipeline: Optional Pipeline instance to collect metrics from
"""
stages_info = [
(
"Effect",
"engine.pipeline.adapters",
"EffectPluginStage",
"Applies effect",
),
(
"Display",
"engine.pipeline.adapters",
"DisplayStage",
"Outputs to display",
),
]
metrics = None
if pipeline and hasattr(pipeline, "get_metrics_summary"):
metrics = pipeline.get_metrics_summary()
if "error" in metrics:
metrics = None
total_avg = metrics.get("pipeline", {}).get("avg_ms", 0) if metrics else 0
for stage_name, module, class_name, desc in stages_info:
node_metrics = None
if metrics and "stages" in metrics:
for name, stats in metrics["stages"].items():
if stage_name.lower() in name.lower():
impact_pct = (
(stats.get("avg_ms", 0) / total_avg * 100)
if total_avg > 0
else 0
)
node_metrics = {
"avg_ms": stats.get("avg_ms", 0),
"min_ms": stats.get("min_ms", 0),
"max_ms": stats.get("max_ms", 0),
"impact_pct": impact_pct,
}
break
self.add_node(
PipelineNode(
name=f"Pipeline: {stage_name}",
module=module,
class_name=class_name,
description=desc,
inputs=["data"],
outputs=["data"],
metrics=node_metrics,
)
)
def run(self) -> str:
"""Run full introspection."""
self.introspect_sources()
self.introspect_sources_v2()
self.introspect_fetch()
self.introspect_prepare()
self.introspect_scroll()
self.introspect_render()
self.introspect_camera()
self.introspect_effects()
self.introspect_display()
self.introspect_async_sources()
self.introspect_eventbus()
self.introspect_animation()
return self.generate_full_diagram()
def generate_pipeline_diagram() -> str:
"""Generate a self-documenting pipeline diagram."""
introspector = PipelineIntrospector()
return introspector.run()
if __name__ == "__main__":
print(generate_pipeline_diagram())

View File

@@ -254,7 +254,9 @@ class CameraStage(Stage):
@property @property
def dependencies(self) -> set[str]: def dependencies(self) -> set[str]:
return {"source.items"} return {
"source"
} # Prefix match any source (source.headlines, source.poetry, etc.)
def process(self, data: Any, ctx: PipelineContext) -> Any: def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Apply camera transformation to data.""" """Apply camera transformation to data."""
@@ -334,7 +336,7 @@ class FontStage(Stage):
if data is None: if data is None:
return None return None
from engine.legacy.render import make_block from engine.render import make_block
w = ctx.params.viewport_width if ctx.params else 80 w = ctx.params.viewport_width if ctx.params else 80
@@ -361,8 +363,8 @@ class FontStage(Stage):
ts = "0" ts = "0"
try: try:
block = make_block(title, src, ts, w) block_lines, color_code, meta_idx = make_block(title, src, ts, w)
result.extend(block) result.extend(block_lines)
except Exception: except Exception:
result.append(title) result.append(title)
@@ -403,10 +405,14 @@ class ImageToTextStage(Stage):
return "transform" return "transform"
@property @property
def capabilities(self) -> set[str]: def outlet_types(self) -> set:
from engine.pipeline.core import DataType from engine.pipeline.core import DataType
return {f"transform.{self.name}", DataType.TEXT_BUFFER} return {DataType.TEXT_BUFFER}
@property
def capabilities(self) -> set[str]:
return {f"transform.{self.name}", "render.output"}
@property @property
def dependencies(self) -> set[str]: def dependencies(self) -> set[str]:

37
engine/render/__init__.py Normal file
View File

@@ -0,0 +1,37 @@
"""Modern block rendering system - OTF font to terminal half-block conversion.
This module provides the core rendering capabilities for big block letters
and styled text output using PIL fonts and ANSI terminal rendering.
Exports:
- make_block: Render a headline into a content block with color
- big_wrap: Word-wrap text and render with OTF font
- render_line: Render a line of text as terminal rows using half-blocks
- font_for_lang: Get appropriate font for a language
- clear_font_cache: Reset cached font objects
- lr_gradient: Color block characters with left-to-right gradient
- lr_gradient_opposite: Complementary gradient coloring
"""
from engine.render.blocks import (
big_wrap,
clear_font_cache,
font_for_lang,
list_font_faces,
load_font_face,
make_block,
render_line,
)
from engine.render.gradient import lr_gradient, lr_gradient_opposite
__all__ = [
"big_wrap",
"clear_font_cache",
"font_for_lang",
"list_font_faces",
"load_font_face",
"lr_gradient",
"lr_gradient_opposite",
"make_block",
"render_line",
]

View File

@@ -1,12 +1,6 @@
""" """Block rendering core - Font loading, text rasterization, word-wrap, and headline assembly.
OTF terminal half-block rendering pipeline.
Font loading, text rasterization, word-wrap, gradient coloring, headline block assembly.
Depends on: config, terminal, sources, translate.
.. deprecated:: Provides PIL font-based rendering to terminal half-block characters.
This module contains legacy rendering code. New pipeline code should
use the Stage-based pipeline architecture instead. This module is
maintained for backwards compatibility with the demo mode.
""" """
import random import random
@@ -17,42 +11,8 @@ from PIL import Image, ImageDraw, ImageFont
from engine import config from engine import config
from engine.sources import NO_UPPER, SCRIPT_FONTS, SOURCE_LANGS from engine.sources import NO_UPPER, SCRIPT_FONTS, SOURCE_LANGS
from engine.terminal import RST
from engine.translate import detect_location_language, translate_headline from engine.translate import detect_location_language, translate_headline
# ─── GRADIENT ─────────────────────────────────────────────
# Left → right: white-hot leading edge fades to near-black
GRAD_COLS = [
"\033[1;38;5;231m", # white
"\033[1;38;5;195m", # pale cyan-white
"\033[38;5;123m", # bright cyan
"\033[38;5;118m", # bright lime
"\033[38;5;82m", # lime
"\033[38;5;46m", # bright green
"\033[38;5;40m", # green
"\033[38;5;34m", # medium green
"\033[38;5;28m", # dark green
"\033[38;5;22m", # deep green
"\033[2;38;5;22m", # dim deep green
"\033[2;38;5;235m", # near black
]
# Complementary sweep for queue messages (opposite hue family from ticker greens)
MSG_GRAD_COLS = [
"\033[1;38;5;231m", # white
"\033[1;38;5;225m", # pale pink-white
"\033[38;5;219m", # bright pink
"\033[38;5;213m", # hot pink
"\033[38;5;207m", # magenta
"\033[38;5;201m", # bright magenta
"\033[38;5;165m", # orchid-red
"\033[38;5;161m", # ruby-magenta
"\033[38;5;125m", # dark magenta
"\033[38;5;89m", # deep maroon-magenta
"\033[2;38;5;89m", # dim deep maroon-magenta
"\033[2;38;5;235m", # near black
]
# ─── FONT LOADING ───────────────────────────────────────── # ─── FONT LOADING ─────────────────────────────────────────
_FONT_OBJ = None _FONT_OBJ = None
_FONT_OBJ_KEY = None _FONT_OBJ_KEY = None
@@ -194,36 +154,22 @@ def big_wrap(text, max_w, fnt=None):
return out return out
def lr_gradient(rows, offset=0.0, grad_cols=None):
"""Color each non-space block character with a shifting left-to-right gradient."""
cols = grad_cols or GRAD_COLS
n = len(cols)
max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1)
out = []
for row in rows:
if not row.strip():
out.append(row)
continue
buf = []
for x, ch in enumerate(row):
if ch == " ":
buf.append(" ")
else:
shifted = (x / max(max_x - 1, 1) + offset) % 1.0
idx = min(round(shifted * (n - 1)), n - 1)
buf.append(f"{cols[idx]}{ch}{RST}")
out.append("".join(buf))
return out
def lr_gradient_opposite(rows, offset=0.0):
"""Complementary (opposite wheel) gradient used for queue message panels."""
return lr_gradient(rows, offset, MSG_GRAD_COLS)
# ─── HEADLINE BLOCK ASSEMBLY ───────────────────────────── # ─── HEADLINE BLOCK ASSEMBLY ─────────────────────────────
def make_block(title, src, ts, w): def make_block(title, src, ts, w):
"""Render a headline into a content block with color.""" """Render a headline into a content block with color.
Args:
title: Headline text to render
src: Source identifier (for metadata)
ts: Timestamp string (for metadata)
w: Width constraint in terminal characters
Returns:
tuple: (content_lines, color_code, meta_row_index)
- content_lines: List of rendered text lines
- color_code: ANSI color code for display
- meta_row_index: Row index of metadata line
"""
target_lang = ( target_lang = (
(SOURCE_LANGS.get(src) or detect_location_language(title)) (SOURCE_LANGS.get(src) or detect_location_language(title))
if config.MODE == "news" if config.MODE == "news"

82
engine/render/gradient.py Normal file
View File

@@ -0,0 +1,82 @@
"""Gradient coloring for rendered block characters.
Provides left-to-right and complementary gradient effects for terminal display.
"""
from engine.terminal import RST
# Left → right: white-hot leading edge fades to near-black
GRAD_COLS = [
"\033[1;38;5;231m", # white
"\033[1;38;5;195m", # pale cyan-white
"\033[38;5;123m", # bright cyan
"\033[38;5;118m", # bright lime
"\033[38;5;82m", # lime
"\033[38;5;46m", # bright green
"\033[38;5;40m", # green
"\033[38;5;34m", # medium green
"\033[38;5;28m", # dark green
"\033[38;5;22m", # deep green
"\033[2;38;5;22m", # dim deep green
"\033[2;38;5;235m", # near black
]
# Complementary sweep for queue messages (opposite hue family from ticker greens)
MSG_GRAD_COLS = [
"\033[1;38;5;231m", # white
"\033[1;38;5;225m", # pale pink-white
"\033[38;5;219m", # bright pink
"\033[38;5;213m", # hot pink
"\033[38;5;207m", # magenta
"\033[38;5;201m", # bright magenta
"\033[38;5;165m", # orchid-red
"\033[38;5;161m", # ruby-magenta
"\033[38;5;125m", # dark magenta
"\033[38;5;89m", # deep maroon-magenta
"\033[2;38;5;89m", # dim deep maroon-magenta
"\033[2;38;5;235m", # near black
]
def lr_gradient(rows, offset=0.0, grad_cols=None):
"""Color each non-space block character with a shifting left-to-right gradient.
Args:
rows: List of text lines with block characters
offset: Gradient offset (0.0-1.0) for animation
grad_cols: List of ANSI color codes (default: GRAD_COLS)
Returns:
List of lines with gradient coloring applied
"""
cols = grad_cols or GRAD_COLS
n = len(cols)
max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1)
out = []
for row in rows:
if not row.strip():
out.append(row)
continue
buf = []
for x, ch in enumerate(row):
if ch == " ":
buf.append(" ")
else:
shifted = (x / max(max_x - 1, 1) + offset) % 1.0
idx = min(round(shifted * (n - 1)), n - 1)
buf.append(f"{cols[idx]}{ch}{RST}")
out.append("".join(buf))
return out
def lr_gradient_opposite(rows, offset=0.0):
"""Complementary (opposite wheel) gradient used for queue message panels.
Args:
rows: List of text lines with block characters
offset: Gradient offset (0.0-1.0) for animation
Returns:
List of lines with complementary gradient coloring applied
"""
return lr_gradient(rows, offset, MSG_GRAD_COLS)

View File

@@ -1,112 +0,0 @@
"""
Tests for engine.layers module.
"""
import time
from engine.legacy import layers
class TestRenderMessageOverlay:
"""Tests for render_message_overlay function."""
def test_no_message_returns_empty(self):
"""Returns empty list when msg is None."""
result, cache = layers.render_message_overlay(None, 80, 24, (None, None))
assert result == []
assert cache[0] is None
def test_message_returns_overlay_lines(self):
"""Returns non-empty list when message is present."""
msg = ("Test Title", "Test Body", time.monotonic())
result, cache = layers.render_message_overlay(msg, 80, 24, (None, None))
assert len(result) > 0
assert cache[0] is not None
def test_cache_key_changes_with_text(self):
"""Cache key changes when message text changes."""
msg1 = ("Title1", "Body1", time.monotonic())
msg2 = ("Title2", "Body2", time.monotonic())
_, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None))
_, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1)
assert cache1[0] != cache2[0]
def test_cache_reuse_avoids_recomputation(self):
"""Cache is returned when same message is passed (interface test)."""
msg = ("Same Title", "Same Body", time.monotonic())
result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None))
result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1)
assert len(result1) > 0
assert len(result2) > 0
assert cache1[0] == cache2[0]
class TestRenderFirehose:
"""Tests for render_firehose function."""
def test_no_firehose_returns_empty(self):
"""Returns empty list when firehose height is 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 0, 24)
assert result == []
def test_firehose_returns_lines(self):
"""Returns lines when firehose height > 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 4, 24)
assert len(result) == 4
def test_firehose_includes_ansi_escapes(self):
"""Returns lines containing ANSI escape sequences."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 1, 24)
assert "\033[" in result[0]
class TestApplyGlitch:
"""Tests for apply_glitch function."""
def test_empty_buffer_unchanged(self):
"""Empty buffer is returned unchanged."""
result = layers.apply_glitch([], 0, 0.0, 80)
assert result == []
def test_buffer_length_preserved(self):
"""Buffer length is preserved after glitch application."""
buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)]
result = layers.apply_glitch(buf, 0, 0.5, 80)
assert len(result) == len(buf)
class TestRenderTickerZone:
"""Tests for render_ticker_zone function - focusing on interface."""
def test_returns_list(self):
"""Returns a list of strings."""
result, cache = layers.render_ticker_zone(
[],
scroll_cam=0,
camera_x=0,
ticker_h=10,
w=80,
noise_cache={},
grad_offset=0.0,
)
assert isinstance(result, list)
def test_returns_dict_for_cache(self):
"""Returns a dict for the noise cache."""
result, cache = layers.render_ticker_zone(
[],
scroll_cam=0,
camera_x=0,
ticker_h=10,
w=80,
noise_cache={},
grad_offset=0.0,
)
assert isinstance(cache, dict)

View File

@@ -1,232 +0,0 @@
"""
Tests for engine.render module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.legacy.render import (
GRAD_COLS,
MSG_GRAD_COLS,
clear_font_cache,
font_for_lang,
lr_gradient,
lr_gradient_opposite,
make_block,
)
class TestGradientConstants:
"""Tests for gradient color constants."""
def test_grad_cols_defined(self):
"""GRAD_COLS is defined with expected length."""
assert len(GRAD_COLS) > 0
assert all(isinstance(c, str) for c in GRAD_COLS)
def test_msg_grad_cols_defined(self):
"""MSG_GRAD_COLS is defined with expected length."""
assert len(MSG_GRAD_COLS) > 0
assert all(isinstance(c, str) for c in MSG_GRAD_COLS)
def test_grad_cols_start_with_white(self):
"""GRAD_COLS starts with white."""
assert "231" in GRAD_COLS[0]
def test_msg_grad_cols_different_from_grad_cols(self):
"""MSG_GRAD_COLS is different from GRAD_COLS."""
assert MSG_GRAD_COLS != GRAD_COLS
class TestLrGradient:
"""Tests for lr_gradient function."""
def test_empty_rows(self):
"""Empty input returns empty output."""
result = lr_gradient([], 0.0)
assert result == []
def test_preserves_empty_rows(self):
"""Empty rows are preserved."""
result = lr_gradient([""], 0.0)
assert result == [""]
def test_adds_gradient_to_content(self):
"""Non-empty rows get gradient coloring."""
result = lr_gradient(["hello"], 0.0)
assert len(result) == 1
assert "\033[" in result[0]
def test_preserves_spaces(self):
"""Spaces are preserved without coloring."""
result = lr_gradient(["hello world"], 0.0)
assert " " in result[0]
def test_offset_wraps_around(self):
"""Offset wraps around at 1.0."""
result1 = lr_gradient(["hello"], 0.0)
result2 = lr_gradient(["hello"], 1.0)
assert result1 != result2 or result1 == result2
class TestLrGradientOpposite:
"""Tests for lr_gradient_opposite function."""
def test_uses_msg_grad_cols(self):
"""Uses MSG_GRAD_COLS instead of GRAD_COLS."""
result = lr_gradient_opposite(["test"])
assert "\033[" in result[0]
class TestClearFontCache:
"""Tests for clear_font_cache function."""
def test_clears_without_error(self):
"""Function runs without error."""
clear_font_cache()
class TestFontForLang:
"""Tests for font_for_lang function."""
@patch("engine.render.font")
def test_returns_default_for_none(self, mock_font):
"""Returns default font when lang is None."""
result = font_for_lang(None)
assert result is not None
@patch("engine.render.font")
def test_returns_default_for_unknown_lang(self, mock_font):
"""Returns default font for unknown language."""
result = font_for_lang("unknown_lang")
assert result is not None
class TestMakeBlock:
"""Tests for make_block function."""
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_basic(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Basic make_block returns content, color, meta index."""
mock_wrap.return_value = ["Headline content", ""]
mock_random.choice.return_value = "\033[38;5;46m"
content, color, meta_idx = make_block(
"Test headline", "TestSource", "12:00", 80
)
assert len(content) > 0
assert color is not None
assert meta_idx >= 0
@pytest.mark.skip(reason="Requires full PIL/font environment")
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_translation(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Translation is applied when mode is news."""
mock_wrap.return_value = ["Translated"]
mock_random.choice.return_value = "\033[38;5;46m"
mock_detect.return_value = "de"
with patch("engine.config.MODE", "news"):
content, _, _ = make_block("Test", "Source", "12:00", 80)
mock_translate.assert_called_once()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_no_translation_poetry(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""No translation when mode is poetry."""
mock_wrap.return_value = ["Poem content"]
mock_random.choice.return_value = "\033[38;5;46m"
with patch("engine.config.MODE", "poetry"):
make_block("Test", "Source", "12:00", 80)
mock_translate.assert_not_called()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_meta_format(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Meta line includes source and timestamp."""
mock_wrap.return_value = ["Content"]
mock_random.choice.return_value = "\033[38;5;46m"
content, _, meta_idx = make_block("Test", "MySource", "14:30", 80)
meta_line = content[meta_idx]
assert "MySource" in meta_line
assert "14:30" in meta_line
class TestRenderLine:
"""Tests for render_line function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.legacy.render import render_line
result = render_line("")
assert result == [""]
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_uses_default_font(self):
"""Uses default font when none provided."""
from engine.legacy.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
render_line("test")
def test_getbbox_returns_none(self):
"""Handles None bbox gracefully."""
from engine.legacy.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = None
result = render_line("test")
assert result == [""]
class TestBigWrap:
"""Tests for big_wrap function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.legacy.render import big_wrap
result = big_wrap("", 80)
assert result == []
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_single_word_fits(self):
"""Single short word returns rendered."""
from engine.legacy.render import big_wrap
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
result = big_wrap("test", 80)
assert len(result) > 0

View File

@@ -629,7 +629,7 @@ class TestStageAdapters:
PipelineContext() PipelineContext()
assert "camera" in stage.capabilities assert "camera" in stage.capabilities
assert "source.items" in stage.dependencies assert "source" in stage.dependencies # Prefix matches any source
class TestDataSourceStage: class TestDataSourceStage:

526
tests/test_pipeline_e2e.py Normal file
View File

@@ -0,0 +1,526 @@
"""
End-to-end pipeline integration tests.
Verifies that data actually flows through every pipeline stage
(source -> render -> effects -> display) using a queue-backed
stub display to capture output frames.
These tests catch dead-code paths and wiring bugs that unit tests miss.
"""
import queue
from unittest.mock import patch
from engine.data_sources.sources import ListDataSource, SourceItem
from engine.effects import EffectContext
from engine.effects.types import EffectPlugin
from engine.pipeline import Pipeline, PipelineConfig
from engine.pipeline.adapters import (
DataSourceStage,
DisplayStage,
EffectPluginStage,
FontStage,
SourceItemsToBufferStage,
)
from engine.pipeline.core import PipelineContext
from engine.pipeline.params import PipelineParams
# ─── FIXTURES ────────────────────────────────────────────
class QueueDisplay:
"""Stub display that captures every frame into a queue.
Acts as a FIFO sink so tests can inspect exactly what
the pipeline produced without any terminal or network I/O.
"""
def __init__(self):
self.frames: queue.Queue[list[str]] = queue.Queue()
self.width = 80
self.height = 24
self._init_called = False
def init(self, width: int, height: int, reuse: bool = False) -> None:
self.width = width
self.height = height
self._init_called = True
def show(self, buffer: list[str], border: bool = False) -> None:
# Deep copy to prevent later mutations
self.frames.put(list(buffer))
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass
def get_dimensions(self) -> tuple[int, int]:
return (self.width, self.height)
class MarkerEffect(EffectPlugin):
"""Effect that prepends a marker line to prove it ran.
Each MarkerEffect adds a unique tag so tests can verify
which effects executed and in what order.
"""
def __init__(self, tag: str = "MARKER"):
self._tag = tag
self.call_count = 0
super().__init__()
@property
def name(self) -> str:
return f"marker-{self._tag}"
def configure(self, config: dict) -> None:
pass
def process(self, buffer: list[str], ctx: EffectContext) -> list[str]:
self.call_count += 1
if buffer is None:
return [f"[{self._tag}:EMPTY]"]
return [f"[{self._tag}]"] + list(buffer)
# ─── HELPERS ─────────────────────────────────────────────
def _build_pipeline(
items: list,
effects: list[tuple[str, EffectPlugin]] | None = None,
use_font_stage: bool = False,
width: int = 80,
height: int = 24,
) -> tuple[Pipeline, QueueDisplay, PipelineContext]:
"""Build a fully-wired pipeline with a QueueDisplay sink.
Args:
items: Content items to feed into the source.
effects: Optional list of (name, EffectPlugin) to add.
use_font_stage: Use FontStage instead of SourceItemsToBufferStage.
width: Viewport width.
height: Viewport height.
Returns:
(pipeline, queue_display, context) tuple.
"""
display = QueueDisplay()
ctx = PipelineContext()
params = PipelineParams()
params.viewport_width = width
params.viewport_height = height
params.frame_number = 0
ctx.params = params
ctx.set("items", items)
pipeline = Pipeline(
config=PipelineConfig(enable_metrics=True),
context=ctx,
)
# Source stage
source = ListDataSource(items, name="test-source")
pipeline.add_stage("source", DataSourceStage(source, name="test-source"))
# Render stage
if use_font_stage:
pipeline.add_stage("render", FontStage(name="font"))
else:
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
# Effect stages
if effects:
for effect_name, effect_plugin in effects:
pipeline.add_stage(
f"effect_{effect_name}",
EffectPluginStage(effect_plugin, name=effect_name),
)
# Display stage
pipeline.add_stage("display", DisplayStage(display, name="queue"))
pipeline.build()
pipeline.initialize()
return pipeline, display, ctx
# ─── TESTS: HAPPY PATH ──────────────────────────────────
class TestPipelineE2EHappyPath:
"""End-to-end: data flows source -> render -> display."""
def test_items_reach_display(self):
"""Content items fed to source must appear in the display output."""
items = [
SourceItem(content="Hello World", source="test", timestamp="now"),
SourceItem(content="Second Item", source="test", timestamp="now"),
]
pipeline, display, ctx = _build_pipeline(items)
result = pipeline.execute(items)
assert result.success, f"Pipeline failed: {result.error}"
frame = display.frames.get(timeout=1)
text = "\n".join(frame)
assert "Hello World" in text
assert "Second Item" in text
def test_pipeline_output_is_list_of_strings(self):
"""Display must receive list[str], not raw SourceItems."""
items = [SourceItem(content="Line one", source="s", timestamp="t")]
pipeline, display, ctx = _build_pipeline(items)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
assert isinstance(frame, list)
for line in frame:
assert isinstance(line, str), f"Expected str, got {type(line)}: {line!r}"
def test_multiline_items_are_split(self):
"""Items with newlines should be split into individual buffer lines."""
items = [
SourceItem(content="Line A\nLine B\nLine C", source="s", timestamp="t")
]
pipeline, display, ctx = _build_pipeline(items)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
assert "Line A" in frame
assert "Line B" in frame
assert "Line C" in frame
def test_empty_source_produces_empty_buffer(self):
"""An empty source should produce an empty (or blank) frame."""
items = []
pipeline, display, ctx = _build_pipeline(items)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
assert isinstance(frame, list)
def test_multiple_frames_are_independent(self):
"""Each execute() call should produce a distinct frame."""
items = [SourceItem(content="frame-content", source="s", timestamp="t")]
pipeline, display, ctx = _build_pipeline(items)
pipeline.execute(items)
pipeline.execute(items)
f1 = display.frames.get(timeout=1)
f2 = display.frames.get(timeout=1)
assert f1 == f2 # Same input => same output
assert display.frames.empty() # Exactly 2 frames
# ─── TESTS: EFFECTS IN THE PIPELINE ─────────────────────
class TestPipelineE2EEffects:
"""End-to-end: effects process the buffer between render and display."""
def test_single_effect_modifies_output(self):
"""A single effect should visibly modify the output frame."""
items = [SourceItem(content="Original", source="s", timestamp="t")]
marker = MarkerEffect("FX1")
pipeline, display, ctx = _build_pipeline(items, effects=[("marker", marker)])
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
assert "[FX1]" in frame, f"Marker not found in frame: {frame}"
assert "Original" in "\n".join(frame)
def test_effect_chain_ordering(self):
"""Multiple effects execute in the order they were added."""
items = [SourceItem(content="data", source="s", timestamp="t")]
fx_a = MarkerEffect("A")
fx_b = MarkerEffect("B")
pipeline, display, ctx = _build_pipeline(
items, effects=[("alpha", fx_a), ("beta", fx_b)]
)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
text = "\n".join(frame)
# B runs after A, so B's marker is prepended last => appears first
idx_a = text.index("[A]")
idx_b = text.index("[B]")
assert idx_b < idx_a, f"Expected [B] before [A], got: {frame}"
def test_effect_receives_list_of_strings(self):
"""Effects must receive list[str] from the render stage."""
items = [SourceItem(content="check-type", source="s", timestamp="t")]
received_types = []
class TypeCheckEffect(EffectPlugin):
@property
def name(self):
return "typecheck"
def configure(self, config):
pass
def process(self, buffer, ctx):
received_types.append(type(buffer).__name__)
if isinstance(buffer, list):
for item in buffer:
received_types.append(type(item).__name__)
return buffer
pipeline, display, ctx = _build_pipeline(
items, effects=[("typecheck", TypeCheckEffect())]
)
pipeline.execute(items)
assert received_types[0] == "list", f"Buffer type: {received_types[0]}"
# All elements should be strings
for t in received_types[1:]:
assert t == "str", f"Buffer element type: {t}"
def test_disabled_effect_is_skipped(self):
"""A disabled effect should not process data."""
items = [SourceItem(content="data", source="s", timestamp="t")]
marker = MarkerEffect("DISABLED")
pipeline, display, ctx = _build_pipeline(
items, effects=[("disabled-fx", marker)]
)
# Disable the effect stage
stage = pipeline.get_stage("effect_disabled-fx")
stage.set_enabled(False)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
assert "[DISABLED]" not in frame, "Disabled effect should not run"
assert marker.call_count == 0
# ─── TESTS: STAGE EXECUTION ORDER & METRICS ─────────────
class TestPipelineE2EStageOrder:
"""Verify all stages execute and metrics are collected."""
def test_all_stages_appear_in_execution_order(self):
"""Pipeline build must include source, render, and display."""
items = [SourceItem(content="x", source="s", timestamp="t")]
pipeline, display, ctx = _build_pipeline(items)
order = pipeline.execution_order
assert "source" in order
assert "render" in order
assert "display" in order
def test_execution_order_is_source_render_display(self):
"""Source must come before render, render before display."""
items = [SourceItem(content="x", source="s", timestamp="t")]
pipeline, display, ctx = _build_pipeline(items)
order = pipeline.execution_order
assert order.index("source") < order.index("render")
assert order.index("render") < order.index("display")
def test_effects_between_render_and_display(self):
"""Effects must execute after render and before display."""
items = [SourceItem(content="x", source="s", timestamp="t")]
marker = MarkerEffect("MID")
pipeline, display, ctx = _build_pipeline(items, effects=[("mid", marker)])
order = pipeline.execution_order
render_idx = order.index("render")
display_idx = order.index("display")
effect_idx = order.index("effect_mid")
assert render_idx < effect_idx < display_idx
def test_metrics_collected_for_all_stages(self):
"""After execution, metrics should exist for every active stage."""
items = [SourceItem(content="x", source="s", timestamp="t")]
marker = MarkerEffect("M")
pipeline, display, ctx = _build_pipeline(items, effects=[("m", marker)])
pipeline.execute(items)
summary = pipeline.get_metrics_summary()
assert "stages" in summary
stage_names = set(summary["stages"].keys())
# All regular (non-overlay) stages should have metrics
assert "source" in stage_names
assert "render" in stage_names
assert "display" in stage_names
assert "effect_m" in stage_names
# ─── TESTS: FONT STAGE DATAFLOW ─────────────────────────
class TestFontStageDataflow:
"""Verify FontStage correctly renders content through make_block.
These tests expose the tuple-unpacking bug in FontStage.process()
where make_block returns (lines, color, meta_idx) but the code
does result.extend(block) instead of result.extend(block[0]).
"""
def test_font_stage_unpacks_make_block_correctly(self):
"""FontStage must produce list[str] output, not mixed types."""
items = [
SourceItem(content="Test Headline", source="test-src", timestamp="12345")
]
# Mock make_block to return its documented signature
mock_lines = [" RENDERED LINE 1", " RENDERED LINE 2", "", " meta info"]
mock_return = (mock_lines, "\033[38;5;46m", 3)
with patch("engine.render.make_block", return_value=mock_return):
pipeline, display, ctx = _build_pipeline(items, use_font_stage=True)
result = pipeline.execute(items)
assert result.success, f"Pipeline failed: {result.error}"
frame = display.frames.get(timeout=1)
# Every element in the frame must be a string
for i, line in enumerate(frame):
assert isinstance(line, str), (
f"Frame line {i} is {type(line).__name__}: {line!r} "
f"(FontStage likely extended with raw tuple)"
)
def test_font_stage_output_contains_rendered_content(self):
"""FontStage output should contain the rendered lines, not color codes."""
items = [SourceItem(content="My Headline", source="src", timestamp="0")]
mock_lines = [" BIG BLOCK TEXT", " MORE TEXT", "", " ░ src · 0"]
mock_return = (mock_lines, "\033[38;5;46m", 3)
with patch("engine.render.make_block", return_value=mock_return):
pipeline, display, ctx = _build_pipeline(items, use_font_stage=True)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
text = "\n".join(frame)
assert "BIG BLOCK TEXT" in text
assert "MORE TEXT" in text
def test_font_stage_does_not_leak_color_codes_as_lines(self):
"""The ANSI color code from make_block must NOT appear as a frame line."""
items = [SourceItem(content="Headline", source="s", timestamp="0")]
color_code = "\033[38;5;46m"
mock_return = ([" rendered"], color_code, 0)
with patch("engine.render.make_block", return_value=mock_return):
pipeline, display, ctx = _build_pipeline(items, use_font_stage=True)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
# The color code itself should not be a standalone line
assert color_code not in frame, (
f"Color code leaked as a frame line: {frame}"
)
# The meta_row_index (int) should not be a line either
for line in frame:
assert not isinstance(line, int), f"Integer leaked into frame: {line}"
def test_font_stage_handles_multiple_items(self):
"""FontStage should render each item through make_block."""
items = [
SourceItem(content="First", source="a", timestamp="1"),
SourceItem(content="Second", source="b", timestamp="2"),
]
call_count = 0
def mock_make_block(title, src, ts, w):
nonlocal call_count
call_count += 1
return ([f" [{title}]"], "\033[0m", 0)
with patch("engine.render.make_block", side_effect=mock_make_block):
pipeline, display, ctx = _build_pipeline(items, use_font_stage=True)
result = pipeline.execute(items)
assert result.success
assert call_count == 2, f"make_block called {call_count} times, expected 2"
frame = display.frames.get(timeout=1)
text = "\n".join(frame)
assert "[First]" in text
assert "[Second]" in text
# ─── TESTS: MIRROR OF app.py ASSEMBLY ───────────────────
class TestAppPipelineAssembly:
"""Verify the pipeline as assembled by app.py works end-to-end.
This mirrors how run_pipeline_mode() builds the pipeline but
without any network or terminal dependencies.
"""
def test_demo_preset_pipeline_produces_output(self):
"""Simulates the 'demo' preset pipeline with stub data."""
# Simulate what app.py does for the demo preset
items = [
("Breaking: Test passes", "UnitTest", "1234567890"),
("Update: Coverage improves", "CI", "1234567891"),
]
display = QueueDisplay()
ctx = PipelineContext()
params = PipelineParams()
params.viewport_width = 80
params.viewport_height = 24
params.frame_number = 0
ctx.params = params
ctx.set("items", items)
pipeline = Pipeline(
config=PipelineConfig(enable_metrics=True),
context=ctx,
)
# Mirror app.py: ListDataSource -> SourceItemsToBufferStage -> display
source = ListDataSource(items, name="headlines")
pipeline.add_stage("source", DataSourceStage(source, name="headlines"))
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
pipeline.add_stage("display", DisplayStage(display, name="queue"))
pipeline.build()
pipeline.initialize()
result = pipeline.execute(items)
assert result.success, f"Pipeline failed: {result.error}"
assert not display.frames.empty(), "Display received no frames"
frame = display.frames.get(timeout=1)
assert isinstance(frame, list)
assert len(frame) > 0
# All lines must be strings
for line in frame:
assert isinstance(line, str)