diff --git a/engine/pipeline/adapters.py b/engine/pipeline/adapters.py index 7a9ba3c..ac9196c 100644 --- a/engine/pipeline/adapters.py +++ b/engine/pipeline/adapters.py @@ -56,6 +56,18 @@ class EffectPluginStage(Stage): def dependencies(self) -> set[str]: return set() + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} + def process(self, data: Any, ctx: PipelineContext) -> Any: """Process data through the effect.""" if data is None: @@ -113,6 +125,18 @@ class DisplayStage(Stage): def dependencies(self) -> set[str]: return {"render.output"} # Display needs rendered content + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} # Display consumes rendered text + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.NONE} # Display is a terminal stage (no output) + def init(self, ctx: PipelineContext) -> bool: w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 @@ -146,6 +170,18 @@ class DataSourceStage(Stage): def dependencies(self) -> set[str]: return set() + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.NONE} # Sources don't take input + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.SOURCE_ITEMS} + def process(self, data: Any, ctx: PipelineContext) -> Any: """Fetch data from source.""" if hasattr(self._source, "get_items"): @@ -177,6 +213,18 @@ class PassthroughStage(Stage): def dependencies(self) -> set[str]: return {"source"} + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.SOURCE_ITEMS} + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.SOURCE_ITEMS} + def process(self, data: Any, ctx: PipelineContext) -> Any: """Pass data through unchanged.""" return data @@ -206,6 +254,18 @@ class SourceItemsToBufferStage(Stage): def dependencies(self) -> set[str]: return {"source"} + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.SOURCE_ITEMS} + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} + def process(self, data: Any, ctx: PipelineContext) -> Any: """Convert SourceItem list to text buffer.""" if data is None: @@ -258,6 +318,18 @@ class CameraStage(Stage): "source" } # Prefix match any source (source.headlines, source.poetry, etc.) + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} # Camera works on rendered text + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} + def process(self, data: Any, ctx: PipelineContext) -> Any: """Apply camera transformation to data.""" if data is None: @@ -317,6 +389,18 @@ class FontStage(Stage): def dependencies(self) -> set[str]: return {"source"} + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.SOURCE_ITEMS} + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} + def init(self, ctx: PipelineContext) -> bool: """Initialize font from config or path.""" from engine import config @@ -404,6 +488,12 @@ class ImageToTextStage(Stage): def stage_type(self) -> str: return "transform" + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.PIL_IMAGE} # Accepts PIL Image objects or ImageItem + @property def outlet_types(self) -> set: from engine.pipeline.core import DataType diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 65109ba..6717462 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -4,6 +4,8 @@ Tests for the new unified pipeline architecture. from unittest.mock import MagicMock +import pytest + from engine.pipeline import ( Pipeline, PipelineConfig, @@ -13,6 +15,7 @@ from engine.pipeline import ( create_default_pipeline, discover_stages, ) +from engine.pipeline.core import DataType, StageError class TestStageRegistry: @@ -1066,3 +1069,214 @@ class TestOverlayStages: pipeline.build() assert pipeline.get_render_order("test") == 42 + + +class TestInletOutletTypeValidation: + """Test type validation between connected stages.""" + + def test_type_mismatch_raises_error(self): + """Type mismatch between stages raises StageError.""" + + class ProducerStage(Stage): + name = "producer" + category = "test" + + @property + def inlet_types(self): + return {DataType.NONE} + + @property + def outlet_types(self): + return {DataType.SOURCE_ITEMS} + + def process(self, data, ctx): + return data + + class ConsumerStage(Stage): + name = "consumer" + category = "test" + + @property + def dependencies(self): + return {"test.producer"} + + @property + def inlet_types(self): + return {DataType.TEXT_BUFFER} # Incompatible! + + @property + def outlet_types(self): + return {DataType.TEXT_BUFFER} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("producer", ProducerStage()) + pipeline.add_stage("consumer", ConsumerStage()) + + with pytest.raises(StageError) as exc_info: + pipeline.build() + + assert "Type mismatch" in str(exc_info.value) + assert "TEXT_BUFFER" in str(exc_info.value) + assert "SOURCE_ITEMS" in str(exc_info.value) + + def test_compatible_types_pass_validation(self): + """Compatible types pass validation.""" + + class ProducerStage(Stage): + name = "producer" + category = "test" + + @property + def inlet_types(self): + return {DataType.NONE} + + @property + def outlet_types(self): + return {DataType.SOURCE_ITEMS} + + def process(self, data, ctx): + return data + + class ConsumerStage(Stage): + name = "consumer" + category = "test" + + @property + def dependencies(self): + return {"test.producer"} + + @property + def inlet_types(self): + return {DataType.SOURCE_ITEMS} # Compatible! + + @property + def outlet_types(self): + return {DataType.TEXT_BUFFER} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("producer", ProducerStage()) + pipeline.add_stage("consumer", ConsumerStage()) + + # Should not raise + pipeline.build() + + def test_any_type_accepts_everything(self): + """DataType.ANY accepts any upstream type.""" + + class ProducerStage(Stage): + name = "producer" + category = "test" + + @property + def inlet_types(self): + return {DataType.NONE} + + @property + def outlet_types(self): + return {DataType.SOURCE_ITEMS} + + def process(self, data, ctx): + return data + + class ConsumerStage(Stage): + name = "consumer" + category = "test" + + @property + def dependencies(self): + return {"test.producer"} + + @property + def inlet_types(self): + return {DataType.ANY} # Accepts anything + + @property + def outlet_types(self): + return {DataType.TEXT_BUFFER} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("producer", ProducerStage()) + pipeline.add_stage("consumer", ConsumerStage()) + + # Should not raise because consumer accepts ANY + pipeline.build() + + def test_multiple_compatible_types(self): + """Stage can declare multiple inlet types.""" + + class ProducerStage(Stage): + name = "producer" + category = "test" + + @property + def inlet_types(self): + return {DataType.NONE} + + @property + def outlet_types(self): + return {DataType.SOURCE_ITEMS} + + def process(self, data, ctx): + return data + + class ConsumerStage(Stage): + name = "consumer" + category = "test" + + @property + def dependencies(self): + return {"test.producer"} + + @property + def inlet_types(self): + return {DataType.SOURCE_ITEMS, DataType.TEXT_BUFFER} + + @property + def outlet_types(self): + return {DataType.TEXT_BUFFER} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("producer", ProducerStage()) + pipeline.add_stage("consumer", ConsumerStage()) + + # Should not raise because consumer accepts SOURCE_ITEMS + pipeline.build() + + def test_display_must_accept_text_buffer(self): + """Display stages must accept TEXT_BUFFER type.""" + + class BadDisplayStage(Stage): + name = "display" + category = "display" + + @property + def inlet_types(self): + return {DataType.SOURCE_ITEMS} # Wrong type for display! + + @property + def outlet_types(self): + return {DataType.NONE} + + def process(self, data, ctx): + return data + + pipeline = Pipeline() + pipeline.add_stage("display", BadDisplayStage()) + + with pytest.raises(StageError) as exc_info: + pipeline.build() + + assert "display" in str(exc_info.value).lower() + assert "TEXT_BUFFER" in str(exc_info.value)