Files
Mainline/tests/test_sensors.py
David Gwilliam 2e96b7cd83 feat(sensors): add sensor framework for pipeline integration
- Add Sensor base class with value emission
- Add SensorRegistry for discovery
- Add SensorStage adapter for pipeline
- Add MicSensor (self-contained, no external deps)
- Add OscillatorSensor for testing
- Add sensor param bindings to effects
2026-03-16 13:55:47 -07:00

474 lines
13 KiB
Python

"""
Tests for the sensor framework.
"""
import time
from engine.sensors import Sensor, SensorRegistry, SensorStage, SensorValue
class TestSensorValue:
"""Tests for SensorValue dataclass."""
def test_create_sensor_value(self):
"""SensorValue stores sensor data correctly."""
value = SensorValue(
sensor_name="mic",
value=42.5,
timestamp=1234567890.0,
unit="dB",
)
assert value.sensor_name == "mic"
assert value.value == 42.5
assert value.timestamp == 1234567890.0
assert value.unit == "dB"
class DummySensor(Sensor):
"""Dummy sensor for testing."""
def __init__(self, name: str = "dummy", value: float = 1.0):
self.name = name
self.unit = "units"
self._value = value
def start(self) -> bool:
return True
def stop(self) -> None:
pass
def read(self) -> SensorValue | None:
return SensorValue(
sensor_name=self.name,
value=self._value,
timestamp=time.time(),
unit=self.unit,
)
class TestSensorRegistry:
"""Tests for SensorRegistry."""
def setup_method(self):
"""Clear registry before each test."""
SensorRegistry._sensors.clear()
SensorRegistry._started = False
def test_register_sensor(self):
"""SensorRegistry registers sensors."""
sensor = DummySensor()
SensorRegistry.register(sensor)
assert SensorRegistry.get("dummy") is sensor
def test_list_sensors(self):
"""SensorRegistry lists registered sensors."""
SensorRegistry.register(DummySensor("a"))
SensorRegistry.register(DummySensor("b"))
sensors = SensorRegistry.list_sensors()
assert "a" in sensors
assert "b" in sensors
def test_read_all(self):
"""SensorRegistry reads all sensor values."""
SensorRegistry.register(DummySensor("a", 1.0))
SensorRegistry.register(DummySensor("b", 2.0))
values = SensorRegistry.read_all()
assert values["a"] == 1.0
assert values["b"] == 2.0
class TestSensorStage:
"""Tests for SensorStage pipeline adapter."""
def setup_method(self):
SensorRegistry._sensors.clear()
SensorRegistry._started = False
def test_sensor_stage_capabilities(self):
"""SensorStage declares correct capabilities."""
sensor = DummySensor("mic")
stage = SensorStage(sensor)
assert "sensor.mic" in stage.capabilities
def test_sensor_stage_process(self):
"""SensorStage reads sensor and stores in context."""
from engine.pipeline.core import PipelineContext
sensor = DummySensor("test", 42.0)
stage = SensorStage(sensor, "test")
ctx = PipelineContext()
result = stage.process(None, ctx)
assert ctx.get_state("sensor.test") == 42.0
assert result is None
class TestApplyParamBindings:
"""Tests for sensor param bindings."""
def test_no_bindings_returns_original(self):
"""Effect without bindings returns original config."""
from engine.effects.types import (
EffectConfig,
EffectPlugin,
apply_param_bindings,
)
class TestEffect(EffectPlugin):
name = "test"
config = EffectConfig()
def process(self, buf, ctx):
return buf
def configure(self, config):
pass
effect = TestEffect()
ctx = object()
result = apply_param_bindings(effect, ctx)
assert result is effect.config
def test_bindings_read_sensor_values(self):
"""Param bindings read sensor values from context."""
from engine.effects.types import (
EffectConfig,
EffectPlugin,
apply_param_bindings,
)
class TestEffect(EffectPlugin):
name = "test"
config = EffectConfig(intensity=1.0)
param_bindings = {
"intensity": {"sensor": "mic", "transform": "linear"},
}
def process(self, buf, ctx):
return buf
def configure(self, config):
pass
from engine.effects.types import EffectContext
effect = TestEffect()
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
)
ctx.set_state("sensor.mic", 0.8)
result = apply_param_bindings(effect, ctx)
assert "intensity_sensor" in result.params
class TestSensorLifecycle:
"""Tests for sensor start/stop lifecycle."""
def setup_method(self):
SensorRegistry._sensors.clear()
SensorRegistry._started = False
def test_start_all(self):
"""SensorRegistry starts all sensors."""
started = []
class StatefulSensor(Sensor):
name = "stateful"
def start(self) -> bool:
started.append("start")
return True
def stop(self) -> None:
started.append("stop")
def read(self) -> SensorValue | None:
return SensorValue("stateful", 1.0, 0.0)
SensorRegistry.register(StatefulSensor())
SensorRegistry.start_all()
assert "start" in started
assert SensorRegistry._started is True
def test_stop_all(self):
"""SensorRegistry stops all sensors."""
stopped = []
class StatefulSensor(Sensor):
name = "stateful"
def start(self) -> bool:
return True
def stop(self) -> None:
stopped.append("stop")
def read(self) -> SensorValue | None:
return SensorValue("stateful", 1.0, 0.0)
SensorRegistry.register(StatefulSensor())
SensorRegistry.start_all()
SensorRegistry.stop_all()
assert "stop" in stopped
assert SensorRegistry._started is False
def test_unavailable_sensor(self):
"""Unavailable sensor returns None from read."""
class UnavailableSensor(Sensor):
name = "unavailable"
@property
def available(self) -> bool:
return False
def start(self) -> bool:
return False
def stop(self) -> None:
pass
def read(self) -> SensorValue | None:
return None
sensor = UnavailableSensor()
assert sensor.available is False
assert sensor.read() is None
class TestTransforms:
"""Tests for sensor value transforms."""
def test_exponential_transform(self):
"""Exponential transform squares the value."""
from engine.effects.types import (
EffectConfig,
EffectPlugin,
apply_param_bindings,
)
class TestEffect(EffectPlugin):
name = "test"
config = EffectConfig(intensity=1.0)
param_bindings = {
"intensity": {"sensor": "mic", "transform": "exponential"},
}
def process(self, buf, ctx):
return buf
def configure(self, config):
pass
from engine.effects.types import EffectContext
effect = TestEffect()
ctx = EffectContext(80, 24, 0, 20)
ctx.set_state("sensor.mic", 0.5)
result = apply_param_bindings(effect, ctx)
# 0.5^2 = 0.25, then scaled: 0.5 + 0.25*0.5 = 0.625
assert result.intensity != effect.config.intensity
def test_inverse_transform(self):
"""Inverse transform inverts the value."""
from engine.effects.types import (
EffectConfig,
EffectPlugin,
apply_param_bindings,
)
class TestEffect(EffectPlugin):
name = "test"
config = EffectConfig(intensity=1.0)
param_bindings = {
"intensity": {"sensor": "mic", "transform": "inverse"},
}
def process(self, buf, ctx):
return buf
def configure(self, config):
pass
from engine.effects.types import EffectContext
effect = TestEffect()
ctx = EffectContext(80, 24, 0, 20)
ctx.set_state("sensor.mic", 0.8)
result = apply_param_bindings(effect, ctx)
# 1.0 - 0.8 = 0.2
assert abs(result.params["intensity_sensor"] - 0.2) < 0.001
def test_threshold_transform(self):
"""Threshold transform applies binary threshold."""
from engine.effects.types import (
EffectConfig,
EffectPlugin,
apply_param_bindings,
)
class TestEffect(EffectPlugin):
name = "test"
config = EffectConfig(intensity=1.0)
param_bindings = {
"intensity": {
"sensor": "mic",
"transform": "threshold",
"threshold": 0.5,
},
}
def process(self, buf, ctx):
return buf
def configure(self, config):
pass
from engine.effects.types import EffectContext
effect = TestEffect()
ctx = EffectContext(80, 24, 0, 20)
# Above threshold
ctx.set_state("sensor.mic", 0.8)
result = apply_param_bindings(effect, ctx)
assert result.params["intensity_sensor"] == 1.0
# Below threshold
ctx.set_state("sensor.mic", 0.3)
result = apply_param_bindings(effect, ctx)
assert result.params["intensity_sensor"] == 0.0
class TestOscillatorSensor:
"""Tests for OscillatorSensor."""
def setup_method(self):
SensorRegistry._sensors.clear()
SensorRegistry._started = False
def test_sine_waveform(self):
"""Oscillator generates sine wave."""
from engine.sensors.oscillator import OscillatorSensor
osc = OscillatorSensor(name="test", waveform="sine", frequency=1.0)
osc.start()
values = [osc.read().value for _ in range(10)]
assert all(0 <= v <= 1 for v in values)
def test_square_waveform(self):
"""Oscillator generates square wave."""
from engine.sensors.oscillator import OscillatorSensor
osc = OscillatorSensor(name="test", waveform="square", frequency=10.0)
osc.start()
values = [osc.read().value for _ in range(10)]
assert all(v in (0.0, 1.0) for v in values)
def test_waveform_types(self):
"""All waveform types work."""
from engine.sensors.oscillator import OscillatorSensor
for wf in ["sine", "square", "sawtooth", "triangle", "noise"]:
osc = OscillatorSensor(name=wf, waveform=wf, frequency=1.0)
osc.start()
val = osc.read()
assert val is not None
assert 0 <= val.value <= 1
def test_invalid_waveform_raises(self):
"""Invalid waveform returns None."""
from engine.sensors.oscillator import OscillatorSensor
osc = OscillatorSensor(waveform="invalid")
osc.start()
val = osc.read()
assert val is None
def test_sensor_driven_oscillator(self):
"""Oscillator can be driven by another sensor."""
from engine.sensors.oscillator import OscillatorSensor
class ModSensor(Sensor):
name = "mod"
def start(self) -> bool:
return True
def stop(self) -> None:
pass
def read(self) -> SensorValue | None:
return SensorValue("mod", 0.5, 0.0)
SensorRegistry.register(ModSensor())
osc = OscillatorSensor(
name="lfo", waveform="sine", frequency=0.1, input_sensor="mod"
)
osc.start()
val = osc.read()
assert val is not None
assert 0 <= val.value <= 1
class TestMicSensor:
"""Tests for MicSensor."""
def setup_method(self):
SensorRegistry._sensors.clear()
SensorRegistry._started = False
def test_mic_sensor_creation(self):
"""MicSensor can be created."""
from engine.sensors.mic import MicSensor
sensor = MicSensor()
assert sensor.name == "mic"
assert sensor.unit == "dB"
def test_mic_sensor_custom_name(self):
"""MicSensor can have custom name."""
from engine.sensors.mic import MicSensor
sensor = MicSensor(name="my_mic")
assert sensor.name == "my_mic"
def test_mic_sensor_start(self):
"""MicSensor.start returns bool."""
from engine.sensors.mic import MicSensor
sensor = MicSensor()
result = sensor.start()
assert isinstance(result, bool)
def test_mic_sensor_read_returns_value_or_none(self):
"""MicSensor.read returns SensorValue or None."""
from engine.sensors.mic import MicSensor
sensor = MicSensor()
sensor.start()
# May be None if no mic available
result = sensor.read()
# Just check it doesn't raise - result depends on system
assert result is None or isinstance(result, SensorValue)