""" Tests for the new unified pipeline architecture. """ from unittest.mock import MagicMock import pytest from engine.pipeline import ( Pipeline, PipelineConfig, PipelineContext, Stage, StageRegistry, create_default_pipeline, discover_stages, ) from engine.pipeline.core import DataType, StageError class TestStageRegistry: """Tests for StageRegistry.""" def setup_method(self): """Reset registry before each test.""" StageRegistry._discovered = False StageRegistry._categories.clear() StageRegistry._instances.clear() def test_discover_stages_registers_sources(self): """discover_stages registers source stages.""" discover_stages() sources = StageRegistry.list("source") assert "HeadlinesDataSource" in sources assert "PoetryDataSource" in sources assert "PipelineIntrospectionSource" in sources def test_discover_stages_registers_displays(self): """discover_stages registers display stages.""" discover_stages() displays = StageRegistry.list("display") assert "terminal" in displays assert "pygame" in displays assert "websocket" in displays assert "null" in displays def test_create_source_stage(self): """StageRegistry.create creates source stages.""" discover_stages() source = StageRegistry.create("source", "HeadlinesDataSource") assert source is not None assert source.name == "headlines" def test_create_display_stage(self): """StageRegistry.create creates display stages.""" discover_stages() display = StageRegistry.create("display", "terminal") assert display is not None assert hasattr(display, "_display") def test_create_display_stage_pygame(self): """StageRegistry.create creates pygame display stage.""" discover_stages() display = StageRegistry.create("display", "pygame") assert display is not None class TestPipeline: """Tests for Pipeline class.""" def setup_method(self): """Reset registry before each test.""" StageRegistry._discovered = False StageRegistry._categories.clear() StageRegistry._instances.clear() discover_stages() def test_create_pipeline(self): """Pipeline can be created with config.""" config = PipelineConfig(source="headlines", display="terminal") pipeline = Pipeline(config=config) assert pipeline.config is not None assert pipeline.config.source == "headlines" assert pipeline.config.display == "terminal" def test_add_stage(self): """Pipeline.add_stage adds a stage.""" pipeline = Pipeline() mock_stage = MagicMock(spec=Stage) mock_stage.name = "test_stage" mock_stage.category = "test" pipeline.add_stage("test", mock_stage) assert "test" in pipeline.stages def test_build_resolves_dependencies(self): """Pipeline.build resolves execution order.""" from engine.pipeline.core import DataType pipeline = Pipeline() mock_source = MagicMock(spec=Stage) mock_source.name = "source" mock_source.category = "source" mock_source.stage_type = "source" mock_source.render_order = 0 mock_source.is_overlay = False mock_source.inlet_types = {DataType.NONE} mock_source.outlet_types = {DataType.SOURCE_ITEMS} mock_source.dependencies = set() mock_source.capabilities = {"source"} mock_display = MagicMock(spec=Stage) mock_display.name = "display" mock_display.category = "display" mock_display.stage_type = "display" mock_display.render_order = 0 mock_display.is_overlay = False mock_display.inlet_types = {DataType.ANY} # Accept any type mock_display.outlet_types = {DataType.NONE} mock_display.dependencies = {"source"} mock_display.capabilities = {"display"} pipeline.add_stage("source", mock_source) pipeline.add_stage("display", mock_display) pipeline.build() assert pipeline._initialized is True assert "source" in pipeline.execution_order assert "display" in pipeline.execution_order def test_execute_runs_stages(self): """Pipeline.execute runs all stages in order.""" from engine.pipeline.core import DataType pipeline = Pipeline() call_order = [] mock_source = MagicMock(spec=Stage) mock_source.name = "source" mock_source.category = "source" mock_source.stage_type = "source" mock_source.render_order = 0 mock_source.is_overlay = False mock_source.inlet_types = {DataType.NONE} mock_source.outlet_types = {DataType.SOURCE_ITEMS} mock_source.dependencies = set() mock_source.capabilities = {"source"} mock_source.process = lambda data, ctx: call_order.append("source") or "data" mock_effect = MagicMock(spec=Stage) mock_effect.name = "effect" mock_effect.category = "effect" mock_effect.stage_type = "effect" mock_effect.render_order = 0 mock_effect.is_overlay = False mock_effect.inlet_types = {DataType.SOURCE_ITEMS} mock_effect.outlet_types = {DataType.TEXT_BUFFER} mock_effect.dependencies = {"source"} mock_effect.capabilities = {"effect"} mock_effect.process = lambda data, ctx: call_order.append("effect") or data mock_display = MagicMock(spec=Stage) mock_display.name = "display" mock_display.category = "display" mock_display.stage_type = "display" mock_display.render_order = 0 mock_display.is_overlay = False mock_display.inlet_types = {DataType.TEXT_BUFFER} mock_display.outlet_types = {DataType.NONE} mock_display.dependencies = {"effect"} mock_display.capabilities = {"display"} mock_display.process = lambda data, ctx: call_order.append("display") or data pipeline.add_stage("source", mock_source) pipeline.add_stage("effect", mock_effect) pipeline.add_stage("display", mock_display) pipeline.build() result = pipeline.execute(None) assert result.success is True assert call_order == ["source", "effect", "display"] def test_execute_handles_stage_failure(self): """Pipeline.execute handles stage failures.""" pipeline = Pipeline() mock_source = MagicMock(spec=Stage) mock_source.name = "source" mock_source.category = "source" mock_source.stage_type = "source" mock_source.render_order = 0 mock_source.is_overlay = False mock_source.dependencies = set() mock_source.capabilities = {"source"} mock_source.process = lambda data, ctx: "data" mock_failing = MagicMock(spec=Stage) mock_failing.name = "failing" mock_failing.category = "effect" mock_failing.stage_type = "effect" mock_failing.render_order = 0 mock_failing.is_overlay = False mock_failing.dependencies = {"source"} mock_failing.capabilities = {"effect"} mock_failing.optional = False mock_failing.process = lambda data, ctx: (_ for _ in ()).throw( Exception("fail") ) pipeline.add_stage("source", mock_source) pipeline.add_stage("failing", mock_failing) pipeline.build() result = pipeline.execute(None) assert result.success is False assert result.error is not None def test_optional_stage_failure_continues(self): """Pipeline.execute continues on optional stage failure.""" pipeline = Pipeline() mock_source = MagicMock(spec=Stage) mock_source.name = "source" mock_source.category = "source" mock_source.stage_type = "source" mock_source.render_order = 0 mock_source.is_overlay = False mock_source.dependencies = set() mock_source.capabilities = {"source"} mock_source.process = lambda data, ctx: "data" mock_optional = MagicMock(spec=Stage) mock_optional.name = "optional" mock_optional.category = "effect" mock_optional.stage_type = "effect" mock_optional.render_order = 0 mock_optional.is_overlay = False mock_optional.dependencies = {"source"} mock_optional.capabilities = {"effect"} mock_optional.optional = True mock_optional.process = lambda data, ctx: (_ for _ in ()).throw( Exception("fail") ) pipeline.add_stage("source", mock_source) pipeline.add_stage("optional", mock_optional) pipeline.build() result = pipeline.execute(None) assert result.success is True class TestCapabilityBasedDependencies: """Tests for capability-based dependency resolution.""" def test_capability_wildcard_resolution(self): """Pipeline resolves dependencies using wildcard capabilities.""" from engine.pipeline.controller import Pipeline from engine.pipeline.core import Stage class SourceStage(Stage): name = "headlines" category = "source" @property def capabilities(self): return {"source.headlines"} @property def dependencies(self): return set() def process(self, data, ctx): return data class RenderStage(Stage): name = "render" category = "render" @property def capabilities(self): return {"render.output"} @property def dependencies(self): return {"source.*"} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("headlines", SourceStage()) pipeline.add_stage("render", RenderStage()) pipeline.build() assert "headlines" in pipeline.execution_order assert "render" in pipeline.execution_order assert pipeline.execution_order.index( "headlines" ) < pipeline.execution_order.index("render") def test_missing_capability_raises_error(self): """Pipeline raises error when capability is missing.""" from engine.pipeline.controller import Pipeline from engine.pipeline.core import Stage, StageError class RenderStage(Stage): name = "render" category = "render" @property def capabilities(self): return {"render.output"} @property def dependencies(self): return {"source.headlines"} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("render", RenderStage()) try: pipeline.build() raise AssertionError("Should have raised StageError") except StageError as e: assert "Missing capabilities" in e.message assert "source.headlines" in e.message def test_multiple_stages_same_capability(self): """Pipeline uses first registered stage for capability.""" from engine.pipeline.controller import Pipeline from engine.pipeline.core import Stage class SourceA(Stage): name = "headlines" category = "source" @property def capabilities(self): return {"source"} @property def dependencies(self): return set() def process(self, data, ctx): return "A" class SourceB(Stage): name = "poetry" category = "source" @property def capabilities(self): return {"source"} @property def dependencies(self): return set() def process(self, data, ctx): return "B" class DisplayStage(Stage): name = "display" category = "display" @property def capabilities(self): return {"display"} @property def dependencies(self): return {"source"} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("headlines", SourceA()) pipeline.add_stage("poetry", SourceB()) pipeline.add_stage("display", DisplayStage()) pipeline.build() assert pipeline.execution_order[0] == "headlines" class TestPipelineContext: """Tests for PipelineContext.""" def test_init_empty(self): """PipelineContext initializes with empty services and state.""" ctx = PipelineContext() assert ctx.services == {} assert ctx.state == {} def test_init_with_services(self): """PipelineContext accepts initial services.""" ctx = PipelineContext(services={"display": MagicMock()}) assert "display" in ctx.services def test_init_with_state(self): """PipelineContext accepts initial state.""" ctx = PipelineContext(initial_state={"count": 42}) assert ctx.get_state("count") == 42 def test_get_set_services(self): """PipelineContext can get/set services.""" ctx = PipelineContext() mock_service = MagicMock() ctx.set("test_service", mock_service) assert ctx.get("test_service") == mock_service def test_get_set_state(self): """PipelineContext can get/set state.""" ctx = PipelineContext() ctx.set_state("counter", 100) assert ctx.get_state("counter") == 100 def test_lazy_resolver(self): """PipelineContext resolves lazy services.""" ctx = PipelineContext() config = ctx.get("config") assert config is not None def test_has_capability(self): """PipelineContext.has_capability checks for services.""" ctx = PipelineContext(services={"display.output": MagicMock()}) assert ctx.has_capability("display.output") is True assert ctx.has_capability("missing") is False class TestCreateDefaultPipeline: """Tests for create_default_pipeline function.""" def setup_method(self): """Reset registry before each test.""" StageRegistry._discovered = False StageRegistry._categories.clear() StageRegistry._instances.clear() discover_stages() def test_create_default_pipeline(self): """create_default_pipeline creates a working pipeline.""" pipeline = create_default_pipeline() assert pipeline is not None assert "display" in pipeline.stages class TestPipelineParams: """Tests for PipelineParams.""" def test_default_values(self): """PipelineParams has correct defaults.""" from engine.pipeline.params import PipelineParams params = PipelineParams() assert params.source == "headlines" assert params.display == "terminal" assert params.camera_mode == "vertical" assert params.effect_order == ["noise", "fade", "glitch", "firehose"] def test_effect_config(self): """PipelineParams effect config methods work.""" from engine.pipeline.params import PipelineParams params = PipelineParams() enabled, intensity = params.get_effect_config("noise") assert enabled is True assert intensity == 1.0 params.set_effect_config("noise", False, 0.5) enabled, intensity = params.get_effect_config("noise") assert enabled is False assert intensity == 0.5 def test_is_effect_enabled(self): """PipelineParams is_effect_enabled works.""" from engine.pipeline.params import PipelineParams params = PipelineParams() assert params.is_effect_enabled("noise") is True params.effect_enabled["noise"] = False assert params.is_effect_enabled("noise") is False def test_to_dict_from_dict(self): """PipelineParams serialization roundtrip works.""" from engine.pipeline.params import PipelineParams params = PipelineParams() params.viewport_width = 100 params.viewport_height = 50 data = params.to_dict() restored = PipelineParams.from_dict(data) assert restored.viewport_width == 100 assert restored.viewport_height == 50 def test_copy(self): """PipelineParams copy works.""" from engine.pipeline.params import PipelineParams params = PipelineParams() params.viewport_width = 100 params.effect_enabled["noise"] = False copy = params.copy() assert copy.viewport_width == 100 assert copy.effect_enabled["noise"] is False class TestPipelinePresets: """Tests for pipeline presets.""" def test_presets_defined(self): """All expected presets are defined.""" from engine.pipeline.presets import ( DEMO_PRESET, FIREHOSE_PRESET, PIPELINE_VIZ_PRESET, POETRY_PRESET, UI_PRESET, WEBSOCKET_PRESET, ) assert DEMO_PRESET.name == "demo" assert POETRY_PRESET.name == "poetry" assert FIREHOSE_PRESET.name == "firehose" assert PIPELINE_VIZ_PRESET.name == "pipeline" assert WEBSOCKET_PRESET.name == "websocket" assert UI_PRESET.name == "ui" def test_preset_to_params(self): """Presets convert to PipelineParams correctly.""" from engine.pipeline.presets import DEMO_PRESET params = DEMO_PRESET.to_params() assert params.source == "headlines" assert params.display == "pygame" assert "noise" in params.effect_order def test_list_presets(self): """list_presets returns all presets.""" from engine.pipeline.presets import list_presets presets = list_presets() assert "demo" in presets assert "poetry" in presets assert "firehose" in presets def test_get_preset(self): """get_preset returns correct preset.""" from engine.pipeline.presets import get_preset preset = get_preset("demo") assert preset is not None assert preset.name == "demo" assert get_preset("nonexistent") is None class TestStageAdapters: """Tests for pipeline stage adapters.""" def test_display_stage_init(self): """DisplayStage.init initializes display.""" from engine.display.backends.null import NullDisplay from engine.pipeline.adapters import DisplayStage from engine.pipeline.core import PipelineContext from engine.pipeline.params import PipelineParams display = NullDisplay() stage = DisplayStage(display, name="null") ctx = PipelineContext() ctx.params = PipelineParams() result = stage.init(ctx) assert result is True def test_display_stage_process(self): """DisplayStage.process forwards to display.""" from engine.display.backends.null import NullDisplay from engine.pipeline.adapters import DisplayStage from engine.pipeline.core import PipelineContext from engine.pipeline.params import PipelineParams display = NullDisplay() stage = DisplayStage(display, name="null") ctx = PipelineContext() ctx.params = PipelineParams() stage.init(ctx) buffer = ["line1", "line2"] result = stage.process(buffer, ctx) assert result == buffer def test_camera_stage(self): """CameraStage applies camera transform.""" from engine.camera import Camera, CameraMode from engine.pipeline.adapters import CameraStage from engine.pipeline.core import PipelineContext camera = Camera(mode=CameraMode.FEED) stage = CameraStage(camera, name="vertical") PipelineContext() assert "camera" in stage.capabilities assert "render.output" in stage.dependencies # Depends on rendered content def test_camera_stage_does_not_error_on_process(self): """CameraStage.process should not error when setting viewport. Regression test: Previously CameraStage tried to set viewport_width and viewport_height as writable properties, but they are computed from canvas_size / zoom. This caused an AttributeError each frame. """ from engine.camera import Camera, CameraMode from engine.pipeline.adapters import CameraStage from engine.pipeline.core import PipelineContext from engine.pipeline.params import PipelineParams camera = Camera(mode=CameraMode.FEED) stage = CameraStage(camera, name="vertical") ctx = PipelineContext() ctx.params = PipelineParams(viewport_width=80, viewport_height=24) buffer = ["line" + str(i) for i in range(24)] # This should not raise AttributeError result = stage.process(buffer, ctx) # Should return the buffer (unchanged for FEED mode) assert result is not None assert len(result) == 24 class TestDataSourceStage: """Tests for DataSourceStage adapter.""" def test_datasource_stage_capabilities(self): """DataSourceStage declares correct capabilities.""" from engine.data_sources.sources import HeadlinesDataSource from engine.pipeline.adapters import DataSourceStage source = HeadlinesDataSource() stage = DataSourceStage(source, name="headlines") assert "source.headlines" in stage.capabilities def test_datasource_stage_process(self): """DataSourceStage fetches from DataSource.""" from unittest.mock import patch from engine.data_sources.sources import HeadlinesDataSource from engine.pipeline.adapters import DataSourceStage from engine.pipeline.core import PipelineContext mock_items = [ ("Test Headline 1", "TestSource", "12:00"), ("Test Headline 2", "TestSource", "12:01"), ] with patch("engine.fetch.fetch_all", return_value=(mock_items, 1, 0)): source = HeadlinesDataSource() stage = DataSourceStage(source, name="headlines") result = stage.process(None, PipelineContext()) assert result is not None assert isinstance(result, list) class TestEffectPluginStage: """Tests for EffectPluginStage adapter.""" def test_effect_stage_capabilities(self): """EffectPluginStage declares correct capabilities.""" from engine.effects.types import EffectPlugin from engine.pipeline.adapters import EffectPluginStage class TestEffect(EffectPlugin): name = "test" def process(self, buf, ctx): return buf def configure(self, config): pass effect = TestEffect() stage = EffectPluginStage(effect, name="test") assert "effect.test" in stage.capabilities def test_effect_stage_with_sensor_bindings(self): """EffectPluginStage applies sensor param bindings.""" from engine.effects.types import EffectConfig, EffectPlugin from engine.pipeline.adapters import EffectPluginStage from engine.pipeline.core import PipelineContext from engine.pipeline.params import PipelineParams class SensorDrivenEffect(EffectPlugin): name = "sensor_effect" config = EffectConfig(intensity=1.0) param_bindings = { "intensity": {"sensor": "mic", "transform": "linear"}, } def process(self, buf, ctx): return buf def configure(self, config): pass effect = SensorDrivenEffect() stage = EffectPluginStage(effect, name="sensor_effect") ctx = PipelineContext() ctx.params = PipelineParams() ctx.set_state("sensor.mic", 0.5) result = stage.process(["test"], ctx) assert result == ["test"] class TestFullPipeline: """End-to-end tests for the full pipeline.""" def test_pipeline_circular_dependency_detection(self): """Pipeline detects circular dependencies.""" from engine.pipeline.controller import Pipeline from engine.pipeline.core import Stage class StageA(Stage): name = "a" @property def capabilities(self): return {"a"} @property def dependencies(self): return {"b"} def process(self, data, ctx): return data class StageB(Stage): name = "b" @property def capabilities(self): return {"b"} @property def dependencies(self): return {"a"} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("a", StageA()) pipeline.add_stage("b", StageB()) try: pipeline.build() raise AssertionError("Should detect circular dependency") except Exception: pass class TestPipelineMetrics: """Tests for pipeline metrics collection.""" def test_metrics_collected(self): """Pipeline collects metrics when enabled.""" from engine.pipeline.controller import Pipeline, PipelineConfig from engine.pipeline.core import Stage class DummyStage(Stage): name = "dummy" category = "test" def process(self, data, ctx): return data config = PipelineConfig(enable_metrics=True) pipeline = Pipeline(config=config) pipeline.add_stage("dummy", DummyStage()) pipeline.build() pipeline.execute("test_data") summary = pipeline.get_metrics_summary() assert "pipeline" in summary assert summary["frame_count"] == 1 def test_metrics_disabled(self): """Pipeline skips metrics when disabled.""" from engine.pipeline.controller import Pipeline, PipelineConfig from engine.pipeline.core import Stage class DummyStage(Stage): name = "dummy" category = "test" def process(self, data, ctx): return data config = PipelineConfig(enable_metrics=False) pipeline = Pipeline(config=config) pipeline.add_stage("dummy", DummyStage()) pipeline.build() pipeline.execute("test_data") summary = pipeline.get_metrics_summary() assert "error" in summary def test_reset_metrics(self): """Pipeline.reset_metrics clears collected metrics.""" from engine.pipeline.controller import Pipeline, PipelineConfig from engine.pipeline.core import Stage class DummyStage(Stage): name = "dummy" category = "test" def process(self, data, ctx): return data config = PipelineConfig(enable_metrics=True) pipeline = Pipeline(config=config) pipeline.add_stage("dummy", DummyStage()) pipeline.build() pipeline.execute("test1") pipeline.execute("test2") assert pipeline.get_metrics_summary()["frame_count"] == 2 pipeline.reset_metrics() # After reset, metrics collection starts fresh pipeline.execute("test3") assert pipeline.get_metrics_summary()["frame_count"] == 1 class TestOverlayStages: """Tests for overlay stage support.""" def test_stage_is_overlay_property(self): """Stage has is_overlay property defaulting to False.""" from engine.pipeline.core import Stage class TestStage(Stage): name = "test" category = "effect" def process(self, data, ctx): return data stage = TestStage() assert stage.is_overlay is False def test_stage_render_order_property(self): """Stage has render_order property defaulting to 0.""" from engine.pipeline.core import Stage class TestStage(Stage): name = "test" category = "effect" def process(self, data, ctx): return data stage = TestStage() assert stage.render_order == 0 def test_stage_stage_type_property(self): """Stage has stage_type property defaulting to category.""" from engine.pipeline.core import Stage class TestStage(Stage): name = "test" category = "effect" def process(self, data, ctx): return data stage = TestStage() assert stage.stage_type == "effect" def test_pipeline_get_overlay_stages(self): """Pipeline.get_overlay_stages returns overlay stages sorted by render_order.""" from engine.pipeline.controller import Pipeline from engine.pipeline.core import Stage class OverlayStageA(Stage): name = "overlay_a" category = "overlay" @property def is_overlay(self): return True @property def render_order(self): return 10 def process(self, data, ctx): return data class OverlayStageB(Stage): name = "overlay_b" category = "overlay" @property def is_overlay(self): return True @property def render_order(self): return 5 def process(self, data, ctx): return data class RegularStage(Stage): name = "regular" category = "effect" def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("overlay_a", OverlayStageA()) pipeline.add_stage("overlay_b", OverlayStageB()) pipeline.add_stage("regular", RegularStage()) pipeline.build() overlays = pipeline.get_overlay_stages() assert len(overlays) == 2 # Should be sorted by render_order assert overlays[0].name == "overlay_b" # render_order=5 assert overlays[1].name == "overlay_a" # render_order=10 def test_pipeline_executes_overlays_after_regular(self): """Pipeline executes overlays after regular stages.""" from engine.pipeline.controller import Pipeline from engine.pipeline.core import Stage call_order = [] class RegularStage(Stage): name = "regular" category = "effect" def process(self, data, ctx): call_order.append("regular") return data class OverlayStage(Stage): name = "overlay" category = "overlay" @property def is_overlay(self): return True @property def render_order(self): return 100 def process(self, data, ctx): call_order.append("overlay") return data pipeline = Pipeline() pipeline.add_stage("regular", RegularStage()) pipeline.add_stage("overlay", OverlayStage()) pipeline.build() pipeline.execute("data") assert call_order == ["regular", "overlay"] def test_effect_plugin_stage_hud_is_overlay(self): """EffectPluginStage marks HUD as overlay.""" from engine.effects.types import EffectConfig, EffectPlugin from engine.pipeline.adapters import EffectPluginStage class HudEffect(EffectPlugin): name = "hud" config = EffectConfig(enabled=True) def process(self, buf, ctx): return buf def configure(self, config): pass stage = EffectPluginStage(HudEffect(), name="hud") assert stage.is_overlay is True assert stage.stage_type == "overlay" assert stage.render_order == 100 def test_effect_plugin_stage_non_hud_not_overlay(self): """EffectPluginStage marks non-HUD effects as not overlay.""" from engine.effects.types import EffectConfig, EffectPlugin from engine.pipeline.adapters import EffectPluginStage class FadeEffect(EffectPlugin): name = "fade" config = EffectConfig(enabled=True) def process(self, buf, ctx): return buf def configure(self, config): pass stage = EffectPluginStage(FadeEffect(), name="fade") assert stage.is_overlay is False assert stage.stage_type == "effect" assert stage.render_order == 0 def test_pipeline_get_stage_type(self): """Pipeline.get_stage_type returns stage_type for a stage.""" from engine.pipeline.controller import Pipeline from engine.pipeline.core import Stage class TestStage(Stage): name = "test" category = "effect" @property def stage_type(self): return "overlay" def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("test", TestStage()) pipeline.build() assert pipeline.get_stage_type("test") == "overlay" def test_pipeline_get_render_order(self): """Pipeline.get_render_order returns render_order for a stage.""" from engine.pipeline.controller import Pipeline from engine.pipeline.core import Stage class TestStage(Stage): name = "test" category = "effect" @property def render_order(self): return 42 def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("test", TestStage()) pipeline.build() assert pipeline.get_render_order("test") == 42 class TestInletOutletTypeValidation: """Test type validation between connected stages.""" def test_type_mismatch_raises_error(self): """Type mismatch between stages raises StageError.""" class ProducerStage(Stage): name = "producer" category = "test" @property def inlet_types(self): return {DataType.NONE} @property def outlet_types(self): return {DataType.SOURCE_ITEMS} def process(self, data, ctx): return data class ConsumerStage(Stage): name = "consumer" category = "test" @property def dependencies(self): return {"test.producer"} @property def inlet_types(self): return {DataType.TEXT_BUFFER} # Incompatible! @property def outlet_types(self): return {DataType.TEXT_BUFFER} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("producer", ProducerStage()) pipeline.add_stage("consumer", ConsumerStage()) with pytest.raises(StageError) as exc_info: pipeline.build() assert "Type mismatch" in str(exc_info.value) assert "TEXT_BUFFER" in str(exc_info.value) assert "SOURCE_ITEMS" in str(exc_info.value) def test_compatible_types_pass_validation(self): """Compatible types pass validation.""" class ProducerStage(Stage): name = "producer" category = "test" @property def inlet_types(self): return {DataType.NONE} @property def outlet_types(self): return {DataType.SOURCE_ITEMS} def process(self, data, ctx): return data class ConsumerStage(Stage): name = "consumer" category = "test" @property def dependencies(self): return {"test.producer"} @property def inlet_types(self): return {DataType.SOURCE_ITEMS} # Compatible! @property def outlet_types(self): return {DataType.TEXT_BUFFER} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("producer", ProducerStage()) pipeline.add_stage("consumer", ConsumerStage()) # Should not raise pipeline.build() def test_any_type_accepts_everything(self): """DataType.ANY accepts any upstream type.""" class ProducerStage(Stage): name = "producer" category = "test" @property def inlet_types(self): return {DataType.NONE} @property def outlet_types(self): return {DataType.SOURCE_ITEMS} def process(self, data, ctx): return data class ConsumerStage(Stage): name = "consumer" category = "test" @property def dependencies(self): return {"test.producer"} @property def inlet_types(self): return {DataType.ANY} # Accepts anything @property def outlet_types(self): return {DataType.TEXT_BUFFER} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("producer", ProducerStage()) pipeline.add_stage("consumer", ConsumerStage()) # Should not raise because consumer accepts ANY pipeline.build() def test_multiple_compatible_types(self): """Stage can declare multiple inlet types.""" class ProducerStage(Stage): name = "producer" category = "test" @property def inlet_types(self): return {DataType.NONE} @property def outlet_types(self): return {DataType.SOURCE_ITEMS} def process(self, data, ctx): return data class ConsumerStage(Stage): name = "consumer" category = "test" @property def dependencies(self): return {"test.producer"} @property def inlet_types(self): return {DataType.SOURCE_ITEMS, DataType.TEXT_BUFFER} @property def outlet_types(self): return {DataType.TEXT_BUFFER} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("producer", ProducerStage()) pipeline.add_stage("consumer", ConsumerStage()) # Should not raise because consumer accepts SOURCE_ITEMS pipeline.build() def test_display_must_accept_text_buffer(self): """Display stages must accept TEXT_BUFFER type.""" class BadDisplayStage(Stage): name = "display" category = "display" @property def inlet_types(self): return {DataType.SOURCE_ITEMS} # Wrong type for display! @property def outlet_types(self): return {DataType.NONE} def process(self, data, ctx): return data pipeline = Pipeline() pipeline.add_stage("display", BadDisplayStage()) with pytest.raises(StageError) as exc_info: pipeline.build() assert "display" in str(exc_info.value).lower() class TestPipelineMutation: """Tests for Pipeline Mutation API - dynamic stage modification.""" def setup_method(self): """Set up test fixtures.""" StageRegistry._discovered = False StageRegistry._categories.clear() StageRegistry._instances.clear() discover_stages() def _create_mock_stage( self, name: str = "test", category: str = "test", capabilities: set | None = None, dependencies: set | None = None, ): """Helper to create a mock stage.""" from engine.pipeline.core import DataType mock = MagicMock(spec=Stage) mock.name = name mock.category = category mock.stage_type = category mock.render_order = 0 mock.is_overlay = False mock.inlet_types = {DataType.ANY} mock.outlet_types = {DataType.TEXT_BUFFER} mock.capabilities = capabilities or {f"{category}.{name}"} mock.dependencies = dependencies or set() mock.process = lambda data, ctx: data mock.init = MagicMock(return_value=True) mock.cleanup = MagicMock() mock.is_enabled = MagicMock(return_value=True) mock.set_enabled = MagicMock() mock._enabled = True return mock def test_add_stage_initializes_when_pipeline_initialized(self): """add_stage() initializes stage when pipeline already initialized.""" pipeline = Pipeline() mock_stage = self._create_mock_stage("test") pipeline.build() pipeline._initialized = True pipeline.add_stage("test", mock_stage, initialize=True) mock_stage.init.assert_called_once() def test_add_stage_skips_initialize_when_pipeline_not_initialized(self): """add_stage() skips initialization when pipeline not built.""" pipeline = Pipeline() mock_stage = self._create_mock_stage("test") pipeline.add_stage("test", mock_stage, initialize=False) mock_stage.init.assert_not_called() def test_remove_stage_returns_removed_stage(self): """remove_stage() returns the removed stage.""" pipeline = Pipeline() mock_stage = self._create_mock_stage("test") pipeline.add_stage("test", mock_stage, initialize=False) removed = pipeline.remove_stage("test", cleanup=False) assert removed is mock_stage assert "test" not in pipeline.stages def test_remove_stage_calls_cleanup_when_requested(self): """remove_stage() calls cleanup when cleanup=True.""" pipeline = Pipeline() mock_stage = self._create_mock_stage("test") pipeline.add_stage("test", mock_stage, initialize=False) pipeline.remove_stage("test", cleanup=True) mock_stage.cleanup.assert_called_once() def test_remove_stage_skips_cleanup_when_requested(self): """remove_stage() skips cleanup when cleanup=False.""" pipeline = Pipeline() mock_stage = self._create_mock_stage("test") pipeline.add_stage("test", mock_stage, initialize=False) pipeline.remove_stage("test", cleanup=False) mock_stage.cleanup.assert_not_called() def test_remove_nonexistent_stage_returns_none(self): """remove_stage() returns None for nonexistent stage.""" pipeline = Pipeline() result = pipeline.remove_stage("nonexistent", cleanup=False) assert result is None def test_replace_stage_preserves_state(self): """replace_stage() copies _enabled from old to new stage.""" pipeline = Pipeline() old_stage = self._create_mock_stage("test") old_stage._enabled = False new_stage = self._create_mock_stage("test") pipeline.add_stage("test", old_stage, initialize=False) pipeline.replace_stage("test", new_stage, preserve_state=True) assert new_stage._enabled is False old_stage.cleanup.assert_called_once() new_stage.init.assert_called_once() def test_replace_stage_without_preserving_state(self): """replace_stage() without preserve_state doesn't copy state.""" pipeline = Pipeline() old_stage = self._create_mock_stage("test") old_stage._enabled = False new_stage = self._create_mock_stage("test") new_stage._enabled = True pipeline.add_stage("test", old_stage, initialize=False) pipeline.replace_stage("test", new_stage, preserve_state=False) assert new_stage._enabled is True def test_replace_nonexistent_stage_returns_none(self): """replace_stage() returns None for nonexistent stage.""" pipeline = Pipeline() mock_stage = self._create_mock_stage("test") result = pipeline.replace_stage("nonexistent", mock_stage) assert result is None def test_swap_stages_swaps_stages(self): """swap_stages() swaps two stages.""" pipeline = Pipeline() stage_a = self._create_mock_stage("stage_a", "a") stage_b = self._create_mock_stage("stage_b", "b") pipeline.add_stage("a", stage_a, initialize=False) pipeline.add_stage("b", stage_b, initialize=False) result = pipeline.swap_stages("a", "b") assert result is True assert pipeline.stages["a"].name == "stage_b" assert pipeline.stages["b"].name == "stage_a" def test_swap_stages_fails_for_nonexistent(self): """swap_stages() fails if either stage doesn't exist.""" pipeline = Pipeline() stage = self._create_mock_stage("test") pipeline.add_stage("test", stage, initialize=False) result = pipeline.swap_stages("test", "nonexistent") assert result is False def test_move_stage_after(self): """move_stage() moves stage after another.""" pipeline = Pipeline() stage_a = self._create_mock_stage("a") stage_b = self._create_mock_stage("b") stage_c = self._create_mock_stage("c") pipeline.add_stage("a", stage_a, initialize=False) pipeline.add_stage("b", stage_b, initialize=False) pipeline.add_stage("c", stage_c, initialize=False) pipeline.build() result = pipeline.move_stage("a", after="c") assert result is True idx_a = pipeline.execution_order.index("a") idx_c = pipeline.execution_order.index("c") assert idx_a > idx_c def test_move_stage_before(self): """move_stage() moves stage before another.""" pipeline = Pipeline() stage_a = self._create_mock_stage("a") stage_b = self._create_mock_stage("b") stage_c = self._create_mock_stage("c") pipeline.add_stage("a", stage_a, initialize=False) pipeline.add_stage("b", stage_b, initialize=False) pipeline.add_stage("c", stage_c, initialize=False) pipeline.build() result = pipeline.move_stage("c", before="a") assert result is True idx_a = pipeline.execution_order.index("a") idx_c = pipeline.execution_order.index("c") assert idx_c < idx_a def test_move_stage_fails_for_nonexistent(self): """move_stage() fails if stage doesn't exist.""" pipeline = Pipeline() stage = self._create_mock_stage("test") pipeline.add_stage("test", stage, initialize=False) pipeline.build() result = pipeline.move_stage("nonexistent", after="test") assert result is False def test_move_stage_fails_when_not_initialized(self): """move_stage() fails if pipeline not built.""" pipeline = Pipeline() stage = self._create_mock_stage("test") pipeline.add_stage("test", stage, initialize=False) result = pipeline.move_stage("test", after="other") assert result is False def test_enable_stage(self): """enable_stage() enables a stage.""" pipeline = Pipeline() stage = self._create_mock_stage("test") pipeline.add_stage("test", stage, initialize=False) result = pipeline.enable_stage("test") assert result is True stage.set_enabled.assert_called_with(True) def test_enable_nonexistent_stage_returns_false(self): """enable_stage() returns False for nonexistent stage.""" pipeline = Pipeline() result = pipeline.enable_stage("nonexistent") assert result is False def test_disable_stage(self): """disable_stage() disables a stage.""" pipeline = Pipeline() stage = self._create_mock_stage("test") pipeline.add_stage("test", stage, initialize=False) result = pipeline.disable_stage("test") assert result is True stage.set_enabled.assert_called_with(False) def test_disable_nonexistent_stage_returns_false(self): """disable_stage() returns False for nonexistent stage.""" pipeline = Pipeline() result = pipeline.disable_stage("nonexistent") assert result is False def test_get_stage_info_returns_correct_info(self): """get_stage_info() returns correct stage information.""" pipeline = Pipeline() stage = self._create_mock_stage( "test_stage", "effect", capabilities={"effect.test"}, dependencies={"source"}, ) stage.render_order = 5 stage.is_overlay = False stage.optional = True pipeline.add_stage("test", stage, initialize=False) info = pipeline.get_stage_info("test") assert info is not None assert info["name"] == "test" # Dict key, not stage.name assert info["category"] == "effect" assert info["stage_type"] == "effect" assert info["enabled"] is True assert info["optional"] is True assert info["capabilities"] == ["effect.test"] assert info["dependencies"] == ["source"] assert info["render_order"] == 5 assert info["is_overlay"] is False def test_get_stage_info_returns_none_for_nonexistent(self): """get_stage_info() returns None for nonexistent stage.""" pipeline = Pipeline() info = pipeline.get_stage_info("nonexistent") assert info is None def test_get_pipeline_info_returns_complete_info(self): """get_pipeline_info() returns complete pipeline state.""" pipeline = Pipeline() stage1 = self._create_mock_stage("stage1") stage2 = self._create_mock_stage("stage2") pipeline.add_stage("s1", stage1, initialize=False) pipeline.add_stage("s2", stage2, initialize=False) pipeline.build() info = pipeline.get_pipeline_info() assert "stages" in info assert "execution_order" in info assert info["initialized"] is True assert info["stage_count"] == 2 assert "s1" in info["stages"] assert "s2" in info["stages"] def test_rebuild_after_mutation(self): """_rebuild() updates execution order after mutation.""" pipeline = Pipeline() source = self._create_mock_stage( "source", "source", capabilities={"source"}, dependencies=set() ) effect = self._create_mock_stage( "effect", "effect", capabilities={"effect"}, dependencies={"source"} ) display = self._create_mock_stage( "display", "display", capabilities={"display"}, dependencies={"effect"} ) pipeline.add_stage("source", source, initialize=False) pipeline.add_stage("effect", effect, initialize=False) pipeline.add_stage("display", display, initialize=False) pipeline.build() assert pipeline.execution_order == ["source", "effect", "display"] pipeline.remove_stage("effect", cleanup=False) pipeline._rebuild() assert "effect" not in pipeline.execution_order assert "source" in pipeline.execution_order assert "display" in pipeline.execution_order def test_add_stage_after_build(self): """add_stage() can add stage after build with initialization.""" pipeline = Pipeline() source = self._create_mock_stage( "source", "source", capabilities={"source"}, dependencies=set() ) display = self._create_mock_stage( "display", "display", capabilities={"display"}, dependencies={"source"} ) pipeline.add_stage("source", source, initialize=False) pipeline.add_stage("display", display, initialize=False) pipeline.build() new_stage = self._create_mock_stage( "effect", "effect", capabilities={"effect"}, dependencies={"source"} ) pipeline.add_stage("effect", new_stage, initialize=True) assert "effect" in pipeline.stages new_stage.init.assert_called_once() def test_mutation_preserves_execution_for_remaining_stages(self): """Removing a stage doesn't break execution of remaining stages.""" from engine.pipeline.core import DataType call_log = [] class TestSource(Stage): name = "source" category = "source" @property def inlet_types(self): return {DataType.NONE} @property def outlet_types(self): return {DataType.SOURCE_ITEMS} @property def capabilities(self): return {"source"} @property def dependencies(self): return set() def process(self, data, ctx): call_log.append("source") return ["item"] class TestEffect(Stage): name = "effect" category = "effect" @property def inlet_types(self): return {DataType.SOURCE_ITEMS} @property def outlet_types(self): return {DataType.TEXT_BUFFER} @property def capabilities(self): return {"effect"} @property def dependencies(self): return {"source"} def process(self, data, ctx): call_log.append("effect") return data class TestDisplay(Stage): name = "display" category = "display" @property def inlet_types(self): return {DataType.TEXT_BUFFER} @property def outlet_types(self): return {DataType.NONE} @property def capabilities(self): return {"display"} @property def dependencies(self): return {"effect"} def process(self, data, ctx): call_log.append("display") return data pipeline = Pipeline() pipeline.add_stage("source", TestSource(), initialize=False) pipeline.add_stage("effect", TestEffect(), initialize=False) pipeline.add_stage("display", TestDisplay(), initialize=False) pipeline.build() pipeline.initialize() result = pipeline.execute(None) assert result.success assert call_log == ["source", "effect", "display"] call_log.clear() pipeline.remove_stage("effect", cleanup=True) pipeline._rebuild() result = pipeline.execute(None) assert result.success assert call_log == ["source", "display"]