forked from genewildish/Mainline
- Add PipelineIntrospectionSource that renders live ASCII DAG with metrics - Add PipelineMetricsSensor exposing pipeline performance as sensor values - Add PipelineIntrospectionDemo controller with 3-phase animation: - Phase 1: Toggle effects one at a time (3s each) - Phase 2: LFO drives intensity default→max→min→default - Phase 3: All effects with shared LFO (infinite loop) - Add pipeline-inspect preset - Add get_frame_times() to Pipeline for sparkline data - Add tests for new components - Update mise.toml with pipeline-inspect preset task
114 lines
3.6 KiB
Python
114 lines
3.6 KiB
Python
"""
|
|
Tests for PipelineMetricsSensor.
|
|
"""
|
|
|
|
from engine.sensors.pipeline_metrics import PipelineMetricsSensor
|
|
|
|
|
|
class MockPipeline:
|
|
"""Mock pipeline for testing."""
|
|
|
|
def __init__(self, metrics=None):
|
|
self._metrics = metrics or {}
|
|
|
|
def get_metrics_summary(self):
|
|
return self._metrics
|
|
|
|
|
|
class TestPipelineMetricsSensor:
|
|
"""Tests for PipelineMetricsSensor."""
|
|
|
|
def test_basic_init(self):
|
|
"""Sensor initializes with defaults."""
|
|
sensor = PipelineMetricsSensor()
|
|
assert sensor.name == "pipeline"
|
|
assert sensor.available is False
|
|
|
|
def test_init_with_pipeline(self):
|
|
"""Sensor initializes with pipeline."""
|
|
mock = MockPipeline()
|
|
sensor = PipelineMetricsSensor(mock)
|
|
assert sensor.available is True
|
|
|
|
def test_set_pipeline(self):
|
|
"""set_pipeline() updates pipeline."""
|
|
sensor = PipelineMetricsSensor()
|
|
assert sensor.available is False
|
|
sensor.set_pipeline(MockPipeline())
|
|
assert sensor.available is True
|
|
|
|
def test_read_no_pipeline(self):
|
|
"""read() returns None when no pipeline."""
|
|
sensor = PipelineMetricsSensor()
|
|
assert sensor.read() is None
|
|
|
|
def test_read_with_metrics(self):
|
|
"""read() returns sensor value with metrics."""
|
|
mock = MockPipeline(
|
|
{
|
|
"total_ms": 18.5,
|
|
"fps": 54.0,
|
|
"avg_ms": 18.5,
|
|
"min_ms": 15.0,
|
|
"max_ms": 22.0,
|
|
"stages": {"render": {"avg_ms": 12.0}, "noise": {"avg_ms": 3.0}},
|
|
}
|
|
)
|
|
sensor = PipelineMetricsSensor(mock)
|
|
val = sensor.read()
|
|
assert val is not None
|
|
assert val.sensor_name == "pipeline"
|
|
assert val.value == 18.5
|
|
|
|
def test_read_with_error(self):
|
|
"""read() returns None when metrics have error."""
|
|
mock = MockPipeline({"error": "No metrics collected"})
|
|
sensor = PipelineMetricsSensor(mock)
|
|
assert sensor.read() is None
|
|
|
|
def test_get_stage_timing(self):
|
|
"""get_stage_timing() returns stage timing."""
|
|
mock = MockPipeline(
|
|
{
|
|
"stages": {"render": {"avg_ms": 12.0}, "noise": {"avg_ms": 3.0}},
|
|
}
|
|
)
|
|
sensor = PipelineMetricsSensor(mock)
|
|
assert sensor.get_stage_timing("render") == 12.0
|
|
assert sensor.get_stage_timing("noise") == 3.0
|
|
assert sensor.get_stage_timing("nonexistent") == 0.0
|
|
|
|
def test_get_stage_timing_no_pipeline(self):
|
|
"""get_stage_timing() returns 0 when no pipeline."""
|
|
sensor = PipelineMetricsSensor()
|
|
assert sensor.get_stage_timing("test") == 0.0
|
|
|
|
def test_get_all_timings(self):
|
|
"""get_all_timings() returns all stage timings."""
|
|
mock = MockPipeline(
|
|
{
|
|
"stages": {"render": {"avg_ms": 12.0}, "noise": {"avg_ms": 3.0}},
|
|
}
|
|
)
|
|
sensor = PipelineMetricsSensor(mock)
|
|
timings = sensor.get_all_timings()
|
|
assert timings == {"render": {"avg_ms": 12.0}, "noise": {"avg_ms": 3.0}}
|
|
|
|
def test_get_frame_history(self):
|
|
"""get_frame_history() returns frame times."""
|
|
MockPipeline()
|
|
|
|
class MockPipelineWithFrames:
|
|
def get_frame_times(self):
|
|
return [1.0, 2.0, 3.0]
|
|
|
|
sensor = PipelineMetricsSensor(MockPipelineWithFrames())
|
|
history = sensor.get_frame_history()
|
|
assert history == [1.0, 2.0, 3.0]
|
|
|
|
def test_start_stop(self):
|
|
"""start() and stop() work."""
|
|
sensor = PipelineMetricsSensor()
|
|
assert sensor.start() is True
|
|
sensor.stop() # Should not raise
|