From f638fb759728f633be6244a0986e64d95dcf773b Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Mon, 16 Mar 2026 16:55:57 -0700 Subject: [PATCH] feat: add pipeline introspection demo mode MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add PipelineIntrospectionSource that renders live ASCII DAG with metrics - Add PipelineMetricsSensor exposing pipeline performance as sensor values - Add PipelineIntrospectionDemo controller with 3-phase animation: - Phase 1: Toggle effects one at a time (3s each) - Phase 2: LFO drives intensity default→max→min→default - Phase 3: All effects with shared LFO (infinite loop) - Add pipeline-inspect preset - Add get_frame_times() to Pipeline for sparkline data - Add tests for new components - Update mise.toml with pipeline-inspect preset task --- AGENTS.md | 30 +- engine/pipeline/controller.py | 4 + .../pipeline/pipeline_introspection_demo.py | 300 ++++++++++++++++++ engine/pipeline/registry.py | 16 +- engine/pipeline_sources/__init__.py | 7 + .../pipeline_introspection.py | 273 ++++++++++++++++ engine/sensors/pipeline_metrics.py | 114 +++++++ engine/sources_v2.py | 35 -- mise.toml | 2 +- presets.toml | 22 +- tests/test_pipeline.py | 2 +- tests/test_pipeline_introspection.py | 156 +++++++++ tests/test_pipeline_introspection_demo.py | 167 ++++++++++ tests/test_pipeline_metrics_sensor.py | 113 +++++++ 14 files changed, 1186 insertions(+), 55 deletions(-) create mode 100644 engine/pipeline/pipeline_introspection_demo.py create mode 100644 engine/pipeline_sources/__init__.py create mode 100644 engine/pipeline_sources/pipeline_introspection.py create mode 100644 engine/sensors/pipeline_metrics.py create mode 100644 tests/test_pipeline_introspection.py create mode 100644 tests/test_pipeline_introspection_demo.py create mode 100644 tests/test_pipeline_metrics_sensor.py diff --git a/AGENTS.md b/AGENTS.md index f38d2c8..0351b32 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -161,7 +161,7 @@ The project uses pytest with strict marker enforcement. Test configuration is in ### Test Coverage Strategy -Current coverage: 56% (433 tests) +Current coverage: 56% (434 tests) Key areas with lower coverage (acceptable for now): - **app.py** (8%): Main entry point - integration heavy, requires terminal @@ -186,11 +186,18 @@ Performance regression tests are in `tests/test_benchmark.py` with `@pytest.mark ## Architecture Notes -- **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies +- **ntfy.py** - standalone notification poller with zero internal dependencies +- **sensors/** - Sensor framework (MicSensor, OscillatorSensor) for real-time input - **eventbus.py** provides thread-safe event publishing for decoupled communication -- **controller.py** coordinates ntfy/mic monitoring and event publishing - **effects/** - plugin architecture with performance monitoring -- The render pipeline: fetch → render → effects → scroll → terminal output +- The new pipeline architecture: source → render → effects → display + +#### Canvas & Camera + +- **Canvas** (`engine/canvas.py`): 2D rendering surface with dirty region tracking +- **Camera** (`engine/camera.py`): Viewport controller for scrolling content + +The Canvas tracks dirty regions automatically when content is written (via `put_region`, `put_text`, `fill`), enabling partial buffer updates for optimized effect processing. ### Pipeline Architecture @@ -214,9 +221,24 @@ Stages declare capabilities (what they provide) and dependencies (what they need - **SensorStage**: Pipeline adapter that provides sensor values to effects - **MicSensor** (`engine/sensors/mic.py`): Self-contained microphone input - **OscillatorSensor** (`engine/sensors/oscillator.py`): Test sensor for development +- **PipelineMetricsSensor** (`engine/sensors/pipeline_metrics.py`): Exposes pipeline metrics as sensor values Sensors support param bindings to drive effect parameters in real-time. +#### Pipeline Introspection + +- **PipelineIntrospectionSource** (`engine/pipeline_sources/pipeline_introspection.py`): Renders live ASCII visualization of pipeline DAG with metrics +- **PipelineIntrospectionDemo** (`engine/pipeline/pipeline_introspection_demo.py`): 3-phase demo controller for effect animation + +Preset: `pipeline-inspect` - Live pipeline introspection with DAG and performance metrics + +#### Partial Update Support + +Effect plugins can opt-in to partial buffer updates for performance optimization: +- Set `supports_partial_updates = True` on the effect class +- Implement `process_partial(buf, ctx, partial)` method +- The `PartialUpdate` dataclass indicates which regions changed + ### Preset System Presets use TOML format (no external dependencies): diff --git a/engine/pipeline/controller.py b/engine/pipeline/controller.py index e21f1d6..ff6dbd7 100644 --- a/engine/pipeline/controller.py +++ b/engine/pipeline/controller.py @@ -466,6 +466,10 @@ class Pipeline: self._frame_metrics.clear() self._current_frame_number = 0 + def get_frame_times(self) -> list[float]: + """Get historical frame times for sparklines/charts.""" + return [f.total_ms for f in self._frame_metrics] + class PipelineRunner: """High-level pipeline runner with animation support.""" diff --git a/engine/pipeline/pipeline_introspection_demo.py b/engine/pipeline/pipeline_introspection_demo.py new file mode 100644 index 0000000..358b5d1 --- /dev/null +++ b/engine/pipeline/pipeline_introspection_demo.py @@ -0,0 +1,300 @@ +""" +Pipeline introspection demo controller - 3-phase animation system. + +Phase 1: Toggle each effect on/off one at a time (3s each, 1s gap) +Phase 2: LFO drives intensity default → max → min → default for each effect +Phase 3: All effects with shared LFO driving full waveform + +This controller manages the animation and updates the pipeline accordingly. +""" + +import time +from dataclasses import dataclass +from enum import Enum, auto +from typing import Any + +from engine.effects import get_registry +from engine.sensors.oscillator import OscillatorSensor + + +class DemoPhase(Enum): + """The three phases of the pipeline introspection demo.""" + + PHASE_1_TOGGLE = auto() # Toggle each effect on/off + PHASE_2_LFO = auto() # LFO drives intensity up/down + PHASE_3_SHARED_LFO = auto() # All effects with shared LFO + + +@dataclass +class PhaseState: + """State for a single phase of the demo.""" + + phase: DemoPhase + start_time: float + current_effect_index: int = 0 + effect_start_time: float = 0.0 + lfo_phase: float = 0.0 # 0.0 to 1.0 + + +@dataclass +class DemoConfig: + """Configuration for the demo animation.""" + + effect_cycle_duration: float = 3.0 # seconds per effect + gap_duration: float = 1.0 # seconds between effects + lfo_duration: float = ( + 4.0 # seconds for full LFO cycle (default → max → min → default) + ) + phase_2_effect_duration: float = 4.0 # seconds per effect in phase 2 + phase_3_lfo_duration: float = 6.0 # seconds for full waveform in phase 3 + + +class PipelineIntrospectionDemo: + """Controller for the 3-phase pipeline introspection demo. + + Manages effect toggling and LFO modulation across the pipeline. + """ + + def __init__( + self, + pipeline: Any, + effect_names: list[str] | None = None, + config: DemoConfig | None = None, + ): + self._pipeline = pipeline + self._config = config or DemoConfig() + self._effect_names = effect_names or ["noise", "fade", "glitch", "firehose"] + self._phase = DemoPhase.PHASE_1_TOGGLE + self._phase_state = PhaseState( + phase=DemoPhase.PHASE_1_TOGGLE, + start_time=time.time(), + ) + self._shared_oscillator: OscillatorSensor | None = None + self._frame = 0 + + # Register shared oscillator for phase 3 + self._shared_oscillator = OscillatorSensor( + name="demo-lfo", + waveform="sine", + frequency=1.0 / self._config.phase_3_lfo_duration, + ) + + @property + def phase(self) -> DemoPhase: + return self._phase + + @property + def phase_display(self) -> str: + """Get a human-readable phase description.""" + phase_num = { + DemoPhase.PHASE_1_TOGGLE: 1, + DemoPhase.PHASE_2_LFO: 2, + DemoPhase.PHASE_3_SHARED_LFO: 3, + } + return f"Phase {phase_num[self._phase]}" + + @property + def effect_names(self) -> list[str]: + return self._effect_names + + @property + def shared_oscillator(self) -> OscillatorSensor | None: + return self._shared_oscillator + + def update(self) -> dict[str, Any]: + """Update the demo state and return current parameters. + + Returns: + dict with current effect settings for the pipeline + """ + self._frame += 1 + current_time = time.time() + elapsed = current_time - self._phase_state.start_time + + # Phase transition logic + phase_duration = self._get_phase_duration() + if elapsed >= phase_duration: + self._advance_phase() + + # Update based on current phase + if self._phase == DemoPhase.PHASE_1_TOGGLE: + return self._update_phase_1(current_time) + elif self._phase == DemoPhase.PHASE_2_LFO: + return self._update_phase_2(current_time) + else: + return self._update_phase_3(current_time) + + def _get_phase_duration(self) -> float: + """Get duration of current phase in seconds.""" + if self._phase == DemoPhase.PHASE_1_TOGGLE: + # Duration = (effect_time + gap) * num_effects + final_gap + return ( + self._config.effect_cycle_duration + self._config.gap_duration + ) * len(self._effect_names) + self._config.gap_duration + elif self._phase == DemoPhase.PHASE_2_LFO: + return self._config.phase_2_effect_duration * len(self._effect_names) + else: + # Phase 3 runs indefinitely + return float("inf") + + def _advance_phase(self) -> None: + """Advance to the next phase.""" + if self._phase == DemoPhase.PHASE_1_TOGGLE: + self._phase = DemoPhase.PHASE_2_LFO + elif self._phase == DemoPhase.PHASE_2_LFO: + self._phase = DemoPhase.PHASE_3_SHARED_LFO + # Start the shared oscillator + if self._shared_oscillator: + self._shared_oscillator.start() + else: + # Phase 3 loops indefinitely - reset for demo replay after long time + self._phase = DemoPhase.PHASE_1_TOGGLE + + self._phase_state = PhaseState( + phase=self._phase, + start_time=time.time(), + ) + + def _update_phase_1(self, current_time: float) -> dict[str, Any]: + """Phase 1: Toggle each effect on/off one at a time.""" + effect_time = current_time - self._phase_state.effect_start_time + + # Check if we should move to next effect + cycle_time = self._config.effect_cycle_duration + self._config.gap_duration + effect_index = int((current_time - self._phase_state.start_time) / cycle_time) + + # Clamp to valid range + if effect_index >= len(self._effect_names): + effect_index = len(self._effect_names) - 1 + + # Calculate current effect state + in_gap = effect_time >= self._config.effect_cycle_duration + + # Build effect states + effect_states: dict[str, dict[str, Any]] = {} + for i, name in enumerate(self._effect_names): + if i < effect_index: + # Past effects - leave at default + effect_states[name] = {"enabled": False, "intensity": 0.5} + elif i == effect_index: + # Current effect - toggle on/off + if in_gap: + effect_states[name] = {"enabled": False, "intensity": 0.5} + else: + effect_states[name] = {"enabled": True, "intensity": 1.0} + else: + # Future effects - off + effect_states[name] = {"enabled": False, "intensity": 0.5} + + # Apply to effect registry + self._apply_effect_states(effect_states) + + return { + "phase": "PHASE_1_TOGGLE", + "phase_display": self.phase_display, + "current_effect": self._effect_names[effect_index] + if effect_index < len(self._effect_names) + else None, + "effect_states": effect_states, + "frame": self._frame, + } + + def _update_phase_2(self, current_time: float) -> dict[str, Any]: + """Phase 2: LFO drives intensity default → max → min → default.""" + elapsed = current_time - self._phase_state.start_time + effect_index = int(elapsed / self._config.phase_2_effect_duration) + effect_index = min(effect_index, len(self._effect_names) - 1) + + # Calculate LFO position (0 → 1 → 0) + effect_elapsed = elapsed % self._config.phase_2_effect_duration + lfo_position = effect_elapsed / self._config.phase_2_effect_duration + + # LFO: 0 → 1 → 0 (triangle wave) + if lfo_position < 0.5: + lfo_value = lfo_position * 2 # 0 → 1 + else: + lfo_value = 2 - lfo_position * 2 # 1 → 0 + + # Map to intensity: 0.3 (default) → 1.0 (max) → 0.0 (min) → 0.3 (default) + if lfo_position < 0.25: + # 0.3 → 1.0 + intensity = 0.3 + (lfo_position / 0.25) * 0.7 + elif lfo_position < 0.75: + # 1.0 → 0.0 + intensity = 1.0 - ((lfo_position - 0.25) / 0.5) * 1.0 + else: + # 0.0 → 0.3 + intensity = ((lfo_position - 0.75) / 0.25) * 0.3 + + # Build effect states + effect_states: dict[str, dict[str, Any]] = {} + for i, name in enumerate(self._effect_names): + if i < effect_index: + # Past effects - default + effect_states[name] = {"enabled": True, "intensity": 0.5} + elif i == effect_index: + # Current effect - LFO modulated + effect_states[name] = {"enabled": True, "intensity": intensity} + else: + # Future effects - off + effect_states[name] = {"enabled": False, "intensity": 0.5} + + # Apply to effect registry + self._apply_effect_states(effect_states) + + return { + "phase": "PHASE_2_LFO", + "phase_display": self.phase_display, + "current_effect": self._effect_names[effect_index], + "lfo_value": lfo_value, + "intensity": intensity, + "effect_states": effect_states, + "frame": self._frame, + } + + def _update_phase_3(self, current_time: float) -> dict[str, Any]: + """Phase 3: All effects with shared LFO driving full waveform.""" + # Read shared oscillator + lfo_value = 0.5 # Default + if self._shared_oscillator: + sensor_val = self._shared_oscillator.read() + if sensor_val: + lfo_value = sensor_val.value + + # All effects enabled with shared LFO + effect_states: dict[str, dict[str, Any]] = {} + for name in self._effect_names: + effect_states[name] = {"enabled": True, "intensity": lfo_value} + + # Apply to effect registry + self._apply_effect_states(effect_states) + + return { + "phase": "PHASE_3_SHARED_LFO", + "phase_display": self.phase_display, + "lfo_value": lfo_value, + "effect_states": effect_states, + "frame": self._frame, + } + + def _apply_effect_states(self, effect_states: dict[str, dict[str, Any]]) -> None: + """Apply effect states to the effect registry.""" + try: + registry = get_registry() + for name, state in effect_states.items(): + effect = registry.get(name) + if effect: + effect.config.enabled = state["enabled"] + effect.config.intensity = state["intensity"] + except Exception: + pass # Silently fail if registry not available + + def cleanup(self) -> None: + """Clean up resources.""" + if self._shared_oscillator: + self._shared_oscillator.stop() + + # Reset all effects to default + self._apply_effect_states( + {name: {"enabled": False, "intensity": 0.5} for name in self._effect_names} + ) diff --git a/engine/pipeline/registry.py b/engine/pipeline/registry.py index ee528fc..e06b90a 100644 --- a/engine/pipeline/registry.py +++ b/engine/pipeline/registry.py @@ -89,17 +89,27 @@ def discover_stages() -> None: try: from engine.sources_v2 import ( HeadlinesDataSource, - PipelineDataSource, PoetryDataSource, ) StageRegistry.register("source", HeadlinesDataSource) StageRegistry.register("source", PoetryDataSource) - StageRegistry.register("source", PipelineDataSource) StageRegistry._categories["source"]["headlines"] = HeadlinesDataSource StageRegistry._categories["source"]["poetry"] = PoetryDataSource - StageRegistry._categories["source"]["pipeline"] = PipelineDataSource + except ImportError: + pass + + # Register pipeline introspection source + try: + from engine.pipeline_sources.pipeline_introspection import ( + PipelineIntrospectionSource, + ) + + StageRegistry.register("source", PipelineIntrospectionSource) + StageRegistry._categories["source"]["pipeline-inspect"] = ( + PipelineIntrospectionSource + ) except ImportError: pass diff --git a/engine/pipeline_sources/__init__.py b/engine/pipeline_sources/__init__.py new file mode 100644 index 0000000..47a1ce4 --- /dev/null +++ b/engine/pipeline_sources/__init__.py @@ -0,0 +1,7 @@ +""" +Data source implementations for the pipeline architecture. +""" + +from engine.pipeline_sources.pipeline_introspection import PipelineIntrospectionSource + +__all__ = ["PipelineIntrospectionSource"] diff --git a/engine/pipeline_sources/pipeline_introspection.py b/engine/pipeline_sources/pipeline_introspection.py new file mode 100644 index 0000000..6761f9b --- /dev/null +++ b/engine/pipeline_sources/pipeline_introspection.py @@ -0,0 +1,273 @@ +""" +Pipeline introspection source - Renders live visualization of pipeline DAG and metrics. + +This DataSource introspects one or more Pipeline instances and renders +an ASCII visualization showing: +- Stage DAG with signal flow connections +- Per-stage execution times +- Sparkline of frame times +- Stage breakdown bars + +Example: + source = PipelineIntrospectionSource(pipelines=[my_pipeline]) + items = source.fetch() # Returns ASCII visualization +""" + +from typing import TYPE_CHECKING + +from engine.sources_v2 import DataSource, SourceItem + +if TYPE_CHECKING: + from engine.pipeline.controller import Pipeline + + +SPARKLINE_CHARS = " ▁▂▃▄▅▆▇█" +BAR_CHARS = " ▁▂▃▄▅▆▇█" + + +class PipelineIntrospectionSource(DataSource): + """Data source that renders live pipeline introspection visualization. + + Renders: + - DAG of stages with signal flow + - Per-stage execution times + - Sparkline of frame history + - Stage breakdown bars + """ + + def __init__( + self, + pipelines: list["Pipeline"] | None = None, + viewport_width: int = 100, + viewport_height: int = 35, + ): + self._pipelines = pipelines or [] + self.viewport_width = viewport_width + self.viewport_height = viewport_height + self.frame = 0 + + @property + def name(self) -> str: + return "pipeline-inspect" + + @property + def is_dynamic(self) -> bool: + return True + + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.NONE} + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.SOURCE_ITEMS} + + def add_pipeline(self, pipeline: "Pipeline") -> None: + """Add a pipeline to visualize.""" + if pipeline not in self._pipelines: + self._pipelines.append(pipeline) + + def remove_pipeline(self, pipeline: "Pipeline") -> None: + """Remove a pipeline from visualization.""" + if pipeline in self._pipelines: + self._pipelines.remove(pipeline) + + def fetch(self) -> list[SourceItem]: + """Fetch the introspection visualization.""" + lines = self._render() + self.frame += 1 + content = "\n".join(lines) + return [ + SourceItem( + content=content, source="pipeline-inspect", timestamp=f"f{self.frame}" + ) + ] + + def get_items(self) -> list[SourceItem]: + return self.fetch() + + def _render(self) -> list[str]: + """Render the full visualization.""" + lines: list[str] = [] + + # Header + lines.extend(self._render_header()) + + if not self._pipelines: + lines.append(" No pipelines to visualize") + return lines + + # Render each pipeline's DAG + for i, pipeline in enumerate(self._pipelines): + if len(self._pipelines) > 1: + lines.append(f" Pipeline {i + 1}:") + lines.extend(self._render_pipeline(pipeline)) + + # Footer with sparkline + lines.extend(self._render_footer()) + + return lines + + def _render_header(self) -> list[str]: + """Render the header with frame info and metrics summary.""" + lines: list[str] = [] + + if not self._pipelines: + return ["┌─ PIPELINE INTROSPECTION ──────────────────────────────┐"] + + # Get aggregate metrics + total_ms = 0.0 + fps = 0.0 + frame_count = 0 + + for pipeline in self._pipelines: + try: + metrics = pipeline.get_metrics_summary() + if metrics and "error" not in metrics: + total_ms = max(total_ms, metrics.get("avg_ms", 0)) + fps = max(fps, metrics.get("fps", 0)) + frame_count = max(frame_count, metrics.get("frame_count", 0)) + except Exception: + pass + + header = f"┌─ PIPELINE INTROSPECTION ── frame: {self.frame} ─ avg: {total_ms:.1f}ms ─ fps: {fps:.1f} ─┐" + lines.append(header) + + return lines + + def _render_pipeline(self, pipeline: "Pipeline") -> list[str]: + """Render a single pipeline's DAG.""" + lines: list[str] = [] + + stages = pipeline.stages + execution_order = pipeline.execution_order + + if not stages: + lines.append(" (no stages)") + return lines + + # Build stage info + stage_infos: list[dict] = [] + for name in execution_order: + stage = stages.get(name) + if not stage: + continue + + try: + metrics = pipeline.get_metrics_summary() + stage_ms = metrics.get("stages", {}).get(name, {}).get("avg_ms", 0.0) + except Exception: + stage_ms = 0.0 + + stage_infos.append( + { + "name": name, + "category": stage.category, + "ms": stage_ms, + } + ) + + # Calculate total time for percentages + total_time = sum(s["ms"] for s in stage_infos) or 1.0 + + # Render DAG - group by category + lines.append("│") + lines.append("│ Signal Flow:") + + # Group stages by category for display + categories: dict[str, list[dict]] = {} + for info in stage_infos: + cat = info["category"] + if cat not in categories: + categories[cat] = [] + categories[cat].append(info) + + # Render categories in order + cat_order = ["source", "render", "effect", "overlay", "display", "system"] + + for cat in cat_order: + if cat not in categories: + continue + + cat_stages = categories[cat] + cat_names = [s["name"] for s in cat_stages] + lines.append(f"│ {cat}: {' → '.join(cat_names)}") + + # Render timing breakdown + lines.append("│") + lines.append("│ Stage Timings:") + + for info in stage_infos: + name = info["name"] + ms = info["ms"] + pct = (ms / total_time) * 100 + bar = self._render_bar(pct, 20) + lines.append(f"│ {name:12s} {ms:6.2f}ms {bar} {pct:5.1f}%") + + lines.append("│") + + return lines + + def _render_footer(self) -> list[str]: + """Render the footer with sparkline.""" + lines: list[str] = [] + + # Get frame history from first pipeline + if self._pipelines: + try: + frame_times = self._pipelines[0].get_frame_times() + except Exception: + frame_times = [] + else: + frame_times = [] + + if frame_times: + sparkline = self._render_sparkline(frame_times[-60:], 50) + lines.append( + f"├─ Frame Time History (last {len(frame_times[-60:])} frames) ─────────────────────────────┤" + ) + lines.append(f"│{sparkline}│") + else: + lines.append( + "├─ Frame Time History ─────────────────────────────────────────┤" + ) + lines.append( + "│ (collecting data...) │" + ) + + lines.append( + "└────────────────────────────────────────────────────────────────┘" + ) + + return lines + + def _render_bar(self, percentage: float, width: int) -> str: + """Render a horizontal bar for percentage.""" + filled = int((percentage / 100.0) * width) + bar = "█" * filled + "░" * (width - filled) + return bar + + def _render_sparkline(self, values: list[float], width: int) -> str: + """Render a sparkline from values.""" + if not values: + return " " * width + + min_val = min(values) + max_val = max(values) + range_val = max_val - min_val or 1.0 + + result = [] + for v in values[-width:]: + normalized = (v - min_val) / range_val + idx = int(normalized * (len(SPARKLINE_CHARS) - 1)) + idx = max(0, min(idx, len(SPARKLINE_CHARS) - 1)) + result.append(SPARKLINE_CHARS[idx]) + + # Pad to width + while len(result) < width: + result.insert(0, " ") + return "".join(result[:width]) diff --git a/engine/sensors/pipeline_metrics.py b/engine/sensors/pipeline_metrics.py new file mode 100644 index 0000000..98f2793 --- /dev/null +++ b/engine/sensors/pipeline_metrics.py @@ -0,0 +1,114 @@ +""" +Pipeline metrics sensor - Exposes pipeline performance data as sensor values. + +This sensor reads metrics from a Pipeline instance and provides them +as sensor values that can drive effect parameters. + +Example: + sensor = PipelineMetricsSensor(pipeline) + sensor.read() # Returns SensorValue with total_ms, fps, etc. +""" + +from typing import TYPE_CHECKING + +from engine.sensors import Sensor, SensorValue + +if TYPE_CHECKING: + from engine.pipeline.controller import Pipeline + + +class PipelineMetricsSensor(Sensor): + """Sensor that reads metrics from a Pipeline instance. + + Provides real-time performance data: + - total_ms: Total frame time in milliseconds + - fps: Calculated frames per second + - stage_timings: Dict of stage name -> duration_ms + + Can be bound to effect parameters for reactive visuals. + """ + + def __init__(self, pipeline: "Pipeline | None" = None, name: str = "pipeline"): + self._pipeline = pipeline + self.name = name + self.unit = "ms" + self._last_values: dict[str, float] = { + "total_ms": 0.0, + "fps": 0.0, + "avg_ms": 0.0, + "min_ms": 0.0, + "max_ms": 0.0, + } + + @property + def available(self) -> bool: + return self._pipeline is not None + + def set_pipeline(self, pipeline: "Pipeline") -> None: + """Set or update the pipeline to read metrics from.""" + self._pipeline = pipeline + + def read(self) -> SensorValue | None: + """Read current metrics from the pipeline.""" + if not self._pipeline: + return None + + try: + metrics = self._pipeline.get_metrics_summary() + except Exception: + return None + + if not metrics or "error" in metrics: + return None + + self._last_values["total_ms"] = metrics.get("total_ms", 0.0) + self._last_values["fps"] = metrics.get("fps", 0.0) + self._last_values["avg_ms"] = metrics.get("avg_ms", 0.0) + self._last_values["min_ms"] = metrics.get("min_ms", 0.0) + self._last_values["max_ms"] = metrics.get("max_ms", 0.0) + + # Provide total_ms as primary value (for LFO-style effects) + return SensorValue( + sensor_name=self.name, + value=self._last_values["total_ms"], + timestamp=0.0, + unit=self.unit, + ) + + def get_stage_timing(self, stage_name: str) -> float: + """Get timing for a specific stage.""" + if not self._pipeline: + return 0.0 + try: + metrics = self._pipeline.get_metrics_summary() + stages = metrics.get("stages", {}) + return stages.get(stage_name, {}).get("avg_ms", 0.0) + except Exception: + return 0.0 + + def get_all_timings(self) -> dict[str, float]: + """Get all stage timings as a dict.""" + if not self._pipeline: + return {} + try: + metrics = self._pipeline.get_metrics_summary() + return metrics.get("stages", {}) + except Exception: + return {} + + def get_frame_history(self) -> list[float]: + """Get historical frame times for sparklines.""" + if not self._pipeline: + return [] + try: + return self._pipeline.get_frame_times() + except Exception: + return [] + + def start(self) -> bool: + """Start the sensor (no-op for read-only metrics).""" + return True + + def stop(self) -> None: + """Stop the sensor (no-op for read-only metrics).""" + pass diff --git a/engine/sources_v2.py b/engine/sources_v2.py index 9fc7652..dcd0afa 100644 --- a/engine/sources_v2.py +++ b/engine/sources_v2.py @@ -94,38 +94,6 @@ class PoetryDataSource(DataSource): return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items] -class PipelineDataSource(DataSource): - """Data source for pipeline visualization (demo mode). Dynamic - updates every frame.""" - - def __init__(self, viewport_width: int = 80, viewport_height: int = 24): - self.viewport_width = viewport_width - self.viewport_height = viewport_height - self.frame = 0 - - @property - def name(self) -> str: - return "pipeline" - - @property - def is_dynamic(self) -> bool: - return True - - def fetch(self) -> list[SourceItem]: - from engine.pipeline_viz import generate_large_network_viewport - - buffer = generate_large_network_viewport( - self.viewport_width, self.viewport_height, self.frame - ) - self.frame += 1 - content = "\n".join(buffer) - return [ - SourceItem(content=content, source="pipeline", timestamp=f"f{self.frame}") - ] - - def get_items(self) -> list[SourceItem]: - return self.fetch() - - class MetricsDataSource(DataSource): """Data source that renders live pipeline metrics as ASCII art. @@ -340,9 +308,6 @@ class SourceRegistry: def create_poetry(self) -> PoetryDataSource: return PoetryDataSource() - def create_pipeline(self, width: int = 80, height: int = 24) -> PipelineDataSource: - return PipelineDataSource(width, height) - _global_registry: SourceRegistry | None = None diff --git a/mise.toml b/mise.toml index c4e89a6..7d76427 100644 --- a/mise.toml +++ b/mise.toml @@ -54,7 +54,7 @@ run-pipeline-firehose = { run = "uv run mainline.py --pipeline --pipeline-preset # ===================== run-preset-demo = { run = "uv run mainline.py --preset demo --display pygame", depends = ["sync-all"] } -run-preset-pipeline = { run = "uv run mainline.py --preset pipeline --display pygame", depends = ["sync-all"] } +run-preset-pipeline-inspect = { run = "uv run mainline.py --preset pipeline-inspect --display terminal", depends = ["sync-all"] } # ===================== # Command & Control diff --git a/presets.toml b/presets.toml index 864e320..ea97fa3 100644 --- a/presets.toml +++ b/presets.toml @@ -30,17 +30,6 @@ viewport_height = 24 camera_speed = 0.5 firehose_enabled = false -[presets.pipeline] -description = "Pipeline visualization mode" -source = "pipeline" -display = "terminal" -camera = "trace" -effects = ["hud"] -viewport_width = 80 -viewport_height = 24 -camera_speed = 1.0 -firehose_enabled = false - [presets.websocket] description = "WebSocket display mode" source = "headlines" @@ -74,6 +63,17 @@ viewport_height = 24 camera_speed = 2.0 firehose_enabled = true +[presets.pipeline-inspect] +description = "Live pipeline introspection with DAG and performance metrics" +source = "pipeline-inspect" +display = "terminal" +camera = "vertical" +effects = ["hud"] +viewport_width = 100 +viewport_height = 35 +camera_speed = 0.3 +firehose_enabled = false + # Sensor configuration (for future use with param bindings) [sensors.mic] enabled = false diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 3bca468..9495655 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -31,7 +31,7 @@ class TestStageRegistry: sources = StageRegistry.list("source") assert "HeadlinesDataSource" in sources assert "PoetryDataSource" in sources - assert "PipelineDataSource" in sources + assert "PipelineIntrospectionSource" in sources def test_discover_stages_registers_displays(self): """discover_stages registers display stages.""" diff --git a/tests/test_pipeline_introspection.py b/tests/test_pipeline_introspection.py new file mode 100644 index 0000000..aa09c75 --- /dev/null +++ b/tests/test_pipeline_introspection.py @@ -0,0 +1,156 @@ +""" +Tests for PipelineIntrospectionSource. +""" + +from engine.pipeline_sources.pipeline_introspection import PipelineIntrospectionSource + + +class TestPipelineIntrospectionSource: + """Tests for PipelineIntrospectionSource.""" + + def test_basic_init(self): + """Source initializes with defaults.""" + source = PipelineIntrospectionSource() + assert source.name == "pipeline-inspect" + assert source.is_dynamic is True + assert source.frame == 0 + + def test_init_with_pipelines(self): + """Source initializes with custom pipelines list.""" + source = PipelineIntrospectionSource( + pipelines=[], viewport_width=100, viewport_height=40 + ) + assert source.viewport_width == 100 + assert source.viewport_height == 40 + + def test_inlet_outlet_types(self): + """Source has correct inlet/outlet types.""" + source = PipelineIntrospectionSource() + # inlet should be NONE (source), outlet should be SOURCE_ITEMS + from engine.pipeline.core import DataType + + assert DataType.NONE in source.inlet_types + assert DataType.SOURCE_ITEMS in source.outlet_types + + def test_fetch_returns_items(self): + """fetch() returns SourceItem list.""" + source = PipelineIntrospectionSource() + items = source.fetch() + assert len(items) == 1 + assert items[0].source == "pipeline-inspect" + + def test_fetch_increments_frame(self): + """fetch() increments frame counter.""" + source = PipelineIntrospectionSource() + assert source.frame == 0 + source.fetch() + assert source.frame == 1 + source.fetch() + assert source.frame == 2 + + def test_get_items(self): + """get_items() returns list of SourceItems.""" + source = PipelineIntrospectionSource() + items = source.get_items() + assert isinstance(items, list) + assert len(items) > 0 + assert items[0].source == "pipeline-inspect" + + def test_add_pipeline(self): + """add_pipeline() adds pipeline to list.""" + source = PipelineIntrospectionSource() + mock_pipeline = object() + source.add_pipeline(mock_pipeline) + assert mock_pipeline in source._pipelines + + def test_remove_pipeline(self): + """remove_pipeline() removes pipeline from list.""" + source = PipelineIntrospectionSource() + mock_pipeline = object() + source.add_pipeline(mock_pipeline) + source.remove_pipeline(mock_pipeline) + assert mock_pipeline not in source._pipelines + + +class TestPipelineIntrospectionRender: + """Tests for rendering methods.""" + + def test_render_header_no_pipelines(self): + """_render_header returns default when no pipelines.""" + source = PipelineIntrospectionSource() + lines = source._render_header() + assert len(lines) == 1 + assert "PIPELINE INTROSPECTION" in lines[0] + + def test_render_bar(self): + """_render_bar creates correct bar.""" + source = PipelineIntrospectionSource() + bar = source._render_bar(50, 10) + assert len(bar) == 10 + assert bar.count("█") == 5 + assert bar.count("░") == 5 + + def test_render_bar_zero(self): + """_render_bar handles zero percentage.""" + source = PipelineIntrospectionSource() + bar = source._render_bar(0, 10) + assert bar == "░" * 10 + + def test_render_bar_full(self): + """_render_bar handles 100%.""" + source = PipelineIntrospectionSource() + bar = source._render_bar(100, 10) + assert bar == "█" * 10 + + def test_render_sparkline(self): + """_render_sparkline creates sparkline.""" + source = PipelineIntrospectionSource() + values = [1.0, 2.0, 3.0, 4.0, 5.0] + sparkline = source._render_sparkline(values, 10) + assert len(sparkline) == 10 + + def test_render_sparkline_empty(self): + """_render_sparkline handles empty values.""" + source = PipelineIntrospectionSource() + sparkline = source._render_sparkline([], 10) + assert sparkline == " " * 10 + + def test_render_footer_no_pipelines(self): + """_render_footer shows collecting data when no pipelines.""" + source = PipelineIntrospectionSource() + lines = source._render_footer() + assert len(lines) >= 2 + assert "collecting data" in lines[1] or "Frame Time" in lines[0] + + +class TestPipelineIntrospectionFull: + """Integration tests.""" + + def test_render_empty(self): + """_render works with no pipelines.""" + source = PipelineIntrospectionSource() + lines = source._render() + assert len(lines) > 0 + assert "PIPELINE INTROSPECTION" in lines[0] + + def test_render_with_mock_pipeline(self): + """_render works with mock pipeline.""" + source = PipelineIntrospectionSource() + + class MockStage: + category = "source" + name = "test" + + class MockPipeline: + stages = {"test": MockStage()} + execution_order = ["test"] + + def get_metrics_summary(self): + return {"stages": {"test": {"avg_ms": 1.5}}, "avg_ms": 2.0, "fps": 60} + + def get_frame_times(self): + return [1.0, 2.0, 3.0] + + source.add_pipeline(MockPipeline()) + lines = source._render() + assert len(lines) > 0 diff --git a/tests/test_pipeline_introspection_demo.py b/tests/test_pipeline_introspection_demo.py new file mode 100644 index 0000000..735f114 --- /dev/null +++ b/tests/test_pipeline_introspection_demo.py @@ -0,0 +1,167 @@ +""" +Tests for PipelineIntrospectionDemo. +""" + +from engine.pipeline.pipeline_introspection_demo import ( + DemoConfig, + DemoPhase, + PhaseState, + PipelineIntrospectionDemo, +) + + +class MockPipeline: + """Mock pipeline for testing.""" + + pass + + +class MockEffectConfig: + """Mock effect config.""" + + def __init__(self): + self.enabled = False + self.intensity = 0.5 + + +class MockEffect: + """Mock effect for testing.""" + + def __init__(self, name): + self.name = name + self.config = MockEffectConfig() + + +class MockRegistry: + """Mock effect registry.""" + + def __init__(self, effects): + self._effects = {e.name: e for e in effects} + + def get(self, name): + return self._effects.get(name) + + +class TestDemoPhase: + """Tests for DemoPhase enum.""" + + def test_phases_exist(self): + """All three phases exist.""" + assert DemoPhase.PHASE_1_TOGGLE is not None + assert DemoPhase.PHASE_2_LFO is not None + assert DemoPhase.PHASE_3_SHARED_LFO is not None + + +class TestDemoConfig: + """Tests for DemoConfig.""" + + def test_defaults(self): + """Default config has sensible values.""" + config = DemoConfig() + assert config.effect_cycle_duration == 3.0 + assert config.gap_duration == 1.0 + assert config.lfo_duration == 4.0 + assert config.phase_2_effect_duration == 4.0 + assert config.phase_3_lfo_duration == 6.0 + + +class TestPhaseState: + """Tests for PhaseState.""" + + def test_defaults(self): + """PhaseState initializes correctly.""" + state = PhaseState(phase=DemoPhase.PHASE_1_TOGGLE, start_time=0.0) + assert state.phase == DemoPhase.PHASE_1_TOGGLE + assert state.start_time == 0.0 + assert state.current_effect_index == 0 + + +class TestPipelineIntrospectionDemo: + """Tests for PipelineIntrospectionDemo.""" + + def test_basic_init(self): + """Demo initializes with defaults.""" + demo = PipelineIntrospectionDemo(pipeline=None) + assert demo.phase == DemoPhase.PHASE_1_TOGGLE + assert demo.effect_names == ["noise", "fade", "glitch", "firehose"] + + def test_init_with_custom_effects(self): + """Demo initializes with custom effects.""" + demo = PipelineIntrospectionDemo(pipeline=None, effect_names=["noise", "fade"]) + assert demo.effect_names == ["noise", "fade"] + + def test_phase_display(self): + """phase_display returns correct string.""" + demo = PipelineIntrospectionDemo(pipeline=None) + assert "Phase 1" in demo.phase_display + + def test_shared_oscillator_created(self): + """Shared oscillator is created.""" + demo = PipelineIntrospectionDemo(pipeline=None) + assert demo.shared_oscillator is not None + assert demo.shared_oscillator.name == "demo-lfo" + + +class TestPipelineIntrospectionDemoUpdate: + """Tests for update method.""" + + def test_update_returns_dict(self): + """update() returns a dict with expected keys.""" + demo = PipelineIntrospectionDemo(pipeline=None) + result = demo.update() + assert "phase" in result + assert "phase_display" in result + assert "effect_states" in result + + def test_update_phase_1_structure(self): + """Phase 1 has correct structure.""" + demo = PipelineIntrospectionDemo(pipeline=None) + result = demo.update() + assert result["phase"] == "PHASE_1_TOGGLE" + assert "current_effect" in result + + def test_effect_states_structure(self): + """effect_states has correct structure.""" + demo = PipelineIntrospectionDemo(pipeline=None) + result = demo.update() + states = result["effect_states"] + for name in demo.effect_names: + assert name in states + assert "enabled" in states[name] + assert "intensity" in states[name] + + +class TestPipelineIntrospectionDemoPhases: + """Tests for phase transitions.""" + + def test_phase_1_initial(self): + """Starts in phase 1.""" + demo = PipelineIntrospectionDemo(pipeline=None) + assert demo.phase == DemoPhase.PHASE_1_TOGGLE + + def test_shared_oscillator_not_started_initially(self): + """Shared oscillator not started in phase 1.""" + demo = PipelineIntrospectionDemo(pipeline=None) + assert demo.shared_oscillator is not None + # The oscillator.start() is called when transitioning to phase 3 + + +class TestPipelineIntrospectionDemoCleanup: + """Tests for cleanup method.""" + + def test_cleanup_no_error(self): + """cleanup() runs without error.""" + demo = PipelineIntrospectionDemo(pipeline=None) + demo.cleanup() # Should not raise + + def test_cleanup_resets_effects(self): + """cleanup() resets effects.""" + demo = PipelineIntrospectionDemo(pipeline=None) + demo._apply_effect_states( + { + "noise": {"enabled": True, "intensity": 1.0}, + "fade": {"enabled": True, "intensity": 1.0}, + } + ) + demo.cleanup() + # If we had a mock registry, we could verify effects were reset diff --git a/tests/test_pipeline_metrics_sensor.py b/tests/test_pipeline_metrics_sensor.py new file mode 100644 index 0000000..8af380b --- /dev/null +++ b/tests/test_pipeline_metrics_sensor.py @@ -0,0 +1,113 @@ +""" +Tests for PipelineMetricsSensor. +""" + +from engine.sensors.pipeline_metrics import PipelineMetricsSensor + + +class MockPipeline: + """Mock pipeline for testing.""" + + def __init__(self, metrics=None): + self._metrics = metrics or {} + + def get_metrics_summary(self): + return self._metrics + + +class TestPipelineMetricsSensor: + """Tests for PipelineMetricsSensor.""" + + def test_basic_init(self): + """Sensor initializes with defaults.""" + sensor = PipelineMetricsSensor() + assert sensor.name == "pipeline" + assert sensor.available is False + + def test_init_with_pipeline(self): + """Sensor initializes with pipeline.""" + mock = MockPipeline() + sensor = PipelineMetricsSensor(mock) + assert sensor.available is True + + def test_set_pipeline(self): + """set_pipeline() updates pipeline.""" + sensor = PipelineMetricsSensor() + assert sensor.available is False + sensor.set_pipeline(MockPipeline()) + assert sensor.available is True + + def test_read_no_pipeline(self): + """read() returns None when no pipeline.""" + sensor = PipelineMetricsSensor() + assert sensor.read() is None + + def test_read_with_metrics(self): + """read() returns sensor value with metrics.""" + mock = MockPipeline( + { + "total_ms": 18.5, + "fps": 54.0, + "avg_ms": 18.5, + "min_ms": 15.0, + "max_ms": 22.0, + "stages": {"render": {"avg_ms": 12.0}, "noise": {"avg_ms": 3.0}}, + } + ) + sensor = PipelineMetricsSensor(mock) + val = sensor.read() + assert val is not None + assert val.sensor_name == "pipeline" + assert val.value == 18.5 + + def test_read_with_error(self): + """read() returns None when metrics have error.""" + mock = MockPipeline({"error": "No metrics collected"}) + sensor = PipelineMetricsSensor(mock) + assert sensor.read() is None + + def test_get_stage_timing(self): + """get_stage_timing() returns stage timing.""" + mock = MockPipeline( + { + "stages": {"render": {"avg_ms": 12.0}, "noise": {"avg_ms": 3.0}}, + } + ) + sensor = PipelineMetricsSensor(mock) + assert sensor.get_stage_timing("render") == 12.0 + assert sensor.get_stage_timing("noise") == 3.0 + assert sensor.get_stage_timing("nonexistent") == 0.0 + + def test_get_stage_timing_no_pipeline(self): + """get_stage_timing() returns 0 when no pipeline.""" + sensor = PipelineMetricsSensor() + assert sensor.get_stage_timing("test") == 0.0 + + def test_get_all_timings(self): + """get_all_timings() returns all stage timings.""" + mock = MockPipeline( + { + "stages": {"render": {"avg_ms": 12.0}, "noise": {"avg_ms": 3.0}}, + } + ) + sensor = PipelineMetricsSensor(mock) + timings = sensor.get_all_timings() + assert timings == {"render": {"avg_ms": 12.0}, "noise": {"avg_ms": 3.0}} + + def test_get_frame_history(self): + """get_frame_history() returns frame times.""" + MockPipeline() + + class MockPipelineWithFrames: + def get_frame_times(self): + return [1.0, 2.0, 3.0] + + sensor = PipelineMetricsSensor(MockPipelineWithFrames()) + history = sensor.get_frame_history() + assert history == [1.0, 2.0, 3.0] + + def test_start_stop(self): + """start() and stop() work.""" + sensor = PipelineMetricsSensor() + assert sensor.start() is True + sensor.stop() # Should not raise