From 6646ed78b33ea42e953422401356a973549e1335 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sat, 21 Mar 2026 21:19:30 -0700 Subject: [PATCH] Add REPL effect detection and input handling to pipeline runner - Detect REPL effect in pipeline and enable interactive mode - Enable raw terminal mode for REPL input capture - Add keyboard input loop for REPL commands (return, up/down arrows, backspace) - Process commands and handle pipeline mutations from REPL - Fix lint issues in graph and REPL modules (type annotations, imports) --- engine/app/pipeline_runner.py | 46 +++++++++++++++++++++++++++++ engine/display/backends/terminal.py | 2 +- engine/effects/plugins/repl.py | 22 +++++--------- engine/pipeline/graph.py | 25 ++++++++-------- engine/pipeline/graph_adapter.py | 27 ++++++++--------- engine/pipeline/graph_toml.py | 7 +++-- engine/pipeline/hybrid_config.py | 12 ++++---- 7 files changed, 89 insertions(+), 52 deletions(-) diff --git a/engine/app/pipeline_runner.py b/engine/app/pipeline_runner.py index 573c8b6..cf512ed 100644 --- a/engine/app/pipeline_runner.py +++ b/engine/app/pipeline_runner.py @@ -433,6 +433,16 @@ def run_pipeline_mode(preset_name: str = "demo", graph_config: str | None = None ui_panel = None render_ui_panel_in_terminal = False + # Check for REPL effect in pipeline + repl_effect = None + for stage in pipeline.stages.values(): + if isinstance(stage, EffectPluginStage) and stage._effect.name == "repl": + repl_effect = stage._effect + print( + " \033[38;5;46mREPL effect detected - Interactive mode enabled\033[0m" + ) + break + if need_ui_controller: from engine.display import render_ui_panel @@ -448,6 +458,10 @@ def run_pipeline_mode(preset_name: str = "demo", graph_config: str | None = None if hasattr(display, "set_raw_mode"): display.set_raw_mode(True) + # Enable raw mode for REPL if present and not already enabled + elif repl_effect and hasattr(display, "set_raw_mode"): + display.set_raw_mode(True) + # Register effect plugin stages from pipeline for UI control for stage in pipeline.stages.values(): if isinstance(stage, EffectPluginStage): @@ -966,6 +980,38 @@ def run_pipeline_mode(preset_name: str = "demo", graph_config: str | None = None else: display.show(result.data, border=show_border) + # --- REPL Input Handling --- + if repl_effect and hasattr(display, "get_input_keys"): + # Get keyboard input (non-blocking) + keys = display.get_input_keys(timeout=0.0) + + for key in keys: + if key == "return": + # Get command string before processing + cmd_str = repl_effect.state.current_command + if cmd_str: + repl_effect.process_command(cmd_str, ctx) + # Check for pending pipeline mutations + pending = repl_effect.get_pending_command() + if pending: + _handle_pipeline_mutation(pipeline, pending) + # Broadcast state update if WebSocket is active + if web_control_active and isinstance( + display, WebSocketDisplay + ): + state = display._get_state_snapshot() + if state: + display.broadcast_state(state) + elif key == "up": + repl_effect.navigate_history(-1) + elif key == "down": + repl_effect.navigate_history(1) + elif key == "backspace": + repl_effect.backspace() + elif len(key) == 1: + repl_effect.append_to_command(key) + # --- End REPL Input Handling --- + if hasattr(display, "is_quit_requested") and display.is_quit_requested(): if hasattr(display, "clear_quit_request"): display.clear_quit_request() diff --git a/engine/display/backends/terminal.py b/engine/display/backends/terminal.py index 4bd575e..b8cb152 100644 --- a/engine/display/backends/terminal.py +++ b/engine/display/backends/terminal.py @@ -247,7 +247,7 @@ class TerminalDisplay: keys.append("escape") elif char.isprintable(): keys.append(char) - except (OSError, IOError): + except OSError: pass return keys diff --git a/engine/effects/plugins/repl.py b/engine/effects/plugins/repl.py index 99dad38..f187746 100644 --- a/engine/effects/plugins/repl.py +++ b/engine/effects/plugins/repl.py @@ -30,7 +30,6 @@ Keyboard: """ from dataclasses import dataclass, field -from typing import Any, Optional from engine.effects.types import ( EffectConfig, @@ -69,7 +68,7 @@ class ReplEffect(EffectPlugin): def __init__(self): super().__init__() self.state = REPLState() - self._last_metrics: Optional[dict] = None + self._last_metrics: dict | None = None def process_partial( self, buf: list[str], ctx: EffectContext, partial: PartialUpdate @@ -82,8 +81,6 @@ class ReplEffect(EffectPlugin): def process(self, buf: list[str], ctx: EffectContext) -> list[str]: """Render buffer with REPL overlay.""" - result = list(buf) - # Get display dimensions from context height = ctx.terminal_height if hasattr(ctx, "terminal_height") else len(buf) width = ctx.terminal_width if hasattr(ctx, "terminal_width") else 80 @@ -94,7 +91,6 @@ class ReplEffect(EffectPlugin): # Reserve space for REPL at bottom # HUD uses top 3 lines if enabled - hud_lines = 3 if show_hud else 0 content_height = max(1, height - repl_height) # Build output @@ -220,9 +216,7 @@ class ReplEffect(EffectPlugin): return {"fps": 0.0, "frame_time": 0.0} - def process_command( - self, command: str, ctx: Optional[EffectContext] = None - ) -> None: + def process_command(self, command: str, ctx: EffectContext | None = None) -> None: """Process a REPL command.""" cmd = command.strip() if not cmd: @@ -283,7 +277,7 @@ class ReplEffect(EffectPlugin): self.state.output_buffer.append(" clear - Clear output buffer") self.state.output_buffer.append(" quit - Show exit message") - def _cmd_status(self, ctx: Optional[EffectContext]): + def _cmd_status(self, ctx: EffectContext | None): """Show pipeline status.""" if ctx: metrics = self._get_metrics(ctx) @@ -299,7 +293,7 @@ class ReplEffect(EffectPlugin): f"History: {len(self.state.command_history)} commands" ) - def _cmd_effects(self, ctx: Optional[EffectContext]): + def _cmd_effects(self, ctx: EffectContext | None): """List all effects.""" if ctx: # Try to get effect list from context @@ -313,7 +307,7 @@ class ReplEffect(EffectPlugin): else: self.state.output_buffer.append("No context available") - def _cmd_effect(self, args: list[str], ctx: Optional[EffectContext]): + def _cmd_effect(self, args: list[str], ctx: EffectContext | None): """Toggle effect on/off.""" if len(args) < 2: self.state.output_buffer.append("Usage: effect ") @@ -336,7 +330,7 @@ class ReplEffect(EffectPlugin): "stage": effect_name, } - def _cmd_param(self, args: list[str], ctx: Optional[EffectContext]): + def _cmd_param(self, args: list[str], ctx: EffectContext | None): """Set effect parameter.""" if len(args) < 3: self.state.output_buffer.append("Usage: param ") @@ -362,7 +356,7 @@ class ReplEffect(EffectPlugin): "delta": param_value, # Note: This sets absolute value, need adjustment } - def _cmd_pipeline(self, ctx: Optional[EffectContext]): + def _cmd_pipeline(self, ctx: EffectContext | None): """Show current pipeline order.""" if ctx: pipeline_order = ctx.get_state("pipeline_order") @@ -375,7 +369,7 @@ class ReplEffect(EffectPlugin): else: self.state.output_buffer.append("No context available") - def get_pending_command(self) -> Optional[dict]: + def get_pending_command(self) -> dict | None: """Get and clear pending command for external handling.""" cmd = getattr(self, "_pending_command", None) if cmd: diff --git a/engine/pipeline/graph.py b/engine/pipeline/graph.py index 507ebf2..1361cdf 100644 --- a/engine/pipeline/graph.py +++ b/engine/pipeline/graph.py @@ -22,8 +22,8 @@ Usage: """ from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional, Union from enum import Enum +from typing import Any class NodeType(Enum): @@ -45,7 +45,7 @@ class Node: name: str type: NodeType - config: Dict[str, Any] = field(default_factory=dict) + config: dict[str, Any] = field(default_factory=dict) enabled: bool = True optional: bool = False @@ -59,17 +59,17 @@ class Connection: source: str target: str - data_type: Optional[str] = None # Optional data type constraint + data_type: str | None = None # Optional data type constraint @dataclass class Graph: """Pipeline graph representation.""" - nodes: Dict[str, Node] = field(default_factory=dict) - connections: List[Connection] = field(default_factory=list) + nodes: dict[str, Node] = field(default_factory=dict) + connections: list[Connection] = field(default_factory=list) - def node(self, name: str, node_type: Union[NodeType, str], **config) -> "Graph": + def node(self, name: str, node_type: NodeType | str, **config) -> "Graph": """Add a node to the graph.""" if isinstance(node_type, str): # Try to parse as NodeType @@ -82,7 +82,7 @@ class Graph: return self def connect( - self, source: str, target: str, data_type: Optional[str] = None + self, source: str, target: str, data_type: str | None = None ) -> "Graph": """Add a connection between nodes.""" if source not in self.nodes: @@ -99,7 +99,7 @@ class Graph: self.connect(names[i], names[i + 1]) return self - def from_dict(self, data: Dict[str, Any]) -> "Graph": + def from_dict(self, data: dict[str, Any]) -> "Graph": """Load graph from dictionary (TOML-compatible).""" # Parse nodes nodes_data = data.get("nodes", {}) @@ -127,7 +127,7 @@ class Graph: return self - def to_dict(self) -> Dict[str, Any]: + def to_dict(self) -> dict[str, Any]: """Convert graph to dictionary.""" return { "nodes": { @@ -140,7 +140,7 @@ class Graph: ], } - def validate(self) -> List[str]: + def validate(self) -> list[str]: """Validate graph structure and return list of errors.""" errors = [] @@ -166,9 +166,8 @@ class Graph: temp.add(node_name) for conn in self.connections: - if conn.source == node_name: - if has_cycle(conn.target): - return True + if conn.source == node_name and has_cycle(conn.target): + return True temp.remove(node_name) visited.add(node_name) return False diff --git a/engine/pipeline/graph_adapter.py b/engine/pipeline/graph_adapter.py index cb614ab..5bb21ad 100644 --- a/engine/pipeline/graph_adapter.py +++ b/engine/pipeline/graph_adapter.py @@ -4,12 +4,12 @@ This module bridges the new graph-based abstraction with the existing Stage-based pipeline system for backward compatibility. """ -from typing import Dict, Any, List, Optional +from typing import Any, Optional -from engine.pipeline.graph import Graph, NodeType -from engine.pipeline.controller import Pipeline, PipelineConfig -from engine.pipeline.core import PipelineContext -from engine.pipeline.params import PipelineParams +from engine.camera import Camera +from engine.data_sources.sources import EmptyDataSource, HeadlinesDataSource +from engine.display import DisplayRegistry +from engine.effects import get_registry from engine.pipeline.adapters import ( CameraStage, DataSourceStage, @@ -18,15 +18,12 @@ from engine.pipeline.adapters import ( FontStage, MessageOverlayStage, PositionStage, - ViewportFilterStage, - create_stage_from_display, - create_stage_from_effect, ) from engine.pipeline.adapters.positioning import PositioningMode -from engine.display import DisplayRegistry -from engine.effects import get_registry -from engine.data_sources.sources import EmptyDataSource, HeadlinesDataSource -from engine.camera import Camera +from engine.pipeline.controller import Pipeline, PipelineConfig +from engine.pipeline.core import PipelineContext +from engine.pipeline.graph import Graph, NodeType +from engine.pipeline.params import PipelineParams class GraphAdapter: @@ -34,8 +31,8 @@ class GraphAdapter: def __init__(self, graph: Graph): self.graph = graph - self.pipeline: Optional[Pipeline] = None - self.context: Optional[PipelineContext] = None + self.pipeline: Pipeline | None = None + self.context: PipelineContext | None = None def build_pipeline( self, viewport_width: int = 80, viewport_height: int = 24 @@ -154,7 +151,7 @@ def graph_to_pipeline( def dict_to_pipeline( - data: Dict[str, Any], viewport_width: int = 80, viewport_height: int = 24 + data: dict[str, Any], viewport_width: int = 80, viewport_height: int = 24 ) -> Pipeline: """Convert a dictionary to a Pipeline.""" graph = Graph().from_dict(data) diff --git a/engine/pipeline/graph_toml.py b/engine/pipeline/graph_toml.py index aa88726..a036b1d 100644 --- a/engine/pipeline/graph_toml.py +++ b/engine/pipeline/graph_toml.py @@ -1,8 +1,9 @@ """TOML-based graph configuration loader.""" -import tomllib from pathlib import Path -from typing import Any, Dict +from typing import Any + +import tomllib from engine.pipeline.graph import Graph, NodeType from engine.pipeline.graph_adapter import graph_to_pipeline @@ -23,7 +24,7 @@ def load_graph_from_toml(toml_path: str | Path) -> Graph: return graph_from_dict(data) -def graph_from_dict(data: Dict[str, Any]) -> Graph: +def graph_from_dict(data: dict[str, Any]) -> Graph: """Create a graph from a dictionary (TOML-compatible structure). Args: diff --git a/engine/pipeline/hybrid_config.py b/engine/pipeline/hybrid_config.py index e325558..da36cbf 100644 --- a/engine/pipeline/hybrid_config.py +++ b/engine/pipeline/hybrid_config.py @@ -18,8 +18,8 @@ providing the same flexibility. """ from dataclasses import dataclass, field -from typing import Any, Dict, List, Optional from pathlib import Path +from typing import Any from engine.pipeline.graph import Graph, NodeType from engine.pipeline.graph_adapter import graph_to_pipeline @@ -32,7 +32,7 @@ class EffectConfig: name: str intensity: float = 1.0 enabled: bool = True - params: Dict[str, Any] = field(default_factory=dict) + params: dict[str, Any] = field(default_factory=dict) @dataclass @@ -70,9 +70,9 @@ class PipelineConfig: """ source: str = "headlines" - camera: Optional[CameraConfig] = None - effects: List[EffectConfig] = field(default_factory=list) - display: Optional[DisplayConfig] = None + camera: CameraConfig | None = None + effects: list[EffectConfig] = field(default_factory=list) + display: DisplayConfig | None = None viewport_width: int = 80 viewport_height: int = 24 @@ -208,7 +208,7 @@ def load_hybrid_config(toml_path: str | Path) -> PipelineConfig: return parse_hybrid_config(data) -def parse_hybrid_config(data: Dict[str, Any]) -> PipelineConfig: +def parse_hybrid_config(data: dict[str, Any]) -> PipelineConfig: """Parse hybrid configuration from dictionary. Expected format: