diff --git a/engine/config.py b/engine/config.py index bf227ba..f7f86a3 100644 --- a/engine/config.py +++ b/engine/config.py @@ -129,7 +129,7 @@ class Config: script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths) - display: str = "terminal" + display: str = "pygame" websocket: bool = False websocket_port: int = 8765 @@ -237,7 +237,7 @@ GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ" # ─── WEBSOCKET ───────────────────────────────────────────── -DISPLAY = _arg_value("--display", sys.argv) or "terminal" +DISPLAY = _arg_value("--display", sys.argv) or "pygame" WEBSOCKET = "--websocket" in sys.argv WEBSOCKET_PORT = _arg_int("--websocket-port", 8765) diff --git a/engine/pipeline/adapters.py b/engine/pipeline/adapters.py index 6636760..35a0cab 100644 --- a/engine/pipeline/adapters.py +++ b/engine/pipeline/adapters.py @@ -183,7 +183,7 @@ class DisplayStage(Stage): def init(self, ctx: PipelineContext) -> bool: w = ctx.params.viewport_width if ctx.params else 80 h = ctx.params.viewport_height if ctx.params else 24 - result = self._display.init(w, h) + result = self._display.init(w, h, reuse=False) return result is not False def process(self, data: Any, ctx: PipelineContext) -> Any: diff --git a/engine/pipeline/controller.py b/engine/pipeline/controller.py index 11e39d1..b9bc92a 100644 --- a/engine/pipeline/controller.py +++ b/engine/pipeline/controller.py @@ -303,18 +303,14 @@ def create_pipeline_from_params(params: PipelineParams) -> Pipeline: def create_default_pipeline() -> Pipeline: """Create a default pipeline with all standard components.""" + from engine.pipeline.adapters import DataSourceStage + from engine.sources_v2 import HeadlinesDataSource + pipeline = Pipeline() - # Add source stage - source = StageRegistry.create("source", "headlines") - if source: - pipeline.add_stage("source", source) - - # Add effect stages - for effect_name in ["noise", "fade", "glitch", "firehose", "hud"]: - effect = StageRegistry.create("effect", effect_name) - if effect: - pipeline.add_stage(f"effect_{effect_name}", effect) + # Add source stage (wrapped as Stage) + source = HeadlinesDataSource() + pipeline.add_stage("source", DataSourceStage(source, name="headlines")) # Add display stage display = StageRegistry.create("display", "terminal") diff --git a/engine/pipeline/registry.py b/engine/pipeline/registry.py index 4e0f969..e0b4423 100644 --- a/engine/pipeline/registry.py +++ b/engine/pipeline/registry.py @@ -28,7 +28,7 @@ class StageRegistry: cls._categories[category] = {} # Use class name as key - key = stage_class.__name__ + key = getattr(stage_class, "__name__", stage_class.__class__.__name__) cls._categories[category][key] = stage_class @classmethod @@ -90,6 +90,10 @@ def discover_stages() -> None: StageRegistry.register("source", HeadlinesDataSource) StageRegistry.register("source", PoetryDataSource) StageRegistry.register("source", PipelineDataSource) + + StageRegistry._categories["source"]["headlines"] = HeadlinesDataSource + StageRegistry._categories["source"]["poetry"] = PoetryDataSource + StageRegistry._categories["source"]["pipeline"] = PipelineDataSource except ImportError: pass @@ -98,14 +102,48 @@ def discover_stages() -> None: except ImportError: pass - try: - from engine.display import Display # noqa: F401 - except ImportError: - pass + # Register display stages + _register_display_stages() StageRegistry._discovered = True +def _register_display_stages() -> None: + """Register display backends as stages.""" + try: + from engine.display import DisplayRegistry + except ImportError: + return + + DisplayRegistry.initialize() + + for backend_name in DisplayRegistry.list_backends(): + factory = _DisplayStageFactory(backend_name) + StageRegistry._categories.setdefault("display", {})[backend_name] = factory + + +class _DisplayStageFactory: + """Factory that creates DisplayStage instances for a specific backend.""" + + def __init__(self, backend_name: str): + self._backend_name = backend_name + + def __call__(self): + from engine.display import DisplayRegistry + from engine.pipeline.adapters import DisplayStage + + display = DisplayRegistry.create(self._backend_name) + if display is None: + raise RuntimeError( + f"Failed to create display backend: {self._backend_name}" + ) + return DisplayStage(display, name=self._backend_name) + + @property + def __name__(self) -> str: + return self._backend_name.capitalize() + "Stage" + + # Convenience functions def register_source(stage_class: type[Stage]) -> None: """Register a source stage.""" diff --git a/engine/sources_v2.py b/engine/sources_v2.py index 9a3aa67..9fc7652 100644 --- a/engine/sources_v2.py +++ b/engine/sources_v2.py @@ -9,6 +9,7 @@ Each data source implements a common interface: """ from abc import ABC, abstractmethod +from collections.abc import Callable from dataclasses import dataclass from typing import Any @@ -125,6 +126,115 @@ class PipelineDataSource(DataSource): return self.fetch() +class MetricsDataSource(DataSource): + """Data source that renders live pipeline metrics as ASCII art. + + Wraps a Pipeline and displays active stages with their average execution + time and approximate FPS impact. Updates lazily when camera is about to + focus on a new node (frame % 15 == 12). + """ + + def __init__( + self, + pipeline: Any, + viewport_width: int = 80, + viewport_height: int = 24, + ): + self.pipeline = pipeline + self.viewport_width = viewport_width + self.viewport_height = viewport_height + self.frame = 0 + self._cached_metrics: dict | None = None + + @property + def name(self) -> str: + return "metrics" + + @property + def is_dynamic(self) -> bool: + return True + + def fetch(self) -> list[SourceItem]: + if self.frame % 15 == 12: + self._cached_metrics = None + + if self._cached_metrics is None: + self._cached_metrics = self._fetch_metrics() + + buffer = self._render_metrics(self._cached_metrics) + self.frame += 1 + content = "\n".join(buffer) + return [ + SourceItem(content=content, source="metrics", timestamp=f"f{self.frame}") + ] + + def _fetch_metrics(self) -> dict: + if hasattr(self.pipeline, "get_metrics_summary"): + metrics = self.pipeline.get_metrics_summary() + if "error" not in metrics: + return metrics + return {"stages": {}, "pipeline": {"avg_ms": 0}} + + def _render_metrics(self, metrics: dict) -> list[str]: + stages = metrics.get("stages", {}) + + if not stages: + return self._render_empty() + + active_stages = { + name: stats for name, stats in stages.items() if stats.get("avg_ms", 0) > 0 + } + + if not active_stages: + return self._render_empty() + + total_avg = sum(s["avg_ms"] for s in active_stages.values()) + if total_avg == 0: + total_avg = 1 + + lines: list[str] = [] + lines.append("═" * self.viewport_width) + lines.append(" PIPELINE METRICS ".center(self.viewport_width, "─")) + lines.append("─" * self.viewport_width) + + header = f"{'STAGE':<20} {'AVG_MS':>8} {'FPS %':>8}" + lines.append(header) + lines.append("─" * self.viewport_width) + + for name, stats in sorted(active_stages.items()): + avg_ms = stats.get("avg_ms", 0) + fps_impact = (avg_ms / 16.67) * 100 if avg_ms > 0 else 0 + + row = f"{name:<20} {avg_ms:>7.2f} {fps_impact:>7.1f}%" + lines.append(row[: self.viewport_width]) + + lines.append("─" * self.viewport_width) + total_row = ( + f"{'TOTAL':<20} {total_avg:>7.2f} {(total_avg / 16.67) * 100:>7.1f}%" + ) + lines.append(total_row[: self.viewport_width]) + lines.append("─" * self.viewport_width) + lines.append( + f" Frame:{self.frame:04d} Cache:{'HIT' if self._cached_metrics else 'MISS'}" + ) + + while len(lines) < self.viewport_height: + lines.append(" " * self.viewport_width) + + return lines[: self.viewport_height] + + def _render_empty(self) -> list[str]: + lines = [" " * self.viewport_width for _ in range(self.viewport_height)] + msg = "No metrics available" + y = self.viewport_height // 2 + x = (self.viewport_width - len(msg)) // 2 + lines[y] = " " * x + msg + " " * (self.viewport_width - x - len(msg)) + return lines + + def get_items(self) -> list[SourceItem]: + return self.fetch() + + class CachedDataSource(DataSource): """Data source that wraps another source with caching.""" @@ -146,6 +256,44 @@ class CachedDataSource(DataSource): return self._items +class TransformDataSource(DataSource): + """Data source that transforms items from another source. + + Applies optional filter and map functions to each item. + This enables chaining: source → transform → transformed output. + + Args: + source: The source to fetch items from + filter_fn: Optional function(item: SourceItem) -> bool + map_fn: Optional function(item: SourceItem) -> SourceItem + """ + + def __init__( + self, + source: DataSource, + filter_fn: Callable[[SourceItem], bool] | None = None, + map_fn: Callable[[SourceItem], SourceItem] | None = None, + ): + self.source = source + self.filter_fn = filter_fn + self.map_fn = map_fn + + @property + def name(self) -> str: + return f"transform:{self.source.name}" + + def fetch(self) -> list[SourceItem]: + items = self.source.fetch() + + if self.filter_fn: + items = [item for item in items if self.filter_fn(item)] + + if self.map_fn: + items = [self.map_fn(item) for item in items] + + return items + + class CompositeDataSource(DataSource): """Data source that combines multiple sources.""" diff --git a/mise.toml b/mise.toml index 17ebbdb..c4e89a6 100644 --- a/mise.toml +++ b/mise.toml @@ -38,19 +38,16 @@ run-kitty = { run = "uv run mainline.py --display kitty", depends = ["sync-all"] run-pygame = { run = "uv run mainline.py --display pygame", depends = ["sync-all"] } run-both = { run = "uv run mainline.py --display both", depends = ["sync-all"] } run-client = { run = "mise run run-both & sleep 2 && $(open http://localhost:8766 2>/dev/null || xdg-open http://localhost:8766 2>/dev/null || echo 'Open http://localhost:8766 manually'); wait", depends = ["sync-all"] } -run-demo = { run = "uv run mainline.py --demo --display pygame", depends = ["sync-all"] } -run-pipeline = "uv run mainline.py --pipeline-diagram" -run-pipeline-demo = { run = "uv run mainline.py --pipeline-demo --display pygame", depends = ["sync-all"] } # ===================== -# New Pipeline Architecture (unified Stage-based) +# Pipeline Architecture (unified Stage-based) # ===================== -run-v2 = { run = "uv run mainline.py --pipeline --display pygame", depends = ["sync-all"] } -run-v2-demo = { run = "uv run mainline.py --pipeline --pipeline-preset demo --display pygame", depends = ["sync-all"] } -run-v2-poetry = { run = "uv run mainline.py --pipeline --pipeline-preset poetry --display pygame", depends = ["sync-all"] } -run-v2-websocket = { run = "uv run mainline.py --pipeline --pipeline-preset websocket", depends = ["sync-all"] } -run-v2-firehose = { run = "uv run mainline.py --pipeline --pipeline-preset firehose --display pygame", depends = ["sync-all"] } +run-pipeline = { run = "uv run mainline.py --pipeline --display pygame", depends = ["sync-all"] } +run-pipeline-demo = { run = "uv run mainline.py --pipeline --pipeline-preset demo --display pygame", depends = ["sync-all"] } +run-pipeline-poetry = { run = "uv run mainline.py --pipeline --pipeline-preset poetry --display pygame", depends = ["sync-all"] } +run-pipeline-websocket = { run = "uv run mainline.py --pipeline --pipeline-preset websocket", depends = ["sync-all"] } +run-pipeline-firehose = { run = "uv run mainline.py --pipeline --pipeline-preset firehose --display pygame", depends = ["sync-all"] } # ===================== # Presets (Animation-controlled modes) diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py new file mode 100644 index 0000000..aab2099 --- /dev/null +++ b/tests/test_pipeline.py @@ -0,0 +1,281 @@ +""" +Tests for the new unified pipeline architecture. +""" + +from unittest.mock import MagicMock + +from engine.pipeline import ( + Pipeline, + PipelineConfig, + PipelineContext, + Stage, + StageRegistry, + create_default_pipeline, + discover_stages, +) + + +class TestStageRegistry: + """Tests for StageRegistry.""" + + def setup_method(self): + """Reset registry before each test.""" + StageRegistry._discovered = False + StageRegistry._categories.clear() + StageRegistry._instances.clear() + + def test_discover_stages_registers_sources(self): + """discover_stages registers source stages.""" + discover_stages() + + sources = StageRegistry.list("source") + assert "HeadlinesDataSource" in sources + assert "PoetryDataSource" in sources + assert "PipelineDataSource" in sources + + def test_discover_stages_registers_displays(self): + """discover_stages registers display stages.""" + discover_stages() + + displays = StageRegistry.list("display") + assert "terminal" in displays + assert "pygame" in displays + assert "websocket" in displays + assert "null" in displays + assert "sixel" in displays + + def test_create_source_stage(self): + """StageRegistry.create creates source stages.""" + discover_stages() + + source = StageRegistry.create("source", "HeadlinesDataSource") + assert source is not None + assert source.name == "headlines" + + def test_create_display_stage(self): + """StageRegistry.create creates display stages.""" + discover_stages() + + display = StageRegistry.create("display", "terminal") + assert display is not None + assert hasattr(display, "_display") + + def test_create_display_stage_pygame(self): + """StageRegistry.create creates pygame display stage.""" + discover_stages() + + display = StageRegistry.create("display", "pygame") + assert display is not None + + +class TestPipeline: + """Tests for Pipeline class.""" + + def setup_method(self): + """Reset registry before each test.""" + StageRegistry._discovered = False + StageRegistry._categories.clear() + StageRegistry._instances.clear() + discover_stages() + + def test_create_pipeline(self): + """Pipeline can be created with config.""" + config = PipelineConfig(source="headlines", display="terminal") + pipeline = Pipeline(config=config) + + assert pipeline.config is not None + assert pipeline.config.source == "headlines" + assert pipeline.config.display == "terminal" + + def test_add_stage(self): + """Pipeline.add_stage adds a stage.""" + pipeline = Pipeline() + mock_stage = MagicMock(spec=Stage) + mock_stage.name = "test_stage" + mock_stage.category = "test" + + pipeline.add_stage("test", mock_stage) + + assert "test" in pipeline.stages + + def test_build_resolves_dependencies(self): + """Pipeline.build resolves execution order.""" + pipeline = Pipeline() + mock_source = MagicMock(spec=Stage) + mock_source.name = "source" + mock_source.category = "source" + mock_source.dependencies = set() + + mock_display = MagicMock(spec=Stage) + mock_display.name = "display" + mock_display.category = "display" + mock_display.dependencies = {"source"} + + pipeline.add_stage("source", mock_source) + pipeline.add_stage("display", mock_display) + pipeline.build() + + assert pipeline._initialized is True + assert "source" in pipeline.execution_order + assert "display" in pipeline.execution_order + + def test_execute_runs_stages(self): + """Pipeline.execute runs all stages in order.""" + pipeline = Pipeline() + + call_order = [] + + mock_source = MagicMock(spec=Stage) + mock_source.name = "source" + mock_source.category = "source" + mock_source.dependencies = set() + mock_source.process = lambda data, ctx: call_order.append("source") or "data" + + mock_effect = MagicMock(spec=Stage) + mock_effect.name = "effect" + mock_effect.category = "effect" + mock_effect.dependencies = {"source"} + mock_effect.process = lambda data, ctx: call_order.append("effect") or data + + mock_display = MagicMock(spec=Stage) + mock_display.name = "display" + mock_display.category = "display" + mock_display.dependencies = {"effect"} + mock_display.process = lambda data, ctx: call_order.append("display") or data + + pipeline.add_stage("source", mock_source) + pipeline.add_stage("effect", mock_effect) + pipeline.add_stage("display", mock_display) + pipeline.build() + + result = pipeline.execute(None) + + assert result.success is True + assert call_order == ["source", "effect", "display"] + + def test_execute_handles_stage_failure(self): + """Pipeline.execute handles stage failures.""" + pipeline = Pipeline() + + mock_source = MagicMock(spec=Stage) + mock_source.name = "source" + mock_source.category = "source" + mock_source.dependencies = set() + mock_source.process = lambda data, ctx: "data" + + mock_failing = MagicMock(spec=Stage) + mock_failing.name = "failing" + mock_failing.category = "effect" + mock_failing.dependencies = {"source"} + mock_failing.optional = False + mock_failing.process = lambda data, ctx: (_ for _ in ()).throw( + Exception("fail") + ) + + pipeline.add_stage("source", mock_source) + pipeline.add_stage("failing", mock_failing) + pipeline.build() + + result = pipeline.execute(None) + + assert result.success is False + assert result.error is not None + + def test_optional_stage_failure_continues(self): + """Pipeline.execute continues on optional stage failure.""" + pipeline = Pipeline() + + mock_source = MagicMock(spec=Stage) + mock_source.name = "source" + mock_source.category = "source" + mock_source.dependencies = set() + mock_source.process = lambda data, ctx: "data" + + mock_optional = MagicMock(spec=Stage) + mock_optional.name = "optional" + mock_optional.category = "effect" + mock_optional.dependencies = {"source"} + mock_optional.optional = True + mock_optional.process = lambda data, ctx: (_ for _ in ()).throw( + Exception("fail") + ) + + pipeline.add_stage("source", mock_source) + pipeline.add_stage("optional", mock_optional) + pipeline.build() + + result = pipeline.execute(None) + + assert result.success is True + + +class TestPipelineContext: + """Tests for PipelineContext.""" + + def test_init_empty(self): + """PipelineContext initializes with empty services and state.""" + ctx = PipelineContext() + + assert ctx.services == {} + assert ctx.state == {} + + def test_init_with_services(self): + """PipelineContext accepts initial services.""" + ctx = PipelineContext(services={"display": MagicMock()}) + + assert "display" in ctx.services + + def test_init_with_state(self): + """PipelineContext accepts initial state.""" + ctx = PipelineContext(initial_state={"count": 42}) + + assert ctx.get_state("count") == 42 + + def test_get_set_services(self): + """PipelineContext can get/set services.""" + ctx = PipelineContext() + mock_service = MagicMock() + + ctx.set("test_service", mock_service) + + assert ctx.get("test_service") == mock_service + + def test_get_set_state(self): + """PipelineContext can get/set state.""" + ctx = PipelineContext() + + ctx.set_state("counter", 100) + + assert ctx.get_state("counter") == 100 + + def test_lazy_resolver(self): + """PipelineContext resolves lazy services.""" + ctx = PipelineContext() + + config = ctx.get("config") + assert config is not None + + def test_has_capability(self): + """PipelineContext.has_capability checks for services.""" + ctx = PipelineContext(services={"display.output": MagicMock()}) + + assert ctx.has_capability("display.output") is True + assert ctx.has_capability("missing") is False + + +class TestCreateDefaultPipeline: + """Tests for create_default_pipeline function.""" + + def setup_method(self): + """Reset registry before each test.""" + StageRegistry._discovered = False + StageRegistry._categories.clear() + StageRegistry._instances.clear() + discover_stages() + + def test_create_default_pipeline(self): + """create_default_pipeline creates a working pipeline.""" + pipeline = create_default_pipeline() + + assert pipeline is not None + assert "display" in pipeline.stages