forked from genewildish/Mainline
The old engine/pipeline/core.py file was removed as part of the Sideline/Mainline split. All imports that referenced engine.pipeline.core have been updated to use engine.pipeline which re-exports from sideline.pipeline.core. This ensures consistency and avoids duplicate DataType enum instances.
204 lines
5.0 KiB
Python
204 lines
5.0 KiB
Python
"""
|
|
Sensor framework - PureData-style real-time input system.
|
|
|
|
Sensors are data sources that emit values over time, similar to how
|
|
PureData objects emit signals. Effects can bind to sensors to modulate
|
|
their parameters dynamically.
|
|
|
|
Architecture:
|
|
- Sensor: Base class for all sensors (mic, camera, ntfy, OSC, etc.)
|
|
- SensorRegistry: Global registry for sensor discovery
|
|
- SensorStage: Pipeline stage wrapper for sensors
|
|
- Effect param_bindings: Declarative sensor-to-param routing
|
|
|
|
Example:
|
|
class GlitchEffect(EffectPlugin):
|
|
param_bindings = {
|
|
"intensity": {"sensor": "mic", "transform": "linear"},
|
|
}
|
|
|
|
This binds the mic sensor to the glitch intensity parameter.
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from dataclasses import dataclass
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
if TYPE_CHECKING:
|
|
from engine.pipeline import PipelineContext
|
|
|
|
|
|
@dataclass
|
|
class SensorValue:
|
|
"""A sensor reading with metadata."""
|
|
|
|
sensor_name: str
|
|
value: float
|
|
timestamp: float
|
|
unit: str = ""
|
|
|
|
|
|
class Sensor(ABC):
|
|
"""Abstract base class for sensors.
|
|
|
|
Sensors are real-time data sources that emit values. They can be:
|
|
- Physical: mic, camera, joystick, MIDI, OSC
|
|
- Virtual: ntfy, timer, random, noise
|
|
|
|
Each sensor has a name and emits SensorValue objects.
|
|
"""
|
|
|
|
name: str
|
|
unit: str = ""
|
|
|
|
@property
|
|
def available(self) -> bool:
|
|
"""Whether the sensor is currently available."""
|
|
return True
|
|
|
|
@abstractmethod
|
|
def read(self) -> SensorValue | None:
|
|
"""Read current sensor value.
|
|
|
|
Returns:
|
|
SensorValue if available, None if sensor is not ready.
|
|
"""
|
|
...
|
|
|
|
@abstractmethod
|
|
def start(self) -> bool:
|
|
"""Start the sensor.
|
|
|
|
Returns:
|
|
True if started successfully.
|
|
"""
|
|
...
|
|
|
|
@abstractmethod
|
|
def stop(self) -> None:
|
|
"""Stop the sensor and release resources."""
|
|
...
|
|
|
|
|
|
class SensorRegistry:
|
|
"""Global registry for sensors.
|
|
|
|
Provides:
|
|
- Registration of sensor instances
|
|
- Lookup by name
|
|
- Global start/stop
|
|
"""
|
|
|
|
_sensors: dict[str, Sensor] = {}
|
|
_started: bool = False
|
|
|
|
@classmethod
|
|
def register(cls, sensor: Sensor) -> None:
|
|
"""Register a sensor instance."""
|
|
cls._sensors[sensor.name] = sensor
|
|
|
|
@classmethod
|
|
def get(cls, name: str) -> Sensor | None:
|
|
"""Get a sensor by name."""
|
|
return cls._sensors.get(name)
|
|
|
|
@classmethod
|
|
def list_sensors(cls) -> list[str]:
|
|
"""List all registered sensor names."""
|
|
return list(cls._sensors.keys())
|
|
|
|
@classmethod
|
|
def start_all(cls) -> bool:
|
|
"""Start all sensors.
|
|
|
|
Returns:
|
|
True if all sensors started successfully.
|
|
"""
|
|
if cls._started:
|
|
return True
|
|
|
|
all_started = True
|
|
for sensor in cls._sensors.values():
|
|
if sensor.available and not sensor.start():
|
|
all_started = False
|
|
|
|
cls._started = all_started
|
|
return all_started
|
|
|
|
@classmethod
|
|
def stop_all(cls) -> None:
|
|
"""Stop all sensors."""
|
|
for sensor in cls._sensors.values():
|
|
sensor.stop()
|
|
cls._started = False
|
|
|
|
@classmethod
|
|
def read_all(cls) -> dict[str, float]:
|
|
"""Read all sensor values.
|
|
|
|
Returns:
|
|
Dict mapping sensor name to current value.
|
|
"""
|
|
result = {}
|
|
for name, sensor in cls._sensors.items():
|
|
value = sensor.read()
|
|
if value:
|
|
result[name] = value.value
|
|
return result
|
|
|
|
|
|
class SensorStage:
|
|
"""Pipeline stage wrapper for sensors.
|
|
|
|
Provides sensor data to the pipeline context.
|
|
Sensors don't transform data - they inject sensor values into context.
|
|
"""
|
|
|
|
def __init__(self, sensor: Sensor, name: str | None = None):
|
|
self._sensor = sensor
|
|
self.name = name or sensor.name
|
|
self.category = "sensor"
|
|
self.optional = True
|
|
|
|
@property
|
|
def stage_type(self) -> str:
|
|
return "sensor"
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline import DataType
|
|
|
|
return {DataType.ANY}
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline import DataType
|
|
|
|
return {DataType.ANY}
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
return {f"sensor.{self.name}"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
return set()
|
|
|
|
def init(self, ctx: "PipelineContext") -> bool:
|
|
return self._sensor.start()
|
|
|
|
def process(self, data: Any, ctx: "PipelineContext") -> Any:
|
|
value = self._sensor.read()
|
|
if value:
|
|
ctx.set_state(f"sensor.{self.name}", value.value)
|
|
ctx.set_state(f"sensor.{self.name}.full", value)
|
|
return data
|
|
|
|
def cleanup(self) -> None:
|
|
self._sensor.stop()
|
|
|
|
|
|
def create_sensor_stage(sensor: Sensor, name: str | None = None) -> SensorStage:
|
|
"""Create a pipeline stage from a sensor."""
|
|
return SensorStage(sensor, name)
|