chore(pipeline): improve pipeline architecture
- Add capability-based dependency resolution with prefix matching - Add EffectPluginStage with sensor binding support - Add CameraStage adapter for camera integration - Add DisplayStage adapter for display integration - Add Pipeline metrics collection - Add deprecation notices to legacy modules - Update app.py with pipeline integration
This commit is contained in:
@@ -56,7 +56,7 @@ class RenderStage(Stage):
|
||||
|
||||
@property
|
||||
def dependencies(self) -> set[str]:
|
||||
return {"source.items"}
|
||||
return {"source"}
|
||||
|
||||
def init(self, ctx: PipelineContext) -> bool:
|
||||
random.shuffle(self._pool)
|
||||
@@ -142,7 +142,7 @@ class EffectPluginStage(Stage):
|
||||
"""Process data through the effect."""
|
||||
if data is None:
|
||||
return None
|
||||
from engine.effects import EffectContext
|
||||
from engine.effects.types import EffectContext, apply_param_bindings
|
||||
|
||||
w = ctx.params.viewport_width if ctx.params else 80
|
||||
h = ctx.params.viewport_height if ctx.params else 24
|
||||
@@ -160,6 +160,17 @@ class EffectPluginStage(Stage):
|
||||
has_message=False,
|
||||
items=ctx.get("items", []),
|
||||
)
|
||||
|
||||
# Copy sensor state from PipelineContext to EffectContext
|
||||
for key, value in ctx.state.items():
|
||||
if key.startswith("sensor."):
|
||||
effect_ctx.set_state(key, value)
|
||||
|
||||
# Apply sensor param bindings if effect has them
|
||||
if hasattr(self._effect, "param_bindings") and self._effect.param_bindings:
|
||||
bound_config = apply_param_bindings(self._effect, effect_ctx)
|
||||
self._effect.configure(bound_config)
|
||||
|
||||
return self._effect.process(data, effect_ctx)
|
||||
|
||||
|
||||
@@ -221,9 +232,21 @@ class DataSourceStage(Stage):
|
||||
|
||||
|
||||
class ItemsStage(Stage):
|
||||
"""Stage that holds pre-fetched items and provides them to the pipeline."""
|
||||
"""Stage that holds pre-fetched items and provides them to the pipeline.
|
||||
|
||||
.. deprecated::
|
||||
Use DataSourceStage with a proper DataSource instead.
|
||||
ItemsStage is a legacy bootstrap mechanism.
|
||||
"""
|
||||
|
||||
def __init__(self, items, name: str = "headlines"):
|
||||
import warnings
|
||||
|
||||
warnings.warn(
|
||||
"ItemsStage is deprecated. Use DataSourceStage with a DataSource instead.",
|
||||
DeprecationWarning,
|
||||
stacklevel=2,
|
||||
)
|
||||
self._items = items
|
||||
self.name = name
|
||||
self.category = "source"
|
||||
|
||||
@@ -83,12 +83,60 @@ class Pipeline:
|
||||
|
||||
def build(self) -> "Pipeline":
|
||||
"""Build execution order based on dependencies."""
|
||||
self._capability_map = self._build_capability_map()
|
||||
self._execution_order = self._resolve_dependencies()
|
||||
self._validate_dependencies()
|
||||
self._initialized = True
|
||||
return self
|
||||
|
||||
def _build_capability_map(self) -> dict[str, list[str]]:
|
||||
"""Build a map of capabilities to stage names.
|
||||
|
||||
Returns:
|
||||
Dict mapping capability -> list of stage names that provide it
|
||||
"""
|
||||
capability_map: dict[str, list[str]] = {}
|
||||
for name, stage in self._stages.items():
|
||||
for cap in stage.capabilities:
|
||||
if cap not in capability_map:
|
||||
capability_map[cap] = []
|
||||
capability_map[cap].append(name)
|
||||
return capability_map
|
||||
|
||||
def _find_stage_with_capability(self, capability: str) -> str | None:
|
||||
"""Find a stage that provides the given capability.
|
||||
|
||||
Supports wildcard matching:
|
||||
- "source" matches "source.headlines" (prefix match)
|
||||
- "source.*" matches "source.headlines"
|
||||
- "source.headlines" matches exactly
|
||||
|
||||
Args:
|
||||
capability: The capability to find
|
||||
|
||||
Returns:
|
||||
Stage name that provides the capability, or None if not found
|
||||
"""
|
||||
# Exact match
|
||||
if capability in self._capability_map:
|
||||
return self._capability_map[capability][0]
|
||||
|
||||
# Prefix match (e.g., "source" -> "source.headlines")
|
||||
for cap, stages in self._capability_map.items():
|
||||
if cap.startswith(capability + "."):
|
||||
return stages[0]
|
||||
|
||||
# Wildcard match (e.g., "source.*" -> "source.headlines")
|
||||
if ".*" in capability:
|
||||
prefix = capability[:-2] # Remove ".*"
|
||||
for cap in self._capability_map:
|
||||
if cap.startswith(prefix + "."):
|
||||
return self._capability_map[cap][0]
|
||||
|
||||
return None
|
||||
|
||||
def _resolve_dependencies(self) -> list[str]:
|
||||
"""Resolve stage execution order using topological sort."""
|
||||
"""Resolve stage execution order using topological sort with capability matching."""
|
||||
ordered = []
|
||||
visited = set()
|
||||
temp_mark = set()
|
||||
@@ -103,9 +151,10 @@ class Pipeline:
|
||||
stage = self._stages.get(name)
|
||||
if stage:
|
||||
for dep in stage.dependencies:
|
||||
dep_stage = self._stages.get(dep)
|
||||
if dep_stage:
|
||||
visit(dep)
|
||||
# Find a stage that provides this capability
|
||||
dep_stage_name = self._find_stage_with_capability(dep)
|
||||
if dep_stage_name:
|
||||
visit(dep_stage_name)
|
||||
|
||||
temp_mark.remove(name)
|
||||
visited.add(name)
|
||||
@@ -117,6 +166,25 @@ class Pipeline:
|
||||
|
||||
return ordered
|
||||
|
||||
def _validate_dependencies(self) -> None:
|
||||
"""Validate that all dependencies can be satisfied.
|
||||
|
||||
Raises StageError if any dependency cannot be resolved.
|
||||
"""
|
||||
missing: list[tuple[str, str]] = [] # (stage_name, capability)
|
||||
|
||||
for name, stage in self._stages.items():
|
||||
for dep in stage.dependencies:
|
||||
if not self._find_stage_with_capability(dep):
|
||||
missing.append((name, dep))
|
||||
|
||||
if missing:
|
||||
msgs = [f" - {stage} needs {cap}" for stage, cap in missing]
|
||||
raise StageError(
|
||||
"validation",
|
||||
"Missing capabilities:\n" + "\n".join(msgs),
|
||||
)
|
||||
|
||||
def initialize(self) -> bool:
|
||||
"""Initialize all stages in execution order."""
|
||||
for name in self._execution_order:
|
||||
|
||||
@@ -6,18 +6,25 @@ Provides a single registry for sources, effects, displays, and cameras.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import TYPE_CHECKING, Any, TypeVar
|
||||
|
||||
from engine.pipeline.core import Stage
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from engine.pipeline.core import Stage
|
||||
|
||||
T = TypeVar("T")
|
||||
|
||||
|
||||
class StageRegistry:
|
||||
"""Unified registry for all pipeline stage types."""
|
||||
|
||||
_categories: dict[str, dict[str, type[Stage]]] = {}
|
||||
_categories: dict[str, dict[str, type[Any]]] = {}
|
||||
_discovered: bool = False
|
||||
_instances: dict[str, Stage] = {}
|
||||
|
||||
@classmethod
|
||||
def register(cls, category: str, stage_class: type[Stage]) -> None:
|
||||
def register(cls, category: str, stage_class: type[Any]) -> None:
|
||||
"""Register a stage class in a category.
|
||||
|
||||
Args:
|
||||
@@ -27,12 +34,11 @@ class StageRegistry:
|
||||
if category not in cls._categories:
|
||||
cls._categories[category] = {}
|
||||
|
||||
# Use class name as key
|
||||
key = getattr(stage_class, "__name__", stage_class.__class__.__name__)
|
||||
cls._categories[category][key] = stage_class
|
||||
|
||||
@classmethod
|
||||
def get(cls, category: str, name: str) -> type[Stage] | None:
|
||||
def get(cls, category: str, name: str) -> type[Any] | None:
|
||||
"""Get a stage class by category and name."""
|
||||
return cls._categories.get(category, {}).get(name)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user