forked from genewildish/Mainline
- Add Sensor base class with value emission - Add SensorRegistry for discovery - Add SensorStage adapter for pipeline - Add MicSensor (self-contained, no external deps) - Add OscillatorSensor for testing - Add sensor param bindings to effects
162 lines
4.5 KiB
Python
162 lines
4.5 KiB
Python
"""
|
|
Oscillator sensor - Modular synth-style oscillator as a pipeline sensor.
|
|
|
|
Provides various waveforms that can be:
|
|
1. Self-driving (phase accumulates over time)
|
|
2. Sensor-driven (phase modulated by external sensor)
|
|
|
|
Built-in waveforms:
|
|
- sine: Pure sine wave
|
|
- square: Square wave (0 to 1)
|
|
- sawtooth: Rising sawtooth (0 to 1, wraps)
|
|
- triangle: Triangle wave (0 to 1 to 0)
|
|
- noise: Random values (0 to 1)
|
|
|
|
Example usage:
|
|
osc = OscillatorSensor(waveform="sine", frequency=0.5)
|
|
# Or driven by mic sensor:
|
|
osc = OscillatorSensor(waveform="sine", frequency=1.0, input_sensor="mic")
|
|
"""
|
|
|
|
import math
|
|
import random
|
|
import time
|
|
from enum import Enum
|
|
|
|
from engine.sensors import Sensor, SensorRegistry, SensorValue
|
|
|
|
|
|
class Waveform(Enum):
|
|
"""Built-in oscillator waveforms."""
|
|
|
|
SINE = "sine"
|
|
SQUARE = "square"
|
|
SAWTOOTH = "sawtooth"
|
|
TRIANGLE = "triangle"
|
|
NOISE = "noise"
|
|
|
|
|
|
class OscillatorSensor(Sensor):
|
|
"""Oscillator sensor that generates periodic or random values.
|
|
|
|
Can run in two modes:
|
|
- Self-driving: phase accumulates based on frequency
|
|
- Sensor-driven: phase modulated by external sensor value
|
|
"""
|
|
|
|
WAVEFORMS = {
|
|
"sine": lambda p: (math.sin(2 * math.pi * p) + 1) / 2,
|
|
"square": lambda p: 1.0 if (p % 1.0) < 0.5 else 0.0,
|
|
"sawtooth": lambda p: p % 1.0,
|
|
"triangle": lambda p: 2 * abs(2 * (p % 1.0) - 1) - 1,
|
|
"noise": lambda _: random.random(),
|
|
}
|
|
|
|
def __init__(
|
|
self,
|
|
name: str = "osc",
|
|
waveform: str = "sine",
|
|
frequency: float = 1.0,
|
|
input_sensor: str | None = None,
|
|
input_scale: float = 1.0,
|
|
):
|
|
"""Initialize oscillator sensor.
|
|
|
|
Args:
|
|
name: Sensor name
|
|
waveform: Waveform type (sine, square, sawtooth, triangle, noise)
|
|
frequency: Frequency in Hz (self-driving mode)
|
|
input_sensor: Optional sensor name to drive phase
|
|
input_scale: Scale factor for input sensor
|
|
"""
|
|
self.name = name
|
|
self.unit = ""
|
|
self._waveform = waveform
|
|
self._frequency = frequency
|
|
self._input_sensor = input_sensor
|
|
self._input_scale = input_scale
|
|
self._phase = 0.0
|
|
self._start_time = time.time()
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
return True
|
|
|
|
@property
|
|
def waveform(self) -> str:
|
|
return self._waveform
|
|
|
|
@waveform.setter
|
|
def waveform(self, value: str) -> None:
|
|
if value not in self.WAVEFORMS:
|
|
raise ValueError(f"Unknown waveform: {value}")
|
|
self._waveform = value
|
|
|
|
@property
|
|
def frequency(self) -> float:
|
|
return self._frequency
|
|
|
|
@frequency.setter
|
|
def frequency(self, value: float) -> None:
|
|
self._frequency = max(0.0, value)
|
|
|
|
def start(self) -> bool:
|
|
self._phase = 0.0
|
|
self._start_time = time.time()
|
|
return True
|
|
|
|
def stop(self) -> None:
|
|
pass
|
|
|
|
def _get_input_value(self) -> float:
|
|
"""Get value from input sensor if configured."""
|
|
if self._input_sensor:
|
|
from engine.sensors import SensorRegistry
|
|
|
|
sensor = SensorRegistry.get(self._input_sensor)
|
|
if sensor:
|
|
reading = sensor.read()
|
|
if reading:
|
|
return reading.value * self._input_scale
|
|
return 0.0
|
|
|
|
def read(self) -> SensorValue | None:
|
|
current_time = time.time()
|
|
elapsed = current_time - self._start_time
|
|
|
|
if self._input_sensor:
|
|
input_val = self._get_input_value()
|
|
phase_increment = (self._frequency * elapsed) + input_val
|
|
else:
|
|
phase_increment = self._frequency * elapsed
|
|
|
|
self._phase += phase_increment
|
|
|
|
waveform_fn = self.WAVEFORMS.get(self._waveform)
|
|
if waveform_fn is None:
|
|
return None
|
|
|
|
value = waveform_fn(self._phase)
|
|
value = max(0.0, min(1.0, value))
|
|
|
|
return SensorValue(
|
|
sensor_name=self.name,
|
|
value=value,
|
|
timestamp=current_time,
|
|
unit=self.unit,
|
|
)
|
|
|
|
def set_waveform(self, waveform: str) -> None:
|
|
"""Change waveform at runtime."""
|
|
self.waveform = waveform
|
|
|
|
def set_frequency(self, frequency: float) -> None:
|
|
"""Change frequency at runtime."""
|
|
self.frequency = frequency
|
|
|
|
|
|
def register_oscillator_sensor(name: str = "osc", **kwargs) -> None:
|
|
"""Register an oscillator sensor with the global registry."""
|
|
sensor = OscillatorSensor(name=name, **kwargs)
|
|
SensorRegistry.register(sensor)
|