""" Pipeline runner - handles preset-based pipeline construction and execution. """ import sys import time from typing import Any from engine.display import BorderMode, DisplayRegistry from engine.effects import get_registry from engine.fetch import fetch_all, fetch_all_fast, fetch_poetry, load_cache, save_cache from engine.pipeline import Pipeline, PipelineConfig, PipelineContext, get_preset from engine.pipeline.adapters import ( EffectPluginStage, MessageOverlayStage, SourceItemsToBufferStage, create_stage_from_display, create_stage_from_effect, ) from engine.pipeline.ui import UIConfig, UIPanel try: from engine.display.backends.websocket import WebSocketDisplay except ImportError: WebSocketDisplay = None def _handle_pipeline_mutation(pipeline: Pipeline, command: dict) -> bool: """Handle pipeline mutation commands from WebSocket or other external control. Args: pipeline: The pipeline to mutate command: Command dictionary with 'action' and other parameters Returns: True if command was successfully handled, False otherwise """ action = command.get("action") if action == "add_stage": # For now, this just returns True to acknowledge the command # In a full implementation, we'd need to create the appropriate stage print(f" [Pipeline] add_stage command received: {command}") return True elif action == "remove_stage": stage_name = command.get("stage") if stage_name: result = pipeline.remove_stage(stage_name) print(f" [Pipeline] Removed stage '{stage_name}': {result is not None}") return result is not None elif action == "replace_stage": stage_name = command.get("stage") # For now, this just returns True to acknowledge the command print(f" [Pipeline] replace_stage command received: {command}") return True elif action == "swap_stages": stage1 = command.get("stage1") stage2 = command.get("stage2") if stage1 and stage2: result = pipeline.swap_stages(stage1, stage2) print(f" [Pipeline] Swapped stages '{stage1}' and '{stage2}': {result}") return result elif action == "move_stage": stage_name = command.get("stage") after = command.get("after") before = command.get("before") if stage_name: result = pipeline.move_stage(stage_name, after, before) print(f" [Pipeline] Moved stage '{stage_name}': {result}") return result elif action == "enable_stage": stage_name = command.get("stage") if stage_name: result = pipeline.enable_stage(stage_name) print(f" [Pipeline] Enabled stage '{stage_name}': {result}") return result elif action == "disable_stage": stage_name = command.get("stage") if stage_name: result = pipeline.disable_stage(stage_name) print(f" [Pipeline] Disabled stage '{stage_name}': {result}") return result elif action == "cleanup_stage": stage_name = command.get("stage") if stage_name: pipeline.cleanup_stage(stage_name) print(f" [Pipeline] Cleaned up stage '{stage_name}'") return True elif action == "can_hot_swap": stage_name = command.get("stage") if stage_name: can_swap = pipeline.can_hot_swap(stage_name) print(f" [Pipeline] Can hot-swap '{stage_name}': {can_swap}") return True return False def run_pipeline_mode(preset_name: str = "demo", graph_config: str | None = None): """Run using the new unified pipeline architecture. Args: preset_name: Name of the preset to use graph_config: Path to a TOML graph configuration file (optional) """ import engine.effects.plugins as effects_plugins from engine.effects import PerformanceMonitor, set_monitor print(" \033[1;38;5;46mPIPELINE MODE\033[0m") print(" \033[38;5;245mUsing unified pipeline architecture\033[0m") effects_plugins.discover_plugins() monitor = PerformanceMonitor() set_monitor(monitor) # Check if graph config is provided using_graph_config = graph_config is not None if using_graph_config: from engine.pipeline.graph_toml import load_pipeline_from_toml print(f" \033[38;5;245mLoading graph from: {graph_config}\033[0m") # Determine viewport size viewport_width = 80 viewport_height = 24 if "--viewport" in sys.argv: idx = sys.argv.index("--viewport") if idx + 1 < len(sys.argv): vp = sys.argv[idx + 1] try: viewport_width, viewport_height = map(int, vp.split("x")) except ValueError: print("Error: Invalid viewport format. Use WxH (e.g., 40x15)") sys.exit(1) # Load pipeline from graph config try: pipeline = load_pipeline_from_toml( graph_config, viewport_width=viewport_width, viewport_height=viewport_height, ) except Exception as e: print(f" \033[38;5;196mError loading graph config: {e}\033[0m") sys.exit(1) # Set params for display from engine.pipeline.params import PipelineParams params = PipelineParams( viewport_width=viewport_width, viewport_height=viewport_height ) # Set display name from graph or CLI display_name = "terminal" # Default for graph mode if "--display" in sys.argv: idx = sys.argv.index("--display") if idx + 1 < len(sys.argv): display_name = sys.argv[idx + 1] else: # Use preset-based pipeline preset = get_preset(preset_name) if not preset: print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m") sys.exit(1) print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m") params = preset.to_params() # Use preset viewport if available, else default to 80x24 params.viewport_width = getattr(preset, "viewport_width", 80) params.viewport_height = getattr(preset, "viewport_height", 24) if "--viewport" in sys.argv: idx = sys.argv.index("--viewport") if idx + 1 < len(sys.argv): vp = sys.argv[idx + 1] try: params.viewport_width, params.viewport_height = map(int, vp.split("x")) except ValueError: print("Error: Invalid viewport format. Use WxH (e.g., 40x15)") sys.exit(1) # Set positioning mode from command line or config if "--positioning" in sys.argv: idx = sys.argv.index("--positioning") if idx + 1 < len(sys.argv): params.positioning = sys.argv[idx + 1] else: from engine import config as app_config params.positioning = app_config.get_config().positioning pipeline = Pipeline(config=preset.to_config()) print(" \033[38;5;245mFetching content...\033[0m") # Handle special sources that don't need traditional fetching introspection_source = None if preset.source == "pipeline-inspect": items = [] print(" \033[38;5;245mUsing pipeline introspection source\033[0m") elif preset.source == "empty": items = [] print(" \033[38;5;245mUsing empty source (no content)\033[0m") elif preset.source == "fixture": items = load_cache() if not items: print(" \033[38;5;196mNo fixture cache available\033[0m") sys.exit(1) print(f" \033[38;5;82mLoaded {len(items)} items from fixture\033[0m") else: cached = load_cache() if cached: items = cached print(f" \033[38;5;82mLoaded {len(items)} items from cache\033[0m") elif preset.source == "poetry": items, _, _ = fetch_poetry() else: items = fetch_all_fast() if items: print( f" \033[38;5;82mFast start: {len(items)} items from first 5 sources\033[0m" ) import threading def background_fetch(): full_items, _, _ = fetch_all() save_cache(full_items) background_thread = threading.Thread(target=background_fetch, daemon=True) background_thread.start() if not items: print(" \033[38;5;196mNo content available\033[0m") sys.exit(1) print(f" \033[38;5;82mLoaded {len(items)} items\033[0m") # CLI --display flag takes priority # Check if --display was explicitly provided display_explicitly_specified = "--display" in sys.argv if not using_graph_config: # Preset mode: use preset display as default display_name = preset.display if display_explicitly_specified: idx = sys.argv.index("--display") if idx + 1 < len(sys.argv): display_name = sys.argv[idx + 1] else: # Warn user that display is falling back to preset default print( f" \033[38;5;226mWarning: No --display specified, using preset default: {display_name}\033[0m" ) print( " \033[38;5;245mTip: Use --display null for headless mode (useful for testing/capture)\033[0m" ) else: # Graph mode: display_name already set above if not display_explicitly_specified: print(f" \033[38;5;245mUsing default display: {display_name}\033[0m") display = DisplayRegistry.create(display_name) if not display and not display_name.startswith("multi"): print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m") sys.exit(1) # Handle multi display (format: "multi:terminal,pygame") if not display and display_name.startswith("multi"): parts = display_name[6:].split( "," ) # "multi:terminal,pygame" -> ["terminal", "pygame"] display = DisplayRegistry.create_multi(parts) if not display: print(f" \033[38;5;196mFailed to create multi display: {parts}\033[0m") sys.exit(1) if not display: print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m") sys.exit(1) display.init(0, 0) # Determine if we need UI controller for WebSocket or border=UI need_ui_controller = False web_control_active = False if WebSocketDisplay and isinstance(display, WebSocketDisplay): need_ui_controller = True web_control_active = True elif isinstance(params.border, BorderMode) and params.border == BorderMode.UI: need_ui_controller = True effect_registry = get_registry() # Only build stages from preset if not using graph config # (graph config already has all stages defined) if not using_graph_config: # Create source stage based on preset source type if preset.source == "pipeline-inspect": from engine.data_sources.pipeline_introspection import ( PipelineIntrospectionSource, ) from engine.pipeline.adapters import DataSourceStage introspection_source = PipelineIntrospectionSource( pipeline=None, # Will be set after pipeline.build() viewport_width=80, viewport_height=24, ) pipeline.add_stage( "source", DataSourceStage(introspection_source, name="pipeline-inspect") ) elif preset.source == "empty": from engine.data_sources.sources import EmptyDataSource from engine.pipeline.adapters import DataSourceStage empty_source = EmptyDataSource(width=80, height=24) pipeline.add_stage("source", DataSourceStage(empty_source, name="empty")) else: from engine.data_sources.sources import ListDataSource from engine.pipeline.adapters import DataSourceStage list_source = ListDataSource(items, name=preset.source) pipeline.add_stage( "source", DataSourceStage(list_source, name=preset.source) ) # Add camera state update stage if specified in preset (must run before viewport filter) camera = None if preset.camera: from engine.camera import Camera from engine.pipeline.adapters import CameraClockStage, CameraStage speed = getattr(preset, "camera_speed", 1.0) if preset.camera == "feed": camera = Camera.feed(speed=speed) elif preset.camera == "scroll": camera = Camera.scroll(speed=speed) elif preset.camera == "vertical": camera = Camera.scroll(speed=speed) # Backwards compat elif preset.camera == "horizontal": camera = Camera.horizontal(speed=speed) elif preset.camera == "omni": camera = Camera.omni(speed=speed) elif preset.camera == "floating": camera = Camera.floating(speed=speed) elif preset.camera == "bounce": camera = Camera.bounce(speed=speed) elif preset.camera == "radial": camera = Camera.radial(speed=speed) elif preset.camera == "static" or preset.camera == "": # Static camera: no movement, but provides camera_y=0 for viewport filter camera = Camera.scroll(speed=0.0) # Speed 0 = no movement camera.set_canvas_size(200, 200) if camera: # Add camera update stage to ensure camera_y is available for viewport filter pipeline.add_stage( "camera_update", CameraClockStage(camera, name="camera-clock") ) # Only build stages from preset if not using graph config if not using_graph_config: # Add FontStage for headlines/poetry (default for demo) if preset.source in ["headlines", "poetry"]: from engine.pipeline.adapters import FontStage, ViewportFilterStage # Add viewport filter to prevent rendering all items pipeline.add_stage( "viewport_filter", ViewportFilterStage(name="viewport-filter") ) pipeline.add_stage("font", FontStage(name="font")) else: # Fallback to simple conversion for other sources pipeline.add_stage( "render", SourceItemsToBufferStage(name="items-to-buffer") ) # Add camera stage if specified in preset (after font/render stage) if camera: pipeline.add_stage("camera", CameraStage(camera, name=preset.camera)) for effect_name in preset.effects: effect = effect_registry.get(effect_name) if effect: pipeline.add_stage( f"effect_{effect_name}", create_stage_from_effect(effect, effect_name), ) # Add message overlay stage if enabled if getattr(preset, "enable_message_overlay", False): from engine import config as engine_config from engine.pipeline.adapters import MessageOverlayConfig overlay_config = MessageOverlayConfig( enabled=True, display_secs=engine_config.MESSAGE_DISPLAY_SECS if hasattr(engine_config, "MESSAGE_DISPLAY_SECS") else 30, topic_url=engine_config.NTFY_TOPIC if hasattr(engine_config, "NTFY_TOPIC") else None, ) pipeline.add_stage( "message_overlay", MessageOverlayStage(config=overlay_config) ) pipeline.add_stage("display", create_stage_from_display(display, display_name)) pipeline.build() # For pipeline-inspect, set the pipeline after build to avoid circular dependency if introspection_source is not None: introspection_source.set_pipeline(pipeline) if not pipeline.initialize(): print(" \033[38;5;196mFailed to initialize pipeline\033[0m") sys.exit(1) # Initialize UI panel if needed (border mode or WebSocket control) ui_panel = None render_ui_panel_in_terminal = False # Check for REPL effect in pipeline repl_effect = None for stage in pipeline.stages.values(): if isinstance(stage, EffectPluginStage) and stage._effect.name == "repl": repl_effect = stage._effect print( " \033[38;5;46mREPL effect detected - Interactive mode enabled\033[0m" ) break if need_ui_controller: from engine.display import render_ui_panel ui_panel = UIPanel(UIConfig(panel_width=24, start_with_preset_picker=True)) # Determine if we should render UI panel in terminal # Only render if border mode is UI (not for WebSocket-only mode) render_ui_panel_in_terminal = ( isinstance(params.border, BorderMode) and params.border == BorderMode.UI ) # Enable raw mode for terminal input if supported if hasattr(display, "set_raw_mode"): display.set_raw_mode(True) # Enable raw mode for REPL if present and not already enabled elif repl_effect and hasattr(display, "set_raw_mode"): display.set_raw_mode(True) # Register effect plugin stages from pipeline for UI control for stage in pipeline.stages.values(): if isinstance(stage, EffectPluginStage): effect = stage._effect enabled = effect.config.enabled if hasattr(effect, "config") else True stage_control = ui_panel.register_stage(stage, enabled=enabled) # Store reference to effect for easier access stage_control.effect = effect # type: ignore[attr-defined] # Select first stage by default if ui_panel.stages: first_stage = next(iter(ui_panel.stages)) ui_panel.select_stage(first_stage) # Populate param schema from EffectConfig if it's a dataclass ctrl = ui_panel.stages[first_stage] if hasattr(ctrl, "effect"): effect = ctrl.effect if hasattr(effect, "config"): config = effect.config # Try to get fields via dataclasses if available try: import dataclasses if dataclasses.is_dataclass(config): for field_name, field_obj in dataclasses.fields(config): if field_name == "enabled": continue value = getattr(config, field_name, None) if value is not None: ctrl.params[field_name] = value ctrl.param_schema[field_name] = { "type": type(value).__name__, "min": 0 if isinstance(value, (int, float)) else None, "max": 1 if isinstance(value, float) else None, "step": 0.1 if isinstance(value, float) else 1, } except Exception: pass # No dataclass fields, skip param UI # Set up callback for stage toggles def on_stage_toggled(stage_name: str, enabled: bool): """Update the actual stage's enabled state when UI toggles.""" stage = pipeline.get_stage(stage_name) if stage: # Set stage enabled flag for pipeline execution stage._enabled = enabled # Also update effect config if it's an EffectPluginStage if isinstance(stage, EffectPluginStage): stage._effect.config.enabled = enabled # Broadcast state update if WebSocket is active if web_control_active and isinstance(display, WebSocketDisplay): state = display._get_state_snapshot() if state: display.broadcast_state(state) ui_panel.set_event_callback("stage_toggled", on_stage_toggled) # Set up callback for parameter changes def on_param_changed(stage_name: str, param_name: str, value: Any): """Update the effect config when UI adjusts a parameter.""" stage = pipeline.get_stage(stage_name) if stage and isinstance(stage, EffectPluginStage): effect = stage._effect if hasattr(effect, "config"): setattr(effect.config, param_name, value) # Mark effect as needing reconfiguration if it has a configure method if hasattr(effect, "configure"): try: effect.configure(effect.config) except Exception: pass # Ignore reconfiguration errors # Broadcast state update if WebSocket is active if web_control_active and isinstance(display, WebSocketDisplay): state = display._get_state_snapshot() if state: display.broadcast_state(state) ui_panel.set_event_callback("param_changed", on_param_changed) # Set up preset list and handle preset changes from engine.pipeline import list_presets ui_panel.set_presets(list_presets(), preset_name) # Connect WebSocket to UI panel for remote control if web_control_active and isinstance(display, WebSocketDisplay): display.set_controller(ui_panel) def handle_websocket_command(command: dict) -> None: """Handle commands from WebSocket clients.""" action = command.get("action") # Handle pipeline mutation commands directly if action in ( "add_stage", "remove_stage", "replace_stage", "swap_stages", "move_stage", "enable_stage", "disable_stage", "cleanup_stage", "can_hot_swap", ): result = _handle_pipeline_mutation(pipeline, command) if result: state = display._get_state_snapshot() if state: display.broadcast_state(state) return # Handle UI panel commands if ui_panel.execute_command(command): # Broadcast updated state after command execution state = display._get_state_snapshot() if state: display.broadcast_state(state) display.set_command_callback(handle_websocket_command) def on_preset_changed(preset_name: str): """Handle preset change from UI - rebuild pipeline.""" nonlocal \ pipeline, \ display, \ items, \ params, \ ui_panel, \ current_width, \ current_height, \ web_control_active, \ render_ui_panel_in_terminal print(f" \033[38;5;245mSwitching to preset: {preset_name}\033[0m") # Save current UI panel state before rebuild ui_state = ui_panel.save_state() if ui_panel else None try: # Clean up old pipeline pipeline.cleanup() # Get new preset new_preset = get_preset(preset_name) if not new_preset: print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m") return # Update params for new preset params = new_preset.to_params() params.viewport_width = current_width params.viewport_height = current_height # Reconstruct pipeline configuration new_config = PipelineConfig( source=new_preset.source, display=new_preset.display, camera=new_preset.camera, effects=new_preset.effects, ) # Create new pipeline instance pipeline = Pipeline(config=new_config, context=PipelineContext()) # Re-add stages (similar to initial construction) # Source stage if new_preset.source == "pipeline-inspect": from engine.data_sources.pipeline_introspection import ( PipelineIntrospectionSource, ) from engine.pipeline.adapters import DataSourceStage introspection_source = PipelineIntrospectionSource( pipeline=None, viewport_width=current_width, viewport_height=current_height, ) pipeline.add_stage( "source", DataSourceStage(introspection_source, name="pipeline-inspect"), ) elif new_preset.source == "empty": from engine.data_sources.sources import EmptyDataSource from engine.pipeline.adapters import DataSourceStage empty_source = EmptyDataSource( width=current_width, height=current_height ) pipeline.add_stage( "source", DataSourceStage(empty_source, name="empty") ) elif new_preset.source == "fixture": items = load_cache() if not items: print(" \033[38;5;196mNo fixture cache available\033[0m") return from engine.data_sources.sources import ListDataSource from engine.pipeline.adapters import DataSourceStage list_source = ListDataSource(items, name="fixture") pipeline.add_stage( "source", DataSourceStage(list_source, name="fixture") ) else: # Fetch or use cached items cached = load_cache() if cached: items = cached elif new_preset.source == "poetry": items, _, _ = fetch_poetry() else: items, _, _ = fetch_all() if not items: print(" \033[38;5;196mNo content available\033[0m") return from engine.data_sources.sources import ListDataSource from engine.pipeline.adapters import DataSourceStage list_source = ListDataSource(items, name=new_preset.source) pipeline.add_stage( "source", DataSourceStage(list_source, name=new_preset.source) ) # Add viewport filter and font for headline/poetry sources if new_preset.source in ["headlines", "poetry", "fixture"]: from engine.pipeline.adapters import FontStage, ViewportFilterStage pipeline.add_stage( "viewport_filter", ViewportFilterStage(name="viewport-filter") ) pipeline.add_stage("font", FontStage(name="font")) # Add camera if specified if new_preset.camera: from engine.camera import Camera from engine.pipeline.adapters import CameraClockStage, CameraStage speed = getattr(new_preset, "camera_speed", 1.0) camera = None cam_type = new_preset.camera if cam_type == "feed": camera = Camera.feed(speed=speed) elif cam_type == "scroll" or cam_type == "vertical": camera = Camera.scroll(speed=speed) elif cam_type == "horizontal": camera = Camera.horizontal(speed=speed) elif cam_type == "omni": camera = Camera.omni(speed=speed) elif cam_type == "floating": camera = Camera.floating(speed=speed) elif cam_type == "bounce": camera = Camera.bounce(speed=speed) elif cam_type == "radial": camera = Camera.radial(speed=speed) elif cam_type == "static" or cam_type == "": # Static camera: no movement, but provides camera_y=0 for viewport filter camera = Camera.scroll(speed=0.0) camera.set_canvas_size(200, 200) if camera: # Add camera update stage to ensure camera_y is available for viewport filter pipeline.add_stage( "camera_update", CameraClockStage(camera, name="camera-clock"), ) pipeline.add_stage("camera", CameraStage(camera, name=cam_type)) # Add effects effect_registry = get_registry() for effect_name in new_preset.effects: effect = effect_registry.get(effect_name) if effect: pipeline.add_stage( f"effect_{effect_name}", create_stage_from_effect(effect, effect_name), ) # Add message overlay stage if enabled if getattr(new_preset, "enable_message_overlay", False): from engine import config as engine_config from engine.pipeline.adapters import MessageOverlayConfig overlay_config = MessageOverlayConfig( enabled=True, display_secs=engine_config.MESSAGE_DISPLAY_SECS if hasattr(engine_config, "MESSAGE_DISPLAY_SECS") else 30, topic_url=engine_config.NTFY_TOPIC if hasattr(engine_config, "NTFY_TOPIC") else None, ) pipeline.add_stage( "message_overlay", MessageOverlayStage(config=overlay_config) ) # Add display (respect CLI override) display_name = new_preset.display if "--display" in sys.argv: idx = sys.argv.index("--display") if idx + 1 < len(sys.argv): display_name = sys.argv[idx + 1] new_display = DisplayRegistry.create(display_name) if not new_display and not display_name.startswith("multi"): print( f" \033[38;5;196mFailed to create display: {display_name}\033[0m" ) return if not new_display and display_name.startswith("multi"): parts = display_name[6:].split(",") new_display = DisplayRegistry.create_multi(parts) if not new_display: print( f" \033[38;5;196mFailed to create multi display: {parts}\033[0m" ) return if not new_display: print( f" \033[38;5;196mFailed to create display: {display_name}\033[0m" ) return new_display.init(0, 0) pipeline.add_stage( "display", create_stage_from_display(new_display, display_name) ) pipeline.build() # Set pipeline for introspection source if needed if ( new_preset.source == "pipeline-inspect" and introspection_source is not None ): introspection_source.set_pipeline(pipeline) if not pipeline.initialize(): print(" \033[38;5;196mFailed to initialize pipeline\033[0m") return # Replace global references with new pipeline and display display = new_display # Reinitialize UI panel with new effect stages # Update web_control_active for new display web_control_active = WebSocketDisplay is not None and isinstance( display, WebSocketDisplay ) # Update render_ui_panel_in_terminal render_ui_panel_in_terminal = ( isinstance(params.border, BorderMode) and params.border == BorderMode.UI ) if need_ui_controller: ui_panel = UIPanel( UIConfig(panel_width=24, start_with_preset_picker=True) ) for stage in pipeline.stages.values(): if isinstance(stage, EffectPluginStage): effect = stage._effect enabled = ( effect.config.enabled if hasattr(effect, "config") else True ) stage_control = ui_panel.register_stage( stage, enabled=enabled ) stage_control.effect = effect # type: ignore[attr-defined] # Restore UI panel state if it was saved if ui_state: ui_panel.restore_state(ui_state) if ui_panel.stages: first_stage = next(iter(ui_panel.stages)) ui_panel.select_stage(first_stage) ctrl = ui_panel.stages[first_stage] if hasattr(ctrl, "effect"): effect = ctrl.effect if hasattr(effect, "config"): config = effect.config try: import dataclasses if dataclasses.is_dataclass(config): for field_name, field_obj in dataclasses.fields( config ): if field_name == "enabled": continue value = getattr(config, field_name, None) if value is not None: ctrl.params[field_name] = value ctrl.param_schema[field_name] = { "type": type(value).__name__, "min": 0 if isinstance(value, (int, float)) else None, "max": 1 if isinstance(value, float) else None, "step": 0.1 if isinstance(value, float) else 1, } except Exception: pass # Reconnect WebSocket to UI panel if needed if web_control_active and isinstance(display, WebSocketDisplay): display.set_controller(ui_panel) def handle_websocket_command(command: dict) -> None: """Handle commands from WebSocket clients.""" if ui_panel.execute_command(command): # Broadcast updated state after command execution state = display._get_state_snapshot() if state: display.broadcast_state(state) display.set_command_callback(handle_websocket_command) # Broadcast initial state after preset change state = display._get_state_snapshot() if state: display.broadcast_state(state) print(f" \033[38;5;82mPreset switched to {preset_name}\033[0m") except Exception as e: print(f" \033[38;5;196mError switching preset: {e}\033[0m") ui_panel.set_event_callback("preset_changed", on_preset_changed) print(" \033[38;5;82mStarting pipeline...\033[0m") print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") ctx = pipeline.context ctx.params = params ctx.set("display", display) # For graph mode, items might not be defined - use empty list if needed if not using_graph_config: ctx.set("items", items) else: # Graph-based pipelines typically use their own data sources # But we can set an empty list for compatibility ctx.set("items", []) ctx.set("pipeline", pipeline) ctx.set("pipeline_order", pipeline.execution_order) ctx.set("camera_y", 0) current_width = params.viewport_width current_height = params.viewport_height # Only get dimensions from display if viewport wasn't explicitly set if "--viewport" not in sys.argv and hasattr(display, "get_dimensions"): current_width, current_height = display.get_dimensions() params.viewport_width = current_width params.viewport_height = current_height try: frame = 0 while True: params.frame_number = frame ctx.params = params result = pipeline.execute(items) if result.success: # Handle UI panel compositing if enabled if ui_panel is not None and render_ui_panel_in_terminal: from engine.display import render_ui_panel buf = render_ui_panel( result.data, current_width, current_height, ui_panel, fps=params.fps if hasattr(params, "fps") else 60.0, frame_time=0.0, ) # Render with border=OFF since we already added borders display.show(buf, border=False) # Handle pygame events for UI if display_name == "pygame": import pygame for event in pygame.event.get(): if event.type == pygame.KEYDOWN: ui_panel.process_key_event(event.key, event.mod) # If space toggled stage, we could rebuild here (TODO) else: # Normal border handling show_border = ( params.border if isinstance(params.border, bool) else False ) # Pass positioning mode if display supports it positioning = getattr(params, "positioning", "mixed") if ( hasattr(display, "show") and "positioning" in display.show.__code__.co_varnames ): display.show( result.data, border=show_border, positioning=positioning ) else: display.show(result.data, border=show_border) # --- REPL Input Handling --- if repl_effect and hasattr(display, "get_input_keys"): # Get keyboard input (non-blocking) keys = display.get_input_keys(timeout=0.0) for key in keys: if key == "return": # Get command string before processing cmd_str = repl_effect.state.current_command if cmd_str: repl_effect.process_command(cmd_str, ctx) # Check for pending pipeline mutations pending = repl_effect.get_pending_command() if pending: _handle_pipeline_mutation(pipeline, pending) # Broadcast state update if WebSocket is active if web_control_active and isinstance( display, WebSocketDisplay ): state = display._get_state_snapshot() if state: display.broadcast_state(state) elif key == "up": repl_effect.navigate_history(-1) elif key == "down": repl_effect.navigate_history(1) elif key == "backspace": repl_effect.backspace() elif len(key) == 1: repl_effect.append_to_command(key) # --- End REPL Input Handling --- if hasattr(display, "is_quit_requested") and display.is_quit_requested(): if hasattr(display, "clear_quit_request"): display.clear_quit_request() raise KeyboardInterrupt() if "--viewport" not in sys.argv and hasattr(display, "get_dimensions"): new_w, new_h = display.get_dimensions() if new_w != current_width or new_h != current_height: current_width, current_height = new_w, new_h params.viewport_width = current_width params.viewport_height = current_height time.sleep(1 / 60) frame += 1 except KeyboardInterrupt: pipeline.cleanup() display.cleanup() print("\n \033[38;5;245mPipeline stopped\033[0m") return pipeline.cleanup() display.cleanup() print("\n \033[38;5;245mPipeline stopped\033[0m")