forked from genewildish/Mainline
- Add engine/pipeline/ module with Stage ABC, PipelineContext, PipelineParams - Stage provides unified interface for sources, effects, displays, cameras - Pipeline class handles DAG-based execution with dependency resolution - PipelinePreset for pre-configured pipelines (demo, poetry, pipeline, etc.) - Add PipelineParams as params layer for animation-driven config - Add StageRegistry for unified stage registration - Add sources_v2.py with DataSource.is_dynamic property - Add animation.py with Preset and AnimationController - Skip ntfy integration tests by default (require -m integration) - Skip e2e tests by default (require -m e2e) - Update pipeline.py with comprehensive introspection methods
222 lines
6.6 KiB
Python
222 lines
6.6 KiB
Python
"""
|
|
Pipeline core - Unified Stage abstraction and PipelineContext.
|
|
|
|
This module provides the foundation for a clean, dependency-managed pipeline:
|
|
- Stage: Base class for all pipeline components (sources, effects, displays, cameras)
|
|
- PipelineContext: Dependency injection context for runtime data exchange
|
|
- Capability system: Explicit capability declarations with duck-typing support
|
|
"""
|
|
|
|
from abc import ABC, abstractmethod
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass, field
|
|
from typing import TYPE_CHECKING, Any
|
|
|
|
if TYPE_CHECKING:
|
|
from engine.pipeline.params import PipelineParams
|
|
|
|
|
|
@dataclass
|
|
class StageConfig:
|
|
"""Configuration for a single stage."""
|
|
|
|
name: str
|
|
category: str
|
|
enabled: bool = True
|
|
optional: bool = False
|
|
params: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
class Stage(ABC):
|
|
"""Abstract base class for all pipeline stages.
|
|
|
|
A Stage is a single component in the rendering pipeline. Stages can be:
|
|
- Sources: Data providers (headlines, poetry, pipeline viz)
|
|
- Effects: Post-processors (noise, fade, glitch, hud)
|
|
- Displays: Output backends (terminal, pygame, websocket)
|
|
- Cameras: Viewport controllers (vertical, horizontal, omni)
|
|
|
|
Stages declare:
|
|
- capabilities: What they provide to other stages
|
|
- dependencies: What they need from other stages
|
|
|
|
Duck-typing is supported: any class with the required methods can act as a Stage.
|
|
"""
|
|
|
|
name: str
|
|
category: str # "source", "effect", "display", "camera"
|
|
optional: bool = False # If True, pipeline continues even if stage fails
|
|
|
|
@property
|
|
def capabilities(self) -> set[str]:
|
|
"""Return set of capabilities this stage provides.
|
|
|
|
Examples:
|
|
- "source.headlines"
|
|
- "effect.noise"
|
|
- "display.output"
|
|
- "camera"
|
|
"""
|
|
return {f"{self.category}.{self.name}"}
|
|
|
|
@property
|
|
def dependencies(self) -> set[str]:
|
|
"""Return set of capability names this stage needs.
|
|
|
|
Examples:
|
|
- {"display.output"}
|
|
- {"source.headlines"}
|
|
- {"camera"}
|
|
"""
|
|
return set()
|
|
|
|
def init(self, ctx: "PipelineContext") -> bool:
|
|
"""Initialize stage with pipeline context.
|
|
|
|
Args:
|
|
ctx: PipelineContext for accessing services
|
|
|
|
Returns:
|
|
True if initialization succeeded, False otherwise
|
|
"""
|
|
return True
|
|
|
|
@abstractmethod
|
|
def process(self, data: Any, ctx: "PipelineContext") -> Any:
|
|
"""Process input data and return output.
|
|
|
|
Args:
|
|
data: Input data from previous stage (or initial data for first stage)
|
|
ctx: PipelineContext for accessing services and state
|
|
|
|
Returns:
|
|
Processed data for next stage
|
|
"""
|
|
...
|
|
|
|
def cleanup(self) -> None: # noqa: B027
|
|
"""Clean up resources when pipeline shuts down."""
|
|
pass
|
|
|
|
def get_config(self) -> StageConfig:
|
|
"""Return current configuration of this stage."""
|
|
return StageConfig(
|
|
name=self.name,
|
|
category=self.category,
|
|
optional=self.optional,
|
|
)
|
|
|
|
def set_enabled(self, enabled: bool) -> None:
|
|
"""Enable or disable this stage."""
|
|
self._enabled = enabled # type: ignore[attr-defined]
|
|
|
|
def is_enabled(self) -> bool:
|
|
"""Check if stage is enabled."""
|
|
return getattr(self, "_enabled", True)
|
|
|
|
|
|
@dataclass
|
|
class StageResult:
|
|
"""Result of stage processing, including success/failure info."""
|
|
|
|
success: bool
|
|
data: Any
|
|
error: str | None = None
|
|
stage_name: str = ""
|
|
|
|
|
|
class PipelineContext:
|
|
"""Dependency injection context passed through the pipeline.
|
|
|
|
Provides:
|
|
- services: Named services (display, config, event_bus, etc.)
|
|
- state: Runtime state shared between stages
|
|
- params: PipelineParams for animation-driven config
|
|
|
|
Services can be injected at construction time or lazily resolved.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
services: dict[str, Any] | None = None,
|
|
initial_state: dict[str, Any] | None = None,
|
|
):
|
|
self.services: dict[str, Any] = services or {}
|
|
self.state: dict[str, Any] = initial_state or {}
|
|
self._params: PipelineParams | None = None
|
|
|
|
# Lazy resolvers for common services
|
|
self._lazy_resolvers: dict[str, Callable[[], Any]] = {
|
|
"config": self._resolve_config,
|
|
"event_bus": self._resolve_event_bus,
|
|
}
|
|
|
|
def _resolve_config(self) -> Any:
|
|
from engine.config import get_config
|
|
|
|
return get_config()
|
|
|
|
def _resolve_event_bus(self) -> Any:
|
|
from engine.eventbus import get_event_bus
|
|
|
|
return get_event_bus()
|
|
|
|
def get(self, key: str, default: Any = None) -> Any:
|
|
"""Get a service or state value by key.
|
|
|
|
First checks services, then state, then lazy resolution.
|
|
"""
|
|
if key in self.services:
|
|
return self.services[key]
|
|
if key in self.state:
|
|
return self.state[key]
|
|
if key in self._lazy_resolvers:
|
|
try:
|
|
return self._lazy_resolvers[key]()
|
|
except Exception:
|
|
return default
|
|
return default
|
|
|
|
def set(self, key: str, value: Any) -> None:
|
|
"""Set a service or state value."""
|
|
self.services[key] = value
|
|
|
|
def set_state(self, key: str, value: Any) -> None:
|
|
"""Set a runtime state value."""
|
|
self.state[key] = value
|
|
|
|
def get_state(self, key: str, default: Any = None) -> Any:
|
|
"""Get a runtime state value."""
|
|
return self.state.get(key, default)
|
|
|
|
@property
|
|
def params(self) -> "PipelineParams | None":
|
|
"""Get current pipeline params (for animation)."""
|
|
return self._params
|
|
|
|
@params.setter
|
|
def params(self, value: "PipelineParams") -> None:
|
|
"""Set pipeline params (from animation controller)."""
|
|
self._params = value
|
|
|
|
def has_capability(self, capability: str) -> bool:
|
|
"""Check if a capability is available."""
|
|
return capability in self.services or capability in self._lazy_resolvers
|
|
|
|
|
|
class StageError(Exception):
|
|
"""Raised when a stage fails to process."""
|
|
|
|
def __init__(self, stage_name: str, message: str, is_optional: bool = False):
|
|
self.stage_name = stage_name
|
|
self.message = message
|
|
self.is_optional = is_optional
|
|
super().__init__(f"Stage '{stage_name}' failed: {message}")
|
|
|
|
|
|
def create_stage_error(
|
|
stage_name: str, error: Exception, is_optional: bool = False
|
|
) -> StageError:
|
|
"""Helper to create a StageError from an exception."""
|
|
return StageError(stage_name, str(error), is_optional)
|