"""REPL Effect Plugin A HUD-style command-line interface for interactive pipeline control. This effect provides a Read-Eval-Print Loop (REPL) that allows users to: - View pipeline status and metrics - Toggle effects on/off - Adjust effect parameters in real-time - Inspect pipeline configuration - Execute commands for pipeline manipulation Usage: Add 'repl' to the effects list in your configuration. Commands: help - Show available commands status - Show pipeline status effects - List all effects effect - Toggle an effect param - Set effect parameter pipeline - Show current pipeline order clear - Clear output buffer quit - Exit REPL Keyboard: Enter - Execute command Up/Down - Navigate command history Tab - Auto-complete (if implemented) Ctrl+C - Clear current input """ from dataclasses import dataclass, field from engine.effects.types import ( EffectConfig, EffectContext, EffectPlugin, PartialUpdate, ) @dataclass class REPLState: """State of the REPL interface.""" command_history: list[str] = field(default_factory=list) current_command: str = "" history_index: int = -1 output_buffer: list[str] = field(default_factory=list) scroll_offset: int = 0 # Manual scroll position (0 = bottom of buffer) max_history: int = 50 max_output_lines: int = 50 # 50 lines excluding empty lines class ReplEffect(EffectPlugin): """REPL effect with HUD-style overlay for interactive pipeline control.""" name = "repl" config = EffectConfig( enabled=True, intensity=1.0, params={ "display_height": 8, # Height of REPL area in lines "show_hud": True, # Show HUD header lines }, ) supports_partial_updates = True def __init__(self): super().__init__() self.state = REPLState() self._last_metrics: dict | None = None def process_partial( self, buf: list[str], ctx: EffectContext, partial: PartialUpdate ) -> list[str]: """Handle partial updates efficiently.""" if partial.full_buffer: return self.process(buf, ctx) # Always process REPL since it needs to stay visible return self.process(buf, ctx) def process(self, buf: list[str], ctx: EffectContext) -> list[str]: """Render buffer with REPL overlay.""" # Get display dimensions from context height = ctx.terminal_height if hasattr(ctx, "terminal_height") else len(buf) width = ctx.terminal_width if hasattr(ctx, "terminal_width") else 80 # Calculate areas repl_height = self.config.params.get("display_height", 8) show_hud = self.config.params.get("show_hud", True) # Reserve space for REPL at bottom # HUD uses top 3 lines if enabled content_height = max(1, height - repl_height) # Build output output = [] # Add content (truncated or padded) for i in range(content_height): if i < len(buf): output.append(buf[i][:width]) else: output.append(" " * width) # Add HUD lines if enabled if show_hud: hud_output = self._render_hud(width, ctx) # Overlay HUD on first lines of content for i, line in enumerate(hud_output): if i < len(output): output[i] = line[:width] # Add separator output.append("─" * width) # Add REPL area repl_lines = self._render_repl(width, repl_height - 1) output.extend(repl_lines) # Ensure correct height while len(output) < height: output.append(" " * width) output = output[:height] return output def _render_hud(self, width: int, ctx: EffectContext) -> list[str]: """Render HUD-style header with metrics.""" lines = [] # Get metrics metrics = self._get_metrics(ctx) fps = metrics.get("fps", 0.0) frame_time = metrics.get("frame_time", 0.0) # Line 1: Title + FPS + Frame time fps_str = f"FPS: {fps:.1f}" if fps > 0 else "FPS: --" time_str = f"{frame_time:.1f}ms" if frame_time > 0 else "--ms" # Calculate scroll percentage (like vim) scroll_pct = 0 if len(self.state.output_buffer) > 1: max_scroll = len(self.state.output_buffer) - 1 scroll_pct = ( int((self.state.scroll_offset / max_scroll) * 100) if max_scroll > 0 else 0 ) scroll_str = f"{scroll_pct}%" line1 = ( f"\033[38;5;46mMAINLINE REPL\033[0m " f"\033[38;5;245m|\033[0m \033[38;5;39m{fps_str}\033[0m " f"\033[38;5;245m|\033[0m \033[38;5;208m{time_str}\033[0m " f"\033[38;5;245m|\033[0m \033[38;5;220m{scroll_str}\033[0m" ) lines.append(line1[:width]) # Line 2: Command count + History index cmd_count = len(self.state.command_history) hist_idx = ( f"[{self.state.history_index + 1}/{cmd_count}]" if cmd_count > 0 else "" ) line2 = ( f"\033[38;5;45mCOMMANDS:\033[0m " f"\033[1;38;5;227m{cmd_count}\033[0m " f"\033[38;5;245m|\033[0m \033[38;5;219m{hist_idx}\033[0m" ) lines.append(line2[:width]) # Line 3: Output buffer count with scroll indicator out_count = len(self.state.output_buffer) scroll_pos = f"({self.state.scroll_offset}/{out_count})" line3 = ( f"\033[38;5;44mOUTPUT:\033[0m " f"\033[1;38;5;227m{out_count}\033[0m lines " f"\033[38;5;245m{scroll_pos}\033[0m" ) lines.append(line3[:width]) return lines def _render_repl(self, width: int, height: int) -> list[str]: """Render REPL interface.""" lines = [] # Calculate how many output lines to show # Reserve 1 line for input prompt output_height = height - 1 # Manual scroll: scroll_offset=0 means show bottom of buffer # scroll_offset increases as you scroll up through history buffer_len = len(self.state.output_buffer) output_start = max(0, buffer_len - output_height - self.state.scroll_offset) # Render output buffer for i in range(output_height): idx = output_start + i if idx < buffer_len: line = self.state.output_buffer[idx][:width] lines.append(line) else: lines.append(" " * width) # Render input prompt prompt = "> " input_line = f"{prompt}{self.state.current_command}" # Add cursor indicator cursor = "█" if len(self.state.current_command) % 2 == 0 else " " input_line += cursor lines.append(input_line[:width]) return lines def scroll_output(self, delta: int) -> None: """Scroll the output buffer by delta lines. Args: delta: Positive to scroll up (back in time), negative to scroll down """ if not self.state.output_buffer: return # Calculate max scroll (can't scroll past top of buffer) max_scroll = max(0, len(self.state.output_buffer) - 1) # Update scroll offset self.state.scroll_offset = max( 0, min(max_scroll, self.state.scroll_offset + delta) ) # Reset scroll when new output arrives (handled in process_command) def _get_metrics(self, ctx: EffectContext) -> dict: """Get pipeline metrics from context.""" metrics = ctx.get_state("metrics") if metrics: self._last_metrics = metrics if self._last_metrics: # Extract FPS and frame time fps = 0.0 frame_time = 0.0 if "pipeline" in self._last_metrics: avg_ms = self._last_metrics["pipeline"].get("avg_ms", 0.0) frame_count = self._last_metrics.get("frame_count", 0) if frame_count > 0 and avg_ms > 0: fps = 1000.0 / avg_ms frame_time = avg_ms return {"fps": fps, "frame_time": frame_time} return {"fps": 0.0, "frame_time": 0.0} def process_command(self, command: str, ctx: EffectContext | None = None) -> None: """Process a REPL command.""" cmd = command.strip() if not cmd: return # Add to history self.state.command_history.append(cmd) if len(self.state.command_history) > self.state.max_history: self.state.command_history.pop(0) self.state.history_index = len(self.state.command_history) self.state.current_command = "" # Add to output buffer self.state.output_buffer.append(f"> {cmd}") # Reset scroll offset when new output arrives (scroll to bottom) self.state.scroll_offset = 0 # Parse command parts = cmd.split() cmd_name = parts[0].lower() cmd_args = parts[1:] if len(parts) > 1 else [] # Execute command try: if cmd_name == "help": self._cmd_help() elif cmd_name == "status": self._cmd_status(ctx) elif cmd_name == "effects": self._cmd_effects(ctx) elif cmd_name == "effect": self._cmd_effect(cmd_args, ctx) elif cmd_name == "param": self._cmd_param(cmd_args, ctx) elif cmd_name == "pipeline": self._cmd_pipeline(ctx) elif cmd_name == "available": self._cmd_available(ctx) elif cmd_name == "add_stage": self._cmd_add_stage(cmd_args) elif cmd_name == "remove_stage": self._cmd_remove_stage(cmd_args) elif cmd_name == "swap_stages": self._cmd_swap_stages(cmd_args) elif cmd_name == "move_stage": self._cmd_move_stage(cmd_args) elif cmd_name == "clear": self.state.output_buffer.clear() elif cmd_name == "quit" or cmd_name == "exit": self.state.output_buffer.append("Use Ctrl+C to exit") else: self.state.output_buffer.append(f"Unknown command: {cmd_name}") self.state.output_buffer.append("Type 'help' for available commands") except Exception as e: self.state.output_buffer.append(f"Error: {e}") def _cmd_help(self): """Show help message.""" self.state.output_buffer.append("Available commands:") self.state.output_buffer.append(" help - Show this help") self.state.output_buffer.append(" status - Show pipeline status") self.state.output_buffer.append(" effects - List effects in current pipeline") self.state.output_buffer.append(" available - List all available effect types") self.state.output_buffer.append(" effect - Toggle effect") self.state.output_buffer.append( " param - Set parameter" ) self.state.output_buffer.append(" pipeline - Show current pipeline order") self.state.output_buffer.append(" add_stage - Add new stage") self.state.output_buffer.append(" remove_stage - Remove stage") self.state.output_buffer.append(" swap_stages - Swap stages") self.state.output_buffer.append( " move_stage [after ] [before ] - Move stage" ) self.state.output_buffer.append(" clear - Clear output buffer") self.state.output_buffer.append(" quit - Show exit message") def _cmd_status(self, ctx: EffectContext | None): """Show pipeline status.""" if ctx: metrics = self._get_metrics(ctx) self.state.output_buffer.append(f"FPS: {metrics['fps']:.1f}") self.state.output_buffer.append( f"Frame time: {metrics['frame_time']:.1f}ms" ) self.state.output_buffer.append( f"Output lines: {len(self.state.output_buffer)}" ) self.state.output_buffer.append( f"History: {len(self.state.command_history)} commands" ) def _cmd_effects(self, ctx: EffectContext | None): """List all effects.""" if ctx: # Try to get effect list from context effects = ctx.get_state("pipeline_order") if effects: self.state.output_buffer.append("Pipeline effects:") for i, name in enumerate(effects): self.state.output_buffer.append(f" {i + 1}. {name}") else: self.state.output_buffer.append("No pipeline information available") else: self.state.output_buffer.append("No context available") def _cmd_available(self, ctx: EffectContext | None): """List all available effect types and stage categories.""" try: from engine.effects import get_registry from engine.effects.plugins import discover_plugins from engine.pipeline.registry import StageRegistry, discover_stages # Discover plugins and stages if not already done discover_plugins() discover_stages() # List effect types from registry registry = get_registry() all_effects = registry.list_all() if all_effects: self.state.output_buffer.append("Available effect types:") for name in sorted(all_effects.keys()): self.state.output_buffer.append(f" - {name}") else: self.state.output_buffer.append("No effects registered") # List stage categories and their types categories = StageRegistry.list_categories() if categories: self.state.output_buffer.append("") self.state.output_buffer.append("Stage categories:") for category in sorted(categories): stages = StageRegistry.list(category) if stages: self.state.output_buffer.append(f" {category}:") for stage_name in sorted(stages): self.state.output_buffer.append(f" - {stage_name}") except Exception as e: self.state.output_buffer.append(f"Error listing available types: {e}") def _cmd_effect(self, args: list[str], ctx: EffectContext | None): """Toggle effect on/off.""" if len(args) < 2: self.state.output_buffer.append("Usage: effect ") return effect_name = args[0] state = args[1].lower() if state not in ("on", "off"): self.state.output_buffer.append("State must be 'on' or 'off'") return # Emit event to toggle effect enabled = state == "on" self.state.output_buffer.append(f"Effect '{effect_name}' set to {state}") # Store command for external handling self._pending_command = { "action": "enable_stage" if enabled else "disable_stage", "stage": effect_name, } def _cmd_param(self, args: list[str], ctx: EffectContext | None): """Set effect parameter.""" if len(args) < 3: self.state.output_buffer.append("Usage: param ") return effect_name = args[0] param_name = args[1] try: param_value = float(args[2]) except ValueError: self.state.output_buffer.append("Value must be a number") return self.state.output_buffer.append( f"Setting {effect_name}.{param_name} = {param_value}" ) # Store command for external handling self._pending_command = { "action": "adjust_param", "stage": effect_name, "param": param_name, "delta": param_value, # Note: This sets absolute value, need adjustment } def _cmd_pipeline(self, ctx: EffectContext | None): """Show current pipeline order.""" if ctx: pipeline_order = ctx.get_state("pipeline_order") if pipeline_order: self.state.output_buffer.append( "Pipeline: " + " → ".join(pipeline_order) ) else: self.state.output_buffer.append("Pipeline information not available") else: self.state.output_buffer.append("No context available") def _cmd_add_stage(self, args: list[str]): """Add a new stage to the pipeline.""" if len(args) < 2: self.state.output_buffer.append("Usage: add_stage ") return stage_name = args[0] stage_type = args[1] self.state.output_buffer.append( f"Adding stage '{stage_name}' of type '{stage_type}'" ) # Store command for external handling self._pending_command = { "action": "add_stage", "stage": stage_name, "stage_type": stage_type, } def _cmd_remove_stage(self, args: list[str]): """Remove a stage from the pipeline.""" if len(args) < 1: self.state.output_buffer.append("Usage: remove_stage ") return stage_name = args[0] self.state.output_buffer.append(f"Removing stage '{stage_name}'") # Store command for external handling self._pending_command = { "action": "remove_stage", "stage": stage_name, } def _cmd_swap_stages(self, args: list[str]): """Swap two stages in the pipeline.""" if len(args) < 2: self.state.output_buffer.append("Usage: swap_stages ") return stage1 = args[0] stage2 = args[1] self.state.output_buffer.append(f"Swapping stages '{stage1}' and '{stage2}'") # Store command for external handling self._pending_command = { "action": "swap_stages", "stage1": stage1, "stage2": stage2, } def _cmd_move_stage(self, args: list[str]): """Move a stage in the pipeline.""" if len(args) < 1: self.state.output_buffer.append( "Usage: move_stage [after ] [before ]" ) return stage_name = args[0] after = None before = None # Parse optional after/before arguments i = 1 while i < len(args): if args[i] == "after" and i + 1 < len(args): after = args[i + 1] i += 2 elif args[i] == "before" and i + 1 < len(args): before = args[i + 1] i += 2 else: i += 1 if after: self.state.output_buffer.append( f"Moving stage '{stage_name}' after '{after}'" ) elif before: self.state.output_buffer.append( f"Moving stage '{stage_name}' before '{before}'" ) else: self.state.output_buffer.append( "Usage: move_stage [after ] [before ]" ) return # Store command for external handling self._pending_command = { "action": "move_stage", "stage": stage_name, "after": after, "before": before, } def get_pending_command(self) -> dict | None: """Get and clear pending command for external handling.""" cmd = getattr(self, "_pending_command", None) if cmd: self._pending_command = None return cmd def navigate_history(self, direction: int) -> None: """Navigate command history (up/down).""" if not self.state.command_history: return if direction > 0: # Down self.state.history_index = min( len(self.state.command_history), self.state.history_index + 1 ) else: # Up self.state.history_index = max(0, self.state.history_index - 1) if self.state.history_index < len(self.state.command_history): self.state.current_command = self.state.command_history[ self.state.history_index ] else: self.state.current_command = "" def append_to_command(self, char: str) -> None: """Append character to current command.""" if len(char) == 1: # Single character self.state.current_command += char def backspace(self) -> None: """Remove last character from command.""" self.state.current_command = self.state.current_command[:-1] def clear_command(self) -> None: """Clear current command.""" self.state.current_command = "" def configure(self, config: EffectConfig) -> None: """Configure the effect.""" self.config = config