diff --git a/engine/app.py b/engine/app.py index 847fa98..bb2fddd 100644 --- a/engine/app.py +++ b/engine/app.py @@ -18,7 +18,6 @@ from engine.pipeline import ( ) from engine.pipeline.adapters import ( SourceItemsToBufferStage, - create_items_stage, create_stage_from_display, create_stage_from_effect, ) @@ -147,7 +146,11 @@ def run_pipeline_mode(preset_name: str = "demo"): empty_source = EmptyDataSource(width=80, height=24) pipeline.add_stage("source", DataSourceStage(empty_source, name="empty")) else: - pipeline.add_stage("source", create_items_stage(items, preset.source)) + from engine.data_sources.sources import ListDataSource + from engine.pipeline.adapters import DataSourceStage + + list_source = ListDataSource(items, name=preset.source) + pipeline.add_stage("source", DataSourceStage(list_source, name=preset.source)) # Add render stage - convert items to buffer pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) diff --git a/engine/data_sources/sources.py b/engine/data_sources/sources.py index c7c1289..42f2d3e 100644 --- a/engine/data_sources/sources.py +++ b/engine/data_sources/sources.py @@ -116,6 +116,44 @@ class EmptyDataSource(DataSource): return [SourceItem(content=content, source="empty", timestamp="0")] +class ListDataSource(DataSource): + """Data source that wraps a pre-fetched list of items. + + Used for bootstrap loading when items are already available in memory. + This is a simple wrapper for already-fetched data. + """ + + def __init__(self, items, name: str = "list"): + self._items = items + self._name = name + + @property + def name(self) -> str: + return self._name + + @property + def is_dynamic(self) -> bool: + return False + + def fetch(self) -> list[SourceItem]: + # Convert tuple items to SourceItem if needed + result = [] + for item in self._items: + if isinstance(item, SourceItem): + result.append(item) + elif isinstance(item, tuple) and len(item) >= 3: + # Assume (content, source, timestamp) tuple format + result.append( + SourceItem(content=item[0], source=item[1], timestamp=str(item[2])) + ) + else: + # Fallback: treat as string content + result.append( + SourceItem(content=str(item), source="list", timestamp="0") + ) + return result + + class PoetryDataSource(DataSource): """Data source for Poetry DB.""" diff --git a/engine/pipeline/adapters.py b/engine/pipeline/adapters.py index 0f5ff38..99ae05a 100644 --- a/engine/pipeline/adapters.py +++ b/engine/pipeline/adapters.py @@ -5,136 +5,11 @@ This module provides adapters that wrap existing components (EffectPlugin, Display, DataSource, Camera) as Stage implementations. """ -import random from typing import Any from engine.pipeline.core import PipelineContext, Stage -class RenderStage(Stage): - """Stage that renders items to a text buffer for display. - - This mimics the old demo's render pipeline: - - Selects headlines and renders them to blocks - - Applies camera scroll position - - Adds firehose layer if enabled - - .. deprecated:: - RenderStage uses legacy rendering from engine.legacy.layers and engine.legacy.render. - This stage will be removed in a future version. For new code, use modern pipeline stages - like PassthroughStage with custom rendering stages instead. - """ - - def __init__( - self, - items: list, - width: int = 80, - height: int = 24, - camera_speed: float = 1.0, - camera_mode: str = "vertical", - firehose_enabled: bool = False, - name: str = "render", - ): - import warnings - - warnings.warn( - "RenderStage is deprecated. It uses legacy rendering code from engine.legacy.*. " - "This stage will be removed in a future version. " - "Use modern pipeline stages with PassthroughStage or create custom rendering stages instead.", - DeprecationWarning, - stacklevel=2, - ) - self.name = name - self.category = "render" - self.optional = False - self._items = items - self._width = width - self._height = height - self._camera_speed = camera_speed - self._camera_mode = camera_mode - self._firehose_enabled = firehose_enabled - - self._camera_y = 0.0 - self._camera_x = 0 - self._scroll_accum = 0.0 - self._ticker_next_y = 0 - self._active: list = [] - self._seen: set = set() - self._pool: list = list(items) - self._noise_cache: dict = {} - self._frame_count = 0 - - @property - def capabilities(self) -> set[str]: - return {"render.output"} - - @property - def dependencies(self) -> set[str]: - return {"source"} - - def init(self, ctx: PipelineContext) -> bool: - random.shuffle(self._pool) - return True - - def process(self, data: Any, ctx: PipelineContext) -> Any: - """Render items to a text buffer.""" - from engine.effects import next_headline - from engine.legacy.layers import render_firehose, render_ticker_zone - from engine.legacy.render import make_block - - items = data or self._items - w = ctx.params.viewport_width if ctx.params else self._width - h = ctx.params.viewport_height if ctx.params else self._height - camera_speed = ctx.params.camera_speed if ctx.params else self._camera_speed - firehose = ctx.params.firehose_enabled if ctx.params else self._firehose_enabled - - scroll_step = 0.5 / (camera_speed * 10) - self._scroll_accum += scroll_step - - GAP = 3 - - while self._scroll_accum >= scroll_step: - self._scroll_accum -= scroll_step - self._camera_y += 1.0 - - while ( - self._ticker_next_y < int(self._camera_y) + h + 10 - and len(self._active) < 50 - ): - t, src, ts = next_headline(self._pool, items, self._seen) - ticker_content, hc, midx = make_block(t, src, ts, w) - self._active.append((ticker_content, hc, self._ticker_next_y, midx)) - self._ticker_next_y += len(ticker_content) + GAP - - self._active = [ - (c, hc, by, mi) - for c, hc, by, mi in self._active - if by + len(c) > int(self._camera_y) - ] - for k in list(self._noise_cache): - if k < int(self._camera_y): - del self._noise_cache[k] - - grad_offset = (self._frame_count * 0.01) % 1.0 - - buf, self._noise_cache = render_ticker_zone( - self._active, - scroll_cam=int(self._camera_y), - camera_x=self._camera_x, - ticker_h=h, - w=w, - noise_cache=self._noise_cache, - grad_offset=grad_offset, - ) - - if firehose: - firehose_buf = render_firehose(items, w, 0, h) - buf.extend(firehose_buf) - - self._frame_count += 1 - return buf - - class EffectPluginStage(Stage): """Adapter wrapping EffectPlugin as a Stage.""" @@ -364,40 +239,6 @@ class SourceItemsToBufferStage(Stage): return [str(data)] -class ItemsStage(Stage): - """Stage that holds pre-fetched items and provides them to the pipeline. - - .. deprecated:: - Use DataSourceStage with a proper DataSource instead. - ItemsStage is a legacy bootstrap mechanism. - """ - - def __init__(self, items, name: str = "headlines"): - import warnings - - warnings.warn( - "ItemsStage is deprecated. Use DataSourceStage with a DataSource instead.", - DeprecationWarning, - stacklevel=2, - ) - self._items = items - self.name = name - self.category = "source" - self.optional = False - - @property - def capabilities(self) -> set[str]: - return {f"source.{self.name}"} - - @property - def dependencies(self) -> set[str]: - return set() - - def process(self, data: Any, ctx: PipelineContext) -> Any: - """Return the pre-fetched items.""" - return self._items - - class CameraStage(Stage): """Adapter wrapping Camera as a Stage.""" @@ -753,8 +594,3 @@ class CanvasStage(Stage): def cleanup(self) -> None: self._canvas = None - - -def create_items_stage(items, name: str = "headlines") -> ItemsStage: - """Create a Stage that holds pre-fetched items.""" - return ItemsStage(items, name) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 5aaf5ba..98e9a72 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -586,47 +586,6 @@ class TestPipelinePresets: class TestStageAdapters: """Tests for pipeline stage adapters.""" - def test_render_stage_capabilities(self): - """RenderStage declares correct capabilities.""" - from engine.pipeline.adapters import RenderStage - - stage = RenderStage(items=[], name="render") - assert "render.output" in stage.capabilities - - def test_render_stage_dependencies(self): - """RenderStage declares correct dependencies.""" - from engine.pipeline.adapters import RenderStage - - stage = RenderStage(items=[], name="render") - assert "source" in stage.dependencies - - def test_render_stage_process(self): - """RenderStage.process returns buffer.""" - from engine.pipeline.adapters import RenderStage - from engine.pipeline.core import PipelineContext - - items = [ - ("Test Headline", "test", 1234567890.0), - ] - stage = RenderStage(items=items, width=80, height=24) - ctx = PipelineContext() - - result = stage.process(None, ctx) - assert result is not None - assert isinstance(result, list) - - def test_items_stage(self): - """ItemsStage provides items to pipeline.""" - from engine.pipeline.adapters import ItemsStage - from engine.pipeline.core import PipelineContext - - items = [("Headline 1", "src1", 123.0), ("Headline 2", "src2", 124.0)] - stage = ItemsStage(items, name="headlines") - ctx = PipelineContext() - - result = stage.process(None, ctx) - assert result == items - def test_display_stage_init(self): """DisplayStage.init initializes display.""" from engine.display.backends.null import NullDisplay @@ -765,55 +724,6 @@ class TestEffectPluginStage: class TestFullPipeline: """End-to-end tests for the full pipeline.""" - def test_pipeline_with_items_and_effect(self): - """Pipeline executes items->effect flow.""" - from engine.effects.types import EffectConfig, EffectPlugin - from engine.pipeline.adapters import EffectPluginStage, ItemsStage - from engine.pipeline.controller import Pipeline, PipelineConfig - - class TestEffect(EffectPlugin): - name = "test" - config = EffectConfig() - - def process(self, buf, ctx): - return [f"processed: {line}" for line in buf] - - def configure(self, config): - pass - - pipeline = Pipeline(config=PipelineConfig(enable_metrics=False)) - - # Items stage - items = [("Headline 1", "src1", 123.0)] - pipeline.add_stage("source", ItemsStage(items, name="headlines")) - - # Effect stage - pipeline.add_stage("effect", EffectPluginStage(TestEffect(), name="test")) - - pipeline.build() - - result = pipeline.execute(None) - assert result.success is True - assert "processed:" in result.data[0] - - def test_pipeline_with_items_stage(self): - """Pipeline with ItemsStage provides items through pipeline.""" - from engine.pipeline.adapters import ItemsStage - from engine.pipeline.controller import Pipeline, PipelineConfig - - pipeline = Pipeline(config=PipelineConfig(enable_metrics=False)) - - # Items stage provides source - items = [("Headline 1", "src1", 123.0), ("Headline 2", "src2", 124.0)] - pipeline.add_stage("source", ItemsStage(items, name="headlines")) - - pipeline.build() - - result = pipeline.execute(None) - assert result.success is True - # Items are passed through - assert result.data == items - def test_pipeline_circular_dependency_detection(self): """Pipeline detects circular dependencies.""" from engine.pipeline.controller import Pipeline @@ -857,33 +767,6 @@ class TestFullPipeline: except Exception: pass - def test_datasource_stage_capabilities_match_render_deps(self): - """DataSourceStage provides capability that RenderStage can depend on.""" - from engine.data_sources.sources import HeadlinesDataSource - from engine.pipeline.adapters import DataSourceStage, RenderStage - - # DataSourceStage provides "source.headlines" - ds_stage = DataSourceStage(HeadlinesDataSource(), name="headlines") - assert "source.headlines" in ds_stage.capabilities - - # RenderStage depends on "source" - r_stage = RenderStage(items=[], width=80, height=24) - assert "source" in r_stage.dependencies - - # Test the capability matching directly - from engine.pipeline.controller import Pipeline, PipelineConfig - - pipeline = Pipeline(config=PipelineConfig(enable_metrics=False)) - pipeline.add_stage("source", ds_stage) - pipeline.add_stage("render", r_stage) - - # Build capability map and test matching - pipeline._capability_map = pipeline._build_capability_map() - - # "source" should match "source.headlines" - match = pipeline._find_stage_with_capability("source") - assert match == "source", f"Expected 'source', got {match}" - class TestPipelineMetrics: """Tests for pipeline metrics collection."""