Files
Mainline/engine/sensors/pipeline_metrics.py
David Gwilliam f638fb7597 feat: add pipeline introspection demo mode
- Add PipelineIntrospectionSource that renders live ASCII DAG with metrics
- Add PipelineMetricsSensor exposing pipeline performance as sensor values
- Add PipelineIntrospectionDemo controller with 3-phase animation:
  - Phase 1: Toggle effects one at a time (3s each)
  - Phase 2: LFO drives intensity default→max→min→default
  - Phase 3: All effects with shared LFO (infinite loop)
- Add pipeline-inspect preset
- Add get_frame_times() to Pipeline for sparkline data
- Add tests for new components
- Update mise.toml with pipeline-inspect preset task
2026-03-16 16:55:57 -07:00

115 lines
3.5 KiB
Python

"""
Pipeline metrics sensor - Exposes pipeline performance data as sensor values.
This sensor reads metrics from a Pipeline instance and provides them
as sensor values that can drive effect parameters.
Example:
sensor = PipelineMetricsSensor(pipeline)
sensor.read() # Returns SensorValue with total_ms, fps, etc.
"""
from typing import TYPE_CHECKING
from engine.sensors import Sensor, SensorValue
if TYPE_CHECKING:
from engine.pipeline.controller import Pipeline
class PipelineMetricsSensor(Sensor):
"""Sensor that reads metrics from a Pipeline instance.
Provides real-time performance data:
- total_ms: Total frame time in milliseconds
- fps: Calculated frames per second
- stage_timings: Dict of stage name -> duration_ms
Can be bound to effect parameters for reactive visuals.
"""
def __init__(self, pipeline: "Pipeline | None" = None, name: str = "pipeline"):
self._pipeline = pipeline
self.name = name
self.unit = "ms"
self._last_values: dict[str, float] = {
"total_ms": 0.0,
"fps": 0.0,
"avg_ms": 0.0,
"min_ms": 0.0,
"max_ms": 0.0,
}
@property
def available(self) -> bool:
return self._pipeline is not None
def set_pipeline(self, pipeline: "Pipeline") -> None:
"""Set or update the pipeline to read metrics from."""
self._pipeline = pipeline
def read(self) -> SensorValue | None:
"""Read current metrics from the pipeline."""
if not self._pipeline:
return None
try:
metrics = self._pipeline.get_metrics_summary()
except Exception:
return None
if not metrics or "error" in metrics:
return None
self._last_values["total_ms"] = metrics.get("total_ms", 0.0)
self._last_values["fps"] = metrics.get("fps", 0.0)
self._last_values["avg_ms"] = metrics.get("avg_ms", 0.0)
self._last_values["min_ms"] = metrics.get("min_ms", 0.0)
self._last_values["max_ms"] = metrics.get("max_ms", 0.0)
# Provide total_ms as primary value (for LFO-style effects)
return SensorValue(
sensor_name=self.name,
value=self._last_values["total_ms"],
timestamp=0.0,
unit=self.unit,
)
def get_stage_timing(self, stage_name: str) -> float:
"""Get timing for a specific stage."""
if not self._pipeline:
return 0.0
try:
metrics = self._pipeline.get_metrics_summary()
stages = metrics.get("stages", {})
return stages.get(stage_name, {}).get("avg_ms", 0.0)
except Exception:
return 0.0
def get_all_timings(self) -> dict[str, float]:
"""Get all stage timings as a dict."""
if not self._pipeline:
return {}
try:
metrics = self._pipeline.get_metrics_summary()
return metrics.get("stages", {})
except Exception:
return {}
def get_frame_history(self) -> list[float]:
"""Get historical frame times for sparklines."""
if not self._pipeline:
return []
try:
return self._pipeline.get_frame_times()
except Exception:
return []
def start(self) -> bool:
"""Start the sensor (no-op for read-only metrics)."""
return True
def stop(self) -> None:
"""Stop the sensor (no-op for read-only metrics)."""
pass