#!/usr/bin/env python3 """ Pipeline Demo Orchestrator Demonstrates all effects and camera modes with gentle oscillation. Runs a comprehensive test of the Mainline pipeline system with proper frame rate control and extended duration for visibility. """ import argparse import math import signal import sys import time from typing import Any from engine.camera import Camera from engine.data_sources.checkerboard import CheckerboardDataSource from engine.data_sources.sources import SourceItem from engine.display import DisplayRegistry, NullDisplay from engine.effects.plugins import discover_plugins from engine.effects import get_registry from engine.effects.types import EffectConfig from engine.frame import FrameTimer from engine.pipeline import Pipeline, PipelineConfig, PipelineContext from engine.pipeline.adapters import ( CameraClockStage, CameraStage, DataSourceStage, DisplayStage, EffectPluginStage, SourceItemsToBufferStage, ) from engine.pipeline.stages.framebuffer import FrameBufferStage class GentleOscillator: """Produces smooth, gentle sinusoidal values.""" def __init__( self, speed: float = 60.0, amplitude: float = 1.0, offset: float = 0.0 ): self.speed = speed # Period length in frames self.amplitude = amplitude # Amplitude self.offset = offset # Base offset def value(self, frame: int) -> float: """Get oscillated value for given frame.""" return self.offset + self.amplitude * 0.5 * (1 + math.sin(frame / self.speed)) class PipelineDemoOrchestrator: """Orchestrates comprehensive pipeline demonstrations.""" def __init__( self, use_terminal: bool = True, target_fps: float = 30.0, effect_duration: float = 8.0, mode_duration: float = 3.0, enable_fps_switch: bool = False, loop: bool = False, verbose: bool = False, ): self.use_terminal = use_terminal self.target_fps = target_fps self.effect_duration = effect_duration self.mode_duration = mode_duration self.enable_fps_switch = enable_fps_switch self.loop = loop self.verbose = verbose self.frame_count = 0 self.pipeline = None self.context = None self.framebuffer = None self.camera = None self.timer = None def log(self, message: str, verbose: bool = False): """Print with timestamp if verbose or always-important.""" if self.verbose or not verbose: print(f"[{time.strftime('%H:%M:%S')}] {message}") def build_base_pipeline( self, camera_type: str = "scroll", camera_speed: float = 0.5 ): """Build a base pipeline with all required components.""" self.log(f"Building base pipeline: camera={camera_type}, speed={camera_speed}") # Camera camera = Camera.scroll(speed=camera_speed) camera.set_canvas_size(200, 200) # Context ctx = PipelineContext() # Pipeline config config = PipelineConfig( source="empty", display="terminal" if self.use_terminal else "null", camera=camera_type, effects=[], enable_metrics=True, ) pipeline = Pipeline(config=config, context=ctx) # Use a large checkerboard pattern for visible motion effects source = CheckerboardDataSource(width=200, height=200, square_size=10) pipeline.add_stage("source", DataSourceStage(source, name="checkerboard")) # Add camera clock (must run every frame) pipeline.add_stage( "camera_update", CameraClockStage(camera, name="camera-clock") ) # Add render pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) # Add camera stage pipeline.add_stage("camera", CameraStage(camera, name="camera")) # Add framebuffer (optional for effects that use it) self.framebuffer = FrameBufferStage(name="default", history_depth=5) pipeline.add_stage("framebuffer", self.framebuffer) # Add display display_backend = "terminal" if self.use_terminal else "null" display = DisplayRegistry.create(display_backend) if display: pipeline.add_stage("display", DisplayStage(display, name=display_backend)) # Build and initialize pipeline.build(auto_inject=False) pipeline.initialize() self.pipeline = pipeline self.context = ctx self.camera = camera self.log("Base pipeline built successfully") return pipeline def test_effects_oscillation(self): """Test each effect with gentle intensity oscillation.""" self.log("\n=== EFFECTS OSCILLATION TEST ===") self.log( f"Duration: {self.effect_duration}s per effect at {self.target_fps} FPS" ) discover_plugins() # Ensure all plugins are registered registry = get_registry() all_effects = registry.list_all() effect_names = [ name for name in all_effects.keys() if name not in ("motionblur", "afterimage") ] # Calculate frames based on duration and FPS frames_per_effect = int(self.effect_duration * self.target_fps) oscillator = GentleOscillator(speed=90, amplitude=0.7, offset=0.3) total_effects = len(effect_names) + 2 # +2 for motionblur and afterimage estimated_total = total_effects * self.effect_duration self.log(f"Testing {len(effect_names)} regular effects + 2 framebuffer effects") self.log(f"Estimated time: {estimated_total:.0f}s") for idx, effect_name in enumerate(sorted(effect_names), 1): try: self.log(f"[{idx}/{len(effect_names)}] Testing effect: {effect_name}") effect = registry.get(effect_name) if not effect: self.log(f" Skipped: plugin not found") continue stage = EffectPluginStage(effect, name=effect_name) self.pipeline.add_stage(f"effect_{effect_name}", stage) self.pipeline.build(auto_inject=False) self._run_frames( frames_per_effect, oscillator=oscillator, effect=effect ) self.pipeline.remove_stage(f"effect_{effect_name}") self.pipeline.build(auto_inject=False) self.log(f" ✓ {effect_name} completed successfully") except Exception as e: self.log(f" ✗ {effect_name} failed: {e}") # Test motionblur and afterimage separately with framebuffer for effect_name in ["motionblur", "afterimage"]: try: self.log( f"[{len(effect_names) + 1}/{total_effects}] Testing effect: {effect_name} (with framebuffer)" ) effect = registry.get(effect_name) if not effect: self.log(f" Skipped: plugin not found") continue stage = EffectPluginStage( effect, name=effect_name, dependencies={"framebuffer.history.default"}, ) self.pipeline.add_stage(f"effect_{effect_name}", stage) self.pipeline.build(auto_inject=False) self._run_frames( frames_per_effect, oscillator=oscillator, effect=effect ) self.pipeline.remove_stage(f"effect_{effect_name}") self.pipeline.build(auto_inject=False) self.log(f" ✓ {effect_name} completed successfully") except Exception as e: self.log(f" ✗ {effect_name} failed: {e}") def _run_frames(self, num_frames: int, oscillator=None, effect=None): """Run a specified number of frames with proper timing.""" for frame in range(num_frames): self.frame_count += 1 self.context.set("frame_number", frame) if oscillator and effect: intensity = oscillator.value(frame) effect.configure(EffectConfig(intensity=intensity)) dt = self.timer.sleep_until_next_frame() self.camera.update(dt) self.pipeline.execute([]) def test_framebuffer(self): """Test framebuffer functionality.""" self.log("\n=== FRAMEBUFFER TEST ===") try: # Run frames using FrameTimer for consistent pacing self._run_frames(10) # Check framebuffer history history = self.context.get("framebuffer.default.history") assert history is not None, "No framebuffer history found" assert len(history) > 0, "Framebuffer history is empty" self.log(f"History frames: {len(history)}") self.log(f"Configured depth: {self.framebuffer.config.history_depth}") # Check intensity computation intensity = self.context.get("framebuffer.default.current_intensity") assert intensity is not None, "No intensity map found" self.log(f"Intensity map length: {len(intensity)}") # Check that frames are being stored correctly recent_frame = self.framebuffer.get_frame(0, self.context) assert recent_frame is not None, "Cannot retrieve recent frame" self.log(f"Recent frame rows: {len(recent_frame)}") self.log("✓ Framebuffer test passed") except Exception as e: self.log(f"✗ Framebuffer test failed: {e}") raise def test_camera_modes(self): """Test each camera mode.""" self.log("\n=== CAMERA MODES TEST ===") self.log(f"Duration: {self.mode_duration}s per mode at {self.target_fps} FPS") camera_modes = [ ("feed", 0.1), ("scroll", 0.5), ("horizontal", 0.3), ("omni", 0.3), ("floating", 0.5), ("bounce", 0.5), ("radial", 0.3), ] frames_per_mode = int(self.mode_duration * self.target_fps) self.log(f"Testing {len(camera_modes)} camera modes") self.log(f"Estimated time: {len(camera_modes) * self.mode_duration:.0f}s") for idx, (camera_type, speed) in enumerate(camera_modes, 1): try: self.log(f"[{idx}/{len(camera_modes)}] Testing camera: {camera_type}") # Rebuild camera self.camera.reset() cam_class = getattr(Camera, camera_type, Camera.scroll) new_camera = cam_class(speed=speed) new_camera.set_canvas_size(200, 200) # Update camera stages clock_stage = CameraClockStage(new_camera, name="camera-clock") self.pipeline.replace_stage("camera_update", clock_stage) camera_stage = CameraStage(new_camera, name="camera") self.pipeline.replace_stage("camera", camera_stage) self.camera = new_camera # Run frames with proper timing self._run_frames(frames_per_mode) # Verify camera moved (check final position) x, y = self.camera.x, self.camera.y self.log(f" Final position: ({x:.1f}, {y:.1f})") if camera_type == "feed": assert x == 0 and y == 0, "Feed camera should not move" elif camera_type in ("scroll", "horizontal"): assert abs(x) > 0 or abs(y) > 0, "Camera should have moved" else: self.log(f" Position check skipped (mode={camera_type})") self.log(f" ✓ {camera_type} completed successfully") except Exception as e: self.log(f" ✗ {camera_type} failed: {e}") def test_fps_switch_demo(self): """Demonstrate the effect of different frame rates on animation smoothness.""" if not self.enable_fps_switch: return self.log("\n=== FPS SWITCH DEMONSTRATION ===") fps_sequence = [ (30.0, 5.0), # 30 FPS for 5 seconds (60.0, 5.0), # 60 FPS for 5 seconds (30.0, 5.0), # Back to 30 FPS for 5 seconds (20.0, 3.0), # 20 FPS for 3 seconds (60.0, 3.0), # 60 FPS for 3 seconds ] original_fps = self.target_fps for fps, duration in fps_sequence: self.log(f"\n--- Switching to {fps} FPS for {duration}s ---") self.target_fps = fps self.timer.target_frame_dt = 1.0 / fps # Update display FPS if supported display = ( self.pipeline.get_stage("display").stage if self.pipeline.get_stage("display") else None ) if display and hasattr(display, "target_fps"): display.target_fps = fps display._frame_period = 1.0 / fps if fps > 0 else 0 frames = int(duration * fps) camera_type = "radial" # Use radial for smooth rotation that's visible at different FPS speed = 0.3 # Rebuild camera if needed self.camera.reset() new_camera = Camera.radial(speed=speed) new_camera.set_canvas_size(200, 200) clock_stage = CameraClockStage(new_camera, name="camera-clock") self.pipeline.replace_stage("camera_update", clock_stage) camera_stage = CameraStage(new_camera, name="camera") self.pipeline.replace_stage("camera", camera_stage) self.camera = new_camera for frame in range(frames): self.context.set("frame_number", frame) dt = self.timer.sleep_until_next_frame() self.camera.update(dt) result = self.pipeline.execute([]) self.log(f" Completed {frames} frames at {fps} FPS") # Restore original FPS self.target_fps = original_fps self.timer.target_frame_dt = 1.0 / original_fps self.log("✓ FPS switch demo completed") def run(self): """Run the complete demo.""" start_time = time.time() self.log("Starting Pipeline Demo Orchestrator") self.log("=" * 50) # Initialize frame timer self.timer = FrameTimer(target_frame_dt=1.0 / self.target_fps) # Build pipeline self.build_base_pipeline() try: # Test framebuffer first (needed for motion blur effects) self.test_framebuffer() # Test effects self.test_effects_oscillation() # Test camera modes self.test_camera_modes() # Optional FPS switch demonstration if self.enable_fps_switch: self.test_fps_switch_demo() else: self.log("\n=== FPS SWITCH DEMO ===") self.log("Skipped (enable with --switch-fps)") elapsed = time.time() - start_time self.log("\n" + "=" * 50) self.log("Demo completed successfully!") self.log(f"Total frames processed: {self.frame_count}") self.log(f"Total elapsed time: {elapsed:.1f}s") self.log(f"Average FPS: {self.frame_count / elapsed:.1f}") finally: # Always cleanup properly self._cleanup() def _cleanup(self): """Clean up pipeline resources.""" self.log("Cleaning up...", verbose=True) if self.pipeline: try: self.pipeline.cleanup() if self.verbose: self.log("Pipeline cleaned up successfully", verbose=True) except Exception as e: self.log(f"Error during pipeline cleanup: {e}", verbose=True) # If not looping, clear references if not self.loop: self.pipeline = None self.context = None if __name__ == "__main__": parser = argparse.ArgumentParser( description="Pipeline Demo Orchestrator - comprehensive demo of Mainline pipeline" ) parser.add_argument( "--null", action="store_true", help="Use null display (no visual output)", ) parser.add_argument( "--fps", type=float, default=30.0, help="Target frame rate (default: 30)", ) parser.add_argument( "--effect-duration", type=float, default=8.0, help="Duration per effect in seconds (default: 8)", ) parser.add_argument( "--mode-duration", type=float, default=3.0, help="Duration per camera mode in seconds (default: 3)", ) parser.add_argument( "--switch-fps", action="store_true", help="Include FPS switching demonstration", ) parser.add_argument( "--loop", action="store_true", help="Run demo in an infinite loop", ) parser.add_argument( "--verbose", action="store_true", help="Enable verbose output", ) args = parser.parse_args() orchestrator = PipelineDemoOrchestrator( use_terminal=not args.null, target_fps=args.fps, effect_duration=args.effect_duration, mode_duration=args.mode_duration, enable_fps_switch=args.switch_fps, loop=args.loop, verbose=args.verbose, ) try: orchestrator.run() except KeyboardInterrupt: print("\nInterrupted by user") sys.exit(0) except Exception as e: print(f"\nDemo failed: {e}") import traceback traceback.print_exc() sys.exit(1)