""" Stage adapters - Bridge existing components to the Stage interface. This module provides adapters that wrap existing components (EffectPlugin, Display, DataSource, Camera) as Stage implementations. """ import random from typing import Any from engine.pipeline.core import PipelineContext, Stage class RenderStage(Stage): """Stage that renders items to a text buffer for display. This mimics the old demo's render pipeline: - Selects headlines and renders them to blocks - Applies camera scroll position - Adds firehose layer if enabled """ def __init__( self, items: list, width: int = 80, height: int = 24, camera_speed: float = 1.0, camera_mode: str = "vertical", firehose_enabled: bool = False, name: str = "render", ): self.name = name self.category = "render" self.optional = False self._items = items self._width = width self._height = height self._camera_speed = camera_speed self._camera_mode = camera_mode self._firehose_enabled = firehose_enabled self._camera_y = 0.0 self._camera_x = 0 self._scroll_accum = 0.0 self._ticker_next_y = 0 self._active: list = [] self._seen: set = set() self._pool: list = list(items) self._noise_cache: dict = {} self._frame_count = 0 @property def capabilities(self) -> set[str]: return {"render.output"} @property def dependencies(self) -> set[str]: return {"source"} def init(self, ctx: PipelineContext) -> bool: random.shuffle(self._pool) return True def process(self, data: Any, ctx: PipelineContext) -> Any: """Render items to a text buffer.""" from engine.effects import next_headline from engine.layers import render_firehose, render_ticker_zone from engine.render import make_block items = data or self._items w = ctx.params.viewport_width if ctx.params else self._width h = ctx.params.viewport_height if ctx.params else self._height camera_speed = ctx.params.camera_speed if ctx.params else self._camera_speed firehose = ctx.params.firehose_enabled if ctx.params else self._firehose_enabled scroll_step = 0.5 / (camera_speed * 10) self._scroll_accum += scroll_step GAP = 3 while self._scroll_accum >= scroll_step: self._scroll_accum -= scroll_step self._camera_y += 1.0 while ( self._ticker_next_y < int(self._camera_y) + h + 10 and len(self._active) < 50 ): t, src, ts = next_headline(self._pool, items, self._seen) ticker_content, hc, midx = make_block(t, src, ts, w) self._active.append((ticker_content, hc, self._ticker_next_y, midx)) self._ticker_next_y += len(ticker_content) + GAP self._active = [ (c, hc, by, mi) for c, hc, by, mi in self._active if by + len(c) > int(self._camera_y) ] for k in list(self._noise_cache): if k < int(self._camera_y): del self._noise_cache[k] grad_offset = (self._frame_count * 0.01) % 1.0 buf, self._noise_cache = render_ticker_zone( self._active, scroll_cam=int(self._camera_y), camera_x=self._camera_x, ticker_h=h, w=w, noise_cache=self._noise_cache, grad_offset=grad_offset, ) if firehose: firehose_buf = render_firehose(items, w, 0, h) buf.extend(firehose_buf) self._frame_count += 1 return buf class EffectPluginStage(Stage): """Adapter wrapping EffectPlugin as a Stage.""" def __init__(self, effect_plugin, name: str = "effect"): self._effect = effect_plugin self.name = name self.category = "effect" self.optional = False @property def stage_type(self) -> str: """Return stage_type based on effect name. HUD effects are overlays. """ if self.name == "hud": return "overlay" return self.category @property def render_order(self) -> int: """Return render_order based on effect type. HUD effects have high render_order to appear on top. """ if self.name == "hud": return 100 # High order for overlays return 0 @property def is_overlay(self) -> bool: """Return True for HUD effects. HUD is an overlay - it composes on top of the buffer rather than transforming it for the next stage. """ return self.name == "hud" @property def capabilities(self) -> set[str]: return {f"effect.{self.name}"} @property def dependencies(self) -> set[str]: return set() def process(self, data: Any, ctx: PipelineContext) -> Any: """Process data through the effect.""" if data is None: return None from engine.effects.types import EffectContext, apply_param_bindings w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 frame = ctx.params.frame_number if ctx.params else 0 effect_ctx = EffectContext( terminal_width=w, terminal_height=h, scroll_cam=0, ticker_height=h, camera_x=0, mic_excess=0.0, grad_offset=(frame * 0.01) % 1.0, frame_number=frame, has_message=False, items=ctx.get("items", []), ) # Copy sensor state from PipelineContext to EffectContext for key, value in ctx.state.items(): if key.startswith("sensor."): effect_ctx.set_state(key, value) # Copy metrics from PipelineContext to EffectContext if "metrics" in ctx.state: effect_ctx.set_state("metrics", ctx.state["metrics"]) # Apply sensor param bindings if effect has them if hasattr(self._effect, "param_bindings") and self._effect.param_bindings: bound_config = apply_param_bindings(self._effect, effect_ctx) self._effect.configure(bound_config) return self._effect.process(data, effect_ctx) class DisplayStage(Stage): """Adapter wrapping Display as a Stage.""" def __init__(self, display, name: str = "terminal"): self._display = display self.name = name self.category = "display" self.optional = False @property def capabilities(self) -> set[str]: return {"display.output"} @property def dependencies(self) -> set[str]: return set() def init(self, ctx: PipelineContext) -> bool: w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 result = self._display.init(w, h, reuse=False) return result is not False def process(self, data: Any, ctx: PipelineContext) -> Any: """Output data to display.""" if data is not None: self._display.show(data) return data def cleanup(self) -> None: self._display.cleanup() class DataSourceStage(Stage): """Adapter wrapping DataSource as a Stage.""" def __init__(self, data_source, name: str = "headlines"): self._source = data_source self.name = name self.category = "source" self.optional = False @property def capabilities(self) -> set[str]: return {f"source.{self.name}"} @property def dependencies(self) -> set[str]: return set() def process(self, data: Any, ctx: PipelineContext) -> Any: """Fetch data from source.""" if hasattr(self._source, "get_items"): return self._source.get_items() return data class ItemsStage(Stage): """Stage that holds pre-fetched items and provides them to the pipeline. .. deprecated:: Use DataSourceStage with a proper DataSource instead. ItemsStage is a legacy bootstrap mechanism. """ def __init__(self, items, name: str = "headlines"): import warnings warnings.warn( "ItemsStage is deprecated. Use DataSourceStage with a DataSource instead.", DeprecationWarning, stacklevel=2, ) self._items = items self.name = name self.category = "source" self.optional = False @property def capabilities(self) -> set[str]: return {f"source.{self.name}"} @property def dependencies(self) -> set[str]: return set() def process(self, data: Any, ctx: PipelineContext) -> Any: """Return the pre-fetched items.""" return self._items class CameraStage(Stage): """Adapter wrapping Camera as a Stage.""" def __init__(self, camera, name: str = "vertical"): self._camera = camera self.name = name self.category = "camera" self.optional = True @property def capabilities(self) -> set[str]: return {"camera"} @property def dependencies(self) -> set[str]: return {"source.items"} def process(self, data: Any, ctx: PipelineContext) -> Any: """Apply camera transformation to data.""" if data is None: return None if hasattr(self._camera, "apply"): return self._camera.apply( data, ctx.params.viewport_width if ctx.params else 80 ) return data def cleanup(self) -> None: if hasattr(self._camera, "reset"): self._camera.reset() class FontStage(Stage): """Stage that applies font rendering to content. FontStage is a Transform that takes raw content (text, headlines) and renders it to an ANSI-formatted buffer using the configured font. This decouples font rendering from data sources, allowing: - Different fonts per source - Runtime font swapping - Font as a pipeline stage Attributes: font_path: Path to font file (None = use config default) font_size: Font size in points (None = use config default) font_ref: Reference name for registered font ("default", "cjk", etc.) """ def __init__( self, font_path: str | None = None, font_size: int | None = None, font_ref: str | None = "default", name: str = "font", ): self.name = name self.category = "transform" self.optional = False self._font_path = font_path self._font_size = font_size self._font_ref = font_ref self._font = None @property def stage_type(self) -> str: return "transform" @property def capabilities(self) -> set[str]: return {f"transform.{self.name}", "render.output"} @property def dependencies(self) -> set[str]: return {"source"} def init(self, ctx: PipelineContext) -> bool: """Initialize font from config or path.""" from engine import config if self._font_path: try: from PIL import ImageFont size = self._font_size or config.FONT_SZ self._font = ImageFont.truetype(self._font_path, size) except Exception: return False return True def process(self, data: Any, ctx: PipelineContext) -> Any: """Render content with font to buffer.""" if data is None: return None from engine.render import make_block w = ctx.params.viewport_width if ctx.params else 80 # If data is already a list of strings (buffer), return as-is if isinstance(data, list) and data and isinstance(data[0], str): return data # If data is a list of items, render each with font if isinstance(data, list): result = [] for item in data: # Handle SourceItem or tuple (title, source, timestamp) if hasattr(item, "content"): title = item.content src = getattr(item, "source", "unknown") ts = getattr(item, "timestamp", "0") elif isinstance(item, tuple): title = item[0] if len(item) > 0 else "" src = item[1] if len(item) > 1 else "unknown" ts = str(item[2]) if len(item) > 2 else "0" else: title = str(item) src = "unknown" ts = "0" try: block = make_block(title, src, ts, w) result.extend(block) except Exception: result.append(title) return result return data def create_stage_from_display(display, name: str = "terminal") -> DisplayStage: """Create a Stage from a Display instance.""" return DisplayStage(display, name) def create_stage_from_effect(effect_plugin, name: str) -> EffectPluginStage: """Create a Stage from an EffectPlugin.""" return EffectPluginStage(effect_plugin, name) def create_stage_from_source(data_source, name: str = "headlines") -> DataSourceStage: """Create a Stage from a DataSource.""" return DataSourceStage(data_source, name) def create_stage_from_camera(camera, name: str = "vertical") -> CameraStage: """Create a Stage from a Camera.""" return CameraStage(camera, name) def create_stage_from_font( font_path: str | None = None, font_size: int | None = None, font_ref: str | None = "default", name: str = "font", ) -> FontStage: """Create a FontStage for rendering content with fonts.""" return FontStage( font_path=font_path, font_size=font_size, font_ref=font_ref, name=name ) class CanvasStage(Stage): """Stage that manages a Canvas for rendering. CanvasStage creates and manages a 2D canvas that can hold rendered content. Other stages can write to and read from the canvas via the pipeline context. This enables: - Pre-rendering content off-screen - Multiple cameras viewing different regions - Smooth scrolling (camera moves, content stays) - Layer compositing Usage: - Add CanvasStage to pipeline - Other stages access canvas via: ctx.get("canvas") """ def __init__( self, width: int = 80, height: int = 24, name: str = "canvas", ): self.name = name self.category = "system" self.optional = True self._width = width self._height = height self._canvas = None @property def stage_type(self) -> str: return "system" @property def capabilities(self) -> set[str]: return {"canvas"} @property def dependencies(self) -> set[str]: return set() @property def inlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.ANY} @property def outlet_types(self) -> set: from engine.pipeline.core import DataType return {DataType.ANY} def init(self, ctx: PipelineContext) -> bool: from engine.canvas import Canvas self._canvas = Canvas(width=self._width, height=self._height) ctx.set("canvas", self._canvas) return True def process(self, data: Any, ctx: PipelineContext) -> Any: """Pass through data but ensure canvas is in context.""" if self._canvas is None: from engine.canvas import Canvas self._canvas = Canvas(width=self._width, height=self._height) ctx.set("canvas", self._canvas) return data def get_canvas(self): """Get the canvas instance.""" return self._canvas def cleanup(self) -> None: self._canvas = None def create_items_stage(items, name: str = "headlines") -> ItemsStage: """Create a Stage that holds pre-fetched items.""" return ItemsStage(items, name)