""" Pipeline controller - DAG-based pipeline execution. The Pipeline class orchestrates stages in dependency order, handling the complete render cycle from source to display. """ import time from dataclasses import dataclass, field from typing import Any from engine.pipeline.core import PipelineContext, Stage, StageError, StageResult from engine.pipeline.params import PipelineParams from engine.pipeline.registry import StageRegistry @dataclass class PipelineConfig: """Configuration for a pipeline instance.""" source: str = "headlines" display: str = "terminal" camera: str = "vertical" effects: list[str] = field(default_factory=list) enable_metrics: bool = True @dataclass class StageMetrics: """Metrics for a single stage execution.""" name: str duration_ms: float chars_in: int = 0 chars_out: int = 0 @dataclass class FrameMetrics: """Metrics for a single frame through the pipeline.""" frame_number: int total_ms: float stages: list[StageMetrics] = field(default_factory=list) class Pipeline: """Main pipeline orchestrator. Manages the execution of all stages in dependency order, handling initialization, processing, and cleanup. Supports dynamic mutation during runtime via the mutation API. """ def __init__( self, config: PipelineConfig | None = None, context: PipelineContext | None = None, ): self.config = config or PipelineConfig() self.context = context or PipelineContext() self._stages: dict[str, Stage] = {} self._execution_order: list[str] = [] self._initialized = False self._capability_map: dict[str, list[str]] = {} self._metrics_enabled = self.config.enable_metrics self._frame_metrics: list[FrameMetrics] = [] self._max_metrics_frames = 60 # Minimum capabilities required for pipeline to function # NOTE: Research later - allow presets to override these defaults self._minimum_capabilities: set[str] = { "source", "render.output", "display.output", "camera.state", # Always required for viewport filtering } self._current_frame_number = 0 def add_stage(self, name: str, stage: Stage, initialize: bool = True) -> "Pipeline": """Add a stage to the pipeline. Args: name: Unique name for the stage stage: Stage instance to add initialize: If True, initialize the stage immediately Returns: Self for method chaining """ self._stages[name] = stage if self._initialized and initialize: stage.init(self.context) return self def remove_stage(self, name: str, cleanup: bool = True) -> Stage | None: """Remove a stage from the pipeline. Args: name: Name of the stage to remove cleanup: If True, call cleanup() on the removed stage Returns: The removed stage, or None if not found """ stage = self._stages.pop(name, None) if stage and cleanup: try: stage.cleanup() except Exception: pass # Rebuild execution order and capability map if stage was removed if stage and self._initialized: self._rebuild() return stage def remove_stage_safe(self, name: str, cleanup: bool = True) -> Stage | None: """Remove a stage and rebuild execution order safely. This is an alias for remove_stage() that explicitly rebuilds the execution order after removal. Args: name: Name of the stage to remove cleanup: If True, call cleanup() on the removed stage Returns: The removed stage, or None if not found """ return self.remove_stage(name, cleanup) def cleanup_stage(self, name: str) -> None: """Clean up a specific stage without removing it. This is useful for stages that need to release resources (like display connections) without being removed from the pipeline. Args: name: Name of the stage to clean up """ stage = self._stages.get(name) if stage: try: stage.cleanup() except Exception: pass def can_hot_swap(self, name: str) -> bool: """Check if a stage can be safely hot-swapped. A stage can be hot-swapped if: 1. It exists in the pipeline 2. It's not required for basic pipeline function 3. It doesn't have strict dependencies that can't be re-resolved Args: name: Name of the stage to check Returns: True if the stage can be hot-swapped, False otherwise """ # Check if stage exists if name not in self._stages: return False # Check if stage is a minimum capability provider stage = self._stages[name] stage_caps = stage.capabilities if hasattr(stage, "capabilities") else set() minimum_caps = self._minimum_capabilities # If stage provides a minimum capability, it's more critical # but still potentially swappable if another stage provides the same capability for cap in stage_caps: if cap in minimum_caps: # Check if another stage provides this capability providers = self._capability_map.get(cap, []) # This stage is the sole provider - might be critical # but still allow hot-swap if pipeline is not initialized if len(providers) <= 1 and self._initialized: return False return True def replace_stage( self, name: str, new_stage: Stage, preserve_state: bool = True ) -> Stage | None: """Replace a stage in the pipeline with a new one. Args: name: Name of the stage to replace new_stage: New stage instance preserve_state: If True, copy relevant state from old stage Returns: The old stage, or None if not found """ old_stage = self._stages.get(name) if not old_stage: return None if preserve_state: self._copy_stage_state(old_stage, new_stage) old_stage.cleanup() self._stages[name] = new_stage new_stage.init(self.context) if self._initialized: self._rebuild() return old_stage def swap_stages(self, name1: str, name2: str) -> bool: """Swap two stages in the pipeline. Args: name1: First stage name name2: Second stage name Returns: True if successful, False if either stage not found """ stage1 = self._stages.get(name1) stage2 = self._stages.get(name2) if not stage1 or not stage2: return False self._stages[name1] = stage2 self._stages[name2] = stage1 if self._initialized: self._rebuild() return True def move_stage( self, name: str, after: str | None = None, before: str | None = None ) -> bool: """Move a stage's position in execution order. Args: name: Stage to move after: Place this stage after this stage name before: Place this stage before this stage name Returns: True if successful, False if stage not found """ if name not in self._stages: return False if not self._initialized: return False current_order = list(self._execution_order) if name not in current_order: return False current_order.remove(name) if after and after in current_order: idx = current_order.index(after) + 1 current_order.insert(idx, name) elif before and before in current_order: idx = current_order.index(before) current_order.insert(idx, name) else: current_order.append(name) self._execution_order = current_order return True def _copy_stage_state(self, old_stage: Stage, new_stage: Stage) -> None: """Copy relevant state from old stage to new stage during replacement. Args: old_stage: The old stage being replaced new_stage: The new stage """ if hasattr(old_stage, "_enabled"): new_stage._enabled = old_stage._enabled # Preserve camera state if hasattr(old_stage, "save_state") and hasattr(new_stage, "restore_state"): try: state = old_stage.save_state() new_stage.restore_state(state) except Exception: # If state preservation fails, continue without it pass def _rebuild(self) -> None: """Rebuild execution order after mutation or auto-injection.""" was_initialized = self._initialized self._initialized = False self._capability_map = self._build_capability_map() self._execution_order = self._resolve_dependencies() # Note: We intentionally DO NOT validate dependencies here. # Mutation operations (remove/swap/move) might leave the pipeline # temporarily invalid (e.g., removing a stage that others depend on). # Validation is performed explicitly in build() or can be checked # manually via validate_minimum_capabilities(). # try: # self._validate_dependencies() # self._validate_types() # except StageError: # pass # Restore initialized state self._initialized = was_initialized def get_stage(self, name: str) -> Stage | None: """Get a stage by name.""" return self._stages.get(name) def enable_stage(self, name: str) -> bool: """Enable a stage in the pipeline. Args: name: Stage name to enable Returns: True if successful, False if stage not found """ stage = self._stages.get(name) if stage: stage.set_enabled(True) return True return False def disable_stage(self, name: str) -> bool: """Disable a stage in the pipeline. Args: name: Stage name to disable Returns: True if successful, False if stage not found """ stage = self._stages.get(name) if stage: stage.set_enabled(False) return True return False def get_stage_info(self, name: str) -> dict | None: """Get detailed information about a stage. Args: name: Stage name Returns: Dictionary with stage information, or None if not found """ stage = self._stages.get(name) if not stage: return None return { "name": name, "category": stage.category, "stage_type": stage.stage_type, "enabled": stage.is_enabled(), "optional": stage.optional, "capabilities": list(stage.capabilities), "dependencies": list(stage.dependencies), "inlet_types": [dt.name for dt in stage.inlet_types], "outlet_types": [dt.name for dt in stage.outlet_types], "render_order": stage.render_order, "is_overlay": stage.is_overlay, } def get_pipeline_info(self) -> dict: """Get comprehensive information about the pipeline. Returns: Dictionary with pipeline state """ return { "stages": {name: self.get_stage_info(name) for name in self._stages}, "execution_order": self._execution_order.copy(), "initialized": self._initialized, "stage_count": len(self._stages), } @property def minimum_capabilities(self) -> set[str]: """Get minimum capabilities required for pipeline to function.""" return self._minimum_capabilities @minimum_capabilities.setter def minimum_capabilities(self, value: set[str]): """Set minimum required capabilities. NOTE: Research later - allow presets to override these defaults """ self._minimum_capabilities = value def validate_minimum_capabilities(self) -> tuple[bool, list[str]]: """Validate that all minimum capabilities are provided. Returns: Tuple of (is_valid, missing_capabilities) """ missing = [] for cap in self._minimum_capabilities: if not self._find_stage_with_capability(cap): missing.append(cap) return len(missing) == 0, missing def ensure_minimum_capabilities(self) -> list[str]: """Automatically inject MVP stages if minimum capabilities are missing. Auto-injection is always on, but defaults are trivial to override. Returns: List of stages that were injected """ from engine.camera import Camera from engine.data_sources.sources import EmptyDataSource from engine.display import DisplayRegistry from engine.pipeline.adapters import ( CameraClockStage, CameraStage, DataSourceStage, DisplayStage, SourceItemsToBufferStage, ) injected = [] # Check for source capability if ( not self._find_stage_with_capability("source") and "source" not in self._stages ): empty_source = EmptyDataSource(width=80, height=24) self.add_stage("source", DataSourceStage(empty_source, name="empty")) injected.append("source") # Check for camera.state capability (must be BEFORE render to accept SOURCE_ITEMS) camera = None if not self._find_stage_with_capability("camera.state"): # Inject static camera (trivial, no movement) camera = Camera.scroll(speed=0.0) camera.set_canvas_size(200, 200) if "camera_update" not in self._stages: self.add_stage( "camera_update", CameraClockStage(camera, name="camera-clock") ) injected.append("camera_update") # Check for render capability if ( not self._find_stage_with_capability("render.output") and "render" not in self._stages ): self.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) injected.append("render") # Check for camera stage (must be AFTER render to accept TEXT_BUFFER) if camera and "camera" not in self._stages: self.add_stage("camera", CameraStage(camera, name="static")) injected.append("camera") # Check for display capability if ( not self._find_stage_with_capability("display.output") and "display" not in self._stages ): display = DisplayRegistry.create("terminal") if display: self.add_stage("display", DisplayStage(display, name="terminal")) injected.append("display") # Rebuild pipeline if stages were injected if injected: self._rebuild() return injected def build(self, auto_inject: bool = True) -> "Pipeline": """Build execution order based on dependencies. Args: auto_inject: If True, automatically inject MVP stages for missing capabilities """ self._capability_map = self._build_capability_map() self._execution_order = self._resolve_dependencies() # Validate minimum capabilities and auto-inject if needed if auto_inject: is_valid, missing = self.validate_minimum_capabilities() if not is_valid: injected = self.ensure_minimum_capabilities() if injected: print( f" \033[38;5;226mAuto-injected stages for missing capabilities: {injected}\033[0m" ) # Rebuild after auto-injection self._capability_map = self._build_capability_map() self._execution_order = self._resolve_dependencies() # Re-validate after injection attempt (whether anything was injected or not) # If injection didn't run (injected empty), we still need to check if we're valid # If injection ran but failed to fix (injected empty), we need to check is_valid, missing = self.validate_minimum_capabilities() if not is_valid: raise StageError( "build", f"Auto-injection failed to provide minimum capabilities: {missing}", ) self._validate_dependencies() self._validate_types() self._initialized = True return self def _build_capability_map(self) -> dict[str, list[str]]: """Build a map of capabilities to stage names. Returns: Dict mapping capability -> list of stage names that provide it """ capability_map: dict[str, list[str]] = {} for name, stage in self._stages.items(): for cap in stage.capabilities: if cap not in capability_map: capability_map[cap] = [] capability_map[cap].append(name) return capability_map def _find_stage_with_capability(self, capability: str) -> str | None: """Find a stage that provides the given capability. Supports wildcard matching: - "source" matches "source.headlines" (prefix match) - "source.*" matches "source.headlines" - "source.headlines" matches exactly Args: capability: The capability to find Returns: Stage name that provides the capability, or None if not found """ # Exact match if capability in self._capability_map: return self._capability_map[capability][0] # Prefix match (e.g., "source" -> "source.headlines") for cap, stages in self._capability_map.items(): if cap.startswith(capability + "."): return stages[0] # Wildcard match (e.g., "source.*" -> "source.headlines") if ".*" in capability: prefix = capability[:-2] # Remove ".*" for cap in self._capability_map: if cap.startswith(prefix + "."): return self._capability_map[cap][0] return None def _resolve_dependencies(self) -> list[str]: """Resolve stage execution order using topological sort with capability matching.""" ordered = [] visited = set() temp_mark = set() def visit(name: str) -> None: if name in temp_mark: raise StageError(name, "Circular dependency detected") if name in visited: return temp_mark.add(name) stage = self._stages.get(name) if stage: # Handle capability-based dependencies for dep in stage.dependencies: # Find a stage that provides this capability dep_stage_name = self._find_stage_with_capability(dep) if dep_stage_name: visit(dep_stage_name) # Handle direct stage dependencies for stage_dep in stage.stage_dependencies: if stage_dep in self._stages: visit(stage_dep) else: # Stage dependency not found - this is an error raise StageError( name, f"Missing stage dependency: '{stage_dep}' not found in pipeline", ) temp_mark.remove(name) visited.add(name) ordered.append(name) for name in self._stages: if name not in visited: visit(name) return ordered def _validate_dependencies(self) -> None: """Validate that all dependencies can be satisfied. Raises StageError if any dependency cannot be resolved. """ missing: list[tuple[str, str]] = [] # (stage_name, capability) for name, stage in self._stages.items(): for dep in stage.dependencies: if not self._find_stage_with_capability(dep): missing.append((name, dep)) if missing: msgs = [f" - {stage} needs {cap}" for stage, cap in missing] raise StageError( "validation", "Missing capabilities:\n" + "\n".join(msgs), ) def _validate_types(self) -> None: """Validate inlet/outlet types between connected stages. PureData-style type validation. Each stage declares its inlet_types (what it accepts) and outlet_types (what it produces). This method validates that connected stages have compatible types. Raises StageError if type mismatch is detected. """ from engine.pipeline.core import DataType errors: list[str] = [] for i, name in enumerate(self._execution_order): stage = self._stages.get(name) if not stage: continue inlet_types = stage.inlet_types # Check against previous stage's outlet types if i > 0: prev_name = self._execution_order[i - 1] prev_stage = self._stages.get(prev_name) if prev_stage: prev_outlets = prev_stage.outlet_types # Check if any outlet type is accepted by this inlet compatible = ( DataType.ANY in inlet_types or DataType.ANY in prev_outlets or bool(prev_outlets & inlet_types) ) if not compatible: errors.append( f" - {name} (inlet: {inlet_types}) " f"← {prev_name} (outlet: {prev_outlets})" ) # Check display/sink stages (should accept TEXT_BUFFER) if ( stage.category == "display" and DataType.TEXT_BUFFER not in inlet_types and DataType.ANY not in inlet_types ): errors.append(f" - {name} is display but doesn't accept TEXT_BUFFER") if errors: raise StageError( "type_validation", "Type mismatch in pipeline connections:\n" + "\n".join(errors), ) def initialize(self) -> bool: """Initialize all stages in execution order.""" for name in self._execution_order: stage = self._stages.get(name) if stage and not stage.init(self.context) and not stage.optional: return False return True def execute(self, data: Any | None = None) -> StageResult: """Execute the pipeline with the given input data. Pipeline execution: 1. Execute all non-overlay stages in dependency order 2. Apply overlay stages on top (sorted by render_order) """ import os import sys debug = os.environ.get("MAINLINE_DEBUG_DATAFLOW") == "1" if debug: print( f"[PIPELINE.execute] Starting with data type: {type(data).__name__ if data else 'None'}", file=sys.stderr, flush=True, ) if not self._initialized: self.build() if not self._initialized: return StageResult( success=False, data=None, error="Pipeline not initialized", ) current_data = data frame_start = time.perf_counter() if self._metrics_enabled else 0 stage_timings: list[StageMetrics] = [] # Separate overlay stages and display stage from regular stages overlay_stages: list[tuple[int, Stage]] = [] display_stage: Stage | None = None regular_stages: list[str] = [] for name in self._execution_order: stage = self._stages.get(name) if not stage or not stage.is_enabled(): continue # Check if this is the display stage - execute last if stage.category == "display": display_stage = stage continue # Safely check is_overlay - handle MagicMock and other non-bool returns try: is_overlay = bool(getattr(stage, "is_overlay", False)) except Exception: is_overlay = False if is_overlay: # Safely get render_order try: render_order = int(getattr(stage, "render_order", 0)) except Exception: render_order = 0 overlay_stages.append((render_order, stage)) else: regular_stages.append(name) # Execute regular stages in dependency order (excluding display) for name in regular_stages: stage = self._stages.get(name) if not stage or not stage.is_enabled(): continue stage_start = time.perf_counter() if self._metrics_enabled else 0 try: if debug: data_info = type(current_data).__name__ if isinstance(current_data, list): data_info += f"[{len(current_data)}]" print( f"[STAGE.{name}] Starting with: {data_info}", file=sys.stderr, flush=True, ) current_data = stage.process(current_data, self.context) if debug: data_info = type(current_data).__name__ if isinstance(current_data, list): data_info += f"[{len(current_data)}]" print( f"[STAGE.{name}] Completed, output: {data_info}", file=sys.stderr, flush=True, ) except Exception as e: if debug: print(f"[STAGE.{name}] ERROR: {e}", file=sys.stderr, flush=True) if not stage.optional: return StageResult( success=False, data=current_data, error=str(e), stage_name=name, ) continue if self._metrics_enabled: stage_duration = (time.perf_counter() - stage_start) * 1000 chars_in = len(str(data)) if data else 0 chars_out = len(str(current_data)) if current_data else 0 stage_timings.append( StageMetrics( name=name, duration_ms=stage_duration, chars_in=chars_in, chars_out=chars_out, ) ) # Apply overlay stages (sorted by render_order) overlay_stages.sort(key=lambda x: x[0]) for render_order, stage in overlay_stages: stage_start = time.perf_counter() if self._metrics_enabled else 0 stage_name = f"[overlay]{stage.name}" try: # Overlays receive current_data but don't pass their output to next stage # Instead, their output is composited on top overlay_output = stage.process(current_data, self.context) # For now, we just let the overlay output pass through # In a more sophisticated implementation, we'd composite it if overlay_output is not None: current_data = overlay_output except Exception as e: if not stage.optional: return StageResult( success=False, data=current_data, error=str(e), stage_name=stage_name, ) if self._metrics_enabled: stage_duration = (time.perf_counter() - stage_start) * 1000 chars_in = len(str(data)) if data else 0 chars_out = len(str(current_data)) if current_data else 0 stage_timings.append( StageMetrics( name=stage_name, duration_ms=stage_duration, chars_in=chars_in, chars_out=chars_out, ) ) # Execute display stage LAST (after overlay stages) # This ensures overlay effects like HUD are visible in the final output if display_stage: stage_start = time.perf_counter() if self._metrics_enabled else 0 try: current_data = display_stage.process(current_data, self.context) except Exception as e: if not display_stage.optional: return StageResult( success=False, data=current_data, error=str(e), stage_name=display_stage.name, ) if self._metrics_enabled: stage_duration = (time.perf_counter() - stage_start) * 1000 chars_in = len(str(data)) if data else 0 chars_out = len(str(current_data)) if current_data else 0 stage_timings.append( StageMetrics( name=display_stage.name, duration_ms=stage_duration, chars_in=chars_in, chars_out=chars_out, ) ) if self._metrics_enabled: total_duration = (time.perf_counter() - frame_start) * 1000 self._frame_metrics.append( FrameMetrics( frame_number=self._current_frame_number, total_ms=total_duration, stages=stage_timings, ) ) # Store metrics in context for other stages (like HUD) # This makes metrics a first-class pipeline citizen if self.context: self.context.state["metrics"] = self.get_metrics_summary() if len(self._frame_metrics) > self._max_metrics_frames: self._frame_metrics.pop(0) self._current_frame_number += 1 return StageResult(success=True, data=current_data) def cleanup(self) -> None: """Clean up all stages in reverse order.""" for name in reversed(self._execution_order): stage = self._stages.get(name) if stage: try: stage.cleanup() except Exception: pass self._stages.clear() self._initialized = False @property def stages(self) -> dict[str, Stage]: """Get all stages.""" return self._stages.copy() @property def execution_order(self) -> list[str]: """Get execution order.""" return self._execution_order.copy() def get_stage_names(self) -> list[str]: """Get list of stage names.""" return list(self._stages.keys()) def get_overlay_stages(self) -> list[Stage]: """Get all overlay stages sorted by render_order.""" overlays = [stage for stage in self._stages.values() if stage.is_overlay] overlays.sort(key=lambda s: s.render_order) return overlays def get_stage_type(self, name: str) -> str: """Get the stage_type for a stage.""" stage = self._stages.get(name) return stage.stage_type if stage else "" def get_render_order(self, name: str) -> int: """Get the render_order for a stage.""" stage = self._stages.get(name) return stage.render_order if stage else 0 def get_metrics_summary(self) -> dict: """Get summary of collected metrics.""" if not self._frame_metrics: return {"error": "No metrics collected"} total_times = [f.total_ms for f in self._frame_metrics] avg_total = sum(total_times) / len(total_times) min_total = min(total_times) max_total = max(total_times) stage_stats: dict[str, dict] = {} for frame in self._frame_metrics: for stage in frame.stages: if stage.name not in stage_stats: stage_stats[stage.name] = {"times": [], "total_chars": 0} stage_stats[stage.name]["times"].append(stage.duration_ms) stage_stats[stage.name]["total_chars"] += stage.chars_out for name, stats in stage_stats.items(): times = stats["times"] stats["avg_ms"] = sum(times) / len(times) stats["min_ms"] = min(times) stats["max_ms"] = max(times) del stats["times"] return { "frame_count": len(self._frame_metrics), "pipeline": { "avg_ms": avg_total, "min_ms": min_total, "max_ms": max_total, }, "stages": stage_stats, } def reset_metrics(self) -> None: """Reset collected metrics.""" self._frame_metrics.clear() self._current_frame_number = 0 def get_frame_times(self) -> list[float]: """Get historical frame times for sparklines/charts.""" return [f.total_ms for f in self._frame_metrics] class PipelineRunner: """High-level pipeline runner with animation support.""" def __init__( self, pipeline: Pipeline, params: PipelineParams | None = None, ): self.pipeline = pipeline self.params = params or PipelineParams() self._running = False def start(self) -> bool: """Start the pipeline.""" self._running = True return self.pipeline.initialize() def step(self, input_data: Any | None = None) -> Any: """Execute one pipeline step.""" self.params.frame_number += 1 self.pipeline.context.params = self.params result = self.pipeline.execute(input_data) return result.data if result.success else None def stop(self) -> None: """Stop and clean up the pipeline.""" self._running = False self.pipeline.cleanup() @property def is_running(self) -> bool: """Check if runner is active.""" return self._running def create_pipeline_from_params(params: PipelineParams) -> Pipeline: """Create a pipeline from PipelineParams.""" config = PipelineConfig( source=params.source, display=params.display, camera=params.camera_mode, effects=params.effect_order, ) return Pipeline(config=config) def create_default_pipeline() -> Pipeline: """Create a default pipeline with all standard components.""" from engine.data_sources.sources import HeadlinesDataSource from engine.pipeline.adapters import ( DataSourceStage, SourceItemsToBufferStage, ) pipeline = Pipeline() # Add source stage (wrapped as Stage) source = HeadlinesDataSource() pipeline.add_stage("source", DataSourceStage(source, name="headlines")) # Add render stage to convert items to text buffer pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) # Add display stage display = StageRegistry.create("display", "terminal") if display: pipeline.add_stage("display", display) return pipeline.build()