Compare commits

...

2 Commits

Author SHA1 Message Date
f27f3475c8 feat(graph): Add adapter to convert graphs to pipelines
Bridge the new graph abstraction with existing Stage-based pipeline
system for backward compatibility.

- Add GraphAdapter class to map nodes to Stage implementations
- Handle effect intensity configuration (sets global effect state)
- Map camera modes to Camera factory methods (feed, scroll, horizontal, etc.)
- Auto-inject required dependencies (render, camera_update) via pipeline capabilities
- Support for all major node types: source, camera, effect, position, display

The adapter ensures that graphs seamlessly integrate with the existing
pipeline architecture while providing a cleaner abstraction layer.
2026-03-21 19:26:32 -07:00
c790027ede feat(graph): Add core graph abstraction for pipeline configuration
Introduce Node, Connection, and Graph classes for defining pipelines
as graphs instead of verbose XYZStage naming convention.

- Add NodeType enum (SOURCE, CAMERA, EFFECT, DISPLAY, etc.)
- Add Node, Connection, and Graph dataclasses with type hints
- Add validation for cycles and disconnected nodes using DFS
- Add factory methods: node(), connect(), chain() for easy graph building
- Support for both imperative and declarative graph construction

This provides the foundation for the graph-based DSL that replaces
the verbose XYZStage naming convention with intuitive node-and-connection syntax.
2026-03-21 19:26:27 -07:00
2 changed files with 367 additions and 0 deletions

206
engine/pipeline/graph.py Normal file
View File

