""" Oscillator sensor - Modular synth-style oscillator as a pipeline sensor. Provides various waveforms that can be: 1. Self-driving (phase accumulates over time) 2. Sensor-driven (phase modulated by external sensor) Built-in waveforms: - sine: Pure sine wave - square: Square wave (0 to 1) - sawtooth: Rising sawtooth (0 to 1, wraps) - triangle: Triangle wave (0 to 1 to 0) - noise: Random values (0 to 1) Example usage: osc = OscillatorSensor(waveform="sine", frequency=0.5) # Or driven by mic sensor: osc = OscillatorSensor(waveform="sine", frequency=1.0, input_sensor="mic") """ import math import random import time from enum import Enum from engine.sensors import Sensor, SensorRegistry, SensorValue class Waveform(Enum): """Built-in oscillator waveforms.""" SINE = "sine" SQUARE = "square" SAWTOOTH = "sawtooth" TRIANGLE = "triangle" NOISE = "noise" class OscillatorSensor(Sensor): """Oscillator sensor that generates periodic or random values. Can run in two modes: - Self-driving: phase accumulates based on frequency - Sensor-driven: phase modulated by external sensor value """ WAVEFORMS = { "sine": lambda p: (math.sin(2 * math.pi * p) + 1) / 2, "square": lambda p: 1.0 if (p % 1.0) < 0.5 else 0.0, "sawtooth": lambda p: p % 1.0, "triangle": lambda p: 2 * abs(2 * (p % 1.0) - 1) - 1, "noise": lambda _: random.random(), } def __init__( self, name: str = "osc", waveform: str = "sine", frequency: float = 1.0, input_sensor: str | None = None, input_scale: float = 1.0, ): """Initialize oscillator sensor. Args: name: Sensor name waveform: Waveform type (sine, square, sawtooth, triangle, noise) frequency: Frequency in Hz (self-driving mode) input_sensor: Optional sensor name to drive phase input_scale: Scale factor for input sensor """ self.name = name self.unit = "" self._waveform = waveform self._frequency = frequency self._input_sensor = input_sensor self._input_scale = input_scale self._phase = 0.0 self._start_time = time.time() @property def available(self) -> bool: return True @property def waveform(self) -> str: return self._waveform @waveform.setter def waveform(self, value: str) -> None: if value not in self.WAVEFORMS: raise ValueError(f"Unknown waveform: {value}") self._waveform = value @property def frequency(self) -> float: return self._frequency @frequency.setter def frequency(self, value: float) -> None: self._frequency = max(0.0, value) def start(self) -> bool: self._phase = 0.0 self._start_time = time.time() return True def stop(self) -> None: pass def _get_input_value(self) -> float: """Get value from input sensor if configured.""" if self._input_sensor: from engine.sensors import SensorRegistry sensor = SensorRegistry.get(self._input_sensor) if sensor: reading = sensor.read() if reading: return reading.value * self._input_scale return 0.0 def read(self) -> SensorValue | None: current_time = time.time() elapsed = current_time - self._start_time if self._input_sensor: input_val = self._get_input_value() phase_increment = (self._frequency * elapsed) + input_val else: phase_increment = self._frequency * elapsed self._phase += phase_increment waveform_fn = self.WAVEFORMS.get(self._waveform) if waveform_fn is None: return None value = waveform_fn(self._phase) value = max(0.0, min(1.0, value)) return SensorValue( sensor_name=self.name, value=value, timestamp=current_time, unit=self.unit, ) def set_waveform(self, waveform: str) -> None: """Change waveform at runtime.""" self.waveform = waveform def set_frequency(self, frequency: float) -> None: """Change frequency at runtime.""" self.frequency = frequency def register_oscillator_sensor(name: str = "osc", **kwargs) -> None: """Register an oscillator sensor with the global registry.""" sensor = OscillatorSensor(name=name, **kwargs) SensorRegistry.register(sensor)