Files
Mainline/engine/pipeline/adapters.py
David Gwilliam 4c97cfe6aa fix: Implement ViewportFilterStage to prevent FontStage performance regression with large datasets
## Summary

Fixed critical performance issue where demo/poetry presets would hang for 10+ seconds due to FontStage rendering all 1438+ headline items instead of just the visible ~5 items.

## Changes

### Core Fix: ViewportFilterStage
- New pipeline stage that filters items to only those fitting in the viewport
- Reduces 1438 items → ~5 items (288x reduction) before FontStage
- Prevents expensive PIL font rendering operations on items that won't be displayed
- Located: engine/pipeline/adapters.py:348-403

### Pipeline Integration
- Updated app.py to add ViewportFilterStage before FontStage for headlines/poetry sources
- Ensures correct data flow: source → viewport_filter → font → camera → effects → display
- ViewportFilterStage depends on 'source' capability, providing pass-through filtering

### Display Protocol Enhancement
- Added is_quit_requested() and clear_quit_request() method signatures to Display protocol
- Documented as optional methods for backends supporting keyboard input
- Already implemented by pygame backend, now formally part of protocol

### Debug Infrastructure
- Added MAINLINE_DEBUG_DATAFLOW environment variable logging throughout pipeline
- Logs stage input/output types and data sizes to stderr (when flag enabled)
- Verified working: 1438 → 5 item reduction shown in debug output

### Performance Testing
- Added pytest-benchmark (v5.2.3) as dev dependency for statistical benchmarking
- Created comprehensive performance regression tests (tests/test_performance_regression.py)
- Tests verify:
  - ViewportFilterStage filters 2000 items efficiently (<1ms)
  - FontStage processes filtered items quickly (<50ms)
  - 288x performance improvement ratio maintained
  - Pipeline doesn't hang with large datasets
- All 523 tests passing, including 7 new performance tests

## Performance Impact

**Before:** FontStage renders all 1438 items per frame → 10+ second hang
**After:** FontStage renders ~5 items per frame → sub-second execution

Real-world impact: Demo preset now responsive and usable with news sources.

## Testing

- Unit tests: 523 passed, 16 skipped
- Regression tests: Catch performance degradation with large datasets
- E2E verification: Debug logging confirms correct pipeline flow
- Benchmark suite: Statistical performance tracking enabled
2026-03-16 22:43:53 -07:00

751 lines
22 KiB
Python

