diff --git a/engine/sensors/__init__.py b/engine/sensors/__init__.py new file mode 100644 index 0000000..0e0841f --- /dev/null +++ b/engine/sensors/__init__.py @@ -0,0 +1,186 @@ +""" +Sensor framework - PureData-style real-time input system. + +Sensors are data sources that emit values over time, similar to how +PureData objects emit signals. Effects can bind to sensors to modulate +their parameters dynamically. + +Architecture: +- Sensor: Base class for all sensors (mic, camera, ntfy, OSC, etc.) +- SensorRegistry: Global registry for sensor discovery +- SensorStage: Pipeline stage wrapper for sensors +- Effect param_bindings: Declarative sensor-to-param routing + +Example: + class GlitchEffect(EffectPlugin): + param_bindings = { + "intensity": {"sensor": "mic", "transform": "linear"}, + } + +This binds the mic sensor to the glitch intensity parameter. +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from engine.pipeline.core import PipelineContext + + +@dataclass +class SensorValue: + """A sensor reading with metadata.""" + + sensor_name: str + value: float + timestamp: float + unit: str = "" + + +class Sensor(ABC): + """Abstract base class for sensors. + + Sensors are real-time data sources that emit values. They can be: + - Physical: mic, camera, joystick, MIDI, OSC + - Virtual: ntfy, timer, random, noise + + Each sensor has a name and emits SensorValue objects. + """ + + name: str + unit: str = "" + + @property + def available(self) -> bool: + """Whether the sensor is currently available.""" + return True + + @abstractmethod + def read(self) -> SensorValue | None: + """Read current sensor value. + + Returns: + SensorValue if available, None if sensor is not ready. + """ + ... + + @abstractmethod + def start(self) -> bool: + """Start the sensor. + + Returns: + True if started successfully. + """ + ... + + @abstractmethod + def stop(self) -> None: + """Stop the sensor and release resources.""" + ... + + +class SensorRegistry: + """Global registry for sensors. + + Provides: + - Registration of sensor instances + - Lookup by name + - Global start/stop + """ + + _sensors: dict[str, Sensor] = {} + _started: bool = False + + @classmethod + def register(cls, sensor: Sensor) -> None: + """Register a sensor instance.""" + cls._sensors[sensor.name] = sensor + + @classmethod + def get(cls, name: str) -> Sensor | None: + """Get a sensor by name.""" + return cls._sensors.get(name) + + @classmethod + def list_sensors(cls) -> list[str]: + """List all registered sensor names.""" + return list(cls._sensors.keys()) + + @classmethod + def start_all(cls) -> bool: + """Start all sensors. + + Returns: + True if all sensors started successfully. + """ + if cls._started: + return True + + all_started = True + for sensor in cls._sensors.values(): + if sensor.available and not sensor.start(): + all_started = False + + cls._started = all_started + return all_started + + @classmethod + def stop_all(cls) -> None: + """Stop all sensors.""" + for sensor in cls._sensors.values(): + sensor.stop() + cls._started = False + + @classmethod + def read_all(cls) -> dict[str, float]: + """Read all sensor values. + + Returns: + Dict mapping sensor name to current value. + """ + result = {} + for name, sensor in cls._sensors.items(): + value = sensor.read() + if value: + result[name] = value.value + return result + + +class SensorStage: + """Pipeline stage wrapper for sensors. + + Provides sensor data to the pipeline context. + """ + + def __init__(self, sensor: Sensor, name: str | None = None): + self._sensor = sensor + self.name = name or sensor.name + self.category = "sensor" + self.optional = True + + @property + def capabilities(self) -> set[str]: + return {f"sensor.{self.name}"} + + @property + def dependencies(self) -> set[str]: + return set() + + def init(self, ctx: "PipelineContext") -> bool: + return self._sensor.start() + + def process(self, data: Any, ctx: "PipelineContext") -> Any: + value = self._sensor.read() + if value: + ctx.set_state(f"sensor.{self.name}", value.value) + ctx.set_state(f"sensor.{self.name}.full", value) + return data + + def cleanup(self) -> None: + self._sensor.stop() + + +def create_sensor_stage(sensor: Sensor, name: str | None = None) -> SensorStage: + """Create a pipeline stage from a sensor.""" + return SensorStage(sensor, name) diff --git a/engine/sensors/mic.py b/engine/sensors/mic.py new file mode 100644 index 0000000..71480fa --- /dev/null +++ b/engine/sensors/mic.py @@ -0,0 +1,147 @@ +""" +Mic sensor - audio input as a pipeline sensor. + +Self-contained implementation that handles audio input directly, +with graceful degradation if sounddevice is unavailable. +""" + +import atexit +import time +from collections.abc import Callable +from dataclasses import dataclass +from datetime import datetime +from typing import Any + +try: + import numpy as np + import sounddevice as sd + + _HAS_AUDIO = True +except Exception: + _HAS_AUDIO = False + np = None # type: ignore + sd = None # type: ignore + + +from engine.events import MicLevelEvent +from engine.sensors import Sensor, SensorRegistry, SensorValue + + +@dataclass +class AudioConfig: + """Configuration for audio input.""" + + threshold_db: float = 50.0 + sample_rate: float = 44100.0 + block_size: int = 1024 + + +class MicSensor(Sensor): + """Microphone sensor for pipeline integration. + + Self-contained implementation with graceful degradation. + No external dependencies required - works with or without sounddevice. + """ + + def __init__(self, threshold_db: float = 50.0, name: str = "mic"): + self.name = name + self.unit = "dB" + self._config = AudioConfig(threshold_db=threshold_db) + self._db: float = -99.0 + self._stream: Any = None + self._subscribers: list[Callable[[MicLevelEvent], None]] = [] + + @property + def available(self) -> bool: + """Check if audio input is available.""" + return _HAS_AUDIO and self._stream is not None + + def start(self) -> bool: + """Start the microphone stream.""" + if not _HAS_AUDIO: + return False + + try: + self._stream = sd.InputStream( + samplerate=self._config.sample_rate, + blocksize=self._config.block_size, + channels=1, + callback=self._audio_callback, + ) + self._stream.start() + atexit.register(self.stop) + return True + except Exception: + return False + + def stop(self) -> None: + """Stop the microphone stream.""" + if self._stream: + try: + self._stream.stop() + self._stream.close() + except Exception: + pass + self._stream = None + + def _audio_callback( + self, indata: np.ndarray, frames: int, time_info: Any, status: Any + ) -> None: + """Process audio data from sounddevice.""" + if not _HAS_AUDIO or np is None: + return + + rms = np.sqrt(np.mean(indata**2)) + if rms > 0: + db = 20 * np.log10(rms) + else: + db = -99.0 + + self._db = db + + excess = max(0.0, db - self._config.threshold_db) + event = MicLevelEvent( + db_level=db, excess_above_threshold=excess, timestamp=datetime.now() + ) + self._emit(event) + + def _emit(self, event: MicLevelEvent) -> None: + """Emit event to all subscribers.""" + for callback in self._subscribers: + try: + callback(event) + except Exception: + pass + + def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None: + """Subscribe to mic level events.""" + if callback not in self._subscribers: + self._subscribers.append(callback) + + def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None: + """Unsubscribe from mic level events.""" + if callback in self._subscribers: + self._subscribers.remove(callback) + + def read(self) -> SensorValue | None: + """Read current mic level as sensor value.""" + if not self.available: + return None + + excess = max(0.0, self._db - self._config.threshold_db) + return SensorValue( + sensor_name=self.name, + value=excess, + timestamp=time.time(), + unit=self.unit, + ) + + +def register_mic_sensor() -> None: + """Register the mic sensor with the global registry.""" + sensor = MicSensor() + SensorRegistry.register(sensor) + + +# Auto-register when imported +register_mic_sensor() diff --git a/engine/sensors/oscillator.py b/engine/sensors/oscillator.py new file mode 100644 index 0000000..d814723 --- /dev/null +++ b/engine/sensors/oscillator.py @@ -0,0 +1,161 @@ +""" +Oscillator sensor - Modular synth-style oscillator as a pipeline sensor. + +Provides various waveforms that can be: +1. Self-driving (phase accumulates over time) +2. Sensor-driven (phase modulated by external sensor) + +Built-in waveforms: +- sine: Pure sine wave +- square: Square wave (0 to 1) +- sawtooth: Rising sawtooth (0 to 1, wraps) +- triangle: Triangle wave (0 to 1 to 0) +- noise: Random values (0 to 1) + +Example usage: + osc = OscillatorSensor(waveform="sine", frequency=0.5) + # Or driven by mic sensor: + osc = OscillatorSensor(waveform="sine", frequency=1.0, input_sensor="mic") +""" + +import math +import random +import time +from enum import Enum + +from engine.sensors import Sensor, SensorRegistry, SensorValue + + +class Waveform(Enum): + """Built-in oscillator waveforms.""" + + SINE = "sine" + SQUARE = "square" + SAWTOOTH = "sawtooth" + TRIANGLE = "triangle" + NOISE = "noise" + + +class OscillatorSensor(Sensor): + """Oscillator sensor that generates periodic or random values. + + Can run in two modes: + - Self-driving: phase accumulates based on frequency + - Sensor-driven: phase modulated by external sensor value + """ + + WAVEFORMS = { + "sine": lambda p: (math.sin(2 * math.pi * p) + 1) / 2, + "square": lambda p: 1.0 if (p % 1.0) < 0.5 else 0.0, + "sawtooth": lambda p: p % 1.0, + "triangle": lambda p: 2 * abs(2 * (p % 1.0) - 1) - 1, + "noise": lambda _: random.random(), + } + + def __init__( + self, + name: str = "osc", + waveform: str = "sine", + frequency: float = 1.0, + input_sensor: str | None = None, + input_scale: float = 1.0, + ): + """Initialize oscillator sensor. + + Args: + name: Sensor name + waveform: Waveform type (sine, square, sawtooth, triangle, noise) + frequency: Frequency in Hz (self-driving mode) + input_sensor: Optional sensor name to drive phase + input_scale: Scale factor for input sensor + """ + self.name = name + self.unit = "" + self._waveform = waveform + self._frequency = frequency + self._input_sensor = input_sensor + self._input_scale = input_scale + self._phase = 0.0 + self._start_time = time.time() + + @property + def available(self) -> bool: + return True + + @property + def waveform(self) -> str: + return self._waveform + + @waveform.setter + def waveform(self, value: str) -> None: + if value not in self.WAVEFORMS: + raise ValueError(f"Unknown waveform: {value}") + self._waveform = value + + @property + def frequency(self) -> float: + return self._frequency + + @frequency.setter + def frequency(self, value: float) -> None: + self._frequency = max(0.0, value) + + def start(self) -> bool: + self._phase = 0.0 + self._start_time = time.time() + return True + + def stop(self) -> None: + pass + + def _get_input_value(self) -> float: + """Get value from input sensor if configured.""" + if self._input_sensor: + from engine.sensors import SensorRegistry + + sensor = SensorRegistry.get(self._input_sensor) + if sensor: + reading = sensor.read() + if reading: + return reading.value * self._input_scale + return 0.0 + + def read(self) -> SensorValue | None: + current_time = time.time() + elapsed = current_time - self._start_time + + if self._input_sensor: + input_val = self._get_input_value() + phase_increment = (self._frequency * elapsed) + input_val + else: + phase_increment = self._frequency * elapsed + + self._phase += phase_increment + + waveform_fn = self.WAVEFORMS.get(self._waveform) + if waveform_fn is None: + return None + + value = waveform_fn(self._phase) + value = max(0.0, min(1.0, value)) + + return SensorValue( + sensor_name=self.name, + value=value, + timestamp=current_time, + unit=self.unit, + ) + + def set_waveform(self, waveform: str) -> None: + """Change waveform at runtime.""" + self.waveform = waveform + + def set_frequency(self, frequency: float) -> None: + """Change frequency at runtime.""" + self.frequency = frequency + + +def register_oscillator_sensor(name: str = "osc", **kwargs) -> None: + """Register an oscillator sensor with the global registry.""" + sensor = OscillatorSensor(name=name, **kwargs) + SensorRegistry.register(sensor) diff --git a/tests/test_sensors.py b/tests/test_sensors.py new file mode 100644 index 0000000..04e43fd --- /dev/null +++ b/tests/test_sensors.py @@ -0,0 +1,473 @@ +""" +Tests for the sensor framework. +""" + +import time + +from engine.sensors import Sensor, SensorRegistry, SensorStage, SensorValue + + +class TestSensorValue: + """Tests for SensorValue dataclass.""" + + def test_create_sensor_value(self): + """SensorValue stores sensor data correctly.""" + value = SensorValue( + sensor_name="mic", + value=42.5, + timestamp=1234567890.0, + unit="dB", + ) + + assert value.sensor_name == "mic" + assert value.value == 42.5 + assert value.timestamp == 1234567890.0 + assert value.unit == "dB" + + +class DummySensor(Sensor): + """Dummy sensor for testing.""" + + def __init__(self, name: str = "dummy", value: float = 1.0): + self.name = name + self.unit = "units" + self._value = value + + def start(self) -> bool: + return True + + def stop(self) -> None: + pass + + def read(self) -> SensorValue | None: + return SensorValue( + sensor_name=self.name, + value=self._value, + timestamp=time.time(), + unit=self.unit, + ) + + +class TestSensorRegistry: + """Tests for SensorRegistry.""" + + def setup_method(self): + """Clear registry before each test.""" + SensorRegistry._sensors.clear() + SensorRegistry._started = False + + def test_register_sensor(self): + """SensorRegistry registers sensors.""" + sensor = DummySensor() + SensorRegistry.register(sensor) + + assert SensorRegistry.get("dummy") is sensor + + def test_list_sensors(self): + """SensorRegistry lists registered sensors.""" + SensorRegistry.register(DummySensor("a")) + SensorRegistry.register(DummySensor("b")) + + sensors = SensorRegistry.list_sensors() + assert "a" in sensors + assert "b" in sensors + + def test_read_all(self): + """SensorRegistry reads all sensor values.""" + SensorRegistry.register(DummySensor("a", 1.0)) + SensorRegistry.register(DummySensor("b", 2.0)) + + values = SensorRegistry.read_all() + assert values["a"] == 1.0 + assert values["b"] == 2.0 + + +class TestSensorStage: + """Tests for SensorStage pipeline adapter.""" + + def setup_method(self): + SensorRegistry._sensors.clear() + SensorRegistry._started = False + + def test_sensor_stage_capabilities(self): + """SensorStage declares correct capabilities.""" + sensor = DummySensor("mic") + stage = SensorStage(sensor) + + assert "sensor.mic" in stage.capabilities + + def test_sensor_stage_process(self): + """SensorStage reads sensor and stores in context.""" + from engine.pipeline.core import PipelineContext + + sensor = DummySensor("test", 42.0) + stage = SensorStage(sensor, "test") + + ctx = PipelineContext() + result = stage.process(None, ctx) + + assert ctx.get_state("sensor.test") == 42.0 + assert result is None + + +class TestApplyParamBindings: + """Tests for sensor param bindings.""" + + def test_no_bindings_returns_original(self): + """Effect without bindings returns original config.""" + from engine.effects.types import ( + EffectConfig, + EffectPlugin, + apply_param_bindings, + ) + + class TestEffect(EffectPlugin): + name = "test" + config = EffectConfig() + + def process(self, buf, ctx): + return buf + + def configure(self, config): + pass + + effect = TestEffect() + ctx = object() + + result = apply_param_bindings(effect, ctx) + assert result is effect.config + + def test_bindings_read_sensor_values(self): + """Param bindings read sensor values from context.""" + from engine.effects.types import ( + EffectConfig, + EffectPlugin, + apply_param_bindings, + ) + + class TestEffect(EffectPlugin): + name = "test" + config = EffectConfig(intensity=1.0) + param_bindings = { + "intensity": {"sensor": "mic", "transform": "linear"}, + } + + def process(self, buf, ctx): + return buf + + def configure(self, config): + pass + + from engine.effects.types import EffectContext + + effect = TestEffect() + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + ) + ctx.set_state("sensor.mic", 0.8) + + result = apply_param_bindings(effect, ctx) + assert "intensity_sensor" in result.params + + +class TestSensorLifecycle: + """Tests for sensor start/stop lifecycle.""" + + def setup_method(self): + SensorRegistry._sensors.clear() + SensorRegistry._started = False + + def test_start_all(self): + """SensorRegistry starts all sensors.""" + started = [] + + class StatefulSensor(Sensor): + name = "stateful" + + def start(self) -> bool: + started.append("start") + return True + + def stop(self) -> None: + started.append("stop") + + def read(self) -> SensorValue | None: + return SensorValue("stateful", 1.0, 0.0) + + SensorRegistry.register(StatefulSensor()) + SensorRegistry.start_all() + + assert "start" in started + assert SensorRegistry._started is True + + def test_stop_all(self): + """SensorRegistry stops all sensors.""" + stopped = [] + + class StatefulSensor(Sensor): + name = "stateful" + + def start(self) -> bool: + return True + + def stop(self) -> None: + stopped.append("stop") + + def read(self) -> SensorValue | None: + return SensorValue("stateful", 1.0, 0.0) + + SensorRegistry.register(StatefulSensor()) + SensorRegistry.start_all() + SensorRegistry.stop_all() + + assert "stop" in stopped + assert SensorRegistry._started is False + + def test_unavailable_sensor(self): + """Unavailable sensor returns None from read.""" + + class UnavailableSensor(Sensor): + name = "unavailable" + + @property + def available(self) -> bool: + return False + + def start(self) -> bool: + return False + + def stop(self) -> None: + pass + + def read(self) -> SensorValue | None: + return None + + sensor = UnavailableSensor() + assert sensor.available is False + assert sensor.read() is None + + +class TestTransforms: + """Tests for sensor value transforms.""" + + def test_exponential_transform(self): + """Exponential transform squares the value.""" + from engine.effects.types import ( + EffectConfig, + EffectPlugin, + apply_param_bindings, + ) + + class TestEffect(EffectPlugin): + name = "test" + config = EffectConfig(intensity=1.0) + param_bindings = { + "intensity": {"sensor": "mic", "transform": "exponential"}, + } + + def process(self, buf, ctx): + return buf + + def configure(self, config): + pass + + from engine.effects.types import EffectContext + + effect = TestEffect() + ctx = EffectContext(80, 24, 0, 20) + ctx.set_state("sensor.mic", 0.5) + + result = apply_param_bindings(effect, ctx) + # 0.5^2 = 0.25, then scaled: 0.5 + 0.25*0.5 = 0.625 + assert result.intensity != effect.config.intensity + + def test_inverse_transform(self): + """Inverse transform inverts the value.""" + from engine.effects.types import ( + EffectConfig, + EffectPlugin, + apply_param_bindings, + ) + + class TestEffect(EffectPlugin): + name = "test" + config = EffectConfig(intensity=1.0) + param_bindings = { + "intensity": {"sensor": "mic", "transform": "inverse"}, + } + + def process(self, buf, ctx): + return buf + + def configure(self, config): + pass + + from engine.effects.types import EffectContext + + effect = TestEffect() + ctx = EffectContext(80, 24, 0, 20) + ctx.set_state("sensor.mic", 0.8) + + result = apply_param_bindings(effect, ctx) + # 1.0 - 0.8 = 0.2 + assert abs(result.params["intensity_sensor"] - 0.2) < 0.001 + + def test_threshold_transform(self): + """Threshold transform applies binary threshold.""" + from engine.effects.types import ( + EffectConfig, + EffectPlugin, + apply_param_bindings, + ) + + class TestEffect(EffectPlugin): + name = "test" + config = EffectConfig(intensity=1.0) + param_bindings = { + "intensity": { + "sensor": "mic", + "transform": "threshold", + "threshold": 0.5, + }, + } + + def process(self, buf, ctx): + return buf + + def configure(self, config): + pass + + from engine.effects.types import EffectContext + + effect = TestEffect() + ctx = EffectContext(80, 24, 0, 20) + + # Above threshold + ctx.set_state("sensor.mic", 0.8) + result = apply_param_bindings(effect, ctx) + assert result.params["intensity_sensor"] == 1.0 + + # Below threshold + ctx.set_state("sensor.mic", 0.3) + result = apply_param_bindings(effect, ctx) + assert result.params["intensity_sensor"] == 0.0 + + +class TestOscillatorSensor: + """Tests for OscillatorSensor.""" + + def setup_method(self): + SensorRegistry._sensors.clear() + SensorRegistry._started = False + + def test_sine_waveform(self): + """Oscillator generates sine wave.""" + from engine.sensors.oscillator import OscillatorSensor + + osc = OscillatorSensor(name="test", waveform="sine", frequency=1.0) + osc.start() + + values = [osc.read().value for _ in range(10)] + assert all(0 <= v <= 1 for v in values) + + def test_square_waveform(self): + """Oscillator generates square wave.""" + from engine.sensors.oscillator import OscillatorSensor + + osc = OscillatorSensor(name="test", waveform="square", frequency=10.0) + osc.start() + + values = [osc.read().value for _ in range(10)] + assert all(v in (0.0, 1.0) for v in values) + + def test_waveform_types(self): + """All waveform types work.""" + from engine.sensors.oscillator import OscillatorSensor + + for wf in ["sine", "square", "sawtooth", "triangle", "noise"]: + osc = OscillatorSensor(name=wf, waveform=wf, frequency=1.0) + osc.start() + val = osc.read() + assert val is not None + assert 0 <= val.value <= 1 + + def test_invalid_waveform_raises(self): + """Invalid waveform returns None.""" + from engine.sensors.oscillator import OscillatorSensor + + osc = OscillatorSensor(waveform="invalid") + osc.start() + val = osc.read() + assert val is None + + def test_sensor_driven_oscillator(self): + """Oscillator can be driven by another sensor.""" + from engine.sensors.oscillator import OscillatorSensor + + class ModSensor(Sensor): + name = "mod" + + def start(self) -> bool: + return True + + def stop(self) -> None: + pass + + def read(self) -> SensorValue | None: + return SensorValue("mod", 0.5, 0.0) + + SensorRegistry.register(ModSensor()) + + osc = OscillatorSensor( + name="lfo", waveform="sine", frequency=0.1, input_sensor="mod" + ) + osc.start() + + val = osc.read() + assert val is not None + assert 0 <= val.value <= 1 + + +class TestMicSensor: + """Tests for MicSensor.""" + + def setup_method(self): + SensorRegistry._sensors.clear() + SensorRegistry._started = False + + def test_mic_sensor_creation(self): + """MicSensor can be created.""" + from engine.sensors.mic import MicSensor + + sensor = MicSensor() + assert sensor.name == "mic" + assert sensor.unit == "dB" + + def test_mic_sensor_custom_name(self): + """MicSensor can have custom name.""" + from engine.sensors.mic import MicSensor + + sensor = MicSensor(name="my_mic") + assert sensor.name == "my_mic" + + def test_mic_sensor_start(self): + """MicSensor.start returns bool.""" + from engine.sensors.mic import MicSensor + + sensor = MicSensor() + result = sensor.start() + assert isinstance(result, bool) + + def test_mic_sensor_read_returns_value_or_none(self): + """MicSensor.read returns SensorValue or None.""" + from engine.sensors.mic import MicSensor + + sensor = MicSensor() + sensor.start() + # May be None if no mic available + result = sensor.read() + # Just check it doesn't raise - result depends on system + assert result is None or isinstance(result, SensorValue)