forked from genewildish/Mainline
- Add ~20 gallery presets covering sources, effects, cameras, displays - Add MultiDisplay support with --display multi:terminal,pygame syntax - Fix ViewportFilterStage to recompute layout on viewport_width change - Add benchmark.py module for hook-based performance testing - Add viewport resize tests to test_viewport_filter_performance.py
843 lines
26 KiB
Python
843 lines
26 KiB
Python
"""
|
|
Stage adapters - Bridge existing components to the Stage interface.
|
|
|
|
This module provides adapters that wrap existing components
|
|
(EffectPlugin, Display, DataSource, Camera) as Stage implementations.
|
|
"""
|
|
|
|
from typing import Any
|
|
|
|
from engine.pipeline.core import PipelineContext, Stage
|
|
|
|
|
|
class EffectPluginStage(Stage):
|
|
"""Adapter wrapping EffectPlugin as a Stage."""
|
|
|
|
def __init__(self, effect_plugin, name: str = "effect"):
|
|
self._effect = effect_plugin
|
|
self.name = name
|
|
self.category = "effect"
|
|
self.optional = False
|
|
|
|
@property
|
|
def stage_type(self) -> str:
|
|
"""Return stage_type based on effect name.
|
|
|
|
HUD effects are overlays.
|
|
"""
|
|
if self.name == "hud":
|
|
return "overlay"
|
|
return self.category
|
|
|
|
@property
|
|
def render_order(self) -> int:
|
|
"""Return render_order based on effect type.
|
|
|
|
HUD effects have high render_order to appear on top.
|
|
"""
|
|
if self.name == "hud":
|
|
return 100 # High order for overlays
|
|
return 0
|
|
|
|
@property
|
|
def is_overlay(self) -> bool:
|
|
"""Return True for HUD effects.
|
|
|
|
HUD is an overlay - it composes on top of the buffer
|
|
rather than transforming it for the next stage.
|
|
"""
|
|
return self.name == "hud"
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {f"effect.{self.name}"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return set()
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.TEXT_BUFFER}
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.TEXT_BUFFER}
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Process data through the effect."""
|
|
if data is None:
|
|
return None
|
|
from engine.effects.types import EffectContext, apply_param_bindings
|
|
|
|
w = ctx.params.viewport_width if ctx.params else 80
|
|
h = ctx.params.viewport_height if ctx.params else 24
|
|
frame = ctx.params.frame_number if ctx.params else 0
|
|
|
|
effect_ctx = EffectContext(
|
|
terminal_width=w,
|
|
terminal_height=h,
|
|
scroll_cam=0,
|
|
ticker_height=h,
|
|
camera_x=0,
|
|
mic_excess=0.0,
|
|
grad_offset=(frame * 0.01) % 1.0,
|
|
frame_number=frame,
|
|
has_message=False,
|
|
items=ctx.get("items", []),
|
|
)
|
|
|
|
# Copy sensor state from PipelineContext to EffectContext
|
|
for key, value in ctx.state.items():
|
|
if key.startswith("sensor."):
|
|
effect_ctx.set_state(key, value)
|
|
|
|
# Copy metrics from PipelineContext to EffectContext
|
|
if "metrics" in ctx.state:
|
|
effect_ctx.set_state("metrics", ctx.state["metrics"])
|
|
|
|
# Apply sensor param bindings if effect has them
|
|
if hasattr(self._effect, "param_bindings") and self._effect.param_bindings:
|
|
bound_config = apply_param_bindings(self._effect, effect_ctx)
|
|
self._effect.configure(bound_config)
|
|
|
|
return self._effect.process(data, effect_ctx)
|
|
|
|
|
|
class DisplayStage(Stage):
|
|
"""Adapter wrapping Display as a Stage."""
|
|
|
|
def __init__(self, display, name: str = "terminal"):
|
|
self._display = display
|
|
self.name = name
|
|
self.category = "display"
|
|
self.optional = False
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {"display.output"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return {"render.output"} # Display needs rendered content
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.TEXT_BUFFER} # Display consumes rendered text
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.NONE} # Display is a terminal stage (no output)
|
|
|
|
def init(self, ctx: PipelineContext) -> bool:
|
|
w = ctx.params.viewport_width if ctx.params else 80
|
|
h = ctx.params.viewport_height if ctx.params else 24
|
|
result = self._display.init(w, h, reuse=False)
|
|
return result is not False
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Output data to display."""
|
|
if data is not None:
|
|
self._display.show(data)
|
|
return data
|
|
|
|
def cleanup(self) -> None:
|
|
self._display.cleanup()
|
|
|
|
|
|
class DataSourceStage(Stage):
|
|
"""Adapter wrapping DataSource as a Stage."""
|
|
|
|
def __init__(self, data_source, name: str = "headlines"):
|
|
self._source = data_source
|
|
self.name = name
|
|
self.category = "source"
|
|
self.optional = False
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {f"source.{self.name}"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return set()
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.NONE} # Sources don't take input
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.SOURCE_ITEMS}
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Fetch data from source."""
|
|
if hasattr(self._source, "get_items"):
|
|
return self._source.get_items()
|
|
return data
|
|
|
|
|
|
class PassthroughStage(Stage):
|
|
"""Simple stage that passes data through unchanged.
|
|
|
|
Used for sources that already provide the data in the correct format
|
|
(e.g., pipeline introspection that outputs text directly).
|
|
"""
|
|
|
|
def __init__(self, name: str = "passthrough"):
|
|
self.name = name
|
|
self.category = "render"
|
|
self.optional = True
|
|
|
|
@property
|
|
def stage_type(self) -> str:
|
|
return "render"
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {"render.output"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return {"source"}
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.SOURCE_ITEMS}
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.SOURCE_ITEMS}
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Pass data through unchanged."""
|
|
return data
|
|
|
|
|
|
class SourceItemsToBufferStage(Stage):
|
|
"""Convert SourceItem objects to text buffer.
|
|
|
|
Takes a list of SourceItem objects and extracts their content,
|
|
splitting on newlines to create a proper text buffer for display.
|
|
"""
|
|
|
|
def __init__(self, name: str = "items-to-buffer"):
|
|
self.name = name
|
|
self.category = "render"
|
|
self.optional = True
|
|
|
|
@property
|
|
def stage_type(self) -> str:
|
|
return "render"
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {"render.output"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return {"source"}
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.SOURCE_ITEMS}
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.TEXT_BUFFER}
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Convert SourceItem list to text buffer."""
|
|
if data is None:
|
|
return []
|
|
|
|
# If already a list of strings, return as-is
|
|
if isinstance(data, list) and data and isinstance(data[0], str):
|
|
return data
|
|
|
|
# If it's a list of SourceItem, extract content
|
|
from engine.data_sources import SourceItem
|
|
|
|
if isinstance(data, list):
|
|
result = []
|
|
for item in data:
|
|
if isinstance(item, SourceItem):
|
|
# Split content by newline to get individual lines
|
|
lines = item.content.split("\n")
|
|
result.extend(lines)
|
|
elif hasattr(item, "content"): # Has content attribute
|
|
lines = str(item.content).split("\n")
|
|
result.extend(lines)
|
|
else:
|
|
result.append(str(item))
|
|
return result
|
|
|
|
# Single item
|
|
if isinstance(data, SourceItem):
|
|
return data.content.split("\n")
|
|
|
|
return [str(data)]
|
|
|
|
|
|
class CameraStage(Stage):
|
|
"""Adapter wrapping Camera as a Stage."""
|
|
|
|
def __init__(self, camera, name: str = "vertical"):
|
|
self._camera = camera
|
|
self.name = name
|
|
self.category = "camera"
|
|
self.optional = True
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {"camera"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return {"render.output"} # Depend on rendered output from font or render stage
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.TEXT_BUFFER} # Camera works on rendered text
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.TEXT_BUFFER}
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Apply camera transformation to data."""
|
|
if data is None:
|
|
return None
|
|
if hasattr(self._camera, "apply"):
|
|
viewport_width = ctx.params.viewport_width if ctx.params else 80
|
|
viewport_height = ctx.params.viewport_height if ctx.params else 24
|
|
buffer_height = len(data) if isinstance(data, list) else 0
|
|
|
|
# Get global layout height for canvas (enables full scrolling range)
|
|
total_layout_height = ctx.get("total_layout_height", buffer_height)
|
|
|
|
# Preserve camera's configured canvas width, but ensure it's at least viewport_width
|
|
# This allows horizontal/omni/floating/bounce cameras to scroll properly
|
|
canvas_width = max(
|
|
viewport_width, getattr(self._camera, "canvas_width", viewport_width)
|
|
)
|
|
|
|
# Update camera's viewport dimensions so it knows its actual bounds
|
|
if hasattr(self._camera, "viewport_width"):
|
|
self._camera.viewport_width = viewport_width
|
|
self._camera.viewport_height = viewport_height
|
|
|
|
# Set canvas to full layout height so camera can scroll through all content
|
|
self._camera.set_canvas_size(width=canvas_width, height=total_layout_height)
|
|
|
|
# Update camera position (scroll) - uses global canvas for clamping
|
|
if hasattr(self._camera, "update"):
|
|
self._camera.update(1 / 60)
|
|
|
|
# Store camera_y in context for ViewportFilterStage (global y position)
|
|
ctx.set("camera_y", self._camera.y)
|
|
|
|
# Apply camera viewport slicing to the partial buffer
|
|
# The buffer starts at render_offset_y in global coordinates
|
|
render_offset_y = ctx.get("render_offset_y", 0)
|
|
|
|
# Temporarily shift camera to local buffer coordinates for apply()
|
|
real_y = self._camera.y
|
|
local_y = max(0, real_y - render_offset_y)
|
|
|
|
# Temporarily shrink canvas to local buffer size so apply() works correctly
|
|
self._camera.set_canvas_size(width=canvas_width, height=buffer_height)
|
|
self._camera.y = local_y
|
|
|
|
# Apply slicing
|
|
result = self._camera.apply(data, viewport_width, viewport_height)
|
|
|
|
# Restore global canvas and camera position for next frame
|
|
self._camera.set_canvas_size(width=canvas_width, height=total_layout_height)
|
|
self._camera.y = real_y
|
|
|
|
return result
|
|
return data
|
|
|
|
def cleanup(self) -> None:
|
|
if hasattr(self._camera, "reset"):
|
|
self._camera.reset()
|
|
|
|
|
|
class ViewportFilterStage(Stage):
|
|
"""Stage that limits items based on layout calculation.
|
|
|
|
Computes cumulative y-offsets for all items using cheap height estimation,
|
|
then returns only items that overlap the camera's viewport window.
|
|
This prevents FontStage from rendering thousands of items when only a few
|
|
are visible, while still allowing camera scrolling through all content.
|
|
"""
|
|
|
|
def __init__(self, name: str = "viewport-filter"):
|
|
self.name = name
|
|
self.category = "filter"
|
|
self.optional = False
|
|
self._cached_count = 0
|
|
self._layout: list[tuple[int, int]] = []
|
|
|
|
@property
|
|
def stage_type(self) -> str:
|
|
return "filter"
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {f"filter.{self.name}"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return {"source"}
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.SOURCE_ITEMS}
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.SOURCE_ITEMS}
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Filter items based on layout and camera position."""
|
|
if data is None or not isinstance(data, list):
|
|
return data
|
|
|
|
viewport_height = ctx.params.viewport_height if ctx.params else 24
|
|
viewport_width = ctx.params.viewport_width if ctx.params else 80
|
|
camera_y = ctx.get("camera_y", 0)
|
|
|
|
# Recompute layout when item count OR viewport width changes
|
|
cached_width = getattr(self, "_cached_width", None)
|
|
if len(data) != self._cached_count or cached_width != viewport_width:
|
|
self._layout = []
|
|
y = 0
|
|
from engine.render.blocks import estimate_block_height
|
|
|
|
for item in data:
|
|
if hasattr(item, "content"):
|
|
title = item.content
|
|
elif isinstance(item, tuple):
|
|
title = str(item[0]) if item else ""
|
|
else:
|
|
title = str(item)
|
|
h = estimate_block_height(title, viewport_width)
|
|
self._layout.append((y, h))
|
|
y += h
|
|
self._cached_count = len(data)
|
|
self._cached_width = viewport_width
|
|
|
|
# Find items visible in [camera_y - buffer, camera_y + viewport_height + buffer]
|
|
buffer_zone = viewport_height
|
|
vis_start = max(0, camera_y - buffer_zone)
|
|
vis_end = camera_y + viewport_height + buffer_zone
|
|
|
|
visible_items = []
|
|
render_offset_y = 0
|
|
first_visible_found = False
|
|
for i, (start_y, height) in enumerate(self._layout):
|
|
item_end = start_y + height
|
|
if item_end > vis_start and start_y < vis_end:
|
|
if not first_visible_found:
|
|
render_offset_y = start_y
|
|
first_visible_found = True
|
|
visible_items.append(data[i])
|
|
|
|
# Compute total layout height for the canvas
|
|
total_layout_height = 0
|
|
if self._layout:
|
|
last_start, last_height = self._layout[-1]
|
|
total_layout_height = last_start + last_height
|
|
|
|
# Store metadata for CameraStage
|
|
ctx.set("render_offset_y", render_offset_y)
|
|
ctx.set("total_layout_height", total_layout_height)
|
|
|
|
# Always return at least one item to avoid empty buffer errors
|
|
return visible_items if visible_items else data[:1]
|
|
|
|
|
|
class FontStage(Stage):
|
|
"""Stage that applies font rendering to content.
|
|
|
|
FontStage is a Transform that takes raw content (text, headlines)
|
|
and renders it to an ANSI-formatted buffer using the configured font.
|
|
|
|
This decouples font rendering from data sources, allowing:
|
|
- Different fonts per source
|
|
- Runtime font swapping
|
|
- Font as a pipeline stage
|
|
|
|
Attributes:
|
|
font_path: Path to font file (None = use config default)
|
|
font_size: Font size in points (None = use config default)
|
|
font_ref: Reference name for registered font ("default", "cjk", etc.)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
font_path: str | None = None,
|
|
font_size: int | None = None,
|
|
font_ref: str | None = "default",
|
|
name: str = "font",
|
|
):
|
|
self.name = name
|
|
self.category = "transform"
|
|
self.optional = False
|
|
self._font_path = font_path
|
|
self._font_size = font_size
|
|
self._font_ref = font_ref
|
|
self._font = None
|
|
self._render_cache: dict[tuple[str, str, int], list[str]] = {}
|
|
|
|
@property
|
|
def stage_type(self) -> str:
|
|
return "transform"
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {f"transform.{self.name}", "render.output"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return {"source"}
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.SOURCE_ITEMS}
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.TEXT_BUFFER}
|
|
|
|
def init(self, ctx: PipelineContext) -> bool:
|
|
"""Initialize font from config or path."""
|
|
from engine import config
|
|
|
|
if self._font_path:
|
|
try:
|
|
from PIL import ImageFont
|
|
|
|
size = self._font_size or config.FONT_SZ
|
|
self._font = ImageFont.truetype(self._font_path, size)
|
|
except Exception:
|
|
return False
|
|
return True
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Render content with font to buffer."""
|
|
if data is None:
|
|
return None
|
|
|
|
from engine.render import make_block
|
|
|
|
w = ctx.params.viewport_width if ctx.params else 80
|
|
|
|
# If data is already a list of strings (buffer), return as-is
|
|
if isinstance(data, list) and data and isinstance(data[0], str):
|
|
return data
|
|
|
|
# If data is a list of items, render each with font
|
|
if isinstance(data, list):
|
|
result = []
|
|
for item in data:
|
|
# Handle SourceItem or tuple (title, source, timestamp)
|
|
if hasattr(item, "content"):
|
|
title = item.content
|
|
src = getattr(item, "source", "unknown")
|
|
ts = getattr(item, "timestamp", "0")
|
|
elif isinstance(item, tuple):
|
|
title = item[0] if len(item) > 0 else ""
|
|
src = item[1] if len(item) > 1 else "unknown"
|
|
ts = str(item[2]) if len(item) > 2 else "0"
|
|
else:
|
|
title = str(item)
|
|
src = "unknown"
|
|
ts = "0"
|
|
|
|
# Check cache first
|
|
cache_key = (title, src, ts, w)
|
|
if cache_key in self._render_cache:
|
|
result.extend(self._render_cache[cache_key])
|
|
continue
|
|
|
|
try:
|
|
block_lines, color_code, meta_idx = make_block(title, src, ts, w)
|
|
self._render_cache[cache_key] = block_lines
|
|
result.extend(block_lines)
|
|
except Exception:
|
|
result.append(title)
|
|
|
|
return result
|
|
|
|
return data
|
|
|
|
|
|
class ImageToTextStage(Stage):
|
|
"""Transform that converts PIL Image to ASCII text buffer.
|
|
|
|
Takes an ImageItem or PIL Image and converts it to a text buffer
|
|
using ASCII character density mapping. The output can be displayed
|
|
directly or further processed by effects.
|
|
|
|
Attributes:
|
|
width: Output width in characters
|
|
height: Output height in characters
|
|
charset: Character set for density mapping (default: simple ASCII)
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
width: int = 80,
|
|
height: int = 24,
|
|
charset: str = " .:-=+*#%@",
|
|
name: str = "image-to-text",
|
|
):
|
|
self.name = name
|
|
self.category = "transform"
|
|
self.optional = False
|
|
self.width = width
|
|
self.height = height
|
|
self.charset = charset
|
|
|
|
@property
|
|
def stage_type(self) -> str:
|
|
return "transform"
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.PIL_IMAGE} # Accepts PIL Image objects or ImageItem
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.TEXT_BUFFER}
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {f"transform.{self.name}", "render.output"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return {"source"}
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Convert PIL Image to text buffer."""
|
|
if data is None:
|
|
return None
|
|
|
|
from engine.data_sources.sources import ImageItem
|
|
|
|
# Extract PIL Image from various input types
|
|
pil_image = None
|
|
|
|
if isinstance(data, ImageItem) or hasattr(data, "image"):
|
|
pil_image = data.image
|
|
else:
|
|
# Assume it's already a PIL Image
|
|
pil_image = data
|
|
|
|
# Check if it's a PIL Image
|
|
if not hasattr(pil_image, "resize"):
|
|
# Not a PIL Image, return as-is
|
|
return data if isinstance(data, list) else [str(data)]
|
|
|
|
# Convert to grayscale and resize
|
|
try:
|
|
if pil_image.mode != "L":
|
|
pil_image = pil_image.convert("L")
|
|
except Exception:
|
|
return ["[image conversion error]"]
|
|
|
|
# Calculate cell aspect ratio correction (characters are taller than wide)
|
|
aspect_ratio = 0.5
|
|
target_w = self.width
|
|
target_h = int(self.height * aspect_ratio)
|
|
|
|
# Resize image to target dimensions
|
|
try:
|
|
resized = pil_image.resize((target_w, target_h))
|
|
except Exception:
|
|
return ["[image resize error]"]
|
|
|
|
# Map pixels to characters
|
|
result = []
|
|
pixels = list(resized.getdata())
|
|
|
|
for row in range(target_h):
|
|
line = ""
|
|
for col in range(target_w):
|
|
idx = row * target_w + col
|
|
if idx < len(pixels):
|
|
brightness = pixels[idx]
|
|
char_idx = int((brightness / 255) * (len(self.charset) - 1))
|
|
line += self.charset[char_idx]
|
|
else:
|
|
line += " "
|
|
result.append(line)
|
|
|
|
# Pad or trim to exact height
|
|
while len(result) < self.height:
|
|
result.append(" " * self.width)
|
|
result = result[: self.height]
|
|
|
|
# Pad lines to width
|
|
result = [line.ljust(self.width) for line in result]
|
|
|
|
return result
|
|
|
|
|
|
def create_stage_from_display(display, name: str = "terminal") -> DisplayStage:
|
|
"""Create a Stage from a Display instance."""
|
|
return DisplayStage(display, name)
|
|
|
|
|
|
def create_stage_from_effect(effect_plugin, name: str) -> EffectPluginStage:
|
|
"""Create a Stage from an EffectPlugin."""
|
|
return EffectPluginStage(effect_plugin, name)
|
|
|
|
|
|
def create_stage_from_source(data_source, name: str = "headlines") -> DataSourceStage:
|
|
"""Create a Stage from a DataSource."""
|
|
return DataSourceStage(data_source, name)
|
|
|
|
|
|
def create_stage_from_camera(camera, name: str = "vertical") -> CameraStage:
|
|
"""Create a Stage from a Camera."""
|
|
return CameraStage(camera, name)
|
|
|
|
|
|
def create_stage_from_font(
|
|
font_path: str | None = None,
|
|
font_size: int | None = None,
|
|
font_ref: str | None = "default",
|
|
name: str = "font",
|
|
) -> FontStage:
|
|
"""Create a FontStage for rendering content with fonts."""
|
|
return FontStage(
|
|
font_path=font_path, font_size=font_size, font_ref=font_ref, name=name
|
|
)
|
|
|
|
|
|
class CanvasStage(Stage):
|
|
"""Stage that manages a Canvas for rendering.
|
|
|
|
CanvasStage creates and manages a 2D canvas that can hold rendered content.
|
|
Other stages can write to and read from the canvas via the pipeline context.
|
|
|
|
This enables:
|
|
- Pre-rendering content off-screen
|
|
- Multiple cameras viewing different regions
|
|
- Smooth scrolling (camera moves, content stays)
|
|
- Layer compositing
|
|
|
|
Usage:
|
|
- Add CanvasStage to pipeline
|
|
- Other stages access canvas via: ctx.get("canvas")
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
width: int = 80,
|
|
height: int = 24,
|
|
name: str = "canvas",
|
|
):
|
|
self.name = name
|
|
self.category = "system"
|
|
self.optional = True
|
|
self._width = width
|
|
self._height = height
|
|
self._canvas = None
|
|
|
|
@property
|
|
def stage_type(self) -> str:
|
|
return "system"
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {"canvas"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return set()
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.ANY}
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline.core import DataType
|
|
|
|
return {DataType.ANY}
|
|
|
|
def init(self, ctx: PipelineContext) -> bool:
|
|
from engine.canvas import Canvas
|
|
|
|
self._canvas = Canvas(width=self._width, height=self._height)
|
|
ctx.set("canvas", self._canvas)
|
|
return True
|
|
|
|
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
|
"""Pass through data but ensure canvas is in context."""
|
|
if self._canvas is None:
|
|
from engine.canvas import Canvas
|
|
|
|
self._canvas = Canvas(width=self._width, height=self._height)
|
|
ctx.set("canvas", self._canvas)
|
|
|
|
# Get dirty regions from canvas and expose via context
|
|
# Effects can access via ctx.get_state("canvas.dirty_rows")
|
|
if self._canvas.is_dirty():
|
|
dirty_rows = self._canvas.get_dirty_rows()
|
|
ctx.set_state("canvas.dirty_rows", dirty_rows)
|
|
ctx.set_state("canvas.dirty_regions", self._canvas.get_dirty_regions())
|
|
|
|
return data
|
|
|
|
def get_canvas(self):
|
|
"""Get the canvas instance."""
|
|
return self._canvas
|
|
|
|
def cleanup(self) -> None:
|
|
self._canvas = None
|