1 Commits

Author SHA1 Message Date
2d28e92594 feature/capability-based-deps (#53)
Reviewed-on: #53
Co-authored-by: David Gwilliam <dhgwilliam@gmail.com>
Co-committed-by: David Gwilliam <dhgwilliam@gmail.com>
2026-03-31 01:55:21 +00:00
2 changed files with 0 additions and 367 deletions

View File

@@ -1,206 +0,0 @@
"""Graph-based pipeline configuration and orchestration.
This module provides a graph abstraction for defining pipelines as nodes
and connections, replacing the verbose XYZStage naming convention.
Usage:
# Declarative (TOML-like)
graph = Graph.from_dict({
"nodes": {
"source": "headlines",
"camera": {"type": "camera", "mode": "scroll"},
"display": {"type": "terminal", "positioning": "mixed"}
},
"connections": ["source -> camera -> display"]
})
# Imperative
graph = Graph()
graph.node("source", "headlines")
graph.node("camera", type="camera", mode="scroll")
graph.connect("source", "camera", "display")
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from enum import Enum
class NodeType(Enum):
"""Types of pipeline nodes."""
SOURCE = "source"
RENDER = "render"
CAMERA = "camera"
EFFECT = "effect"
OVERLAY = "overlay"
POSITION = "position"
DISPLAY = "display"
CUSTOM = "custom"
@dataclass
class Node:
"""A node in the pipeline graph."""
name: str
type: NodeType
config: Dict[str, Any] = field(default_factory=dict)
enabled: bool = True
optional: bool = False
def __repr__(self) -> str:
return f"Node({self.name}, type={self.type.value})"
@dataclass
class Connection:
"""A connection between two nodes."""
source: str
target: str
data_type: Optional[str] = None # Optional data type constraint
@dataclass
class Graph:
"""Pipeline graph representation."""
nodes: Dict[str, Node] = field(default_factory=dict)
connections: List[Connection] = field(default_factory=list)
def node(self, name: str, node_type: Union[NodeType, str], **config) -> "Graph":
"""Add a node to the graph."""
if isinstance(node_type, str):
# Try to parse as NodeType
try:
node_type = NodeType(node_type)
except ValueError:
node_type = NodeType.CUSTOM
self.nodes[name] = Node(name=name, type=node_type, config=config)
return self
def connect(
self, source: str, target: str, data_type: Optional[str] = None
) -> "Graph":
"""Add a connection between nodes."""
if source not in self.nodes:
raise ValueError(f"Source node '{source}' not found")
if target not in self.nodes:
raise ValueError(f"Target node '{target}' not found")
self.connections.append(Connection(source, target, data_type))
return self
def chain(self, *names: str) -> "Graph":
"""Connect nodes in a chain."""
for i in range(len(names) - 1):
self.connect(names[i], names[i + 1])
return self
def from_dict(self, data: Dict[str, Any]) -> "Graph":
"""Load graph from dictionary (TOML-compatible)."""
# Parse nodes
nodes_data = data.get("nodes", {})
for name, node_info in nodes_data.items():
if isinstance(node_info, str):
# Simple format: "source": "headlines"
self.node(name, NodeType.SOURCE, source=node_info)
elif isinstance(node_info, dict):
# Full format: {"type": "camera", "mode": "scroll"}
node_type = node_info.get("type", "custom")
config = {k: v for k, v in node_info.items() if k != "type"}
self.node(name, node_type, **config)
# Parse connections
connections_data = data.get("connections", [])
for conn in connections_data:
if isinstance(conn, str):
# Parse "source -> target" format
parts = conn.split("->")
if len(parts) == 2:
self.connect(parts[0].strip(), parts[1].strip())
elif isinstance(conn, dict):
# Parse dict format: {"source": "a", "target": "b"}
self.connect(conn["source"], conn["target"])
return self
def to_dict(self) -> Dict[str, Any]:
"""Convert graph to dictionary."""
return {
"nodes": {
name: {"type": node.type.value, **node.config}
for name, node in self.nodes.items()
},
"connections": [
{"source": conn.source, "target": conn.target}
for conn in self.connections
],
}
def validate(self) -> List[str]:
"""Validate graph structure and return list of errors."""
errors = []
# Check for disconnected nodes
connected_nodes = set()
for conn in self.connections:
connected_nodes.add(conn.source)
connected_nodes.add(conn.target)
for node_name in self.nodes:
if node_name not in connected_nodes:
errors.append(f"Node '{node_name}' is not connected")
# Check for cycles (simplified)
visited = set()
temp = set()
def has_cycle(node_name: str) -> bool:
if node_name in temp:
return True
if node_name in visited:
return False
temp.add(node_name)
for conn in self.connections:
if conn.source == node_name:
if has_cycle(conn.target):
return True
temp.remove(node_name)
visited.add(node_name)
return False
for node_name in self.nodes:
if has_cycle(node_name):
errors.append(f"Cycle detected involving node '{node_name}'")
break
return errors
def __repr__(self) -> str:
nodes_str = ", ".join(str(n) for n in self.nodes.values())
return f"Graph(nodes=[{nodes_str}])"
# Factory functions for common node types
def source(name: str, source_type: str, **config) -> Node:
"""Create a source node."""
return Node(name, NodeType.SOURCE, {"source": source_type, **config})
def camera(name: str, mode: str = "scroll", **config) -> Node:
"""Create a camera node."""
return Node(name, NodeType.CAMERA, {"mode": mode, **config})
def display(name: str, backend: str = "terminal", **config) -> Node:
"""Create a display node."""
return Node(name, NodeType.DISPLAY, {"backend": backend, **config})
def effect(name: str, effect_name: str, **config) -> Node:
"""Create an effect node."""
return Node(name, NodeType.EFFECT, {"effect": effect_name, **config})

View File

@@ -1,161 +0,0 @@
"""Adapter to convert Graph to Pipeline stages.
This module bridges the new graph-based abstraction with the existing
Stage-based pipeline system for backward compatibility.
"""
from typing import Dict, Any, List, Optional
from engine.pipeline.graph import Graph, NodeType
from engine.pipeline.controller import Pipeline, PipelineConfig
from engine.pipeline.core import PipelineContext
from engine.pipeline.params import PipelineParams
from engine.pipeline.adapters import (
CameraStage,
DataSourceStage,
DisplayStage,
EffectPluginStage,
FontStage,
MessageOverlayStage,
PositionStage,
ViewportFilterStage,
create_stage_from_display,
create_stage_from_effect,
)
from engine.pipeline.adapters.positioning import PositioningMode
from engine.display import DisplayRegistry
from engine.effects import get_registry
from engine.data_sources.sources import EmptyDataSource, HeadlinesDataSource
from engine.camera import Camera
class GraphAdapter:
"""Converts Graph to Pipeline with existing Stage classes."""
def __init__(self, graph: Graph):
self.graph = graph
self.pipeline: Optional[Pipeline] = None
self.context: Optional[PipelineContext] = None
def build_pipeline(
self, viewport_width: int = 80, viewport_height: int = 24
) -> Pipeline:
"""Build a Pipeline from the Graph."""
# Create pipeline context
self.context = PipelineContext()
self.context.terminal_width = viewport_width
self.context.terminal_height = viewport_height
# Create params
params = PipelineParams(
viewport_width=viewport_width,
viewport_height=viewport_height,
)
self.context.params = params
# Create pipeline config
config = PipelineConfig()
# Create pipeline
self.pipeline = Pipeline(config=config, context=self.context)
# Map graph nodes to pipeline stages
self._map_nodes_to_stages()
# Build pipeline
self.pipeline.build()
return self.pipeline
def _map_nodes_to_stages(self) -> None:
"""Map graph nodes to pipeline stages."""
for name, node in self.graph.nodes.items():
if not node.enabled:
continue
stage = self._create_stage_from_node(name, node)
if stage:
self.pipeline.add_stage(name, stage)
def _create_stage_from_node(self, name: str, node) -> Optional:
"""Create a pipeline stage from a graph node."""
stage = None
if node.type == NodeType.SOURCE:
source_type = node.config.get("source", "headlines")
if source_type == "headlines":
source = HeadlinesDataSource()
elif source_type == "empty":
source = EmptyDataSource(
width=self.context.terminal_width,
height=self.context.terminal_height,
)
else:
source = EmptyDataSource(
width=self.context.terminal_width,
height=self.context.terminal_height,
)
stage = DataSourceStage(source, name=name)
elif node.type == NodeType.CAMERA:
mode = node.config.get("mode", "scroll")
speed = node.config.get("speed", 1.0)
# Map mode string to Camera factory method
mode_lower = mode.lower()
if hasattr(Camera, mode_lower):
camera_factory = getattr(Camera, mode_lower)
camera = camera_factory(speed=speed)
else:
# Fallback to scroll mode
camera = Camera.scroll(speed=speed)
stage = CameraStage(camera, name=name)
elif node.type == NodeType.DISPLAY:
backend = node.config.get("backend", "terminal")
positioning = node.config.get("positioning", "mixed")
display = DisplayRegistry.create(backend)
if display:
stage = DisplayStage(display, name=name, positioning=positioning)
elif node.type == NodeType.EFFECT:
effect_name = node.config.get("effect", "")
intensity = node.config.get("intensity", 1.0)
effect = get_registry().get(effect_name)
if effect:
# Set effect intensity (modifies global effect state)
effect.config.intensity = intensity
# Effects typically depend on rendered output
dependencies = {"render.output"}
stage = EffectPluginStage(effect, name=name, dependencies=dependencies)
elif node.type == NodeType.RENDER:
stage = FontStage(name=name)
elif node.type == NodeType.OVERLAY:
stage = MessageOverlayStage(name=name)
elif node.type == NodeType.POSITION:
mode_str = node.config.get("mode", "mixed")
try:
mode = PositioningMode(mode_str)
except ValueError:
mode = PositioningMode.MIXED
stage = PositionStage(mode=mode, name=name)
return stage
def graph_to_pipeline(
graph: Graph, viewport_width: int = 80, viewport_height: int = 24
) -> Pipeline:
"""Convert a Graph to a Pipeline."""
adapter = GraphAdapter(graph)
return adapter.build_pipeline(viewport_width, viewport_height)
def dict_to_pipeline(
data: Dict[str, Any], viewport_width: int = 80, viewport_height: int = 24
) -> Pipeline:
"""Convert a dictionary to a Pipeline."""
graph = Graph().from_dict(data)
return graph_to_pipeline(graph, viewport_width, viewport_height)