""" Stage adapters - Bridge existing components to the Stage interface. This module provides adapters that wrap existing components (EffectPlugin, Display, DataSource, Camera) as Stage implementations. """ from typing import Any from engine.pipeline.core import PipelineContext, Stage class EffectPluginStage(Stage): """Adapter wrapping EffectPlugin as a Stage.""" def __init__(self, effect_plugin, name: str = "effect"): self._effect = effect_plugin self.name = name self.category = "effect" self.optional = False @property def stage_type(self) -> str: """Return stage_type based on effect name. HUD effects are overlays. """ if self.name == "hud": return "overlay" return self.category @property def render_order(self) -> int: """Return render_order based on effect type. HUD effects have high render_order to appear on top. """ if self.name == "hud": return 100 # High order for overlays return 0 @property def is_overlay(self) -> bool: """Return True for HUD effects. HUD is an overlay - it composes on top of the buffer rather than transforming it for the next stage. """ return self.name == "hud" @property def capabilities(self) -> set[str]: return {f"effect.{self.name}"} @property def dependencies(self) -> set[str]: return set() @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.TEXT_BUFFER} @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.TEXT_BUFFER} def process(self, data: Any, ctx: PipelineContext) -> Any: """Process data through the effect.""" if data is None: return None from engine.effects.types import EffectContext, apply_param_bindings w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 frame = ctx.params.frame_number if ctx.params else 0 effect_ctx = EffectContext( terminal_width=w, terminal_height=h, scroll_cam=0, ticker_height=h, camera_x=0, mic_excess=0.0, grad_offset=(frame * 0.01) % 1.0, frame_number=frame, has_message=False, items=ctx.get("items", []), ) # Copy sensor state from PipelineContext to EffectContext for key, value in ctx.state.items(): if key.startswith("sensor."): effect_ctx.set_state(key, value) # Copy metrics from PipelineContext to EffectContext if "metrics" in ctx.state: effect_ctx.set_state("metrics", ctx.state["metrics"]) # Apply sensor param bindings if effect has them if hasattr(self._effect, "param_bindings") and self._effect.param_bindings: bound_config = apply_param_bindings(self._effect, effect_ctx) self._effect.configure(bound_config) return self._effect.process(data, effect_ctx) class DisplayStage(Stage): """Adapter wrapping Display as a Stage.""" def __init__(self, display, name: str = "terminal"): self._display = display self.name = name self.category = "display" self.optional = False @property def capabilities(self) -> set[str]: return {"display.output"} @property def dependencies(self) -> set[str]: return {"render.output"} # Display needs rendered content @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.TEXT_BUFFER} # Display consumes rendered text @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.NONE} # Display is a terminal stage (no output) def init(self, ctx: PipelineContext) -> bool: w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 result = self._display.init(w, h, reuse=False) return result is not False def process(self, data: Any, ctx: PipelineContext) -> Any: """Output data to display.""" if data is not None: self._display.show(data) return data def cleanup(self) -> None: self._display.cleanup() class DataSourceStage(Stage): """Adapter wrapping DataSource as a Stage.""" def __init__(self, data_source, name: str = "headlines"): self._source = data_source self.name = name self.category = "source" self.optional = False @property def capabilities(self) -> set[str]: return {f"source.{self.name}"} @property def dependencies(self) -> set[str]: return set() @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.NONE} # Sources don't take input @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.SOURCE_ITEMS} def process(self, data: Any, ctx: PipelineContext) -> Any: """Fetch data from source.""" if hasattr(self._source, "get_items"): return self._source.get_items() return data class PassthroughStage(Stage): """Simple stage that passes data through unchanged. Used for sources that already provide the data in the correct format (e.g., pipeline introspection that outputs text directly). """ def __init__(self, name: str = "passthrough"): self.name = name self.category = "render" self.optional = True @property def stage_type(self) -> str: return "render" @property def capabilities(self) -> set[str]: return {"render.output"} @property def dependencies(self) -> set[str]: return {"source"} @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.SOURCE_ITEMS} @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.SOURCE_ITEMS} def process(self, data: Any, ctx: PipelineContext) -> Any: """Pass data through unchanged.""" return data class SourceItemsToBufferStage(Stage): """Convert SourceItem objects to text buffer. Takes a list of SourceItem objects and extracts their content, splitting on newlines to create a proper text buffer for display. """ def __init__(self, name: str = "items-to-buffer"): self.name = name self.category = "render" self.optional = True @property def stage_type(self) -> str: return "render" @property def capabilities(self) -> set[str]: return {"render.output"} @property def dependencies(self) -> set[str]: return {"source"} @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.SOURCE_ITEMS} @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.TEXT_BUFFER} def process(self, data: Any, ctx: PipelineContext) -> Any: """Convert SourceItem list to text buffer.""" if data is None: return [] # If already a list of strings, return as-is if isinstance(data, list) and data and isinstance(data[0], str): return data # If it's a list of SourceItem, extract content from engine.data_sources import SourceItem if isinstance(data, list): result = [] for item in data: if isinstance(item, SourceItem): # Split content by newline to get individual lines lines = item.content.split("\n") result.extend(lines) elif hasattr(item, "content"): # Has content attribute lines = str(item.content).split("\n") result.extend(lines) else: result.append(str(item)) return result # Single item if isinstance(data, SourceItem): return data.content.split("\n") return [str(data)] class CameraStage(Stage): """Adapter wrapping Camera as a Stage.""" def __init__(self, camera, name: str = "vertical"): self._camera = camera self.name = name self.category = "camera" self.optional = True @property def capabilities(self) -> set[str]: return {"camera"} @property def dependencies(self) -> set[str]: return {"render.output"} # Depend on rendered output from font or render stage @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.TEXT_BUFFER} # Camera works on rendered text @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.TEXT_BUFFER} def process(self, data: Any, ctx: PipelineContext) -> Any: """Apply camera transformation to data.""" if data is None or (isinstance(data, list) and len(data) == 0): return data if hasattr(self._camera, "apply"): viewport_width = ctx.params.viewport_width if ctx.params else 80 viewport_height = ctx.params.viewport_height if ctx.params else 24 buffer_height = len(data) if isinstance(data, list) else 0 # Get global layout height for canvas (enables full scrolling range) total_layout_height = ctx.get("total_layout_height", buffer_height) # Preserve camera's configured canvas width, but ensure it's at least viewport_width # This allows horizontal/omni/floating/bounce cameras to scroll properly canvas_width = max( viewport_width, getattr(self._camera, "canvas_width", viewport_width) ) # Update camera's viewport dimensions so it knows its actual bounds if hasattr(self._camera, "viewport_width"): self._camera.viewport_width = viewport_width self._camera.viewport_height = viewport_height # Set canvas to full layout height so camera can scroll through all content self._camera.set_canvas_size(width=canvas_width, height=total_layout_height) # Update camera position (scroll) - uses global canvas for clamping if hasattr(self._camera, "update"): self._camera.update(1 / 60) # Store camera_y in context for ViewportFilterStage (global y position) ctx.set("camera_y", self._camera.y) # Apply camera viewport slicing to the partial buffer # The buffer starts at render_offset_y in global coordinates render_offset_y = ctx.get("render_offset_y", 0) # Temporarily shift camera to local buffer coordinates for apply() real_y = self._camera.y local_y = max(0, real_y - render_offset_y) # Temporarily shrink canvas to local buffer size so apply() works correctly self._camera.set_canvas_size(width=canvas_width, height=buffer_height) self._camera.y = local_y # Apply slicing result = self._camera.apply(data, viewport_width, viewport_height) # Restore global canvas and camera position for next frame self._camera.set_canvas_size(width=canvas_width, height=total_layout_height) self._camera.y = real_y return result return data def cleanup(self) -> None: if hasattr(self._camera, "reset"): self._camera.reset() class ViewportFilterStage(Stage): """Stage that limits items based on layout calculation. Computes cumulative y-offsets for all items using cheap height estimation, then returns only items that overlap the camera's viewport window. This prevents FontStage from rendering thousands of items when only a few are visible, while still allowing camera scrolling through all content. """ def __init__(self, name: str = "viewport-filter"): self.name = name self.category = "filter" self.optional = False self._cached_count = 0 self._layout: list[tuple[int, int]] = [] @property def stage_type(self) -> str: return "filter" @property def capabilities(self) -> set[str]: return {f"filter.{self.name}"} @property def dependencies(self) -> set[str]: return {"source"} @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.SOURCE_ITEMS} @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.SOURCE_ITEMS} def process(self, data: Any, ctx: PipelineContext) -> Any: """Filter items based on layout and camera position.""" if data is None or not isinstance(data, list): return data viewport_height = ctx.params.viewport_height if ctx.params else 24 viewport_width = ctx.params.viewport_width if ctx.params else 80 camera_y = ctx.get("camera_y", 0) # Recompute layout when item count OR viewport width changes cached_width = getattr(self, "_cached_width", None) if len(data) != self._cached_count or cached_width != viewport_width: self._layout = [] y = 0 from engine.render.blocks import estimate_block_height for item in data: if hasattr(item, "content"): title = item.content elif isinstance(item, tuple): title = str(item[0]) if item else "" else: title = str(item) h = estimate_block_height(title, viewport_width) self._layout.append((y, h)) y += h self._cached_count = len(data) self._cached_width = viewport_width # Find items visible in [camera_y - buffer, camera_y + viewport_height + buffer] buffer_zone = viewport_height vis_start = max(0, camera_y - buffer_zone) vis_end = camera_y + viewport_height + buffer_zone visible_items = [] render_offset_y = 0 first_visible_found = False for i, (start_y, height) in enumerate(self._layout): item_end = start_y + height if item_end > vis_start and start_y < vis_end: if not first_visible_found: render_offset_y = start_y first_visible_found = True visible_items.append(data[i]) # Compute total layout height for the canvas total_layout_height = 0 if self._layout: last_start, last_height = self._layout[-1] total_layout_height = last_start + last_height # Store metadata for CameraStage ctx.set("render_offset_y", render_offset_y) ctx.set("total_layout_height", total_layout_height) # Always return at least one item to avoid empty buffer errors return visible_items if visible_items else data[:1] class FontStage(Stage): """Stage that applies font rendering to content. FontStage is a Transform that takes raw content (text, headlines) and renders it to an ANSI-formatted buffer using the configured font. This decouples font rendering from data sources, allowing: - Different fonts per source - Runtime font swapping - Font as a pipeline stage Attributes: font_path: Path to font file (None = use config default) font_size: Font size in points (None = use config default) font_ref: Reference name for registered font ("default", "cjk", etc.) """ def __init__( self, font_path: str | None = None, font_size: int | None = None, font_ref: str | None = "default", name: str = "font", ): self.name = name self.category = "transform" self.optional = False self._font_path = font_path self._font_size = font_size self._font_ref = font_ref self._font = None self._render_cache: dict[tuple[str, str, str, int], list[str]] = {} @property def stage_type(self) -> str: return "transform" @property def capabilities(self) -> set[str]: return {f"transform.{self.name}", "render.output"} @property def dependencies(self) -> set[str]: return {"source"} @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.SOURCE_ITEMS} @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.TEXT_BUFFER} def init(self, ctx: PipelineContext) -> bool: """Initialize font from config or path.""" from engine import config if self._font_path: try: from PIL import ImageFont size = self._font_size or config.FONT_SZ self._font = ImageFont.truetype(self._font_path, size) except Exception: return False return True def process(self, data: Any, ctx: PipelineContext) -> Any: """Render content with font to buffer.""" if data is None: return None from engine.render import make_block w = ctx.params.viewport_width if ctx.params else 80 # If data is already a list of strings (buffer), return as-is if isinstance(data, list) and data and isinstance(data[0], str): return data # If data is a list of items, render each with font if isinstance(data, list): result = [] for item in data: # Handle SourceItem or tuple (title, source, timestamp) if hasattr(item, "content"): title = item.content src = getattr(item, "source", "unknown") ts = getattr(item, "timestamp", "0") elif isinstance(item, tuple): title = item[0] if len(item) > 0 else "" src = item[1] if len(item) > 1 else "unknown" ts = str(item[2]) if len(item) > 2 else "0" else: title = str(item) src = "unknown" ts = "0" # Check cache first cache_key = (title, src, ts, w) if cache_key in self._render_cache: result.extend(self._render_cache[cache_key]) continue try: block_lines, color_code, meta_idx = make_block(title, src, ts, w) self._render_cache[cache_key] = block_lines result.extend(block_lines) except Exception: result.append(title) return result return data class ImageToTextStage(Stage): """Transform that converts PIL Image to ASCII text buffer. Takes an ImageItem or PIL Image and converts it to a text buffer using ASCII character density mapping. The output can be displayed directly or further processed by effects. Attributes: width: Output width in characters height: Output height in characters charset: Character set for density mapping (default: simple ASCII) """ def __init__( self, width: int = 80, height: int = 24, charset: str = " .:-=+*#%@", name: str = "image-to-text", ): self.name = name self.category = "transform" self.optional = False self.width = width self.height = height self.charset = charset @property def stage_type(self) -> str: return "transform" @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.PIL_IMAGE} # Accepts PIL Image objects or ImageItem @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.TEXT_BUFFER} @property def capabilities(self) -> set[str]: return {f"transform.{self.name}", "render.output"} @property def dependencies(self) -> set[str]: return {"source"} def process(self, data: Any, ctx: PipelineContext) -> Any: """Convert PIL Image to text buffer.""" if data is None: return None from engine.data_sources.sources import ImageItem # Extract PIL Image from various input types pil_image = None if isinstance(data, ImageItem) or hasattr(data, "image"): pil_image = data.image else: # Assume it's already a PIL Image pil_image = data # Check if it's a PIL Image if not hasattr(pil_image, "resize"): # Not a PIL Image, return as-is return data if isinstance(data, list) else [str(data)] # Convert to grayscale and resize try: if pil_image.mode != "L": pil_image = pil_image.convert("L") except Exception: return ["[image conversion error]"] # Calculate cell aspect ratio correction (characters are taller than wide) aspect_ratio = 0.5 target_w = self.width target_h = int(self.height * aspect_ratio) # Resize image to target dimensions try: resized = pil_image.resize((target_w, target_h)) except Exception: return ["[image resize error]"] # Map pixels to characters result = [] pixels = list(resized.getdata()) for row in range(target_h): line = "" for col in range(target_w): idx = row * target_w + col if idx < len(pixels): brightness = pixels[idx] char_idx = int((brightness / 255) * (len(self.charset) - 1)) line += self.charset[char_idx] else: line += " " result.append(line) # Pad or trim to exact height while len(result) < self.height: result.append(" " * self.width) result = result[: self.height] # Pad lines to width result = [line.ljust(self.width) for line in result] return result def create_stage_from_display(display, name: str = "terminal") -> DisplayStage: """Create a Stage from a Display instance.""" return DisplayStage(display, name) def create_stage_from_effect(effect_plugin, name: str) -> EffectPluginStage: """Create a Stage from an EffectPlugin.""" return EffectPluginStage(effect_plugin, name) def create_stage_from_source(data_source, name: str = "headlines") -> DataSourceStage: """Create a Stage from a DataSource.""" return DataSourceStage(data_source, name) def create_stage_from_camera(camera, name: str = "vertical") -> CameraStage: """Create a Stage from a Camera.""" return CameraStage(camera, name) def create_stage_from_font( font_path: str | None = None, font_size: int | None = None, font_ref: str | None = "default", name: str = "font", ) -> FontStage: """Create a FontStage for rendering content with fonts.""" return FontStage( font_path=font_path, font_size=font_size, font_ref=font_ref, name=name ) class CanvasStage(Stage): """Stage that manages a Canvas for rendering. CanvasStage creates and manages a 2D canvas that can hold rendered content. Other stages can write to and read from the canvas via the pipeline context. This enables: - Pre-rendering content off-screen - Multiple cameras viewing different regions - Smooth scrolling (camera moves, content stays) - Layer compositing Usage: - Add CanvasStage to pipeline - Other stages access canvas via: ctx.get("canvas") """ def __init__( self, width: int = 80, height: int = 24, name: str = "canvas", ): self.name = name self.category = "system" self.optional = True self._width = width self._height = height self._canvas = None @property def stage_type(self) -> str: return "system" @property def capabilities(self) -> set[str]: return {"canvas"} @property def dependencies(self) -> set[str]: return set() @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.ANY} @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.ANY} def init(self, ctx: PipelineContext) -> bool: from engine.canvas import Canvas self._canvas = Canvas(width=self._width, height=self._height) ctx.set("canvas", self._canvas) return True def process(self, data: Any, ctx: PipelineContext) -> Any: """Pass through data but ensure canvas is in context.""" if self._canvas is None: from engine.canvas import Canvas self._canvas = Canvas(width=self._width, height=self._height) ctx.set("canvas", self._canvas) # Get dirty regions from canvas and expose via context # Effects can access via ctx.get_state("canvas.dirty_rows") if self._canvas.is_dirty(): dirty_rows = self._canvas.get_dirty_rows() ctx.set_state("canvas.dirty_rows", dirty_rows) ctx.set_state("canvas.dirty_regions", self._canvas.get_dirty_regions()) return data def get_canvas(self): """Get the canvas instance.""" return self._canvas def cleanup(self) -> None: self._canvas = None