""" Pipeline metrics sensor - Exposes pipeline performance data as sensor values. This sensor reads metrics from a Pipeline instance and provides them as sensor values that can drive effect parameters. Example: sensor = PipelineMetricsSensor(pipeline) sensor.read() # Returns SensorValue with total_ms, fps, etc. """ from typing import TYPE_CHECKING from engine.sensors import Sensor, SensorValue if TYPE_CHECKING: from engine.pipeline.controller import Pipeline class PipelineMetricsSensor(Sensor): """Sensor that reads metrics from a Pipeline instance. Provides real-time performance data: - total_ms: Total frame time in milliseconds - fps: Calculated frames per second - stage_timings: Dict of stage name -> duration_ms Can be bound to effect parameters for reactive visuals. """ def __init__(self, pipeline: "Pipeline | None" = None, name: str = "pipeline"): self._pipeline = pipeline self.name = name self.unit = "ms" self._last_values: dict[str, float] = { "total_ms": 0.0, "fps": 0.0, "avg_ms": 0.0, "min_ms": 0.0, "max_ms": 0.0, } @property def available(self) -> bool: return self._pipeline is not None def set_pipeline(self, pipeline: "Pipeline") -> None: """Set or update the pipeline to read metrics from.""" self._pipeline = pipeline def read(self) -> SensorValue | None: """Read current metrics from the pipeline.""" if not self._pipeline: return None try: metrics = self._pipeline.get_metrics_summary() except Exception: return None if not metrics or "error" in metrics: return None self._last_values["total_ms"] = metrics.get("total_ms", 0.0) self._last_values["fps"] = metrics.get("fps", 0.0) self._last_values["avg_ms"] = metrics.get("avg_ms", 0.0) self._last_values["min_ms"] = metrics.get("min_ms", 0.0) self._last_values["max_ms"] = metrics.get("max_ms", 0.0) # Provide total_ms as primary value (for LFO-style effects) return SensorValue( sensor_name=self.name, value=self._last_values["total_ms"], timestamp=0.0, unit=self.unit, ) def get_stage_timing(self, stage_name: str) -> float: """Get timing for a specific stage.""" if not self._pipeline: return 0.0 try: metrics = self._pipeline.get_metrics_summary() stages = metrics.get("stages", {}) return stages.get(stage_name, {}).get("avg_ms", 0.0) except Exception: return 0.0 def get_all_timings(self) -> dict[str, float]: """Get all stage timings as a dict.""" if not self._pipeline: return {} try: metrics = self._pipeline.get_metrics_summary() return metrics.get("stages", {}) except Exception: return {} def get_frame_history(self) -> list[float]: """Get historical frame times for sparklines.""" if not self._pipeline: return [] try: return self._pipeline.get_frame_times() except Exception: return [] def start(self) -> bool: """Start the sensor (no-op for read-only metrics).""" return True def stop(self) -> None: """Stop the sensor (no-op for read-only metrics).""" pass