""" Stage adapters - Bridge existing components to the Stage interface. This module provides adapters that wrap existing components (EffectPlugin, Display, DataSource, Camera) as Stage implementations. """ from typing import Any from engine.pipeline.core import PipelineContext, Stage class EffectPluginStage(Stage): """Adapter wrapping EffectPlugin as a Stage.""" def __init__(self, effect_plugin, name: str = "effect"): self._effect = effect_plugin self.name = name self.category = "effect" self.optional = False @property def stage_type(self) -> str: """Return stage_type based on effect name. HUD effects are overlays. """ if self.name == "hud": return "overlay" return self.category @property def render_order(self) -> int: """Return render_order based on effect type. HUD effects have high render_order to appear on top. """ if self.name == "hud": return 100 # High order for overlays return 0 @property def is_overlay(self) -> bool: """Return True for HUD effects. HUD is an overlay - it composes on top of the buffer rather than transforming it for the next stage. """ return self.name == "hud" @property def capabilities(self) -> set[str]: return {f"effect.{self.name}"} @property def dependencies(self) -> set[str]: return set() def process(self, data: Any, ctx: PipelineContext) -> Any: """Process data through the effect.""" if data is None: return None from engine.effects.types import EffectContext, apply_param_bindings w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 frame = ctx.params.frame_number if ctx.params else 0 effect_ctx = EffectContext( terminal_width=w, terminal_height=h, scroll_cam=0, ticker_height=h, camera_x=0, mic_excess=0.0, grad_offset=(frame * 0.01) % 1.0, frame_number=frame, has_message=False, items=ctx.get("items", []), ) # Copy sensor state from PipelineContext to EffectContext for key, value in ctx.state.items(): if key.startswith("sensor."): effect_ctx.set_state(key, value) # Copy metrics from PipelineContext to EffectContext if "metrics" in ctx.state: effect_ctx.set_state("metrics", ctx.state["metrics"]) # Apply sensor param bindings if effect has them if hasattr(self._effect, "param_bindings") and self._effect.param_bindings: bound_config = apply_param_bindings(self._effect, effect_ctx) self._effect.configure(bound_config) return self._effect.process(data, effect_ctx) class DisplayStage(Stage): """Adapter wrapping Display as a Stage.""" def __init__(self, display, name: str = "terminal"): self._display = display self.name = name self.category = "display" self.optional = False @property def capabilities(self) -> set[str]: return {"display.output"} @property def dependencies(self) -> set[str]: return set() def init(self, ctx: PipelineContext) -> bool: w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 result = self._display.init(w, h, reuse=False) return result is not False def process(self, data: Any, ctx: PipelineContext) -> Any: """Output data to display.""" if data is not None: self._display.show(data) return data def cleanup(self) -> None: self._display.cleanup() class DataSourceStage(Stage): """Adapter wrapping DataSource as a Stage.""" def __init__(self, data_source, name: str = "headlines"): self._source = data_source self.name = name self.category = "source" self.optional = False @property def capabilities(self) -> set[str]: return {f"source.{self.name}"} @property def dependencies(self) -> set[str]: return set() def process(self, data: Any, ctx: PipelineContext) -> Any: """Fetch data from source.""" if hasattr(self._source, "get_items"): return self._source.get_items() return data class PassthroughStage(Stage): """Simple stage that passes data through unchanged. Used for sources that already provide the data in the correct format (e.g., pipeline introspection that outputs text directly). """ def __init__(self, name: str = "passthrough"): self.name = name self.category = "render" self.optional = True @property def stage_type(self) -> str: return "render" @property def capabilities(self) -> set[str]: return {"render.output"} @property def dependencies(self) -> set[str]: return {"source"} def process(self, data: Any, ctx: PipelineContext) -> Any: """Pass data through unchanged.""" return data class SourceItemsToBufferStage(Stage): """Convert SourceItem objects to text buffer. Takes a list of SourceItem objects and extracts their content, splitting on newlines to create a proper text buffer for display. """ def __init__(self, name: str = "items-to-buffer"): self.name = name self.category = "render" self.optional = True @property def stage_type(self) -> str: return "render" @property def capabilities(self) -> set[str]: return {"render.output"} @property def dependencies(self) -> set[str]: return {"source"} def process(self, data: Any, ctx: PipelineContext) -> Any: """Convert SourceItem list to text buffer.""" if data is None: return [] # If already a list of strings, return as-is if isinstance(data, list) and data and isinstance(data[0], str): return data # If it's a list of SourceItem, extract content from engine.data_sources import SourceItem if isinstance(data, list): result = [] for item in data: if isinstance(item, SourceItem): # Split content by newline to get individual lines lines = item.content.split("\n") result.extend(lines) elif hasattr(item, "content"): # Has content attribute lines = str(item.content).split("\n") result.extend(lines) else: result.append(str(item)) return result # Single item if isinstance(data, SourceItem): return data.content.split("\n") return [str(data)] class CameraStage(Stage): """Adapter wrapping Camera as a Stage.""" def __init__(self, camera, name: str = "vertical"): self._camera = camera self.name = name self.category = "camera" self.optional = True @property def capabilities(self) -> set[str]: return {"camera"} @property def dependencies(self) -> set[str]: return {"source.items"} def process(self, data: Any, ctx: PipelineContext) -> Any: """Apply camera transformation to data.""" if data is None: return None if hasattr(self._camera, "apply"): return self._camera.apply( data, ctx.params.viewport_width if ctx.params else 80 ) return data def cleanup(self) -> None: if hasattr(self._camera, "reset"): self._camera.reset() class FontStage(Stage): """Stage that applies font rendering to content. FontStage is a Transform that takes raw content (text, headlines) and renders it to an ANSI-formatted buffer using the configured font. This decouples font rendering from data sources, allowing: - Different fonts per source - Runtime font swapping - Font as a pipeline stage Attributes: font_path: Path to font file (None = use config default) font_size: Font size in points (None = use config default) font_ref: Reference name for registered font ("default", "cjk", etc.) """ def __init__( self, font_path: str | None = None, font_size: int | None = None, font_ref: str | None = "default", name: str = "font", ): self.name = name self.category = "transform" self.optional = False self._font_path = font_path self._font_size = font_size self._font_ref = font_ref self._font = None @property def stage_type(self) -> str: return "transform" @property def capabilities(self) -> set[str]: return {f"transform.{self.name}", "render.output"} @property def dependencies(self) -> set[str]: return {"source"} def init(self, ctx: PipelineContext) -> bool: """Initialize font from config or path.""" from engine import config if self._font_path: try: from PIL import ImageFont size = self._font_size or config.FONT_SZ self._font = ImageFont.truetype(self._font_path, size) except Exception: return False return True def process(self, data: Any, ctx: PipelineContext) -> Any: """Render content with font to buffer.""" if data is None: return None from engine.legacy.render import make_block w = ctx.params.viewport_width if ctx.params else 80 # If data is already a list of strings (buffer), return as-is if isinstance(data, list) and data and isinstance(data[0], str): return data # If data is a list of items, render each with font if isinstance(data, list): result = [] for item in data: # Handle SourceItem or tuple (title, source, timestamp) if hasattr(item, "content"): title = item.content src = getattr(item, "source", "unknown") ts = getattr(item, "timestamp", "0") elif isinstance(item, tuple): title = item[0] if len(item) > 0 else "" src = item[1] if len(item) > 1 else "unknown" ts = str(item[2]) if len(item) > 2 else "0" else: title = str(item) src = "unknown" ts = "0" try: block = make_block(title, src, ts, w) result.extend(block) except Exception: result.append(title) return result return data class ImageToTextStage(Stage): """Transform that converts PIL Image to ASCII text buffer. Takes an ImageItem or PIL Image and converts it to a text buffer using ASCII character density mapping. The output can be displayed directly or further processed by effects. Attributes: width: Output width in characters height: Output height in characters charset: Character set for density mapping (default: simple ASCII) """ def __init__( self, width: int = 80, height: int = 24, charset: str = " .:-=+*#%@", name: str = "image-to-text", ): self.name = name self.category = "transform" self.optional = False self.width = width self.height = height self.charset = charset @property def stage_type(self) -> str: return "transform" @property def capabilities(self) -> set[str]: from engine.pipeline.core import DataType return {f"transform.{self.name}", DataType.TEXT_BUFFER} @property def dependencies(self) -> set[str]: return {"source"} def process(self, data: Any, ctx: PipelineContext) -> Any: """Convert PIL Image to text buffer.""" if data is None: return None from engine.data_sources.sources import ImageItem # Extract PIL Image from various input types pil_image = None if isinstance(data, ImageItem) or hasattr(data, "image"): pil_image = data.image else: # Assume it's already a PIL Image pil_image = data # Check if it's a PIL Image if not hasattr(pil_image, "resize"): # Not a PIL Image, return as-is return data if isinstance(data, list) else [str(data)] # Convert to grayscale and resize try: if pil_image.mode != "L": pil_image = pil_image.convert("L") except Exception: return ["[image conversion error]"] # Calculate cell aspect ratio correction (characters are taller than wide) aspect_ratio = 0.5 target_w = self.width target_h = int(self.height * aspect_ratio) # Resize image to target dimensions try: resized = pil_image.resize((target_w, target_h)) except Exception: return ["[image resize error]"] # Map pixels to characters result = [] pixels = list(resized.getdata()) for row in range(target_h): line = "" for col in range(target_w): idx = row * target_w + col if idx < len(pixels): brightness = pixels[idx] char_idx = int((brightness / 255) * (len(self.charset) - 1)) line += self.charset[char_idx] else: line += " " result.append(line) # Pad or trim to exact height while len(result) < self.height: result.append(" " * self.width) result = result[: self.height] # Pad lines to width result = [line.ljust(self.width) for line in result] return result def create_stage_from_display(display, name: str = "terminal") -> DisplayStage: """Create a Stage from a Display instance.""" return DisplayStage(display, name) def create_stage_from_effect(effect_plugin, name: str) -> EffectPluginStage: """Create a Stage from an EffectPlugin.""" return EffectPluginStage(effect_plugin, name) def create_stage_from_source(data_source, name: str = "headlines") -> DataSourceStage: """Create a Stage from a DataSource.""" return DataSourceStage(data_source, name) def create_stage_from_camera(camera, name: str = "vertical") -> CameraStage: """Create a Stage from a Camera.""" return CameraStage(camera, name) def create_stage_from_font( font_path: str | None = None, font_size: int | None = None, font_ref: str | None = "default", name: str = "font", ) -> FontStage: """Create a FontStage for rendering content with fonts.""" return FontStage( font_path=font_path, font_size=font_size, font_ref=font_ref, name=name ) class CanvasStage(Stage): """Stage that manages a Canvas for rendering. CanvasStage creates and manages a 2D canvas that can hold rendered content. Other stages can write to and read from the canvas via the pipeline context. This enables: - Pre-rendering content off-screen - Multiple cameras viewing different regions - Smooth scrolling (camera moves, content stays) - Layer compositing Usage: - Add CanvasStage to pipeline - Other stages access canvas via: ctx.get("canvas") """ def __init__( self, width: int = 80, height: int = 24, name: str = "canvas", ): self.name = name self.category = "system" self.optional = True self._width = width self._height = height self._canvas = None @property def stage_type(self) -> str: return "system" @property def capabilities(self) -> set[str]: return {"canvas"} @property def dependencies(self) -> set[str]: return set() @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.ANY} @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.ANY} def init(self, ctx: PipelineContext) -> bool: from engine.canvas import Canvas self._canvas = Canvas(width=self._width, height=self._height) ctx.set("canvas", self._canvas) return True def process(self, data: Any, ctx: PipelineContext) -> Any: """Pass through data but ensure canvas is in context.""" if self._canvas is None: from engine.canvas import Canvas self._canvas = Canvas(width=self._width, height=self._height) ctx.set("canvas", self._canvas) # Get dirty regions from canvas and expose via context # Effects can access via ctx.get_state("canvas.dirty_rows") if self._canvas.is_dirty(): dirty_rows = self._canvas.get_dirty_rows() ctx.set_state("canvas.dirty_rows", dirty_rows) ctx.set_state("canvas.dirty_regions", self._canvas.get_dirty_regions()) return data def get_canvas(self): """Get the canvas instance.""" return self._canvas def cleanup(self) -> None: self._canvas = None