Files
sideline/engine/sensors/mic.py
David Gwilliam 2e96b7cd83 feat(sensors): add sensor framework for pipeline integration
- Add Sensor base class with value emission
- Add SensorRegistry for discovery
- Add SensorStage adapter for pipeline
- Add MicSensor (self-contained, no external deps)
- Add OscillatorSensor for testing
- Add sensor param bindings to effects
2026-03-16 13:55:47 -07:00

148 lines
4.0 KiB
Python

"""
Mic sensor - audio input as a pipeline sensor.
Self-contained implementation that handles audio input directly,
with graceful degradation if sounddevice is unavailable.
"""
import atexit
import time
from collections.abc import Callable
from dataclasses import dataclass
from datetime import datetime
from typing import Any
try:
import numpy as np
import sounddevice as sd
_HAS_AUDIO = True
except Exception:
_HAS_AUDIO = False
np = None # type: ignore
sd = None # type: ignore
from engine.events import MicLevelEvent
from engine.sensors import Sensor, SensorRegistry, SensorValue
@dataclass
class AudioConfig:
"""Configuration for audio input."""
threshold_db: float = 50.0
sample_rate: float = 44100.0
block_size: int = 1024
class MicSensor(Sensor):
"""Microphone sensor for pipeline integration.
Self-contained implementation with graceful degradation.
No external dependencies required - works with or without sounddevice.
"""
def __init__(self, threshold_db: float = 50.0, name: str = "mic"):
self.name = name
self.unit = "dB"
self._config = AudioConfig(threshold_db=threshold_db)
self._db: float = -99.0
self._stream: Any = None
self._subscribers: list[Callable[[MicLevelEvent], None]] = []
@property
def available(self) -> bool:
"""Check if audio input is available."""
return _HAS_AUDIO and self._stream is not None
def start(self) -> bool:
"""Start the microphone stream."""
if not _HAS_AUDIO:
return False
try:
self._stream = sd.InputStream(
samplerate=self._config.sample_rate,
blocksize=self._config.block_size,
channels=1,
callback=self._audio_callback,
)
self._stream.start()
atexit.register(self.stop)
return True
except Exception:
return False
def stop(self) -> None:
"""Stop the microphone stream."""
if self._stream:
try:
self._stream.stop()
self._stream.close()
except Exception:
pass
self._stream = None
def _audio_callback(
self, indata: np.ndarray, frames: int, time_info: Any, status: Any
) -> None:
"""Process audio data from sounddevice."""
if not _HAS_AUDIO or np is None:
return
rms = np.sqrt(np.mean(indata**2))
if rms > 0:
db = 20 * np.log10(rms)
else:
db = -99.0
self._db = db
excess = max(0.0, db - self._config.threshold_db)
event = MicLevelEvent(
db_level=db, excess_above_threshold=excess, timestamp=datetime.now()
)
self._emit(event)
def _emit(self, event: MicLevelEvent) -> None:
"""Emit event to all subscribers."""
for callback in self._subscribers:
try:
callback(event)
except Exception:
pass
def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Subscribe to mic level events."""
if callback not in self._subscribers:
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Unsubscribe from mic level events."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def read(self) -> SensorValue | None:
"""Read current mic level as sensor value."""
if not self.available:
return None
excess = max(0.0, self._db - self._config.threshold_db)
return SensorValue(
sensor_name=self.name,
value=excess,
timestamp=time.time(),
unit=self.unit,
)
def register_mic_sensor() -> None:
"""Register the mic sensor with the global registry."""
sensor = MicSensor()
SensorRegistry.register(sensor)
# Auto-register when imported
register_mic_sensor()