forked from genewildish/Mainline
- Add _cmd_available() method to list all registered effect types - Discover plugins and query registry to get complete list - Add 'available' to help text and command processing - Update help description for 'effects' command to clarify it shows current pipeline
545 lines
19 KiB
Python
545 lines
19 KiB
Python
"""REPL Effect Plugin
|
|
|
|
A HUD-style command-line interface for interactive pipeline control.
|
|
|
|
This effect provides a Read-Eval-Print Loop (REPL) that allows users to:
|
|
- View pipeline status and metrics
|
|
- Toggle effects on/off
|
|
- Adjust effect parameters in real-time
|
|
- Inspect pipeline configuration
|
|
- Execute commands for pipeline manipulation
|
|
|
|
Usage:
|
|
Add 'repl' to the effects list in your configuration.
|
|
|
|
Commands:
|
|
help - Show available commands
|
|
status - Show pipeline status
|
|
effects - List all effects
|
|
effect <name> <on|off> - Toggle an effect
|
|
param <effect> <param> <value> - Set effect parameter
|
|
pipeline - Show current pipeline order
|
|
clear - Clear output buffer
|
|
quit - Exit REPL
|
|
|
|
Keyboard:
|
|
Enter - Execute command
|
|
Up/Down - Navigate command history
|
|
Tab - Auto-complete (if implemented)
|
|
Ctrl+C - Clear current input
|
|
"""
|
|
|
|
from dataclasses import dataclass, field
|
|
|
|
from engine.effects.types import (
|
|
EffectConfig,
|
|
EffectContext,
|
|
EffectPlugin,
|
|
PartialUpdate,
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class REPLState:
|
|
"""State of the REPL interface."""
|
|
|
|
command_history: list[str] = field(default_factory=list)
|
|
current_command: str = ""
|
|
history_index: int = -1
|
|
output_buffer: list[str] = field(default_factory=list)
|
|
max_history: int = 50
|
|
max_output_lines: int = 20
|
|
|
|
|
|
class ReplEffect(EffectPlugin):
|
|
"""REPL effect with HUD-style overlay for interactive pipeline control."""
|
|
|
|
name = "repl"
|
|
config = EffectConfig(
|
|
enabled=True,
|
|
intensity=1.0,
|
|
params={
|
|
"display_height": 8, # Height of REPL area in lines
|
|
"show_hud": True, # Show HUD header lines
|
|
},
|
|
)
|
|
supports_partial_updates = True
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self.state = REPLState()
|
|
self._last_metrics: dict | None = None
|
|
|
|
def process_partial(
|
|
self, buf: list[str], ctx: EffectContext, partial: PartialUpdate
|
|
) -> list[str]:
|
|
"""Handle partial updates efficiently."""
|
|
if partial.full_buffer:
|
|
return self.process(buf, ctx)
|
|
# Always process REPL since it needs to stay visible
|
|
return self.process(buf, ctx)
|
|
|
|
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
|
"""Render buffer with REPL overlay."""
|
|
# Get display dimensions from context
|
|
height = ctx.terminal_height if hasattr(ctx, "terminal_height") else len(buf)
|
|
width = ctx.terminal_width if hasattr(ctx, "terminal_width") else 80
|
|
|
|
# Calculate areas
|
|
repl_height = self.config.params.get("display_height", 8)
|
|
show_hud = self.config.params.get("show_hud", True)
|
|
|
|
# Reserve space for REPL at bottom
|
|
# HUD uses top 3 lines if enabled
|
|
content_height = max(1, height - repl_height)
|
|
|
|
# Build output
|
|
output = []
|
|
|
|
# Add content (truncated or padded)
|
|
for i in range(content_height):
|
|
if i < len(buf):
|
|
output.append(buf[i][:width])
|
|
else:
|
|
output.append(" " * width)
|
|
|
|
# Add HUD lines if enabled
|
|
if show_hud:
|
|
hud_output = self._render_hud(width, ctx)
|
|
# Overlay HUD on first lines of content
|
|
for i, line in enumerate(hud_output):
|
|
if i < len(output):
|
|
output[i] = line[:width]
|
|
|
|
# Add separator
|
|
output.append("─" * width)
|
|
|
|
# Add REPL area
|
|
repl_lines = self._render_repl(width, repl_height - 1)
|
|
output.extend(repl_lines)
|
|
|
|
# Ensure correct height
|
|
while len(output) < height:
|
|
output.append(" " * width)
|
|
output = output[:height]
|
|
|
|
return output
|
|
|
|
def _render_hud(self, width: int, ctx: EffectContext) -> list[str]:
|
|
"""Render HUD-style header with metrics."""
|
|
lines = []
|
|
|
|
# Get metrics
|
|
metrics = self._get_metrics(ctx)
|
|
fps = metrics.get("fps", 0.0)
|
|
frame_time = metrics.get("frame_time", 0.0)
|
|
|
|
# Line 1: Title + FPS + Frame time
|
|
fps_str = f"FPS: {fps:.1f}" if fps > 0 else "FPS: --"
|
|
time_str = f"{frame_time:.1f}ms" if frame_time > 0 else "--ms"
|
|
line1 = (
|
|
f"\033[38;5;46mMAINLINE REPL\033[0m "
|
|
f"\033[38;5;245m|\033[0m \033[38;5;39m{fps_str}\033[0m "
|
|
f"\033[38;5;245m|\033[0m \033[38;5;208m{time_str}\033[0m"
|
|
)
|
|
lines.append(line1[:width])
|
|
|
|
# Line 2: Command count + History index
|
|
cmd_count = len(self.state.command_history)
|
|
hist_idx = (
|
|
f"[{self.state.history_index + 1}/{cmd_count}]" if cmd_count > 0 else ""
|
|
)
|
|
line2 = (
|
|
f"\033[38;5;45mCOMMANDS:\033[0m "
|
|
f"\033[1;38;5;227m{cmd_count}\033[0m "
|
|
f"\033[38;5;245m|\033[0m \033[38;5;219m{hist_idx}\033[0m"
|
|
)
|
|
lines.append(line2[:width])
|
|
|
|
# Line 3: Output buffer count
|
|
out_count = len(self.state.output_buffer)
|
|
line3 = f"\033[38;5;44mOUTPUT:\033[0m \033[1;38;5;227m{out_count}\033[0m lines"
|
|
lines.append(line3[:width])
|
|
|
|
return lines
|
|
|
|
def _render_repl(self, width: int, height: int) -> list[str]:
|
|
"""Render REPL interface."""
|
|
lines = []
|
|
|
|
# Calculate how many output lines to show
|
|
# Reserve 1 line for input prompt
|
|
output_height = height - 1
|
|
output_start = max(0, len(self.state.output_buffer) - output_height)
|
|
|
|
# Render output buffer
|
|
for i in range(output_height):
|
|
idx = output_start + i
|
|
if idx < len(self.state.output_buffer):
|
|
line = self.state.output_buffer[idx][:width]
|
|
lines.append(line)
|
|
else:
|
|
lines.append(" " * width)
|
|
|
|
# Render input prompt
|
|
prompt = "> "
|
|
input_line = f"{prompt}{self.state.current_command}"
|
|
# Add cursor indicator
|
|
cursor = "█" if len(self.state.current_command) % 2 == 0 else " "
|
|
input_line += cursor
|
|
lines.append(input_line[:width])
|
|
|
|
return lines
|
|
|
|
def _get_metrics(self, ctx: EffectContext) -> dict:
|
|
"""Get pipeline metrics from context."""
|
|
metrics = ctx.get_state("metrics")
|
|
if metrics:
|
|
self._last_metrics = metrics
|
|
|
|
if self._last_metrics:
|
|
# Extract FPS and frame time
|
|
fps = 0.0
|
|
frame_time = 0.0
|
|
|
|
if "pipeline" in self._last_metrics:
|
|
avg_ms = self._last_metrics["pipeline"].get("avg_ms", 0.0)
|
|
frame_count = self._last_metrics.get("frame_count", 0)
|
|
if frame_count > 0 and avg_ms > 0:
|
|
fps = 1000.0 / avg_ms
|
|
frame_time = avg_ms
|
|
|
|
return {"fps": fps, "frame_time": frame_time}
|
|
|
|
return {"fps": 0.0, "frame_time": 0.0}
|
|
|
|
def process_command(self, command: str, ctx: EffectContext | None = None) -> None:
|
|
"""Process a REPL command."""
|
|
cmd = command.strip()
|
|
if not cmd:
|
|
return
|
|
|
|
# Add to history
|
|
self.state.command_history.append(cmd)
|
|
if len(self.state.command_history) > self.state.max_history:
|
|
self.state.command_history.pop(0)
|
|
|
|
self.state.history_index = len(self.state.command_history)
|
|
self.state.current_command = ""
|
|
|
|
# Add to output buffer
|
|
self.state.output_buffer.append(f"> {cmd}")
|
|
|
|
# Parse command
|
|
parts = cmd.split()
|
|
cmd_name = parts[0].lower()
|
|
cmd_args = parts[1:] if len(parts) > 1 else []
|
|
|
|
# Execute command
|
|
try:
|
|
if cmd_name == "help":
|
|
self._cmd_help()
|
|
elif cmd_name == "status":
|
|
self._cmd_status(ctx)
|
|
elif cmd_name == "effects":
|
|
self._cmd_effects(ctx)
|
|
elif cmd_name == "effect":
|
|
self._cmd_effect(cmd_args, ctx)
|
|
elif cmd_name == "param":
|
|
self._cmd_param(cmd_args, ctx)
|
|
elif cmd_name == "pipeline":
|
|
self._cmd_pipeline(ctx)
|
|
elif cmd_name == "available":
|
|
self._cmd_available(ctx)
|
|
elif cmd_name == "add_stage":
|
|
self._cmd_add_stage(cmd_args)
|
|
elif cmd_name == "remove_stage":
|
|
self._cmd_remove_stage(cmd_args)
|
|
elif cmd_name == "swap_stages":
|
|
self._cmd_swap_stages(cmd_args)
|
|
elif cmd_name == "move_stage":
|
|
self._cmd_move_stage(cmd_args)
|
|
elif cmd_name == "clear":
|
|
self.state.output_buffer.clear()
|
|
elif cmd_name == "quit" or cmd_name == "exit":
|
|
self.state.output_buffer.append("Use Ctrl+C to exit")
|
|
else:
|
|
self.state.output_buffer.append(f"Unknown command: {cmd_name}")
|
|
self.state.output_buffer.append("Type 'help' for available commands")
|
|
|
|
except Exception as e:
|
|
self.state.output_buffer.append(f"Error: {e}")
|
|
|
|
def _cmd_help(self):
|
|
"""Show help message."""
|
|
self.state.output_buffer.append("Available commands:")
|
|
self.state.output_buffer.append(" help - Show this help")
|
|
self.state.output_buffer.append(" status - Show pipeline status")
|
|
self.state.output_buffer.append(" effects - List effects in current pipeline")
|
|
self.state.output_buffer.append(" available - List all available effect types")
|
|
self.state.output_buffer.append(" effect <name> <on|off> - Toggle effect")
|
|
self.state.output_buffer.append(
|
|
" param <effect> <param> <value> - Set parameter"
|
|
)
|
|
self.state.output_buffer.append(" pipeline - Show current pipeline order")
|
|
self.state.output_buffer.append(" add_stage <name> <type> - Add new stage")
|
|
self.state.output_buffer.append(" remove_stage <name> - Remove stage")
|
|
self.state.output_buffer.append(" swap_stages <name1> <name2> - Swap stages")
|
|
self.state.output_buffer.append(
|
|
" move_stage <name> [after <stage>] [before <stage>] - Move stage"
|
|
)
|
|
self.state.output_buffer.append(" clear - Clear output buffer")
|
|
self.state.output_buffer.append(" quit - Show exit message")
|
|
|
|
def _cmd_status(self, ctx: EffectContext | None):
|
|
"""Show pipeline status."""
|
|
if ctx:
|
|
metrics = self._get_metrics(ctx)
|
|
self.state.output_buffer.append(f"FPS: {metrics['fps']:.1f}")
|
|
self.state.output_buffer.append(
|
|
f"Frame time: {metrics['frame_time']:.1f}ms"
|
|
)
|
|
|
|
self.state.output_buffer.append(
|
|
f"Output lines: {len(self.state.output_buffer)}"
|
|
)
|
|
self.state.output_buffer.append(
|
|
f"History: {len(self.state.command_history)} commands"
|
|
)
|
|
|
|
def _cmd_effects(self, ctx: EffectContext | None):
|
|
"""List all effects."""
|
|
if ctx:
|
|
# Try to get effect list from context
|
|
effects = ctx.get_state("pipeline_order")
|
|
if effects:
|
|
self.state.output_buffer.append("Pipeline effects:")
|
|
for i, name in enumerate(effects):
|
|
self.state.output_buffer.append(f" {i + 1}. {name}")
|
|
else:
|
|
self.state.output_buffer.append("No pipeline information available")
|
|
else:
|
|
self.state.output_buffer.append("No context available")
|
|
|
|
def _cmd_available(self, ctx: EffectContext | None):
|
|
"""List all available effect types."""
|
|
try:
|
|
from engine.effects import get_registry
|
|
from engine.effects.plugins import discover_plugins
|
|
|
|
# Discover plugins if not already done
|
|
discover_plugins()
|
|
registry = get_registry()
|
|
all_effects = registry.list_all()
|
|
|
|
if all_effects:
|
|
self.state.output_buffer.append("Available effect types:")
|
|
for name in sorted(all_effects.keys()):
|
|
self.state.output_buffer.append(f" - {name}")
|
|
else:
|
|
self.state.output_buffer.append("No effects registered")
|
|
except Exception as e:
|
|
self.state.output_buffer.append(f"Error listing effects: {e}")
|
|
|
|
def _cmd_effect(self, args: list[str], ctx: EffectContext | None):
|
|
"""Toggle effect on/off."""
|
|
if len(args) < 2:
|
|
self.state.output_buffer.append("Usage: effect <name> <on|off>")
|
|
return
|
|
|
|
effect_name = args[0]
|
|
state = args[1].lower()
|
|
|
|
if state not in ("on", "off"):
|
|
self.state.output_buffer.append("State must be 'on' or 'off'")
|
|
return
|
|
|
|
# Emit event to toggle effect
|
|
enabled = state == "on"
|
|
self.state.output_buffer.append(f"Effect '{effect_name}' set to {state}")
|
|
|
|
# Store command for external handling
|
|
self._pending_command = {
|
|
"action": "enable_stage" if enabled else "disable_stage",
|
|
"stage": effect_name,
|
|
}
|
|
|
|
def _cmd_param(self, args: list[str], ctx: EffectContext | None):
|
|
"""Set effect parameter."""
|
|
if len(args) < 3:
|
|
self.state.output_buffer.append("Usage: param <effect> <param> <value>")
|
|
return
|
|
|
|
effect_name = args[0]
|
|
param_name = args[1]
|
|
try:
|
|
param_value = float(args[2])
|
|
except ValueError:
|
|
self.state.output_buffer.append("Value must be a number")
|
|
return
|
|
|
|
self.state.output_buffer.append(
|
|
f"Setting {effect_name}.{param_name} = {param_value}"
|
|
)
|
|
|
|
# Store command for external handling
|
|
self._pending_command = {
|
|
"action": "adjust_param",
|
|
"stage": effect_name,
|
|
"param": param_name,
|
|
"delta": param_value, # Note: This sets absolute value, need adjustment
|
|
}
|
|
|
|
def _cmd_pipeline(self, ctx: EffectContext | None):
|
|
"""Show current pipeline order."""
|
|
if ctx:
|
|
pipeline_order = ctx.get_state("pipeline_order")
|
|
if pipeline_order:
|
|
self.state.output_buffer.append(
|
|
"Pipeline: " + " → ".join(pipeline_order)
|
|
)
|
|
else:
|
|
self.state.output_buffer.append("Pipeline information not available")
|
|
else:
|
|
self.state.output_buffer.append("No context available")
|
|
|
|
def _cmd_add_stage(self, args: list[str]):
|
|
"""Add a new stage to the pipeline."""
|
|
if len(args) < 2:
|
|
self.state.output_buffer.append("Usage: add_stage <name> <type>")
|
|
return
|
|
|
|
stage_name = args[0]
|
|
stage_type = args[1]
|
|
self.state.output_buffer.append(
|
|
f"Adding stage '{stage_name}' of type '{stage_type}'"
|
|
)
|
|
|
|
# Store command for external handling
|
|
self._pending_command = {
|
|
"action": "add_stage",
|
|
"stage": stage_name,
|
|
"stage_type": stage_type,
|
|
}
|
|
|
|
def _cmd_remove_stage(self, args: list[str]):
|
|
"""Remove a stage from the pipeline."""
|
|
if len(args) < 1:
|
|
self.state.output_buffer.append("Usage: remove_stage <name>")
|
|
return
|
|
|
|
stage_name = args[0]
|
|
self.state.output_buffer.append(f"Removing stage '{stage_name}'")
|
|
|
|
# Store command for external handling
|
|
self._pending_command = {
|
|
"action": "remove_stage",
|
|
"stage": stage_name,
|
|
}
|
|
|
|
def _cmd_swap_stages(self, args: list[str]):
|
|
"""Swap two stages in the pipeline."""
|
|
if len(args) < 2:
|
|
self.state.output_buffer.append("Usage: swap_stages <name1> <name2>")
|
|
return
|
|
|
|
stage1 = args[0]
|
|
stage2 = args[1]
|
|
self.state.output_buffer.append(f"Swapping stages '{stage1}' and '{stage2}'")
|
|
|
|
# Store command for external handling
|
|
self._pending_command = {
|
|
"action": "swap_stages",
|
|
"stage1": stage1,
|
|
"stage2": stage2,
|
|
}
|
|
|
|
def _cmd_move_stage(self, args: list[str]):
|
|
"""Move a stage in the pipeline."""
|
|
if len(args) < 1:
|
|
self.state.output_buffer.append(
|
|
"Usage: move_stage <name> [after <stage>] [before <stage>]"
|
|
)
|
|
return
|
|
|
|
stage_name = args[0]
|
|
after = None
|
|
before = None
|
|
|
|
# Parse optional after/before arguments
|
|
i = 1
|
|
while i < len(args):
|
|
if args[i] == "after" and i + 1 < len(args):
|
|
after = args[i + 1]
|
|
i += 2
|
|
elif args[i] == "before" and i + 1 < len(args):
|
|
before = args[i + 1]
|
|
i += 2
|
|
else:
|
|
i += 1
|
|
|
|
if after:
|
|
self.state.output_buffer.append(
|
|
f"Moving stage '{stage_name}' after '{after}'"
|
|
)
|
|
elif before:
|
|
self.state.output_buffer.append(
|
|
f"Moving stage '{stage_name}' before '{before}'"
|
|
)
|
|
else:
|
|
self.state.output_buffer.append(
|
|
"Usage: move_stage <name> [after <stage>] [before <stage>]"
|
|
)
|
|
return
|
|
|
|
# Store command for external handling
|
|
self._pending_command = {
|
|
"action": "move_stage",
|
|
"stage": stage_name,
|
|
"after": after,
|
|
"before": before,
|
|
}
|
|
|
|
def get_pending_command(self) -> dict | None:
|
|
"""Get and clear pending command for external handling."""
|
|
cmd = getattr(self, "_pending_command", None)
|
|
if cmd:
|
|
self._pending_command = None
|
|
return cmd
|
|
|
|
def navigate_history(self, direction: int) -> None:
|
|
"""Navigate command history (up/down)."""
|
|
if not self.state.command_history:
|
|
return
|
|
|
|
if direction > 0: # Down
|
|
self.state.history_index = min(
|
|
len(self.state.command_history), self.state.history_index + 1
|
|
)
|
|
else: # Up
|
|
self.state.history_index = max(0, self.state.history_index - 1)
|
|
|
|
if self.state.history_index < len(self.state.command_history):
|
|
self.state.current_command = self.state.command_history[
|
|
self.state.history_index
|
|
]
|
|
else:
|
|
self.state.current_command = ""
|
|
|
|
def append_to_command(self, char: str) -> None:
|
|
"""Append character to current command."""
|
|
if len(char) == 1: # Single character
|
|
self.state.current_command += char
|
|
|
|
def backspace(self) -> None:
|
|
"""Remove last character from command."""
|
|
self.state.current_command = self.state.current_command[:-1]
|
|
|
|
def clear_command(self) -> None:
|
|
"""Clear current command."""
|
|
self.state.current_command = ""
|
|
|
|
def configure(self, config: EffectConfig) -> None:
|
|
"""Configure the effect."""
|
|
self.config = config
|