""" Pipeline controller - DAG-based pipeline execution. The Pipeline class orchestrates stages in dependency order, handling the complete render cycle from source to display. """ from dataclasses import dataclass, field from typing import Any from engine.pipeline.core import PipelineContext, Stage, StageError, StageResult from engine.pipeline.params import PipelineParams from engine.pipeline.registry import StageRegistry @dataclass class PipelineConfig: """Configuration for a pipeline instance.""" source: str = "headlines" display: str = "terminal" camera: str = "vertical" effects: list[str] = field(default_factory=list) class Pipeline: """Main pipeline orchestrator. Manages the execution of all stages in dependency order, handling initialization, processing, and cleanup. """ def __init__( self, config: PipelineConfig | None = None, context: PipelineContext | None = None, ): self.config = config or PipelineConfig() self.context = context or PipelineContext() self._stages: dict[str, Stage] = {} self._execution_order: list[str] = [] self._initialized = False def add_stage(self, name: str, stage: Stage) -> "Pipeline": """Add a stage to the pipeline.""" self._stages[name] = stage return self def remove_stage(self, name: str) -> None: """Remove a stage from the pipeline.""" if name in self._stages: del self._stages[name] def get_stage(self, name: str) -> Stage | None: """Get a stage by name.""" return self._stages.get(name) def build(self) -> "Pipeline": """Build execution order based on dependencies.""" self._execution_order = self._resolve_dependencies() self._initialized = True return self def _resolve_dependencies(self) -> list[str]: """Resolve stage execution order using topological sort.""" ordered = [] visited = set() temp_mark = set() def visit(name: str) -> None: if name in temp_mark: raise StageError(name, "Circular dependency detected") if name in visited: return temp_mark.add(name) stage = self._stages.get(name) if stage: for dep in stage.dependencies: dep_stage = self._stages.get(dep) if dep_stage: visit(dep) temp_mark.remove(name) visited.add(name) ordered.append(name) for name in self._stages: if name not in visited: visit(name) return ordered def initialize(self) -> bool: """Initialize all stages in execution order.""" for name in self._execution_order: stage = self._stages.get(name) if stage and not stage.init(self.context) and not stage.optional: return False return True def execute(self, data: Any | None = None) -> StageResult: """Execute the pipeline with the given input data.""" if not self._initialized: self.build() if not self._initialized: return StageResult( success=False, data=None, error="Pipeline not initialized", ) current_data = data for name in self._execution_order: stage = self._stages.get(name) if not stage or not stage.is_enabled(): continue try: current_data = stage.process(current_data, self.context) except Exception as e: if not stage.optional: return StageResult( success=False, data=current_data, error=str(e), stage_name=name, ) # Skip optional stage on error continue return StageResult(success=True, data=current_data) def cleanup(self) -> None: """Clean up all stages in reverse order.""" for name in reversed(self._execution_order): stage = self._stages.get(name) if stage: try: stage.cleanup() except Exception: pass self._stages.clear() self._initialized = False @property def stages(self) -> dict[str, Stage]: """Get all stages.""" return self._stages.copy() @property def execution_order(self) -> list[str]: """Get execution order.""" return self._execution_order.copy() def get_stage_names(self) -> list[str]: """Get list of stage names.""" return list(self._stages.keys()) class PipelineRunner: """High-level pipeline runner with animation support.""" def __init__( self, pipeline: Pipeline, params: PipelineParams | None = None, ): self.pipeline = pipeline self.params = params or PipelineParams() self._running = False def start(self) -> bool: """Start the pipeline.""" self._running = True return self.pipeline.initialize() def step(self, input_data: Any | None = None) -> Any: """Execute one pipeline step.""" self.params.frame_number += 1 self.context.params = self.params result = self.pipeline.execute(input_data) return result.data if result.success else None def stop(self) -> None: """Stop and clean up the pipeline.""" self._running = False self.pipeline.cleanup() @property def is_running(self) -> bool: """Check if runner is active.""" return self._running def create_pipeline_from_params(params: PipelineParams) -> Pipeline: """Create a pipeline from PipelineParams.""" config = PipelineConfig( source=params.source, display=params.display, camera=params.camera_mode, effects=params.effect_order, ) return Pipeline(config=config) def create_default_pipeline() -> Pipeline: """Create a default pipeline with all standard components.""" pipeline = Pipeline() # Add source stage source = StageRegistry.create("source", "headlines") if source: pipeline.add_stage("source", source) # Add effect stages for effect_name in ["noise", "fade", "glitch", "firehose", "hud"]: effect = StageRegistry.create("effect", effect_name) if effect: pipeline.add_stage(f"effect_{effect_name}", effect) # Add display stage display = StageRegistry.create("display", "terminal") if display: pipeline.add_stage("display", display) return pipeline.build()