"""
Stage adapters - Bridge existing components to the Stage interface.
This module provides adapters that wrap existing components
(EffectPlugin, Display, DataSource, Camera) as Stage implementations.
"""
from typing import Any
from engine.pipeline.core import PipelineContext, Stage
class EffectPluginStage(Stage):
"""Adapter wrapping EffectPlugin as a Stage."""
def __init__(self, effect_plugin, name: str = "effect"):
self._effect = effect_plugin
self.name = name
self.category = "effect"
self.optional = False
@property
def stage_type(self) -> str:
"""Return stage_type based on effect name.
HUD effects are overlays.
"""
if self.name == "hud":
return "overlay"
return self.category
@property
def render_order(self) -> int:
"""Return render_order based on effect type.
HUD effects have high render_order to appear on top.
"""
if self.name == "hud":
return 100 # High order for overlays
return 0
@property
def is_overlay(self) -> bool:
"""Return True for HUD effects.
HUD is an overlay - it composes on top of the buffer
rather than transforming it for the next stage.
"""
return self.name == "hud"
@property
def capabilities(self) -> set[str]:
return {f"effect.{self.name}"}
@property
def dependencies(self) -> set[str]:
return set()
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.TEXT_BUFFER}
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.TEXT_BUFFER}
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Process data through the effect."""
if data is None:
return None
from engine.effects.types import EffectContext, apply_param_bindings
w = ctx.params.viewport_width if ctx.params else 80
h = ctx.params.viewport_height if ctx.params else 24
frame = ctx.params.frame_number if ctx.params else 0
effect_ctx = EffectContext(
terminal_width=w,
terminal_height=h,
scroll_cam=0,
ticker_height=h,
camera_x=0,
mic_excess=0.0,
grad_offset=(frame * 0.01) % 1.0,
frame_number=frame,
has_message=False,
items=ctx.get("items", []),
)
# Copy sensor state from PipelineContext to EffectContext
for key, value in ctx.state.items():
if key.startswith("sensor."):
effect_ctx.set_state(key, value)
# Copy metrics from PipelineContext to EffectContext
if "metrics" in ctx.state:
effect_ctx.set_state("metrics", ctx.state["metrics"])
# Apply sensor param bindings if effect has them
if hasattr(self._effect, "param_bindings") and self._effect.param_bindings:
bound_config = apply_param_bindings(self._effect, effect_ctx)
self._effect.configure(bound_config)
return self._effect.process(data, effect_ctx)
class DisplayStage(Stage):
"""Adapter wrapping Display as a Stage."""
def __init__(self, display, name: str = "terminal"):
self._display = display
self.name = name
self.category = "display"
self.optional = False
@property
def capabilities(self) -> set[str]:
return {"display.output"}
@property
def dependencies(self) -> set[str]:
return {"render.output"} # Display needs rendered content
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.TEXT_BUFFER} # Display consumes rendered text
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.NONE} # Display is a terminal stage (no output)
def init(self, ctx: PipelineContext) -> bool:
w = ctx.params.viewport_width if ctx.params else 80
h = ctx.params.viewport_height if ctx.params else 24
result = self._display.init(w, h, reuse=False)
return result is not False
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Output data to display."""
if data is not None:
self._display.show(data)
return data
def cleanup(self) -> None:
self._display.cleanup()
class DataSourceStage(Stage):
"""Adapter wrapping DataSource as a Stage."""
def __init__(self, data_source, name: str = "headlines"):
self._source = data_source
self.name = name
self.category = "source"
self.optional = False
@property
def capabilities(self) -> set[str]:
return {f"source.{self.name}"}
@property
def dependencies(self) -> set[str]:
return set()
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.NONE} # Sources don't take input
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.SOURCE_ITEMS}
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Fetch data from source."""
if hasattr(self._source, "get_items"):
return self._source.get_items()
return data
class PassthroughStage(Stage):
"""Simple stage that passes data through unchanged.
Used for sources that already provide the data in the correct format
(e.g., pipeline introspection that outputs text directly).
"""
def __init__(self, name: str = "passthrough"):
self.name = name
self.category = "render"
self.optional = True
@property
def stage_type(self) -> str:
return "render"
@property
def capabilities(self) -> set[str]:
return {"render.output"}
@property
def dependencies(self) -> set[str]:
return {"source"}
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.SOURCE_ITEMS}
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.SOURCE_ITEMS}
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Pass data through unchanged."""
return data
class SourceItemsToBufferStage(Stage):
"""Convert SourceItem objects to text buffer.
Takes a list of SourceItem objects and extracts their content,
splitting on newlines to create a proper text buffer for display.
"""
def __init__(self, name: str = "items-to-buffer"):
self.name = name
self.category = "render"
self.optional = True
@property
def stage_type(self) -> str:
return "render"
@property
def capabilities(self) -> set[str]:
return {"render.output"}
@property
def dependencies(self) -> set[str]:
return {"source"}
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.SOURCE_ITEMS}
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.TEXT_BUFFER}
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Convert SourceItem list to text buffer."""
if data is None:
return []
# If already a list of strings, return as-is
if isinstance(data, list) and data and isinstance(data[0], str):
return data
# If it's a list of SourceItem, extract content
from engine.data_sources import SourceItem
if isinstance(data, list):
result = []
for item in data:
if isinstance(item, SourceItem):
# Split content by newline to get individual lines
lines = item.content.split("\n")
result.extend(lines)
elif hasattr(item, "content"): # Has content attribute
lines = str(item.content).split("\n")
result.extend(lines)
else:
result.append(str(item))
return result
# Single item
if isinstance(data, SourceItem):
return data.content.split("\n")
return [str(data)]
class CameraStage(Stage):
"""Adapter wrapping Camera as a Stage."""
def __init__(self, camera, name: str = "vertical"):
self._camera = camera
self.name = name
self.category = "camera"
self.optional = True
@property
def capabilities(self) -> set[str]:
return {"camera"}
@property
def dependencies(self) -> set[str]:
return {
"source"
} # Prefix match any source (source.headlines, source.poetry, etc.)
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.TEXT_BUFFER} # Camera works on rendered text
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.TEXT_BUFFER}
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Apply camera transformation to data."""
if data is None:
return None
if hasattr(self._camera, "apply"):
return self._camera.apply(
data, ctx.params.viewport_width if ctx.params else 80
)
return data
def cleanup(self) -> None:
if hasattr(self._camera, "reset"):
self._camera.reset()
class ViewportFilterStage(Stage):
"""Stage that limits items to fit in viewport.
Filters the input list of items to only include as many as can fit
in the visible viewport. This prevents FontStage from rendering
thousands of items when only a few are visible, reducing processing time.
Estimate: each rendered item typically takes 5-8 terminal lines.
For a 24-line viewport, we limit to ~4 items (conservative estimate).
"""
def __init__(self, name: str = "viewport-filter"):
self.name = name
self.category = "filter"
self.optional = False
@property
def stage_type(self) -> str:
return "filter"
@property
def capabilities(self) -> set[str]:
return {f"filter.{self.name}"}
@property
def dependencies(self) -> set[str]:
return {"source"}
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.SOURCE_ITEMS}
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.SOURCE_ITEMS}
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Filter items to viewport-fitting count."""
if data is None or not isinstance(data, list):
return data
# Estimate: each rendered headline takes 5-8 lines
# Use a conservative factor to ensure we don't run out of space
lines_per_item = 6
viewport_height = ctx.params.viewport_height if ctx.params else 24
# Calculate how many items we need to fill the viewport
# Add 1 extra to account for padding/spacing
max_items = max(1, viewport_height // lines_per_item + 1)
# Return only the items that fit in the viewport
return data[:max_items]
class FontStage(Stage):
"""Stage that applies font rendering to content.
FontStage is a Transform that takes raw content (text, headlines)
and renders it to an ANSI-formatted buffer using the configured font.
This decouples font rendering from data sources, allowing:
- Different fonts per source
- Runtime font swapping
- Font as a pipeline stage
Attributes:
font_path: Path to font file (None = use config default)
font_size: Font size in points (None = use config default)
font_ref: Reference name for registered font ("default", "cjk", etc.)
"""
def __init__(
self,
font_path: str | None = None,
font_size: int | None = None,
font_ref: str | None = "default",
name: str = "font",
):
self.name = name
self.category = "transform"
self.optional = False
self._font_path = font_path
self._font_size = font_size
self._font_ref = font_ref
self._font = None
@property
def stage_type(self) -> str:
return "transform"
@property
def capabilities(self) -> set[str]:
return {f"transform.{self.name}", "render.output"}
@property
def dependencies(self) -> set[str]:
return {"source"}
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.SOURCE_ITEMS}
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.TEXT_BUFFER}
def init(self, ctx: PipelineContext) -> bool:
"""Initialize font from config or path."""
from engine import config
if self._font_path:
try:
from PIL import ImageFont
size = self._font_size or config.FONT_SZ
self._font = ImageFont.truetype(self._font_path, size)
except Exception:
return False
return True
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Render content with font to buffer."""
if data is None:
return None
from engine.render import make_block
w = ctx.params.viewport_width if ctx.params else 80
# If data is already a list of strings (buffer), return as-is
if isinstance(data, list) and data and isinstance(data[0], str):
return data
# If data is a list of items, render each with font
if isinstance(data, list):
result = []
for item in data:
# Handle SourceItem or tuple (title, source, timestamp)
if hasattr(item, "content"):
title = item.content
src = getattr(item, "source", "unknown")
ts = getattr(item, "timestamp", "0")
elif isinstance(item, tuple):
title = item[0] if len(item) > 0 else ""
src = item[1] if len(item) > 1 else "unknown"
ts = str(item[2]) if len(item) > 2 else "0"
else:
title = str(item)
src = "unknown"
ts = "0"
try:
block_lines, color_code, meta_idx = make_block(title, src, ts, w)
result.extend(block_lines)
except Exception:
result.append(title)
return result
return data
class ImageToTextStage(Stage):
"""Transform that converts PIL Image to ASCII text buffer.
Takes an ImageItem or PIL Image and converts it to a text buffer
using ASCII character density mapping. The output can be displayed
directly or further processed by effects.
Attributes:
width: Output width in characters
height: Output height in characters
charset: Character set for density mapping (default: simple ASCII)
"""
def __init__(
self,
width: int = 80,
height: int = 24,
charset: str = " .:-=+*#%@",
name: str = "image-to-text",
):
self.name = name
self.category = "transform"
self.optional = False
self.width = width
self.height = height
self.charset = charset
@property
def stage_type(self) -> str:
return "transform"
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.PIL_IMAGE} # Accepts PIL Image objects or ImageItem
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.TEXT_BUFFER}
@property
def capabilities(self) -> set[str]:
return {f"transform.{self.name}", "render.output"}
@property
def dependencies(self) -> set[str]:
return {"source"}
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Convert PIL Image to text buffer."""
if data is None:
return None
from engine.data_sources.sources import ImageItem
# Extract PIL Image from various input types
pil_image = None
if isinstance(data, ImageItem) or hasattr(data, "image"):
pil_image = data.image
else:
# Assume it's already a PIL Image
pil_image = data
# Check if it's a PIL Image
if not hasattr(pil_image, "resize"):
# Not a PIL Image, return as-is
return data if isinstance(data, list) else [str(data)]
# Convert to grayscale and resize
try:
if pil_image.mode != "L":
pil_image = pil_image.convert("L")
except Exception:
return ["[image conversion error]"]
# Calculate cell aspect ratio correction (characters are taller than wide)
aspect_ratio = 0.5
target_w = self.width
target_h = int(self.height * aspect_ratio)
# Resize image to target dimensions
try:
resized = pil_image.resize((target_w, target_h))
except Exception:
return ["[image resize error]"]
# Map pixels to characters
result = []
pixels = list(resized.getdata())
for row in range(target_h):
line = ""
for col in range(target_w):
idx = row * target_w + col
if idx < len(pixels):
brightness = pixels[idx]
char_idx = int((brightness / 255) * (len(self.charset) - 1))
line += self.charset[char_idx]
else:
line += " "
result.append(line)
# Pad or trim to exact height
while len(result) < self.height:
result.append(" " * self.width)
result = result[: self.height]
# Pad lines to width
result = [line.ljust(self.width) for line in result]
return result
def create_stage_from_display(display, name: str = "terminal") -> DisplayStage:
"""Create a Stage from a Display instance."""
return DisplayStage(display, name)
def create_stage_from_effect(effect_plugin, name: str) -> EffectPluginStage:
"""Create a Stage from an EffectPlugin."""
return EffectPluginStage(effect_plugin, name)
def create_stage_from_source(data_source, name: str = "headlines") -> DataSourceStage:
"""Create a Stage from a DataSource."""
return DataSourceStage(data_source, name)
def create_stage_from_camera(camera, name: str = "vertical") -> CameraStage:
"""Create a Stage from a Camera."""
return CameraStage(camera, name)
def create_stage_from_font(
font_path: str | None = None,
font_size: int | None = None,
font_ref: str | None = "default",
name: str = "font",
) -> FontStage:
"""Create a FontStage for rendering content with fonts."""
return FontStage(
font_path=font_path, font_size=font_size, font_ref=font_ref, name=name
)
class CanvasStage(Stage):
"""Stage that manages a Canvas for rendering.
CanvasStage creates and manages a 2D canvas that can hold rendered content.
Other stages can write to and read from the canvas via the pipeline context.
This enables:
- Pre-rendering content off-screen
- Multiple cameras viewing different regions
- Smooth scrolling (camera moves, content stays)
- Layer compositing
Usage:
- Add CanvasStage to pipeline
- Other stages access canvas via: ctx.get("canvas")
"""
def __init__(
self,
width: int = 80,
height: int = 24,
name: str = "canvas",
):
self.name = name
self.category = "system"
self.optional = True
self._width = width
self._height = height
self._canvas = None
@property
def stage_type(self) -> str:
return "system"
@property
def capabilities(self) -> set[str]:
return {"canvas"}
@property
def dependencies(self) -> set[str]:
return set()
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.ANY}
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.ANY}
def init(self, ctx: PipelineContext) -> bool:
from engine.canvas import Canvas
self._canvas = Canvas(width=self._width, height=self._height)
ctx.set("canvas", self._canvas)
return True
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Pass through data but ensure canvas is in context."""
if self._canvas is None:
from engine.canvas import Canvas
self._canvas = Canvas(width=self._width, height=self._height)
ctx.set("canvas", self._canvas)
# Get dirty regions from canvas and expose via context
# Effects can access via ctx.get_state("canvas.dirty_rows")
if self._canvas.is_dirty():
dirty_rows = self._canvas.get_dirty_rows()
ctx.set_state("canvas.dirty_rows", dirty_rows)
ctx.set_state("canvas.dirty_regions", self._canvas.get_dirty_regions())
return data
def get_canvas(self):
"""Get the canvas instance."""
return self._canvas
def cleanup(self) -> None:
self._canvas = None