@@ -0,0 +1,206 @@
"""Graph-based pipeline configuration and orchestration.
This module provides a graph abstraction for defining pipelines as nodes
and connections, replacing the verbose XYZStage naming convention.
Usage:
# Declarative (TOML-like)
graph = Graph.from_dict({
"nodes": {
"source": "headlines",
"camera": {"type": "camera", "mode": "scroll"},
"display": {"type": "terminal", "positioning": "mixed"}
},
"connections": ["source -> camera -> display"]
})
# Imperative
graph = Graph()
graph.node("source", "headlines")
graph.node("camera", type="camera", mode="scroll")
graph.connect("source", "camera", "display")
"""
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Union
from enum import Enum
class NodeType(Enum):
"""Types of pipeline nodes."""
SOURCE = "source"
RENDER = "render"
CAMERA = "camera"
EFFECT = "effect"
OVERLAY = "overlay"
POSITION = "position"
DISPLAY = "display"
CUSTOM = "custom"
@dataclass
class Node:
"""A node in the pipeline graph."""
name: str
type: NodeType
config: Dict[str, Any] = field(default_factory=dict)
enabled: bool = True
optional: bool = False
def __repr__(self) -> str:
return f"Node({self.name}, type={self.type.value})"
@dataclass
class Connection:
"""A connection between two nodes."""
source: str
target: str
data_type: Optional[str] = None # Optional data type constraint
@dataclass
class Graph:
"""Pipeline graph representation."""
nodes: Dict[str, Node] = field(default_factory=dict)
connections: List[Connection] = field(default_factory=list)
def node(self, name: str, node_type: Union[NodeType, str], **config) -> "Graph":
"""Add a node to the graph."""
if isinstance(node_type, str):
# Try to parse as NodeType
try:
node_type = NodeType(node_type)
except ValueError:
node_type = NodeType.CUSTOM
self.nodes[name] = Node(name=name, type=node_type, config=config)
return self
def connect(
self, source: str, target: str, data_type: Optional[str] = None
) -> "Graph":
"""Add a connection between nodes."""
if source not in self.nodes:
raise ValueError(f"Source node '{source}' not found")
if target not in self.nodes:
raise ValueError(f"Target node '{target}' not found")
self.connections.append(Connection(source, target, data_type))
return self
def chain(self, *names: str) -> "Graph":
"""Connect nodes in a chain."""
for i in range(len(names) - 1):
self.connect(names[i], names[i + 1])
return self
def from_dict(self, data: Dict[str, Any]) -> "Graph":
"""Load graph from dictionary (TOML-compatible)."""
# Parse nodes
nodes_data = data.get("nodes", {})
for name, node_info in nodes_data.items():
if isinstance(node_info, str):
# Simple format: "source": "headlines"
self.node(name, NodeType.SOURCE, source=node_info)
elif isinstance(node_info, dict):
# Full format: {"type": "camera", "mode": "scroll"}
node_type = node_info.get("type", "custom")
config = {k: v for k, v in node_info.items() if k != "type"}
self.node(name, node_type, **config)
# Parse connections
connections_data = data.get("connections", [])
for conn in connections_data:
if isinstance(conn, str):
# Parse "source -> target" format
parts = conn.split("->")
if len(parts) == 2:
self.connect(parts[0].strip(), parts[1].strip())
elif isinstance(conn, dict):
# Parse dict format: {"source": "a", "target": "b"}
self.connect(conn["source"], conn["target"])
return self
def to_dict(self) -> Dict[str, Any]:
"""Convert graph to dictionary."""
return {
"nodes": {
name: {"type": node.type.value, **node.config}
for name, node in self.nodes.items()
},
"connections": [
{"source": conn.source, "target": conn.target}
for conn in self.connections
],
}
def validate(self) -> List[str]:
"""Validate graph structure and return list of errors."""
errors = []
# Check for disconnected nodes
connected_nodes = set()
for conn in self.connections:
connected_nodes.add(conn.source)
connected_nodes.add(conn.target)
for node_name in self.nodes:
if node_name not in connected_nodes:
errors.append(f"Node '{node_name}' is not connected")
# Check for cycles (simplified)
visited = set()
temp = set()
def has_cycle(node_name: str) -> bool:
if node_name in temp:
return True
if node_name in visited:
return False
temp.add(node_name)
for conn in self.connections:
if conn.source == node_name:
if has_cycle(conn.target):
return True
temp.remove(node_name)
visited.add(node_name)
return False
for node_name in self.nodes:
if has_cycle(node_name):
errors.append(f"Cycle detected involving node '{node_name}'")
break
return errors
def __repr__(self) -> str:
nodes_str = ", ".join(str(n) for n in self.nodes.values())
return f"Graph(nodes=[{nodes_str}])"
# Factory functions for common node types
def source(name: str, source_type: str, **config) -> Node:
"""Create a source node."""
return Node(name, NodeType.SOURCE, {"source": source_type, **config})
def camera(name: str, mode: str = "scroll", **config) -> Node:
"""Create a camera node."""
return Node(name, NodeType.CAMERA, {"mode": mode, **config})
def display(name: str, backend: str = "terminal", **config) -> Node:
"""Create a display node."""
return Node(name, NodeType.DISPLAY, {"backend": backend, **config})
def effect(name: str, effect_name: str, **config) -> Node:
"""Create an effect node."""
return Node(name, NodeType.EFFECT, {"effect": effect_name, **config})

View File

