""" Application orchestrator — pipeline mode entry point. """ import sys import time from typing import Any import engine.effects.plugins as effects_plugins from engine import config from engine.display import BorderMode, DisplayRegistry from engine.effects import PerformanceMonitor, get_registry, set_monitor from engine.fetch import fetch_all, fetch_poetry, load_cache from engine.pipeline import ( Pipeline, PipelineConfig, get_preset, list_presets, ) from engine.pipeline.adapters import ( EffectPluginStage, SourceItemsToBufferStage, create_stage_from_display, create_stage_from_effect, ) from engine.pipeline.core import PipelineContext from engine.pipeline.params import PipelineParams from engine.pipeline.ui import UIConfig, UIPanel def main(): """Main entry point - all modes now use presets or CLI construction.""" if config.PIPELINE_DIAGRAM: try: from engine.pipeline import generate_pipeline_diagram except ImportError: print("Error: pipeline diagram not available") return print(generate_pipeline_diagram()) return # Check for direct pipeline construction flags if "--pipeline-source" in sys.argv: # Construct pipeline directly from CLI args run_pipeline_mode_direct() return preset_name = None if config.PRESET: preset_name = config.PRESET elif config.PIPELINE_MODE: preset_name = config.PIPELINE_PRESET else: preset_name = "demo" available = list_presets() if preset_name not in available: print(f"Error: Unknown preset '{preset_name}'") print(f"Available presets: {', '.join(available)}") sys.exit(1) run_pipeline_mode(preset_name) def run_pipeline_mode(preset_name: str = "demo"): """Run using the new unified pipeline architecture.""" print(" \033[1;38;5;46mPIPELINE MODE\033[0m") print(" \033[38;5;245mUsing unified pipeline architecture\033[0m") effects_plugins.discover_plugins() monitor = PerformanceMonitor() set_monitor(monitor) preset = get_preset(preset_name) if not preset: print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m") sys.exit(1) print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m") params = preset.to_params() params.viewport_width = 80 params.viewport_height = 24 pipeline = Pipeline( config=PipelineConfig( source=preset.source, display=preset.display, camera=preset.camera, effects=preset.effects, ) ) print(" \033[38;5;245mFetching content...\033[0m") # Handle special sources that don't need traditional fetching introspection_source = None if preset.source == "pipeline-inspect": items = [] print(" \033[38;5;245mUsing pipeline introspection source\033[0m") elif preset.source == "empty": items = [] print(" \033[38;5;245mUsing empty source (no content)\033[0m") elif preset.source == "fixture": items = load_cache() if not items: print(" \033[38;5;196mNo fixture cache available\033[0m") sys.exit(1) print(f" \033[38;5;82mLoaded {len(items)} items from fixture\033[0m") else: cached = load_cache() if cached: items = cached elif preset.source == "poetry": items, _, _ = fetch_poetry() else: items, _, _ = fetch_all() if not items: print(" \033[38;5;196mNo content available\033[0m") sys.exit(1) print(f" \033[38;5;82mLoaded {len(items)} items\033[0m") # CLI --display flag takes priority over preset # Check if --display was explicitly provided display_name = preset.display if "--display" in sys.argv: idx = sys.argv.index("--display") if idx + 1 < len(sys.argv): display_name = sys.argv[idx + 1] display = DisplayRegistry.create(display_name) if not display and not display_name.startswith("multi"): print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m") sys.exit(1) # Handle multi display (format: "multi:terminal,pygame") if not display and display_name.startswith("multi"): parts = display_name[6:].split( "," ) # "multi:terminal,pygame" -> ["terminal", "pygame"] display = DisplayRegistry.create_multi(parts) if not display: print(f" \033[38;5;196mFailed to create multi display: {parts}\033[0m") sys.exit(1) if not display: print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m") sys.exit(1) display.init(0, 0) effect_registry = get_registry() # Create source stage based on preset source type if preset.source == "pipeline-inspect": from engine.data_sources.pipeline_introspection import ( PipelineIntrospectionSource, ) from engine.pipeline.adapters import DataSourceStage introspection_source = PipelineIntrospectionSource( pipeline=None, # Will be set after pipeline.build() viewport_width=80, viewport_height=24, ) pipeline.add_stage( "source", DataSourceStage(introspection_source, name="pipeline-inspect") ) elif preset.source == "empty": from engine.data_sources.sources import EmptyDataSource from engine.pipeline.adapters import DataSourceStage empty_source = EmptyDataSource(width=80, height=24) pipeline.add_stage("source", DataSourceStage(empty_source, name="empty")) else: from engine.data_sources.sources import ListDataSource from engine.pipeline.adapters import DataSourceStage list_source = ListDataSource(items, name=preset.source) pipeline.add_stage("source", DataSourceStage(list_source, name=preset.source)) # Add FontStage for headlines/poetry (default for demo) if preset.source in ["headlines", "poetry"]: from engine.pipeline.adapters import FontStage, ViewportFilterStage # Add viewport filter to prevent rendering all items pipeline.add_stage( "viewport_filter", ViewportFilterStage(name="viewport-filter") ) pipeline.add_stage("font", FontStage(name="font")) else: # Fallback to simple conversion for other sources pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) # Add camera stage if specified in preset if preset.camera: from engine.camera import Camera from engine.pipeline.adapters import CameraStage camera = None speed = getattr(preset, "camera_speed", 1.0) if preset.camera == "feed": camera = Camera.feed(speed=speed) elif preset.camera == "scroll": camera = Camera.scroll(speed=speed) elif preset.camera == "vertical": camera = Camera.scroll(speed=speed) # Backwards compat elif preset.camera == "horizontal": camera = Camera.horizontal(speed=speed) elif preset.camera == "omni": camera = Camera.omni(speed=speed) elif preset.camera == "floating": camera = Camera.floating(speed=speed) elif preset.camera == "bounce": camera = Camera.bounce(speed=speed) if camera: pipeline.add_stage("camera", CameraStage(camera, name=preset.camera)) for effect_name in preset.effects: effect = effect_registry.get(effect_name) if effect: pipeline.add_stage( f"effect_{effect_name}", create_stage_from_effect(effect, effect_name) ) pipeline.add_stage("display", create_stage_from_display(display, display_name)) pipeline.build() # For pipeline-inspect, set the pipeline after build to avoid circular dependency if introspection_source is not None: introspection_source.set_pipeline(pipeline) if not pipeline.initialize(): print(" \033[38;5;196mFailed to initialize pipeline\033[0m") sys.exit(1) # Initialize UI panel if border mode requires it ui_panel = None if isinstance(params.border, BorderMode) and params.border == BorderMode.UI: from engine.display import render_ui_panel ui_panel = UIPanel(UIConfig(panel_width=24, start_with_preset_picker=True)) # Enable raw mode for terminal input if supported if hasattr(display, "set_raw_mode"): display.set_raw_mode(True) # Register effect plugin stages from pipeline for UI control for stage in pipeline.stages.values(): if isinstance(stage, EffectPluginStage): effect = stage._effect enabled = effect.config.enabled if hasattr(effect, "config") else True stage_control = ui_panel.register_stage(stage, enabled=enabled) # Store reference to effect for easier access stage_control.effect = effect # type: ignore[attr-defined] # Select first stage by default if ui_panel.stages: first_stage = next(iter(ui_panel.stages)) ui_panel.select_stage(first_stage) # Populate param schema from EffectConfig if it's a dataclass ctrl = ui_panel.stages[first_stage] if hasattr(ctrl, "effect"): effect = ctrl.effect if hasattr(effect, "config"): config = effect.config # Try to get fields via dataclasses if available try: import dataclasses if dataclasses.is_dataclass(config): for field_name, field_obj in dataclasses.fields(config): if field_name == "enabled": continue value = getattr(config, field_name, None) if value is not None: ctrl.params[field_name] = value ctrl.param_schema[field_name] = { "type": type(value).__name__, "min": 0 if isinstance(value, (int, float)) else None, "max": 1 if isinstance(value, float) else None, "step": 0.1 if isinstance(value, float) else 1, } except Exception: pass # No dataclass fields, skip param UI # Set up callback for stage toggles def on_stage_toggled(stage_name: str, enabled: bool): """Update the actual stage's enabled state when UI toggles.""" stage = pipeline.get_stage(stage_name) if stage: # Set stage enabled flag for pipeline execution stage._enabled = enabled # Also update effect config if it's an EffectPluginStage if isinstance(stage, EffectPluginStage): stage._effect.config.enabled = enabled ui_panel.set_event_callback("stage_toggled", on_stage_toggled) # Set up callback for parameter changes def on_param_changed(stage_name: str, param_name: str, value: Any): """Update the effect config when UI adjusts a parameter.""" stage = pipeline.get_stage(stage_name) if stage and isinstance(stage, EffectPluginStage): effect = stage._effect if hasattr(effect, "config"): setattr(effect.config, param_name, value) # Mark effect as needing reconfiguration if it has a configure method if hasattr(effect, "configure"): try: effect.configure(effect.config) except Exception: pass # Ignore reconfiguration errors ui_panel.set_event_callback("param_changed", on_param_changed) # Set up preset list and handle preset changes from engine.pipeline import list_presets ui_panel.set_presets(list_presets(), preset_name) def on_preset_changed(preset_name: str): """Handle preset change from UI - rebuild pipeline.""" nonlocal \ pipeline, \ display, \ items, \ params, \ ui_panel, \ current_width, \ current_height print(f" \033[38;5;245mSwitching to preset: {preset_name}\033[0m") try: # Clean up old pipeline pipeline.cleanup() # Get new preset new_preset = get_preset(preset_name) if not new_preset: print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m") return # Update params for new preset params = new_preset.to_params() params.viewport_width = current_width params.viewport_height = current_height # Reconstruct pipeline configuration new_config = PipelineConfig( source=new_preset.source, display=new_preset.display, camera=new_preset.camera, effects=new_preset.effects, ) # Create new pipeline instance pipeline = Pipeline(config=new_config, context=PipelineContext()) # Re-add stages (similar to initial construction) # Source stage if new_preset.source == "pipeline-inspect": from engine.data_sources.pipeline_introspection import ( PipelineIntrospectionSource, ) from engine.pipeline.adapters import DataSourceStage introspection_source = PipelineIntrospectionSource( pipeline=None, viewport_width=current_width, viewport_height=current_height, ) pipeline.add_stage( "source", DataSourceStage(introspection_source, name="pipeline-inspect"), ) elif new_preset.source == "empty": from engine.data_sources.sources import EmptyDataSource from engine.pipeline.adapters import DataSourceStage empty_source = EmptyDataSource( width=current_width, height=current_height ) pipeline.add_stage( "source", DataSourceStage(empty_source, name="empty") ) elif new_preset.source == "fixture": items = load_cache() if not items: print(" \033[38;5;196mNo fixture cache available\033[0m") return from engine.data_sources.sources import ListDataSource from engine.pipeline.adapters import DataSourceStage list_source = ListDataSource(items, name="fixture") pipeline.add_stage( "source", DataSourceStage(list_source, name="fixture") ) else: # Fetch or use cached items cached = load_cache() if cached: items = cached elif new_preset.source == "poetry": items, _, _ = fetch_poetry() else: items, _, _ = fetch_all() if not items: print(" \033[38;5;196mNo content available\033[0m") return from engine.data_sources.sources import ListDataSource from engine.pipeline.adapters import DataSourceStage list_source = ListDataSource(items, name=new_preset.source) pipeline.add_stage( "source", DataSourceStage(list_source, name=new_preset.source) ) # Add viewport filter and font for headline/poetry sources if new_preset.source in ["headlines", "poetry", "fixture"]: from engine.pipeline.adapters import FontStage, ViewportFilterStage pipeline.add_stage( "viewport_filter", ViewportFilterStage(name="viewport-filter") ) pipeline.add_stage("font", FontStage(name="font")) # Add camera if specified if new_preset.camera: from engine.camera import Camera from engine.pipeline.adapters import CameraStage speed = getattr(new_preset, "camera_speed", 1.0) camera = None cam_type = new_preset.camera if cam_type == "feed": camera = Camera.feed(speed=speed) elif cam_type == "scroll" or cam_type == "vertical": camera = Camera.scroll(speed=speed) elif cam_type == "horizontal": camera = Camera.horizontal(speed=speed) elif cam_type == "omni": camera = Camera.omni(speed=speed) elif cam_type == "floating": camera = Camera.floating(speed=speed) elif cam_type == "bounce": camera = Camera.bounce(speed=speed) if camera: pipeline.add_stage("camera", CameraStage(camera, name=cam_type)) # Add effects effect_registry = get_registry() for effect_name in new_preset.effects: effect = effect_registry.get(effect_name) if effect: pipeline.add_stage( f"effect_{effect_name}", create_stage_from_effect(effect, effect_name), ) # Add display (respect CLI override) display_name = new_preset.display if "--display" in sys.argv: idx = sys.argv.index("--display") if idx + 1 < len(sys.argv): display_name = sys.argv[idx + 1] new_display = DisplayRegistry.create(display_name) if not new_display and not display_name.startswith("multi"): print( f" \033[38;5;196mFailed to create display: {display_name}\033[0m" ) return if not new_display and display_name.startswith("multi"): parts = display_name[6:].split(",") new_display = DisplayRegistry.create_multi(parts) if not new_display: print( f" \033[38;5;196mFailed to create multi display: {parts}\033[0m" ) return if not new_display: print( f" \033[38;5;196mFailed to create display: {display_name}\033[0m" ) return new_display.init(0, 0) pipeline.add_stage( "display", create_stage_from_display(new_display, display_name) ) pipeline.build() # Set pipeline for introspection source if needed if ( new_preset.source == "pipeline-inspect" and introspection_source is not None ): introspection_source.set_pipeline(pipeline) if not pipeline.initialize(): print(" \033[38;5;196mFailed to initialize pipeline\033[0m") return # Replace global references with new pipeline and display display = new_display # Reinitialize UI panel with new effect stages if ( isinstance(params.border, BorderMode) and params.border == BorderMode.UI ): ui_panel = UIPanel( UIConfig(panel_width=24, start_with_preset_picker=True) ) for stage in pipeline.stages.values(): if isinstance(stage, EffectPluginStage): effect = stage._effect enabled = ( effect.config.enabled if hasattr(effect, "config") else True ) stage_control = ui_panel.register_stage( stage, enabled=enabled ) stage_control.effect = effect # type: ignore[attr-defined] if ui_panel.stages: first_stage = next(iter(ui_panel.stages)) ui_panel.select_stage(first_stage) ctrl = ui_panel.stages[first_stage] if hasattr(ctrl, "effect"): effect = ctrl.effect if hasattr(effect, "config"): config = effect.config try: import dataclasses if dataclasses.is_dataclass(config): for field_name, field_obj in dataclasses.fields( config ): if field_name == "enabled": continue value = getattr(config, field_name, None) if value is not None: ctrl.params[field_name] = value ctrl.param_schema[field_name] = { "type": type(value).__name__, "min": 0 if isinstance(value, (int, float)) else None, "max": 1 if isinstance(value, float) else None, "step": 0.1 if isinstance(value, float) else 1, } except Exception: pass print(f" \033[38;5;82mPreset switched to {preset_name}\033[0m") except Exception as e: print(f" \033[38;5;196mError switching preset: {e}\033[0m") ui_panel.set_event_callback("preset_changed", on_preset_changed) print(" \033[38;5;82mStarting pipeline...\033[0m") print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") ctx = pipeline.context ctx.params = params ctx.set("display", display) ctx.set("items", items) ctx.set("pipeline", pipeline) ctx.set("pipeline_order", pipeline.execution_order) ctx.set("camera_y", 0) current_width = 80 current_height = 24 if hasattr(display, "get_dimensions"): current_width, current_height = display.get_dimensions() params.viewport_width = current_width params.viewport_height = current_height try: frame = 0 while True: params.frame_number = frame ctx.params = params result = pipeline.execute(items) if result.success: # Handle UI panel compositing if enabled if ui_panel is not None: from engine.display import render_ui_panel buf = render_ui_panel( result.data, current_width, current_height, ui_panel, fps=params.fps if hasattr(params, "fps") else 60.0, frame_time=0.0, ) # Render with border=OFF since we already added borders display.show(buf, border=False) # Handle pygame events for UI if display_name == "pygame": import pygame for event in pygame.event.get(): if event.type == pygame.KEYDOWN: ui_panel.process_key_event(event.key, event.mod) # If space toggled stage, we could rebuild here (TODO) else: # Normal border handling show_border = ( params.border if isinstance(params.border, bool) else False ) display.show(result.data, border=show_border) if hasattr(display, "is_quit_requested") and display.is_quit_requested(): if hasattr(display, "clear_quit_request"): display.clear_quit_request() raise KeyboardInterrupt() if hasattr(display, "get_dimensions"): new_w, new_h = display.get_dimensions() if new_w != current_width or new_h != current_height: current_width, current_height = new_w, new_h params.viewport_width = current_width params.viewport_height = current_height time.sleep(1 / 60) frame += 1 except KeyboardInterrupt: pipeline.cleanup() display.cleanup() print("\n \033[38;5;245mPipeline stopped\033[0m") return pipeline.cleanup() display.cleanup() print("\n \033[38;5;245mPipeline stopped\033[0m") def run_pipeline_mode_direct(): """Construct and run a pipeline directly from CLI arguments. Usage: python -m engine.app --pipeline-source headlines --pipeline-effects noise,fade --display null python -m engine.app --pipeline-source fixture --pipeline-effects glitch --pipeline-ui --display null Flags: --pipeline-source : Headlines, fixture, poetry, empty, pipeline-inspect --pipeline-effects : Comma-separated list (noise, fade, glitch, firehose, hud, tint, border, crop) --pipeline-camera : scroll, feed, horizontal, omni, floating, bounce --pipeline-display : terminal, pygame, websocket, null, multi:term,pygame --pipeline-ui: Enable UI panel (BorderMode.UI) --pipeline-border : off, simple, ui """ import sys from engine.camera import Camera from engine.data_sources.pipeline_introspection import PipelineIntrospectionSource from engine.data_sources.sources import EmptyDataSource, ListDataSource from engine.display import BorderMode, DisplayRegistry from engine.effects import get_registry from engine.fetch import fetch_all, fetch_poetry, load_cache from engine.pipeline import Pipeline, PipelineConfig, PipelineContext from engine.pipeline.adapters import ( CameraStage, DataSourceStage, EffectPluginStage, create_stage_from_display, create_stage_from_effect, ) from engine.pipeline.ui import UIConfig, UIPanel # Parse CLI arguments source_name = None effect_names = [] camera_type = None # Will use MVP default (static) display_name = None # Will use MVP default (terminal) ui_enabled = False border_mode = BorderMode.OFF source_items = None allow_unsafe = False i = 1 argv = sys.argv while i < len(argv): arg = argv[i] if arg == "--pipeline-source" and i + 1 < len(argv): source_name = argv[i + 1] i += 2 elif arg == "--pipeline-effects" and i + 1 < len(argv): effect_names = [e.strip() for e in argv[i + 1].split(",") if e.strip()] i += 2 elif arg == "--pipeline-camera" and i + 1 < len(argv): camera_type = argv[i + 1] i += 2 elif arg == "--pipeline-display" and i + 1 < len(argv): display_name = argv[i + 1] i += 2 elif arg == "--pipeline-ui": ui_enabled = True i += 1 elif arg == "--pipeline-border" and i + 1 < len(argv): mode = argv[i + 1] if mode == "simple": border_mode = True elif mode == "ui": border_mode = BorderMode.UI else: border_mode = False i += 2 elif arg == "--allow-unsafe": allow_unsafe = True i += 1 else: i += 1 if not source_name: print("Error: --pipeline-source is required") print( "Usage: python -m engine.app --pipeline-source [--pipeline-effects ] ..." ) sys.exit(1) print(" \033[38;5;245mDirect pipeline construction\033[0m") print(f" Source: {source_name}") print(f" Effects: {effect_names}") print(f" Camera: {camera_type}") print(f" Display: {display_name}") print(f" UI Enabled: {ui_enabled}") # Import validation from engine.pipeline.validation import validate_pipeline_config # Create initial config and params params = PipelineParams() params.source = source_name params.camera_mode = camera_type if camera_type is not None else "" params.effect_order = effect_names params.border = border_mode # Create minimal config for validation config = PipelineConfig( source=source_name, display=display_name or "", # Will be filled by validation camera=camera_type if camera_type is not None else "", effects=effect_names, ) # Run MVP validation result = validate_pipeline_config(config, params, allow_unsafe=allow_unsafe) if result.warnings and not allow_unsafe: print(" \033[38;5;226mWarning: MVP validation found issues:\033[0m") for warning in result.warnings: print(f" - {warning}") if result.changes: print(" \033[38;5;226mApplied MVP defaults:\033[0m") for change in result.changes: print(f" {change}") if not result.valid: print( " \033[38;5;196mPipeline configuration invalid and could not be fixed\033[0m" ) sys.exit(1) # Show MVP summary print(" \033[38;5;245mMVP Configuration:\033[0m") print(f" Source: {result.config.source}") print(f" Display: {result.config.display}") print(f" Camera: {result.config.camera or 'static (none)'}") print(f" Effects: {result.config.effects if result.config.effects else 'none'}") print(f" Border: {result.params.border}") # Load source items if source_name == "headlines": cached = load_cache() if cached: source_items = cached else: source_items, _, _ = fetch_all() elif source_name == "fixture": source_items = load_cache() if not source_items: print(" \033[38;5;196mNo fixture cache available\033[0m") sys.exit(1) elif source_name == "poetry": source_items, _, _ = fetch_poetry() elif source_name == "empty" or source_name == "pipeline-inspect": source_items = [] else: print(f" \033[38;5;196mUnknown source: {source_name}\033[0m") sys.exit(1) if source_items is not None: print(f" \033[38;5;82mLoaded {len(source_items)} items\033[0m") # Set border mode if ui_enabled: border_mode = BorderMode.UI # Build pipeline using validated config and params params = result.params params.viewport_width = 80 params.viewport_height = 24 ctx = PipelineContext() ctx.params = params # Create display using validated display name display_name = result.config.display or "terminal" # Default to terminal if empty display = DisplayRegistry.create(display_name) if not display: print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m") sys.exit(1) display.init(0, 0) # Create pipeline using validated config pipeline = Pipeline(config=result.config, context=ctx) # Add stages # Source stage if source_name == "pipeline-inspect": introspection_source = PipelineIntrospectionSource( pipeline=None, viewport_width=params.viewport_width, viewport_height=params.viewport_height, ) pipeline.add_stage( "source", DataSourceStage(introspection_source, name="pipeline-inspect") ) elif source_name == "empty": empty_source = EmptyDataSource( width=params.viewport_width, height=params.viewport_height ) pipeline.add_stage("source", DataSourceStage(empty_source, name="empty")) else: list_source = ListDataSource(source_items, name=source_name) pipeline.add_stage("source", DataSourceStage(list_source, name=source_name)) # Add viewport filter and font for headline sources if source_name in ["headlines", "poetry", "fixture"]: from engine.pipeline.adapters import FontStage, ViewportFilterStage pipeline.add_stage( "viewport_filter", ViewportFilterStage(name="viewport-filter") ) pipeline.add_stage("font", FontStage(name="font")) # Add camera speed = getattr(params, "camera_speed", 1.0) camera = None if camera_type == "feed": camera = Camera.feed(speed=speed) elif camera_type == "scroll": camera = Camera.scroll(speed=speed) elif camera_type == "horizontal": camera = Camera.horizontal(speed=speed) elif camera_type == "omni": camera = Camera.omni(speed=speed) elif camera_type == "floating": camera = Camera.floating(speed=speed) elif camera_type == "bounce": camera = Camera.bounce(speed=speed) if camera: pipeline.add_stage("camera", CameraStage(camera, name=camera_type)) # Add effects effect_registry = get_registry() for effect_name in effect_names: effect = effect_registry.get(effect_name) if effect: pipeline.add_stage( f"effect_{effect_name}", create_stage_from_effect(effect, effect_name) ) # Add display pipeline.add_stage("display", create_stage_from_display(display, display_name)) pipeline.build() if not pipeline.initialize(): print(" \033[38;5;196mFailed to initialize pipeline\033[0m") sys.exit(1) # Create UI panel if border mode is UI ui_panel = None if params.border == BorderMode.UI: ui_panel = UIPanel(UIConfig(panel_width=24, start_with_preset_picker=True)) # Enable raw mode for terminal input if supported if hasattr(display, "set_raw_mode"): display.set_raw_mode(True) for stage in pipeline.stages.values(): if isinstance(stage, EffectPluginStage): effect = stage._effect enabled = effect.config.enabled if hasattr(effect, "config") else True stage_control = ui_panel.register_stage(stage, enabled=enabled) stage_control.effect = effect # type: ignore[attr-defined] if ui_panel.stages: first_stage = next(iter(ui_panel.stages)) ui_panel.select_stage(first_stage) ctrl = ui_panel.stages[first_stage] if hasattr(ctrl, "effect"): effect = ctrl.effect if hasattr(effect, "config"): config = effect.config try: import dataclasses if dataclasses.is_dataclass(config): for field_name, field_obj in dataclasses.fields(config): if field_name == "enabled": continue value = getattr(config, field_name, None) if value is not None: ctrl.params[field_name] = value ctrl.param_schema[field_name] = { "type": type(value).__name__, "min": 0 if isinstance(value, (int, float)) else None, "max": 1 if isinstance(value, float) else None, "step": 0.1 if isinstance(value, float) else 1, } except Exception: pass # Run pipeline loop from engine.display import render_ui_panel ctx.set("display", display) ctx.set("items", source_items) ctx.set("pipeline", pipeline) ctx.set("pipeline_order", pipeline.execution_order) current_width = params.viewport_width current_height = params.viewport_height print(" \033[38;5;82mStarting pipeline...\033[0m") print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") try: frame = 0 while True: params.frame_number = frame ctx.params = params result = pipeline.execute(source_items) if not result.success: print(" \033[38;5;196mPipeline execution failed\033[0m") break # Render with UI panel if ui_panel is not None: buf = render_ui_panel( result.data, current_width, current_height, ui_panel ) display.show(buf, border=False) else: display.show(result.data, border=border_mode) # Handle keyboard events if UI is enabled if ui_panel is not None: # Try pygame first if hasattr(display, "_pygame"): try: import pygame for event in pygame.event.get(): if event.type == pygame.KEYDOWN: ui_panel.process_key_event(event.key, event.mod) except (ImportError, Exception): pass # Try terminal input elif hasattr(display, "get_input_keys"): try: keys = display.get_input_keys() for key in keys: ui_panel.process_key_event(key, 0) except Exception: pass # Check for quit request if hasattr(display, "is_quit_requested") and display.is_quit_requested(): if hasattr(display, "clear_quit_request"): display.clear_quit_request() raise KeyboardInterrupt() time.sleep(1 / 60) frame += 1 except KeyboardInterrupt: pipeline.cleanup() display.cleanup() print("\n \033[38;5;245mPipeline stopped\033[0m") return pipeline.cleanup() display.cleanup() print("\n \033[38;5;245mPipeline stopped\033[0m") if __name__ == "__main__": main()