""" Stage adapters - Bridge existing components to the Stage interface. This module provides adapters that wrap existing components (EffectPlugin, Display, DataSource, Camera) as Stage implementations. """ import random from typing import Any from engine.pipeline.core import PipelineContext, Stage class RenderStage(Stage): """Stage that renders items to a text buffer for display. This mimics the old demo's render pipeline: - Selects headlines and renders them to blocks - Applies camera scroll position - Adds firehose layer if enabled """ def __init__( self, items: list, width: int = 80, height: int = 24, camera_speed: float = 1.0, camera_mode: str = "vertical", firehose_enabled: bool = False, name: str = "render", ): self.name = name self.category = "render" self.optional = False self._items = items self._width = width self._height = height self._camera_speed = camera_speed self._camera_mode = camera_mode self._firehose_enabled = firehose_enabled self._camera_y = 0.0 self._camera_x = 0 self._scroll_accum = 0.0 self._ticker_next_y = 0 self._active: list = [] self._seen: set = set() self._pool: list = list(items) self._noise_cache: dict = {} self._frame_count = 0 @property def capabilities(self) -> set[str]: return {"render.output"} @property def dependencies(self) -> set[str]: return {"source.items"} def init(self, ctx: PipelineContext) -> bool: random.shuffle(self._pool) return True def process(self, data: Any, ctx: PipelineContext) -> Any: """Render items to a text buffer.""" from engine.effects import next_headline from engine.layers import render_firehose, render_ticker_zone from engine.render import make_block items = data or self._items w = ctx.params.viewport_width if ctx.params else self._width h = ctx.params.viewport_height if ctx.params else self._height camera_speed = ctx.params.camera_speed if ctx.params else self._camera_speed firehose = ctx.params.firehose_enabled if ctx.params else self._firehose_enabled scroll_step = 0.5 / (camera_speed * 10) self._scroll_accum += scroll_step GAP = 3 while self._scroll_accum >= scroll_step: self._scroll_accum -= scroll_step self._camera_y += 1.0 while ( self._ticker_next_y < int(self._camera_y) + h + 10 and len(self._active) < 50 ): t, src, ts = next_headline(self._pool, items, self._seen) ticker_content, hc, midx = make_block(t, src, ts, w) self._active.append((ticker_content, hc, self._ticker_next_y, midx)) self._ticker_next_y += len(ticker_content) + GAP self._active = [ (c, hc, by, mi) for c, hc, by, mi in self._active if by + len(c) > int(self._camera_y) ] for k in list(self._noise_cache): if k < int(self._camera_y): del self._noise_cache[k] grad_offset = (self._frame_count * 0.01) % 1.0 buf, self._noise_cache = render_ticker_zone( self._active, scroll_cam=int(self._camera_y), camera_x=self._camera_x, ticker_h=h, w=w, noise_cache=self._noise_cache, grad_offset=grad_offset, ) if firehose: firehose_buf = render_firehose(items, w, 0, h) buf.extend(firehose_buf) self._frame_count += 1 return buf class EffectPluginStage(Stage): """Adapter wrapping EffectPlugin as a Stage.""" def __init__(self, effect_plugin, name: str = "effect"): self._effect = effect_plugin self.name = name self.category = "effect" self.optional = False @property def capabilities(self) -> set[str]: return {f"effect.{self.name}"} @property def dependencies(self) -> set[str]: return set() def process(self, data: Any, ctx: PipelineContext) -> Any: """Process data through the effect.""" if data is None: return None from engine.effects import EffectContext w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 frame = ctx.params.frame_number if ctx.params else 0 effect_ctx = EffectContext( terminal_width=w, terminal_height=h, scroll_cam=0, ticker_height=h, camera_x=0, mic_excess=0.0, grad_offset=(frame * 0.01) % 1.0, frame_number=frame, has_message=False, items=ctx.get("items", []), ) return self._effect.process(data, effect_ctx) class DisplayStage(Stage): """Adapter wrapping Display as a Stage.""" def __init__(self, display, name: str = "terminal"): self._display = display self.name = name self.category = "display" self.optional = False @property def capabilities(self) -> set[str]: return {"display.output"} @property def dependencies(self) -> set[str]: return set() def init(self, ctx: PipelineContext) -> bool: w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 result = self._display.init(w, h, reuse=False) return result is not False def process(self, data: Any, ctx: PipelineContext) -> Any: """Output data to display.""" if data is not None: self._display.show(data) return data def cleanup(self) -> None: self._display.cleanup() class DataSourceStage(Stage): """Adapter wrapping DataSource as a Stage.""" def __init__(self, data_source, name: str = "headlines"): self._source = data_source self.name = name self.category = "source" self.optional = False @property def capabilities(self) -> set[str]: return {f"source.{self.name}"} @property def dependencies(self) -> set[str]: return set() def process(self, data: Any, ctx: PipelineContext) -> Any: """Fetch data from source.""" if hasattr(self._source, "get_items"): return self._source.get_items() return data class ItemsStage(Stage): """Stage that holds pre-fetched items and provides them to the pipeline.""" def __init__(self, items, name: str = "headlines"): self._items = items self.name = name self.category = "source" self.optional = False @property def capabilities(self) -> set[str]: return {f"source.{self.name}"} @property def dependencies(self) -> set[str]: return set() def process(self, data: Any, ctx: PipelineContext) -> Any: """Return the pre-fetched items.""" return self._items class CameraStage(Stage): """Adapter wrapping Camera as a Stage.""" def __init__(self, camera, name: str = "vertical"): self._camera = camera self.name = name self.category = "camera" self.optional = True @property def capabilities(self) -> set[str]: return {"camera"} @property def dependencies(self) -> set[str]: return {"source.items"} def process(self, data: Any, ctx: PipelineContext) -> Any: """Apply camera transformation to data.""" if data is None: return None if hasattr(self._camera, "apply"): return self._camera.apply( data, ctx.params.viewport_width if ctx.params else 80 ) return data def cleanup(self) -> None: if hasattr(self._camera, "reset"): self._camera.reset() def create_stage_from_display(display, name: str = "terminal") -> DisplayStage: """Create a Stage from a Display instance.""" return DisplayStage(display, name) def create_stage_from_effect(effect_plugin, name: str) -> EffectPluginStage: """Create a Stage from an EffectPlugin.""" return EffectPluginStage(effect_plugin, name) def create_stage_from_source(data_source, name: str = "headlines") -> DataSourceStage: """Create a Stage from a DataSource.""" return DataSourceStage(data_source, name) def create_stage_from_camera(camera, name: str = "vertical") -> CameraStage: """Create a Stage from a Camera.""" return CameraStage(camera, name) def create_items_stage(items, name: str = "headlines") -> ItemsStage: """Create a Stage that holds pre-fetched items.""" return ItemsStage(items, name)