@@ -0,0 +1,161 @@
"""Adapter to convert Graph to Pipeline stages.
This module bridges the new graph-based abstraction with the existing
Stage-based pipeline system for backward compatibility.
"""
from typing import Dict, Any, List, Optional
from engine.pipeline.graph import Graph, NodeType
from engine.pipeline.controller import Pipeline, PipelineConfig
from engine.pipeline.core import PipelineContext
from engine.pipeline.params import PipelineParams
from engine.pipeline.adapters import (
CameraStage,
DataSourceStage,
DisplayStage,
EffectPluginStage,
FontStage,
MessageOverlayStage,
PositionStage,
ViewportFilterStage,
create_stage_from_display,
create_stage_from_effect,
)
from engine.pipeline.adapters.positioning import PositioningMode
from engine.display import DisplayRegistry
from engine.effects import get_registry
from engine.data_sources.sources import EmptyDataSource, HeadlinesDataSource
from engine.camera import Camera
class GraphAdapter:
"""Converts Graph to Pipeline with existing Stage classes."""
def __init__(self, graph: Graph):
self.graph = graph
self.pipeline: Optional[Pipeline] = None
self.context: Optional[PipelineContext] = None
def build_pipeline(
self, viewport_width: int = 80, viewport_height: int = 24
) -> Pipeline:
"""Build a Pipeline from the Graph."""
# Create pipeline context
self.context = PipelineContext()
self.context.terminal_width = viewport_width
self.context.terminal_height = viewport_height
# Create params
params = PipelineParams(
viewport_width=viewport_width,
viewport_height=viewport_height,
)
self.context.params = params
# Create pipeline config
config = PipelineConfig()
# Create pipeline
self.pipeline = Pipeline(config=config, context=self.context)
# Map graph nodes to pipeline stages
self._map_nodes_to_stages()
# Build pipeline
self.pipeline.build()
return self.pipeline
def _map_nodes_to_stages(self) -> None:
"""Map graph nodes to pipeline stages."""
for name, node in self.graph.nodes.items():
if not node.enabled:
continue
stage = self._create_stage_from_node(name, node)
if stage:
self.pipeline.add_stage(name, stage)
def _create_stage_from_node(self, name: str, node) -> Optional:
"""Create a pipeline stage from a graph node."""
stage = None
if node.type == NodeType.SOURCE:
source_type = node.config.get("source", "headlines")
if source_type == "headlines":
source = HeadlinesDataSource()
elif source_type == "empty":
source = EmptyDataSource(
width=self.context.terminal_width,
height=self.context.terminal_height,
)
else:
source = EmptyDataSource(
width=self.context.terminal_width,
height=self.context.terminal_height,
)
stage = DataSourceStage(source, name=name)
elif node.type == NodeType.CAMERA:
mode = node.config.get("mode", "scroll")
speed = node.config.get("speed", 1.0)
# Map mode string to Camera factory method
mode_lower = mode.lower()
if hasattr(Camera, mode_lower):
camera_factory = getattr(Camera, mode_lower)
camera = camera_factory(speed=speed)
else:
# Fallback to scroll mode
camera = Camera.scroll(speed=speed)
stage = CameraStage(camera, name=name)
elif node.type == NodeType.DISPLAY:
backend = node.config.get("backend", "terminal")
positioning = node.config.get("positioning", "mixed")
display = DisplayRegistry.create(backend)
if display:
stage = DisplayStage(display, name=name, positioning=positioning)
elif node.type == NodeType.EFFECT:
effect_name = node.config.get("effect", "")
intensity = node.config.get("intensity", 1.0)
effect = get_registry().get(effect_name)
if effect:
# Set effect intensity (modifies global effect state)
effect.config.intensity = intensity
# Effects typically depend on rendered output
dependencies = {"render.output"}
stage = EffectPluginStage(effect, name=name, dependencies=dependencies)
elif node.type == NodeType.RENDER:
stage = FontStage(name=name)
elif node.type == NodeType.OVERLAY:
stage = MessageOverlayStage(name=name)
elif node.type == NodeType.POSITION:
mode_str = node.config.get("mode", "mixed")
try:
mode = PositioningMode(mode_str)
except ValueError:
mode = PositioningMode.MIXED
stage = PositionStage(mode=mode, name=name)
return stage
def graph_to_pipeline(
graph: Graph, viewport_width: int = 80, viewport_height: int = 24
) -> Pipeline:
"""Convert a Graph to a Pipeline."""
adapter = GraphAdapter(graph)
return adapter.build_pipeline(viewport_width, viewport_height)
def dict_to_pipeline(
data: Dict[str, Any], viewport_width: int = 80, viewport_height: int = 24
) -> Pipeline:
"""Convert a dictionary to a Pipeline."""
graph = Graph().from_dict(data)
return graph_to_pipeline(graph, viewport_width, viewport_height)