1056 lines
36 KiB
Python
1056 lines
36 KiB
Python
"""
|
|
Pipeline controller - DAG-based pipeline execution.
|
|
|
|
The Pipeline class orchestrates stages in dependency order, handling
|
|
the complete render cycle from source to display.
|
|
"""
|
|
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from typing import Any
|
|
|
|
from engine.pipeline.core import PipelineContext, Stage, StageError, StageResult
|
|
from engine.pipeline.params import PipelineParams
|
|
from engine.pipeline.registry import StageRegistry
|
|
|
|
|
|
@dataclass
|
|
class PipelineConfig:
|
|
"""Configuration for a pipeline instance."""
|
|
|
|
source: str = "headlines"
|
|
display: str = "terminal"
|
|
camera: str = "vertical"
|
|
effects: list[str] = field(default_factory=list)
|
|
enable_metrics: bool = True
|
|
|
|
|
|
@dataclass
|
|
class StageMetrics:
|
|
"""Metrics for a single stage execution."""
|
|
|
|
name: str
|
|
duration_ms: float
|
|
chars_in: int = 0
|
|
chars_out: int = 0
|
|
|
|
|
|
@dataclass
|
|
class FrameMetrics:
|
|
"""Metrics for a single frame through the pipeline."""
|
|
|
|
frame_number: int
|
|
total_ms: float
|
|
stages: list[StageMetrics] = field(default_factory=list)
|
|
|
|
|
|
class Pipeline:
|
|
"""Main pipeline orchestrator.
|
|
|
|
Manages the execution of all stages in dependency order,
|
|
handling initialization, processing, and cleanup.
|
|
|
|
Supports dynamic mutation during runtime via the mutation API.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
config: PipelineConfig | None = None,
|
|
context: PipelineContext | None = None,
|
|
):
|
|
self.config = config or PipelineConfig()
|
|
self.context = context or PipelineContext()
|
|
self._stages: dict[str, Stage] = {}
|
|
self._execution_order: list[str] = []
|
|
self._initialized = False
|
|
self._capability_map: dict[str, list[str]] = {}
|
|
|
|
self._metrics_enabled = self.config.enable_metrics
|
|
self._frame_metrics: list[FrameMetrics] = []
|
|
self._max_metrics_frames = 60
|
|
|
|
# Minimum capabilities required for pipeline to function
|
|
# NOTE: Research later - allow presets to override these defaults
|
|
self._minimum_capabilities: set[str] = {
|
|
"source",
|
|
"render.output",
|
|
"display.output",
|
|
"camera.state", # Always required for viewport filtering
|
|
}
|
|
self._current_frame_number = 0
|
|
|
|
def add_stage(self, name: str, stage: Stage, initialize: bool = True) -> "Pipeline":
|
|
"""Add a stage to the pipeline.
|
|
|
|
Args:
|
|
name: Unique name for the stage
|
|
stage: Stage instance to add
|
|
initialize: If True, initialize the stage immediately
|
|
|
|
Returns:
|
|
Self for method chaining
|
|
"""
|
|
self._stages[name] = stage
|
|
if self._initialized and initialize:
|
|
stage.init(self.context)
|
|
return self
|
|
|
|
def remove_stage(self, name: str, cleanup: bool = True) -> Stage | None:
|
|
"""Remove a stage from the pipeline.
|
|
|
|
Args:
|
|
name: Name of the stage to remove
|
|
cleanup: If True, call cleanup() on the removed stage
|
|
|
|
Returns:
|
|
The removed stage, or None if not found
|
|
"""
|
|
stage = self._stages.pop(name, None)
|
|
if stage and cleanup:
|
|
try:
|
|
stage.cleanup()
|
|
except Exception:
|
|
pass
|
|
|
|
# Rebuild execution order and capability map if stage was removed
|
|
if stage and self._initialized:
|
|
self._rebuild()
|
|
|
|
return stage
|
|
|
|
def remove_stage_safe(self, name: str, cleanup: bool = True) -> Stage | None:
|
|
"""Remove a stage and rebuild execution order safely.
|
|
|
|
This is an alias for remove_stage() that explicitly rebuilds
|
|
the execution order after removal.
|
|
|
|
Args:
|
|
name: Name of the stage to remove
|
|
cleanup: If True, call cleanup() on the removed stage
|
|
|
|
Returns:
|
|
The removed stage, or None if not found
|
|
"""
|
|
return self.remove_stage(name, cleanup)
|
|
|
|
def cleanup_stage(self, name: str) -> None:
|
|
"""Clean up a specific stage without removing it.
|
|
|
|
This is useful for stages that need to release resources
|
|
(like display connections) without being removed from the pipeline.
|
|
|
|
Args:
|
|
name: Name of the stage to clean up
|
|
"""
|
|
stage = self._stages.get(name)
|
|
if stage:
|
|
try:
|
|
stage.cleanup()
|
|
except Exception:
|
|
pass
|
|
|
|
def can_hot_swap(self, name: str) -> bool:
|
|
"""Check if a stage can be safely hot-swapped.
|
|
|
|
A stage can be hot-swapped if:
|
|
1. It exists in the pipeline
|
|
2. It's not required for basic pipeline function
|
|
3. It doesn't have strict dependencies that can't be re-resolved
|
|
|
|
Args:
|
|
name: Name of the stage to check
|
|
|
|
Returns:
|
|
True if the stage can be hot-swapped, False otherwise
|
|
"""
|
|
# Check if stage exists
|
|
if name not in self._stages:
|
|
return False
|
|
|
|
# Check if stage is a minimum capability provider
|
|
stage = self._stages[name]
|
|
stage_caps = stage.capabilities if hasattr(stage, "capabilities") else set()
|
|
minimum_caps = self._minimum_capabilities
|
|
|
|
# If stage provides a minimum capability, it's more critical
|
|
# but still potentially swappable if another stage provides the same capability
|
|
for cap in stage_caps:
|
|
if cap in minimum_caps:
|
|
# Check if another stage provides this capability
|
|
providers = self._capability_map.get(cap, [])
|
|
# This stage is the sole provider - might be critical
|
|
# but still allow hot-swap if pipeline is not initialized
|
|
if len(providers) <= 1 and self._initialized:
|
|
return False
|
|
|
|
return True
|
|
|
|
def replace_stage(
|
|
self, name: str, new_stage: Stage, preserve_state: bool = True
|
|
) -> Stage | None:
|
|
"""Replace a stage in the pipeline with a new one.
|
|
|
|
Args:
|
|
name: Name of the stage to replace
|
|
new_stage: New stage instance
|
|
preserve_state: If True, copy relevant state from old stage
|
|
|
|
Returns:
|
|
The old stage, or None if not found
|
|
"""
|
|
old_stage = self._stages.get(name)
|
|
if not old_stage:
|
|
return None
|
|
|
|
if preserve_state:
|
|
self._copy_stage_state(old_stage, new_stage)
|
|
|
|
old_stage.cleanup()
|
|
self._stages[name] = new_stage
|
|
new_stage.init(self.context)
|
|
|
|
if self._initialized:
|
|
self._rebuild()
|
|
|
|
return old_stage
|
|
|
|
def swap_stages(self, name1: str, name2: str) -> bool:
|
|
"""Swap two stages in the pipeline.
|
|
|
|
Args:
|
|
name1: First stage name
|
|
name2: Second stage name
|
|
|
|
Returns:
|
|
True if successful, False if either stage not found
|
|
"""
|
|
stage1 = self._stages.get(name1)
|
|
stage2 = self._stages.get(name2)
|
|
|
|
if not stage1 or not stage2:
|
|
return False
|
|
|
|
self._stages[name1] = stage2
|
|
self._stages[name2] = stage1
|
|
|
|
if self._initialized:
|
|
self._rebuild()
|
|
|
|
return True
|
|
|
|
def move_stage(
|
|
self, name: str, after: str | None = None, before: str | None = None
|
|
) -> bool:
|
|
"""Move a stage's position in execution order.
|
|
|
|
Args:
|
|
name: Stage to move
|
|
after: Place this stage after this stage name
|
|
before: Place this stage before this stage name
|
|
|
|
Returns:
|
|
True if successful, False if stage not found
|
|
"""
|
|
if name not in self._stages:
|
|
return False
|
|
|
|
if not self._initialized:
|
|
return False
|
|
|
|
current_order = list(self._execution_order)
|
|
if name not in current_order:
|
|
return False
|
|
|
|
current_order.remove(name)
|
|
|
|
if after and after in current_order:
|
|
idx = current_order.index(after) + 1
|
|
current_order.insert(idx, name)
|
|
elif before and before in current_order:
|
|
idx = current_order.index(before)
|
|
current_order.insert(idx, name)
|
|
else:
|
|
current_order.append(name)
|
|
|
|
self._execution_order = current_order
|
|
return True
|
|
|
|
def _copy_stage_state(self, old_stage: Stage, new_stage: Stage) -> None:
|
|
"""Copy relevant state from old stage to new stage during replacement.
|
|
|
|
Args:
|
|
old_stage: The old stage being replaced
|
|
new_stage: The new stage
|
|
"""
|
|
if hasattr(old_stage, "_enabled"):
|
|
new_stage._enabled = old_stage._enabled
|
|
|
|
# Preserve camera state
|
|
if hasattr(old_stage, "save_state") and hasattr(new_stage, "restore_state"):
|
|
try:
|
|
state = old_stage.save_state()
|
|
new_stage.restore_state(state)
|
|
except Exception:
|
|
# If state preservation fails, continue without it
|
|
pass
|
|
|
|
def _rebuild(self) -> None:
|
|
"""Rebuild execution order after mutation or auto-injection."""
|
|
was_initialized = self._initialized
|
|
self._initialized = False
|
|
|
|
self._capability_map = self._build_capability_map()
|
|
self._execution_order = self._resolve_dependencies()
|
|
|
|
# Note: We intentionally DO NOT validate dependencies here.
|
|
# Mutation operations (remove/swap/move) might leave the pipeline
|
|
# temporarily invalid (e.g., removing a stage that others depend on).
|
|
# Validation is performed explicitly in build() or can be checked
|
|
# manually via validate_minimum_capabilities().
|
|
# try:
|
|
# self._validate_dependencies()
|
|
# self._validate_types()
|
|
# except StageError:
|
|
# pass
|
|
|
|
# Restore initialized state
|
|
self._initialized = was_initialized
|
|
|
|
def get_stage(self, name: str) -> Stage | None:
|
|
"""Get a stage by name."""
|
|
return self._stages.get(name)
|
|
|
|
def enable_stage(self, name: str) -> bool:
|
|
"""Enable a stage in the pipeline.
|
|
|
|
Args:
|
|
name: Stage name to enable
|
|
|
|
Returns:
|
|
True if successful, False if stage not found
|
|
"""
|
|
stage = self._stages.get(name)
|
|
if stage:
|
|
stage.set_enabled(True)
|
|
return True
|
|
return False
|
|
|
|
def disable_stage(self, name: str) -> bool:
|
|
"""Disable a stage in the pipeline.
|
|
|
|
Args:
|
|
name: Stage name to disable
|
|
|
|
Returns:
|
|
True if successful, False if stage not found
|
|
"""
|
|
stage = self._stages.get(name)
|
|
if stage:
|
|
stage.set_enabled(False)
|
|
return True
|
|
return False
|
|
|
|
def get_stage_info(self, name: str) -> dict | None:
|
|
"""Get detailed information about a stage.
|
|
|
|
Args:
|
|
name: Stage name
|
|
|
|
Returns:
|
|
Dictionary with stage information, or None if not found
|
|
"""
|
|
stage = self._stages.get(name)
|
|
if not stage:
|
|
return None
|
|
|
|
return {
|
|
"name": name,
|
|
"category": stage.category,
|
|
"stage_type": stage.stage_type,
|
|
"enabled": stage.is_enabled(),
|
|
"optional": stage.optional,
|
|
"capabilities": list(stage.capabilities),
|
|
"dependencies": list(stage.dependencies),
|
|
"inlet_types": [dt.name for dt in stage.inlet_types],
|
|
"outlet_types": [dt.name for dt in stage.outlet_types],
|
|
"render_order": stage.render_order,
|
|
"is_overlay": stage.is_overlay,
|
|
}
|
|
|
|
def get_pipeline_info(self) -> dict:
|
|
"""Get comprehensive information about the pipeline.
|
|
|
|
Returns:
|
|
Dictionary with pipeline state
|
|
"""
|
|
return {
|
|
"stages": {name: self.get_stage_info(name) for name in self._stages},
|
|
"execution_order": self._execution_order.copy(),
|
|
"initialized": self._initialized,
|
|
"stage_count": len(self._stages),
|
|
}
|
|
|
|
@property
|
|
def minimum_capabilities(self) -> set[str]:
|
|
"""Get minimum capabilities required for pipeline to function."""
|
|
return self._minimum_capabilities
|
|
|
|
@minimum_capabilities.setter
|
|
def minimum_capabilities(self, value: set[str]):
|
|
"""Set minimum required capabilities.
|
|
|
|
NOTE: Research later - allow presets to override these defaults
|
|
"""
|
|
self._minimum_capabilities = value
|
|
|
|
def validate_minimum_capabilities(self) -> tuple[bool, list[str]]:
|
|
"""Validate that all minimum capabilities are provided.
|
|
|
|
Returns:
|
|
Tuple of (is_valid, missing_capabilities)
|
|
"""
|
|
missing = []
|
|
for cap in self._minimum_capabilities:
|
|
if not self._find_stage_with_capability(cap):
|
|
missing.append(cap)
|
|
return len(missing) == 0, missing
|
|
|
|
def ensure_minimum_capabilities(self) -> list[str]:
|
|
"""Automatically inject MVP stages if minimum capabilities are missing.
|
|
|
|
Auto-injection is always on, but defaults are trivial to override.
|
|
Returns:
|
|
List of stages that were injected
|
|
"""
|
|
from engine.camera import Camera
|
|
from engine.data_sources.sources import EmptyDataSource
|
|
from engine.display import DisplayRegistry
|
|
from engine.pipeline.adapters import (
|
|
CameraClockStage,
|
|
CameraStage,
|
|
DataSourceStage,
|
|
DisplayStage,
|
|
SourceItemsToBufferStage,
|
|
)
|
|
|
|
injected = []
|
|
|
|
# Check for source capability
|
|
if (
|
|
not self._find_stage_with_capability("source")
|
|
and "source" not in self._stages
|
|
):
|
|
empty_source = EmptyDataSource(width=80, height=24)
|
|
self.add_stage("source", DataSourceStage(empty_source, name="empty"))
|
|
injected.append("source")
|
|
|
|
# Check for camera.state capability (must be BEFORE render to accept SOURCE_ITEMS)
|
|
camera = None
|
|
if not self._find_stage_with_capability("camera.state"):
|
|
# Inject static camera (trivial, no movement)
|
|
camera = Camera.scroll(speed=0.0)
|
|
camera.set_canvas_size(200, 200)
|
|
if "camera_update" not in self._stages:
|
|
self.add_stage(
|
|
"camera_update", CameraClockStage(camera, name="camera-clock")
|
|
)
|
|
injected.append("camera_update")
|
|
|
|
# Check for render capability
|
|
if (
|
|
not self._find_stage_with_capability("render.output")
|
|
and "render" not in self._stages
|
|
):
|
|
self.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
|
|
injected.append("render")
|
|
|
|
# Check for camera stage (must be AFTER render to accept TEXT_BUFFER)
|
|
if camera and "camera" not in self._stages:
|
|
self.add_stage("camera", CameraStage(camera, name="static"))
|
|
injected.append("camera")
|
|
|
|
# Check for display capability
|
|
if (
|
|
not self._find_stage_with_capability("display.output")
|
|
and "display" not in self._stages
|
|
):
|
|
display = DisplayRegistry.create("terminal")
|
|
if display:
|
|
self.add_stage("display", DisplayStage(display, name="terminal"))
|
|
injected.append("display")
|
|
|
|
# Rebuild pipeline if stages were injected
|
|
if injected:
|
|
self._rebuild()
|
|
|
|
return injected
|
|
|
|
def build(self, auto_inject: bool = True) -> "Pipeline":
|
|
"""Build execution order based on dependencies.
|
|
|
|
Args:
|
|
auto_inject: If True, automatically inject MVP stages for missing capabilities
|
|
"""
|
|
self._capability_map = self._build_capability_map()
|
|
self._execution_order = self._resolve_dependencies()
|
|
|
|
# Validate minimum capabilities and auto-inject if needed
|
|
if auto_inject:
|
|
is_valid, missing = self.validate_minimum_capabilities()
|
|
if not is_valid:
|
|
injected = self.ensure_minimum_capabilities()
|
|
if injected:
|
|
print(
|
|
f" \033[38;5;226mAuto-injected stages for missing capabilities: {injected}\033[0m"
|
|
)
|
|
# Rebuild after auto-injection
|
|
self._capability_map = self._build_capability_map()
|
|
self._execution_order = self._resolve_dependencies()
|
|
|
|
# Re-validate after injection attempt (whether anything was injected or not)
|
|
# If injection didn't run (injected empty), we still need to check if we're valid
|
|
# If injection ran but failed to fix (injected empty), we need to check
|
|
is_valid, missing = self.validate_minimum_capabilities()
|
|
if not is_valid:
|
|
raise StageError(
|
|
"build",
|
|
f"Auto-injection failed to provide minimum capabilities: {missing}",
|
|
)
|
|
|
|
self._validate_dependencies()
|
|
self._validate_types()
|
|
self._initialized = True
|
|
return self
|
|
|
|
def _build_capability_map(self) -> dict[str, list[str]]:
|
|
"""Build a map of capabilities to stage names.
|
|
|
|
Returns:
|
|
Dict mapping capability -> list of stage names that provide it
|
|
"""
|
|
capability_map: dict[str, list[str]] = {}
|
|
for name, stage in self._stages.items():
|
|
for cap in stage.capabilities:
|
|
if cap not in capability_map:
|
|
capability_map[cap] = []
|
|
capability_map[cap].append(name)
|
|
return capability_map
|
|
|
|
def _find_stage_with_capability(self, capability: str) -> str | None:
|
|
"""Find a stage that provides the given capability.
|
|
|
|
Supports wildcard matching:
|
|
- "source" matches "source.headlines" (prefix match)
|
|
- "source.*" matches "source.headlines"
|
|
- "source.headlines" matches exactly
|
|
|
|
Args:
|
|
capability: The capability to find
|
|
|
|
Returns:
|
|
Stage name that provides the capability, or None if not found
|
|
"""
|
|
# Exact match
|
|
if capability in self._capability_map:
|
|
return self._capability_map[capability][0]
|
|
|
|
# Prefix match (e.g., "source" -> "source.headlines")
|
|
for cap, stages in self._capability_map.items():
|
|
if cap.startswith(capability + "."):
|
|
return stages[0]
|
|
|
|
# Wildcard match (e.g., "source.*" -> "source.headlines")
|
|
if ".*" in capability:
|
|
prefix = capability[:-2] # Remove ".*"
|
|
for cap in self._capability_map:
|
|
if cap.startswith(prefix + "."):
|
|
return self._capability_map[cap][0]
|
|
|
|
return None
|
|
|
|
def _resolve_dependencies(self) -> list[str]:
|
|
"""Resolve stage execution order using topological sort with capability matching."""
|
|
ordered = []
|
|
visited = set()
|
|
temp_mark = set()
|
|
|
|
def visit(name: str) -> None:
|
|
if name in temp_mark:
|
|
raise StageError(name, "Circular dependency detected")
|
|
if name in visited:
|
|
return
|
|
|
|
temp_mark.add(name)
|
|
stage = self._stages.get(name)
|
|
if stage:
|
|
# Handle capability-based dependencies
|
|
for dep in stage.dependencies:
|
|
# Find a stage that provides this capability
|
|
dep_stage_name = self._find_stage_with_capability(dep)
|
|
if dep_stage_name:
|
|
visit(dep_stage_name)
|
|
|
|
# Handle direct stage dependencies
|
|
for stage_dep in stage.stage_dependencies:
|
|
if stage_dep in self._stages:
|
|
visit(stage_dep)
|
|
else:
|
|
# Stage dependency not found - this is an error
|
|
raise StageError(
|
|
name,
|
|
f"Missing stage dependency: '{stage_dep}' not found in pipeline",
|
|
)
|
|
|
|
temp_mark.remove(name)
|
|
visited.add(name)
|
|
ordered.append(name)
|
|
|
|
for name in self._stages:
|
|
if name not in visited:
|
|
visit(name)
|
|
|
|
return ordered
|
|
|
|
def _validate_dependencies(self) -> None:
|
|
"""Validate that all dependencies can be satisfied.
|
|
|
|
Raises StageError if any dependency cannot be resolved.
|
|
"""
|
|
missing: list[tuple[str, str]] = [] # (stage_name, capability)
|
|
|
|
for name, stage in self._stages.items():
|
|
for dep in stage.dependencies:
|
|
if not self._find_stage_with_capability(dep):
|
|
missing.append((name, dep))
|
|
|
|
if missing:
|
|
msgs = [f" - {stage} needs {cap}" for stage, cap in missing]
|
|
raise StageError(
|
|
"validation",
|
|
"Missing capabilities:\n" + "\n".join(msgs),
|
|
)
|
|
|
|
def _validate_types(self) -> None:
|
|
"""Validate inlet/outlet types between connected stages.
|
|
|
|
PureData-style type validation. Each stage declares its inlet_types
|
|
(what it accepts) and outlet_types (what it produces). This method
|
|
validates that connected stages have compatible types.
|
|
|
|
Raises StageError if type mismatch is detected.
|
|
"""
|
|
from engine.pipeline.core import DataType
|
|
|
|
errors: list[str] = []
|
|
|
|
for i, name in enumerate(self._execution_order):
|
|
stage = self._stages.get(name)
|
|
if not stage:
|
|
continue
|
|
|
|
inlet_types = stage.inlet_types
|
|
|
|
# Check against previous stage's outlet types
|
|
if i > 0:
|
|
prev_name = self._execution_order[i - 1]
|
|
prev_stage = self._stages.get(prev_name)
|
|
if prev_stage:
|
|
prev_outlets = prev_stage.outlet_types
|
|
|
|
# Check if any outlet type is accepted by this inlet
|
|
compatible = (
|
|
DataType.ANY in inlet_types
|
|
or DataType.ANY in prev_outlets
|
|
or bool(prev_outlets & inlet_types)
|
|
)
|
|
|
|
if not compatible:
|
|
errors.append(
|
|
f" - {name} (inlet: {inlet_types}) "
|
|
f"← {prev_name} (outlet: {prev_outlets})"
|
|
)
|
|
|
|
# Check display/sink stages (should accept TEXT_BUFFER)
|
|
if (
|
|
stage.category == "display"
|
|
and DataType.TEXT_BUFFER not in inlet_types
|
|
and DataType.ANY not in inlet_types
|
|
):
|
|
errors.append(f" - {name} is display but doesn't accept TEXT_BUFFER")
|
|
|
|
if errors:
|
|
raise StageError(
|
|
"type_validation",
|
|
"Type mismatch in pipeline connections:\n" + "\n".join(errors),
|
|
)
|
|
|
|
def initialize(self) -> bool:
|
|
"""Initialize all stages in execution order."""
|
|
for name in self._execution_order:
|
|
stage = self._stages.get(name)
|
|
if stage and not stage.init(self.context) and not stage.optional:
|
|
return False
|
|
return True
|
|
|
|
def execute(self, data: Any | None = None) -> StageResult:
|
|
"""Execute the pipeline with the given input data.
|
|
|
|
Pipeline execution:
|
|
1. Execute all non-overlay stages in dependency order
|
|
2. Apply overlay stages on top (sorted by render_order)
|
|
"""
|
|
import os
|
|
import sys
|
|
|
|
debug = os.environ.get("MAINLINE_DEBUG_DATAFLOW") == "1"
|
|
|
|
if debug:
|
|
print(
|
|
f"[PIPELINE.execute] Starting with data type: {type(data).__name__ if data else 'None'}",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
|
|
if not self._initialized:
|
|
self.build()
|
|
|
|
if not self._initialized:
|
|
return StageResult(
|
|
success=False,
|
|
data=None,
|
|
error="Pipeline not initialized",
|
|
)
|
|
|
|
current_data = data
|
|
frame_start = time.perf_counter() if self._metrics_enabled else 0
|
|
stage_timings: list[StageMetrics] = []
|
|
|
|
# Separate overlay stages and display stage from regular stages
|
|
overlay_stages: list[tuple[int, Stage]] = []
|
|
display_stage: Stage | None = None
|
|
regular_stages: list[str] = []
|
|
|
|
for name in self._execution_order:
|
|
stage = self._stages.get(name)
|
|
if not stage or not stage.is_enabled():
|
|
continue
|
|
|
|
# Check if this is the display stage - execute last
|
|
if stage.category == "display":
|
|
display_stage = stage
|
|
continue
|
|
|
|
# Safely check is_overlay - handle MagicMock and other non-bool returns
|
|
try:
|
|
is_overlay = bool(getattr(stage, "is_overlay", False))
|
|
except Exception:
|
|
is_overlay = False
|
|
|
|
if is_overlay:
|
|
# Safely get render_order
|
|
try:
|
|
render_order = int(getattr(stage, "render_order", 0))
|
|
except Exception:
|
|
render_order = 0
|
|
overlay_stages.append((render_order, stage))
|
|
else:
|
|
regular_stages.append(name)
|
|
|
|
# Execute regular stages in dependency order (excluding display)
|
|
for name in regular_stages:
|
|
stage = self._stages.get(name)
|
|
if not stage or not stage.is_enabled():
|
|
continue
|
|
|
|
stage_start = time.perf_counter() if self._metrics_enabled else 0
|
|
|
|
try:
|
|
if debug:
|
|
data_info = type(current_data).__name__
|
|
if isinstance(current_data, list):
|
|
data_info += f"[{len(current_data)}]"
|
|
print(
|
|
f"[STAGE.{name}] Starting with: {data_info}",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
|
|
current_data = stage.process(current_data, self.context)
|
|
|
|
if debug:
|
|
data_info = type(current_data).__name__
|
|
if isinstance(current_data, list):
|
|
data_info += f"[{len(current_data)}]"
|
|
print(
|
|
f"[STAGE.{name}] Completed, output: {data_info}",
|
|
file=sys.stderr,
|
|
flush=True,
|
|
)
|
|
except Exception as e:
|
|
if debug:
|
|
print(f"[STAGE.{name}] ERROR: {e}", file=sys.stderr, flush=True)
|
|
if not stage.optional:
|
|
return StageResult(
|
|
success=False,
|
|
data=current_data,
|
|
error=str(e),
|
|
stage_name=name,
|
|
)
|
|
continue
|
|
|
|
if self._metrics_enabled:
|
|
stage_duration = (time.perf_counter() - stage_start) * 1000
|
|
chars_in = len(str(data)) if data else 0
|
|
chars_out = len(str(current_data)) if current_data else 0
|
|
stage_timings.append(
|
|
StageMetrics(
|
|
name=name,
|
|
duration_ms=stage_duration,
|
|
chars_in=chars_in,
|
|
chars_out=chars_out,
|
|
)
|
|
)
|
|
|
|
# Apply overlay stages (sorted by render_order)
|
|
overlay_stages.sort(key=lambda x: x[0])
|
|
for render_order, stage in overlay_stages:
|
|
stage_start = time.perf_counter() if self._metrics_enabled else 0
|
|
stage_name = f"[overlay]{stage.name}"
|
|
|
|
try:
|
|
# Overlays receive current_data but don't pass their output to next stage
|
|
# Instead, their output is composited on top
|
|
overlay_output = stage.process(current_data, self.context)
|
|
# For now, we just let the overlay output pass through
|
|
# In a more sophisticated implementation, we'd composite it
|
|
if overlay_output is not None:
|
|
current_data = overlay_output
|
|
except Exception as e:
|
|
if not stage.optional:
|
|
return StageResult(
|
|
success=False,
|
|
data=current_data,
|
|
error=str(e),
|
|
stage_name=stage_name,
|
|
)
|
|
|
|
if self._metrics_enabled:
|
|
stage_duration = (time.perf_counter() - stage_start) * 1000
|
|
chars_in = len(str(data)) if data else 0
|
|
chars_out = len(str(current_data)) if current_data else 0
|
|
stage_timings.append(
|
|
StageMetrics(
|
|
name=stage_name,
|
|
duration_ms=stage_duration,
|
|
chars_in=chars_in,
|
|
chars_out=chars_out,
|
|
)
|
|
)
|
|
|
|
# Execute display stage LAST (after overlay stages)
|
|
# This ensures overlay effects like HUD are visible in the final output
|
|
if display_stage:
|
|
stage_start = time.perf_counter() if self._metrics_enabled else 0
|
|
|
|
try:
|
|
current_data = display_stage.process(current_data, self.context)
|
|
except Exception as e:
|
|
if not display_stage.optional:
|
|
return StageResult(
|
|
success=False,
|
|
data=current_data,
|
|
error=str(e),
|
|
stage_name=display_stage.name,
|
|
)
|
|
|
|
if self._metrics_enabled:
|
|
stage_duration = (time.perf_counter() - stage_start) * 1000
|
|
chars_in = len(str(data)) if data else 0
|
|
chars_out = len(str(current_data)) if current_data else 0
|
|
stage_timings.append(
|
|
StageMetrics(
|
|
name=display_stage.name,
|
|
duration_ms=stage_duration,
|
|
chars_in=chars_in,
|
|
chars_out=chars_out,
|
|
)
|
|
)
|
|
|
|
if self._metrics_enabled:
|
|
total_duration = (time.perf_counter() - frame_start) * 1000
|
|
self._frame_metrics.append(
|
|
FrameMetrics(
|
|
frame_number=self._current_frame_number,
|
|
total_ms=total_duration,
|
|
stages=stage_timings,
|
|
)
|
|
)
|
|
|
|
# Store metrics in context for other stages (like HUD)
|
|
# This makes metrics a first-class pipeline citizen
|
|
if self.context:
|
|
self.context.state["metrics"] = self.get_metrics_summary()
|
|
|
|
if len(self._frame_metrics) > self._max_metrics_frames:
|
|
self._frame_metrics.pop(0)
|
|
self._current_frame_number += 1
|
|
|
|
return StageResult(success=True, data=current_data)
|
|
|
|
def cleanup(self) -> None:
|
|
"""Clean up all stages in reverse order."""
|
|
for name in reversed(self._execution_order):
|
|
stage = self._stages.get(name)
|
|
if stage:
|
|
try:
|
|
stage.cleanup()
|
|
except Exception:
|
|
pass
|
|
self._stages.clear()
|
|
self._initialized = False
|
|
|
|
@property
|
|
def stages(self) -> dict[str, Stage]:
|
|
"""Get all stages."""
|
|
return self._stages.copy()
|
|
|
|
@property
|
|
def execution_order(self) -> list[str]:
|
|
"""Get execution order."""
|
|
return self._execution_order.copy()
|
|
|
|
def get_stage_names(self) -> list[str]:
|
|
"""Get list of stage names."""
|
|
return list(self._stages.keys())
|
|
|
|
def get_overlay_stages(self) -> list[Stage]:
|
|
"""Get all overlay stages sorted by render_order."""
|
|
overlays = [stage for stage in self._stages.values() if stage.is_overlay]
|
|
overlays.sort(key=lambda s: s.render_order)
|
|
return overlays
|
|
|
|
def get_stage_type(self, name: str) -> str:
|
|
"""Get the stage_type for a stage."""
|
|
stage = self._stages.get(name)
|
|
return stage.stage_type if stage else ""
|
|
|
|
def get_render_order(self, name: str) -> int:
|
|
"""Get the render_order for a stage."""
|
|
stage = self._stages.get(name)
|
|
return stage.render_order if stage else 0
|
|
|
|
def get_metrics_summary(self) -> dict:
|
|
"""Get summary of collected metrics."""
|
|
if not self._frame_metrics:
|
|
return {"error": "No metrics collected"}
|
|
|
|
total_times = [f.total_ms for f in self._frame_metrics]
|
|
avg_total = sum(total_times) / len(total_times)
|
|
min_total = min(total_times)
|
|
max_total = max(total_times)
|
|
|
|
stage_stats: dict[str, dict] = {}
|
|
for frame in self._frame_metrics:
|
|
for stage in frame.stages:
|
|
if stage.name not in stage_stats:
|
|
stage_stats[stage.name] = {"times": [], "total_chars": 0}
|
|
stage_stats[stage.name]["times"].append(stage.duration_ms)
|
|
stage_stats[stage.name]["total_chars"] += stage.chars_out
|
|
|
|
for name, stats in stage_stats.items():
|
|
times = stats["times"]
|
|
stats["avg_ms"] = sum(times) / len(times)
|
|
stats["min_ms"] = min(times)
|
|
stats["max_ms"] = max(times)
|
|
del stats["times"]
|
|
|
|
return {
|
|
"frame_count": len(self._frame_metrics),
|
|
"pipeline": {
|
|
"avg_ms": avg_total,
|
|
"min_ms": min_total,
|
|
"max_ms": max_total,
|
|
},
|
|
"stages": stage_stats,
|
|
}
|
|
|
|
def reset_metrics(self) -> None:
|
|
"""Reset collected metrics."""
|
|
self._frame_metrics.clear()
|
|
self._current_frame_number = 0
|
|
|
|
def get_frame_times(self) -> list[float]:
|
|
"""Get historical frame times for sparklines/charts."""
|
|
return [f.total_ms for f in self._frame_metrics]
|
|
|
|
|
|
class PipelineRunner:
|
|
"""High-level pipeline runner with animation support."""
|
|
|
|
def __init__(
|
|
self,
|
|
pipeline: Pipeline,
|
|
params: PipelineParams | None = None,
|
|
):
|
|
self.pipeline = pipeline
|
|
self.params = params or PipelineParams()
|
|
self._running = False
|
|
|
|
def start(self) -> bool:
|
|
"""Start the pipeline."""
|
|
self._running = True
|
|
return self.pipeline.initialize()
|
|
|
|
def step(self, input_data: Any | None = None) -> Any:
|
|
"""Execute one pipeline step."""
|
|
self.params.frame_number += 1
|
|
self.pipeline.context.params = self.params
|
|
result = self.pipeline.execute(input_data)
|
|
return result.data if result.success else None
|
|
|
|
def stop(self) -> None:
|
|
"""Stop and clean up the pipeline."""
|
|
self._running = False
|
|
self.pipeline.cleanup()
|
|
|
|
@property
|
|
def is_running(self) -> bool:
|
|
"""Check if runner is active."""
|
|
return self._running
|
|
|
|
|
|
def create_pipeline_from_params(params: PipelineParams) -> Pipeline:
|
|
"""Create a pipeline from PipelineParams."""
|
|
config = PipelineConfig(
|
|
source=params.source,
|
|
display=params.display,
|
|
camera=params.camera_mode,
|
|
effects=params.effect_order,
|
|
)
|
|
return Pipeline(config=config)
|
|
|
|
|
|
def create_default_pipeline() -> Pipeline:
|
|
"""Create a default pipeline with all standard components."""
|
|
from engine.data_sources.sources import HeadlinesDataSource
|
|
from engine.pipeline.adapters import (
|
|
DataSourceStage,
|
|
SourceItemsToBufferStage,
|
|
)
|
|
|
|
pipeline = Pipeline()
|
|
|
|
# Add source stage (wrapped as Stage)
|
|
source = HeadlinesDataSource()
|
|
pipeline.add_stage("source", DataSourceStage(source, name="headlines"))
|
|
|
|
# Add render stage to convert items to text buffer
|
|
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
|
|
|
|
# Add display stage
|
|
display = StageRegistry.create("display", "terminal")
|
|
if display:
|
|
pipeline.add_stage("display", display)
|
|
|
|
return pipeline.build()
|