""" Pipeline introspection demo controller - 3-phase animation system. Phase 1: Toggle each effect on/off one at a time (3s each, 1s gap) Phase 2: LFO drives intensity default → max → min → default for each effect Phase 3: All effects with shared LFO driving full waveform This controller manages the animation and updates the pipeline accordingly. """ import time from dataclasses import dataclass from enum import Enum, auto from typing import Any from engine.effects import get_registry from engine.sensors.oscillator import OscillatorSensor class DemoPhase(Enum): """The three phases of the pipeline introspection demo.""" PHASE_1_TOGGLE = auto() # Toggle each effect on/off PHASE_2_LFO = auto() # LFO drives intensity up/down PHASE_3_SHARED_LFO = auto() # All effects with shared LFO @dataclass class PhaseState: """State for a single phase of the demo.""" phase: DemoPhase start_time: float current_effect_index: int = 0 effect_start_time: float = 0.0 lfo_phase: float = 0.0 # 0.0 to 1.0 @dataclass class DemoConfig: """Configuration for the demo animation.""" effect_cycle_duration: float = 3.0 # seconds per effect gap_duration: float = 1.0 # seconds between effects lfo_duration: float = ( 4.0 # seconds for full LFO cycle (default → max → min → default) ) phase_2_effect_duration: float = 4.0 # seconds per effect in phase 2 phase_3_lfo_duration: float = 6.0 # seconds for full waveform in phase 3 class PipelineIntrospectionDemo: """Controller for the 3-phase pipeline introspection demo. Manages effect toggling and LFO modulation across the pipeline. """ def __init__( self, pipeline: Any, effect_names: list[str] | None = None, config: DemoConfig | None = None, ): self._pipeline = pipeline self._config = config or DemoConfig() self._effect_names = effect_names or ["noise", "fade", "glitch", "firehose"] self._phase = DemoPhase.PHASE_1_TOGGLE self._phase_state = PhaseState( phase=DemoPhase.PHASE_1_TOGGLE, start_time=time.time(), ) self._shared_oscillator: OscillatorSensor | None = None self._frame = 0 # Register shared oscillator for phase 3 self._shared_oscillator = OscillatorSensor( name="demo-lfo", waveform="sine", frequency=1.0 / self._config.phase_3_lfo_duration, ) @property def phase(self) -> DemoPhase: return self._phase @property def phase_display(self) -> str: """Get a human-readable phase description.""" phase_num = { DemoPhase.PHASE_1_TOGGLE: 1, DemoPhase.PHASE_2_LFO: 2, DemoPhase.PHASE_3_SHARED_LFO: 3, } return f"Phase {phase_num[self._phase]}" @property def effect_names(self) -> list[str]: return self._effect_names @property def shared_oscillator(self) -> OscillatorSensor | None: return self._shared_oscillator def update(self) -> dict[str, Any]: """Update the demo state and return current parameters. Returns: dict with current effect settings for the pipeline """ self._frame += 1 current_time = time.time() elapsed = current_time - self._phase_state.start_time # Phase transition logic phase_duration = self._get_phase_duration() if elapsed >= phase_duration: self._advance_phase() # Update based on current phase if self._phase == DemoPhase.PHASE_1_TOGGLE: return self._update_phase_1(current_time) elif self._phase == DemoPhase.PHASE_2_LFO: return self._update_phase_2(current_time) else: return self._update_phase_3(current_time) def _get_phase_duration(self) -> float: """Get duration of current phase in seconds.""" if self._phase == DemoPhase.PHASE_1_TOGGLE: # Duration = (effect_time + gap) * num_effects + final_gap return ( self._config.effect_cycle_duration + self._config.gap_duration ) * len(self._effect_names) + self._config.gap_duration elif self._phase == DemoPhase.PHASE_2_LFO: return self._config.phase_2_effect_duration * len(self._effect_names) else: # Phase 3 runs indefinitely return float("inf") def _advance_phase(self) -> None: """Advance to the next phase.""" if self._phase == DemoPhase.PHASE_1_TOGGLE: self._phase = DemoPhase.PHASE_2_LFO elif self._phase == DemoPhase.PHASE_2_LFO: self._phase = DemoPhase.PHASE_3_SHARED_LFO # Start the shared oscillator if self._shared_oscillator: self._shared_oscillator.start() else: # Phase 3 loops indefinitely - reset for demo replay after long time self._phase = DemoPhase.PHASE_1_TOGGLE self._phase_state = PhaseState( phase=self._phase, start_time=time.time(), ) def _update_phase_1(self, current_time: float) -> dict[str, Any]: """Phase 1: Toggle each effect on/off one at a time.""" effect_time = current_time - self._phase_state.effect_start_time # Check if we should move to next effect cycle_time = self._config.effect_cycle_duration + self._config.gap_duration effect_index = int((current_time - self._phase_state.start_time) / cycle_time) # Clamp to valid range if effect_index >= len(self._effect_names): effect_index = len(self._effect_names) - 1 # Calculate current effect state in_gap = effect_time >= self._config.effect_cycle_duration # Build effect states effect_states: dict[str, dict[str, Any]] = {} for i, name in enumerate(self._effect_names): if i < effect_index: # Past effects - leave at default effect_states[name] = {"enabled": False, "intensity": 0.5} elif i == effect_index: # Current effect - toggle on/off if in_gap: effect_states[name] = {"enabled": False, "intensity": 0.5} else: effect_states[name] = {"enabled": True, "intensity": 1.0} else: # Future effects - off effect_states[name] = {"enabled": False, "intensity": 0.5} # Apply to effect registry self._apply_effect_states(effect_states) return { "phase": "PHASE_1_TOGGLE", "phase_display": self.phase_display, "current_effect": self._effect_names[effect_index] if effect_index < len(self._effect_names) else None, "effect_states": effect_states, "frame": self._frame, } def _update_phase_2(self, current_time: float) -> dict[str, Any]: """Phase 2: LFO drives intensity default → max → min → default.""" elapsed = current_time - self._phase_state.start_time effect_index = int(elapsed / self._config.phase_2_effect_duration) effect_index = min(effect_index, len(self._effect_names) - 1) # Calculate LFO position (0 → 1 → 0) effect_elapsed = elapsed % self._config.phase_2_effect_duration lfo_position = effect_elapsed / self._config.phase_2_effect_duration # LFO: 0 → 1 → 0 (triangle wave) if lfo_position < 0.5: lfo_value = lfo_position * 2 # 0 → 1 else: lfo_value = 2 - lfo_position * 2 # 1 → 0 # Map to intensity: 0.3 (default) → 1.0 (max) → 0.0 (min) → 0.3 (default) if lfo_position < 0.25: # 0.3 → 1.0 intensity = 0.3 + (lfo_position / 0.25) * 0.7 elif lfo_position < 0.75: # 1.0 → 0.0 intensity = 1.0 - ((lfo_position - 0.25) / 0.5) * 1.0 else: # 0.0 → 0.3 intensity = ((lfo_position - 0.75) / 0.25) * 0.3 # Build effect states effect_states: dict[str, dict[str, Any]] = {} for i, name in enumerate(self._effect_names): if i < effect_index: # Past effects - default effect_states[name] = {"enabled": True, "intensity": 0.5} elif i == effect_index: # Current effect - LFO modulated effect_states[name] = {"enabled": True, "intensity": intensity} else: # Future effects - off effect_states[name] = {"enabled": False, "intensity": 0.5} # Apply to effect registry self._apply_effect_states(effect_states) return { "phase": "PHASE_2_LFO", "phase_display": self.phase_display, "current_effect": self._effect_names[effect_index], "lfo_value": lfo_value, "intensity": intensity, "effect_states": effect_states, "frame": self._frame, } def _update_phase_3(self, current_time: float) -> dict[str, Any]: """Phase 3: All effects with shared LFO driving full waveform.""" # Read shared oscillator lfo_value = 0.5 # Default if self._shared_oscillator: sensor_val = self._shared_oscillator.read() if sensor_val: lfo_value = sensor_val.value # All effects enabled with shared LFO effect_states: dict[str, dict[str, Any]] = {} for name in self._effect_names: effect_states[name] = {"enabled": True, "intensity": lfo_value} # Apply to effect registry self._apply_effect_states(effect_states) return { "phase": "PHASE_3_SHARED_LFO", "phase_display": self.phase_display, "lfo_value": lfo_value, "effect_states": effect_states, "frame": self._frame, } def _apply_effect_states(self, effect_states: dict[str, dict[str, Any]]) -> None: """Apply effect states to the effect registry.""" try: registry = get_registry() for name, state in effect_states.items(): effect = registry.get(name) if effect: effect.config.enabled = state["enabled"] effect.config.intensity = state["intensity"] except Exception: pass # Silently fail if registry not available def cleanup(self) -> None: """Clean up resources.""" if self._shared_oscillator: self._shared_oscillator.stop() # Reset all effects to default self._apply_effect_states( {name: {"enabled": False, "intensity": 0.5} for name in self._effect_names} )