diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index aab2099..b1fd931 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -105,11 +105,13 @@ class TestPipeline: mock_source.name = "source" mock_source.category = "source" mock_source.dependencies = set() + mock_source.capabilities = {"source"} mock_display = MagicMock(spec=Stage) mock_display.name = "display" mock_display.category = "display" mock_display.dependencies = {"source"} + mock_display.capabilities = {"display"} pipeline.add_stage("source", mock_source) pipeline.add_stage("display", mock_display) @@ -129,18 +131,21 @@ class TestPipeline: mock_source.name = "source" mock_source.category = "source" mock_source.dependencies = set() + mock_source.capabilities = {"source"} mock_source.process = lambda data, ctx: call_order.append("source") or "data" mock_effect = MagicMock(spec=Stage) mock_effect.name = "effect" mock_effect.category = "effect" mock_effect.dependencies = {"source"} + mock_effect.capabilities = {"effect"} mock_effect.process = lambda data, ctx: call_order.append("effect") or data mock_display = MagicMock(spec=Stage) mock_display.name = "display" mock_display.category = "display" mock_display.dependencies = {"effect"} + mock_display.capabilities = {"display"} mock_display.process = lambda data, ctx: call_order.append("display") or data pipeline.add_stage("source", mock_source) @@ -161,12 +166,14 @@ class TestPipeline: mock_source.name = "source" mock_source.category = "source" mock_source.dependencies = set() + mock_source.capabilities = {"source"} mock_source.process = lambda data, ctx: "data" mock_failing = MagicMock(spec=Stage) mock_failing.name = "failing" mock_failing.category = "effect" mock_failing.dependencies = {"source"} + mock_failing.capabilities = {"effect"} mock_failing.optional = False mock_failing.process = lambda data, ctx: (_ for _ in ()).throw( Exception("fail") @@ -189,12 +196,14 @@ class TestPipeline: mock_source.name = "source" mock_source.category = "source" mock_source.dependencies = set() + mock_source.capabilities = {"source"} mock_source.process = lambda data, ctx: "data" mock_optional = MagicMock(spec=Stage) mock_optional.name = "optional" mock_optional.category = "effect" mock_optional.dependencies = {"source"} + mock_optional.capabilities = {"effect"} mock_optional.optional = True mock_optional.process = lambda data, ctx: (_ for _ in ()).throw( Exception("fail") @@ -209,6 +218,144 @@ class TestPipeline: assert result.success is True +class TestCapabilityBasedDependencies: + """Tests for capability-based dependency resolution.""" + + def test_capability_wildcard_resolution(self): + """Pipeline resolves dependencies using wildcard capabilities.""" + from engine.pipeline.controller import Pipeline + from engine.pipeline.core import Stage + + class SourceStage(Stage): + name = "headlines" + category = "source" + + @property + def capabilities(self): + return {"source.headlines"} + + @property + def dependencies(self): + return set() + + def process(self, data, ctx): + return data + + class RenderStage(Stage): + name = "render" + category = "render" + + @property + def capabilities(self): + return {"render.output"} + + @property + def dependencies(self): + return {"source.*"} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("headlines", SourceStage()) + pipeline.add_stage("render", RenderStage()) + pipeline.build() + + assert "headlines" in pipeline.execution_order + assert "render" in pipeline.execution_order + assert pipeline.execution_order.index( + "headlines" + ) < pipeline.execution_order.index("render") + + def test_missing_capability_raises_error(self): + """Pipeline raises error when capability is missing.""" + from engine.pipeline.controller import Pipeline + from engine.pipeline.core import Stage, StageError + + class RenderStage(Stage): + name = "render" + category = "render" + + @property + def capabilities(self): + return {"render.output"} + + @property + def dependencies(self): + return {"source.headlines"} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("render", RenderStage()) + + try: + pipeline.build() + raise AssertionError("Should have raised StageError") + except StageError as e: + assert "Missing capabilities" in e.message + assert "source.headlines" in e.message + + def test_multiple_stages_same_capability(self): + """Pipeline uses first registered stage for capability.""" + from engine.pipeline.controller import Pipeline + from engine.pipeline.core import Stage + + class SourceA(Stage): + name = "headlines" + category = "source" + + @property + def capabilities(self): + return {"source"} + + @property + def dependencies(self): + return set() + + def process(self, data, ctx): + return "A" + + class SourceB(Stage): + name = "poetry" + category = "source" + + @property + def capabilities(self): + return {"source"} + + @property + def dependencies(self): + return set() + + def process(self, data, ctx): + return "B" + + class DisplayStage(Stage): + name = "display" + category = "display" + + @property + def capabilities(self): + return {"display"} + + @property + def dependencies(self): + return {"source"} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("headlines", SourceA()) + pipeline.add_stage("poetry", SourceB()) + pipeline.add_stage("display", DisplayStage()) + pipeline.build() + + assert pipeline.execution_order[0] == "headlines" + + class TestPipelineContext: """Tests for PipelineContext.""" @@ -279,3 +426,495 @@ class TestCreateDefaultPipeline: assert pipeline is not None assert "display" in pipeline.stages + + +class TestPipelineParams: + """Tests for PipelineParams.""" + + def test_default_values(self): + """PipelineParams has correct defaults.""" + from engine.pipeline.params import PipelineParams + + params = PipelineParams() + assert params.source == "headlines" + assert params.display == "terminal" + assert params.camera_mode == "vertical" + assert params.effect_order == ["noise", "fade", "glitch", "firehose", "hud"] + + def test_effect_config(self): + """PipelineParams effect config methods work.""" + from engine.pipeline.params import PipelineParams + + params = PipelineParams() + enabled, intensity = params.get_effect_config("noise") + assert enabled is True + assert intensity == 1.0 + + params.set_effect_config("noise", False, 0.5) + enabled, intensity = params.get_effect_config("noise") + assert enabled is False + assert intensity == 0.5 + + def test_is_effect_enabled(self): + """PipelineParams is_effect_enabled works.""" + from engine.pipeline.params import PipelineParams + + params = PipelineParams() + assert params.is_effect_enabled("noise") is True + + params.effect_enabled["noise"] = False + assert params.is_effect_enabled("noise") is False + + def test_to_dict_from_dict(self): + """PipelineParams serialization roundtrip works.""" + from engine.pipeline.params import PipelineParams + + params = PipelineParams() + params.viewport_width = 100 + params.viewport_height = 50 + + data = params.to_dict() + restored = PipelineParams.from_dict(data) + + assert restored.viewport_width == 100 + assert restored.viewport_height == 50 + + def test_copy(self): + """PipelineParams copy works.""" + from engine.pipeline.params import PipelineParams + + params = PipelineParams() + params.viewport_width = 100 + params.effect_enabled["noise"] = False + + copy = params.copy() + assert copy.viewport_width == 100 + assert copy.effect_enabled["noise"] is False + + +class TestPipelinePresets: + """Tests for pipeline presets.""" + + def test_presets_defined(self): + """All expected presets are defined.""" + from engine.pipeline.presets import ( + DEMO_PRESET, + FIREHOSE_PRESET, + PIPELINE_VIZ_PRESET, + POETRY_PRESET, + SIXEL_PRESET, + WEBSOCKET_PRESET, + ) + + assert DEMO_PRESET.name == "demo" + assert POETRY_PRESET.name == "poetry" + assert FIREHOSE_PRESET.name == "firehose" + assert PIPELINE_VIZ_PRESET.name == "pipeline" + assert SIXEL_PRESET.name == "sixel" + assert WEBSOCKET_PRESET.name == "websocket" + + def test_preset_to_params(self): + """Presets convert to PipelineParams correctly.""" + from engine.pipeline.presets import DEMO_PRESET + + params = DEMO_PRESET.to_params() + assert params.source == "headlines" + assert params.display == "pygame" + assert "noise" in params.effect_order + + def test_list_presets(self): + """list_presets returns all presets.""" + from engine.pipeline.presets import list_presets + + presets = list_presets() + assert "demo" in presets + assert "poetry" in presets + assert "firehose" in presets + + def test_get_preset(self): + """get_preset returns correct preset.""" + from engine.pipeline.presets import get_preset + + preset = get_preset("demo") + assert preset is not None + assert preset.name == "demo" + + assert get_preset("nonexistent") is None + + +class TestStageAdapters: + """Tests for pipeline stage adapters.""" + + def test_render_stage_capabilities(self): + """RenderStage declares correct capabilities.""" + from engine.pipeline.adapters import RenderStage + + stage = RenderStage(items=[], name="render") + assert "render.output" in stage.capabilities + + def test_render_stage_dependencies(self): + """RenderStage declares correct dependencies.""" + from engine.pipeline.adapters import RenderStage + + stage = RenderStage(items=[], name="render") + assert "source" in stage.dependencies + + def test_render_stage_process(self): + """RenderStage.process returns buffer.""" + from engine.pipeline.adapters import RenderStage + from engine.pipeline.core import PipelineContext + + items = [ + ("Test Headline", "test", 1234567890.0), + ] + stage = RenderStage(items=items, width=80, height=24) + ctx = PipelineContext() + + result = stage.process(None, ctx) + assert result is not None + assert isinstance(result, list) + + def test_items_stage(self): + """ItemsStage provides items to pipeline.""" + from engine.pipeline.adapters import ItemsStage + from engine.pipeline.core import PipelineContext + + items = [("Headline 1", "src1", 123.0), ("Headline 2", "src2", 124.0)] + stage = ItemsStage(items, name="headlines") + ctx = PipelineContext() + + result = stage.process(None, ctx) + assert result == items + + def test_display_stage_init(self): + """DisplayStage.init initializes display.""" + from engine.display.backends.null import NullDisplay + from engine.pipeline.adapters import DisplayStage + from engine.pipeline.core import PipelineContext + from engine.pipeline.params import PipelineParams + + display = NullDisplay() + stage = DisplayStage(display, name="null") + ctx = PipelineContext() + ctx.params = PipelineParams() + + result = stage.init(ctx) + assert result is True + + def test_display_stage_process(self): + """DisplayStage.process forwards to display.""" + from engine.display.backends.null import NullDisplay + from engine.pipeline.adapters import DisplayStage + from engine.pipeline.core import PipelineContext + from engine.pipeline.params import PipelineParams + + display = NullDisplay() + stage = DisplayStage(display, name="null") + ctx = PipelineContext() + ctx.params = PipelineParams() + + stage.init(ctx) + buffer = ["line1", "line2"] + result = stage.process(buffer, ctx) + assert result == buffer + + def test_camera_stage(self): + """CameraStage applies camera transform.""" + from engine.camera import Camera, CameraMode + from engine.pipeline.adapters import CameraStage + from engine.pipeline.core import PipelineContext + + camera = Camera(mode=CameraMode.VERTICAL) + stage = CameraStage(camera, name="vertical") + PipelineContext() + + assert "camera" in stage.capabilities + assert "source.items" in stage.dependencies + + +class TestDataSourceStage: + """Tests for DataSourceStage adapter.""" + + def test_datasource_stage_capabilities(self): + """DataSourceStage declares correct capabilities.""" + from engine.pipeline.adapters import DataSourceStage + from engine.sources_v2 import HeadlinesDataSource + + source = HeadlinesDataSource() + stage = DataSourceStage(source, name="headlines") + + assert "source.headlines" in stage.capabilities + + def test_datasource_stage_process(self): + """DataSourceStage fetches from DataSource.""" + from unittest.mock import patch + + from engine.pipeline.adapters import DataSourceStage + from engine.pipeline.core import PipelineContext + from engine.sources_v2 import HeadlinesDataSource + + mock_items = [ + ("Test Headline 1", "TestSource", "12:00"), + ("Test Headline 2", "TestSource", "12:01"), + ] + + with patch("engine.fetch.fetch_all", return_value=(mock_items, 1, 0)): + source = HeadlinesDataSource() + stage = DataSourceStage(source, name="headlines") + + result = stage.process(None, PipelineContext()) + + assert result is not None + assert isinstance(result, list) + + +class TestEffectPluginStage: + """Tests for EffectPluginStage adapter.""" + + def test_effect_stage_capabilities(self): + """EffectPluginStage declares correct capabilities.""" + from engine.effects.types import EffectPlugin + from engine.pipeline.adapters import EffectPluginStage + + class TestEffect(EffectPlugin): + name = "test" + + def process(self, buf, ctx): + return buf + + def configure(self, config): + pass + + effect = TestEffect() + stage = EffectPluginStage(effect, name="test") + + assert "effect.test" in stage.capabilities + + def test_effect_stage_with_sensor_bindings(self): + """EffectPluginStage applies sensor param bindings.""" + from engine.effects.types import EffectConfig, EffectPlugin + from engine.pipeline.adapters import EffectPluginStage + from engine.pipeline.core import PipelineContext + from engine.pipeline.params import PipelineParams + + class SensorDrivenEffect(EffectPlugin): + name = "sensor_effect" + config = EffectConfig(intensity=1.0) + param_bindings = { + "intensity": {"sensor": "mic", "transform": "linear"}, + } + + def process(self, buf, ctx): + return buf + + def configure(self, config): + pass + + effect = SensorDrivenEffect() + stage = EffectPluginStage(effect, name="sensor_effect") + + ctx = PipelineContext() + ctx.params = PipelineParams() + ctx.set_state("sensor.mic", 0.5) + + result = stage.process(["test"], ctx) + assert result == ["test"] + + +class TestFullPipeline: + """End-to-end tests for the full pipeline.""" + + def test_pipeline_with_items_and_effect(self): + """Pipeline executes items->effect flow.""" + from engine.effects.types import EffectConfig, EffectPlugin + from engine.pipeline.adapters import EffectPluginStage, ItemsStage + from engine.pipeline.controller import Pipeline, PipelineConfig + + class TestEffect(EffectPlugin): + name = "test" + config = EffectConfig() + + def process(self, buf, ctx): + return [f"processed: {line}" for line in buf] + + def configure(self, config): + pass + + pipeline = Pipeline(config=PipelineConfig(enable_metrics=False)) + + # Items stage + items = [("Headline 1", "src1", 123.0)] + pipeline.add_stage("source", ItemsStage(items, name="headlines")) + + # Effect stage + pipeline.add_stage("effect", EffectPluginStage(TestEffect(), name="test")) + + pipeline.build() + + result = pipeline.execute(None) + assert result.success is True + assert "processed:" in result.data[0] + + def test_pipeline_with_items_stage(self): + """Pipeline with ItemsStage provides items through pipeline.""" + from engine.pipeline.adapters import ItemsStage + from engine.pipeline.controller import Pipeline, PipelineConfig + + pipeline = Pipeline(config=PipelineConfig(enable_metrics=False)) + + # Items stage provides source + items = [("Headline 1", "src1", 123.0), ("Headline 2", "src2", 124.0)] + pipeline.add_stage("source", ItemsStage(items, name="headlines")) + + pipeline.build() + + result = pipeline.execute(None) + assert result.success is True + # Items are passed through + assert result.data == items + + def test_pipeline_circular_dependency_detection(self): + """Pipeline detects circular dependencies.""" + from engine.pipeline.controller import Pipeline + from engine.pipeline.core import Stage + + class StageA(Stage): + name = "a" + + @property + def capabilities(self): + return {"a"} + + @property + def dependencies(self): + return {"b"} + + def process(self, data, ctx): + return data + + class StageB(Stage): + name = "b" + + @property + def capabilities(self): + return {"b"} + + @property + def dependencies(self): + return {"a"} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("a", StageA()) + pipeline.add_stage("b", StageB()) + + try: + pipeline.build() + raise AssertionError("Should detect circular dependency") + except Exception: + pass + + def test_datasource_stage_capabilities_match_render_deps(self): + """DataSourceStage provides capability that RenderStage can depend on.""" + from engine.pipeline.adapters import DataSourceStage, RenderStage + from engine.sources_v2 import HeadlinesDataSource + + # DataSourceStage provides "source.headlines" + ds_stage = DataSourceStage(HeadlinesDataSource(), name="headlines") + assert "source.headlines" in ds_stage.capabilities + + # RenderStage depends on "source" + r_stage = RenderStage(items=[], width=80, height=24) + assert "source" in r_stage.dependencies + + # Test the capability matching directly + from engine.pipeline.controller import Pipeline, PipelineConfig + + pipeline = Pipeline(config=PipelineConfig(enable_metrics=False)) + pipeline.add_stage("source", ds_stage) + pipeline.add_stage("render", r_stage) + + # Build capability map and test matching + pipeline._capability_map = pipeline._build_capability_map() + + # "source" should match "source.headlines" + match = pipeline._find_stage_with_capability("source") + assert match == "source", f"Expected 'source', got {match}" + + +class TestPipelineMetrics: + """Tests for pipeline metrics collection.""" + + def test_metrics_collected(self): + """Pipeline collects metrics when enabled.""" + from engine.pipeline.controller import Pipeline, PipelineConfig + from engine.pipeline.core import Stage + + class DummyStage(Stage): + name = "dummy" + category = "test" + + def process(self, data, ctx): + return data + + config = PipelineConfig(enable_metrics=True) + pipeline = Pipeline(config=config) + pipeline.add_stage("dummy", DummyStage()) + pipeline.build() + + pipeline.execute("test_data") + + summary = pipeline.get_metrics_summary() + assert "pipeline" in summary + assert summary["frame_count"] == 1 + + def test_metrics_disabled(self): + """Pipeline skips metrics when disabled.""" + from engine.pipeline.controller import Pipeline, PipelineConfig + from engine.pipeline.core import Stage + + class DummyStage(Stage): + name = "dummy" + category = "test" + + def process(self, data, ctx): + return data + + config = PipelineConfig(enable_metrics=False) + pipeline = Pipeline(config=config) + pipeline.add_stage("dummy", DummyStage()) + pipeline.build() + + pipeline.execute("test_data") + + summary = pipeline.get_metrics_summary() + assert "error" in summary + + def test_reset_metrics(self): + """Pipeline.reset_metrics clears collected metrics.""" + from engine.pipeline.controller import Pipeline, PipelineConfig + from engine.pipeline.core import Stage + + class DummyStage(Stage): + name = "dummy" + category = "test" + + def process(self, data, ctx): + return data + + config = PipelineConfig(enable_metrics=True) + pipeline = Pipeline(config=config) + pipeline.add_stage("dummy", DummyStage()) + pipeline.build() + + pipeline.execute("test1") + pipeline.execute("test2") + + assert pipeline.get_metrics_summary()["frame_count"] == 2 + + pipeline.reset_metrics() + # After reset, metrics collection starts fresh + pipeline.execute("test3") + assert pipeline.get_metrics_summary()["frame_count"] == 1