forked from genewildish/Mainline
Compare commits
2 Commits
feature/ca
...
f27f3475c8
| Author | SHA1 | Date | |
|---|---|---|---|
| f27f3475c8 | |||
| c790027ede |
206
engine/pipeline/graph.py
Normal file
206
engine/pipeline/graph.py
Normal file
@@ -0,0 +1,206 @@
|
||||
"""Graph-based pipeline configuration and orchestration.
|
||||
|
||||
This module provides a graph abstraction for defining pipelines as nodes
|
||||
and connections, replacing the verbose XYZStage naming convention.
|
||||
|
||||
Usage:
|
||||
# Declarative (TOML-like)
|
||||
graph = Graph.from_dict({
|
||||
"nodes": {
|
||||
"source": "headlines",
|
||||
"camera": {"type": "camera", "mode": "scroll"},
|
||||
"display": {"type": "terminal", "positioning": "mixed"}
|
||||
},
|
||||
"connections": ["source -> camera -> display"]
|
||||
})
|
||||
|
||||
# Imperative
|
||||
graph = Graph()
|
||||
graph.node("source", "headlines")
|
||||
graph.node("camera", type="camera", mode="scroll")
|
||||
graph.connect("source", "camera", "display")
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class NodeType(Enum):
|
||||
"""Types of pipeline nodes."""
|
||||
|
||||
SOURCE = "source"
|
||||
RENDER = "render"
|
||||
CAMERA = "camera"
|
||||
EFFECT = "effect"
|
||||
OVERLAY = "overlay"
|
||||
POSITION = "position"
|
||||
DISPLAY = "display"
|
||||
CUSTOM = "custom"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Node:
|
||||
"""A node in the pipeline graph."""
|
||||
|
||||
name: str
|
||||
type: NodeType
|
||||
config: Dict[str, Any] = field(default_factory=dict)
|
||||
enabled: bool = True
|
||||
optional: bool = False
|
||||
|
||||
def __repr__(self) -> str:
|
||||
return f"Node({self.name}, type={self.type.value})"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Connection:
|
||||
"""A connection between two nodes."""
|
||||
|
||||
source: str
|
||||
target: str
|
||||
data_type: Optional[str] = None # Optional data type constraint
|
||||
|
||||
|
||||
@dataclass
|
||||
class Graph:
|
||||
"""Pipeline graph representation."""
|
||||
|
||||
nodes: Dict[str, Node] = field(default_factory=dict)
|
||||
connections: List[Connection] = field(default_factory=list)
|
||||
|
||||
def node(self, name: str, node_type: Union[NodeType, str], **config) -> "Graph":
|
||||
"""Add a node to the graph."""
|
||||
if isinstance(node_type, str):
|
||||
# Try to parse as NodeType
|
||||
try:
|
||||
node_type = NodeType(node_type)
|
||||
except ValueError:
|
||||
node_type = NodeType.CUSTOM
|
||||
|
||||
self.nodes[name] = Node(name=name, type=node_type, config=config)
|
||||
return self
|
||||
|
||||
def connect(
|
||||
self, source: str, target: str, data_type: Optional[str] = None
|
||||
) -> "Graph":
|
||||
"""Add a connection between nodes."""
|
||||
if source not in self.nodes:
|
||||
raise ValueError(f"Source node '{source}' not found")
|
||||
if target not in self.nodes:
|
||||
raise ValueError(f"Target node '{target}' not found")
|
||||
|
||||
self.connections.append(Connection(source, target, data_type))
|
||||
return self
|
||||
|
||||
def chain(self, *names: str) -> "Graph":
|
||||
"""Connect nodes in a chain."""
|
||||
for i in range(len(names) - 1):
|
||||
self.connect(names[i], names[i + 1])
|
||||
return self
|
||||
|
||||
def from_dict(self, data: Dict[str, Any]) -> "Graph":
|
||||
"""Load graph from dictionary (TOML-compatible)."""
|
||||
# Parse nodes
|
||||
nodes_data = data.get("nodes", {})
|
||||
for name, node_info in nodes_data.items():
|
||||
if isinstance(node_info, str):
|
||||
# Simple format: "source": "headlines"
|
||||
self.node(name, NodeType.SOURCE, source=node_info)
|
||||
elif isinstance(node_info, dict):
|
||||
# Full format: {"type": "camera", "mode": "scroll"}
|
||||
node_type = node_info.get("type", "custom")
|
||||
config = {k: v for k, v in node_info.items() if k != "type"}
|
||||
self.node(name, node_type, **config)
|
||||
|
||||
# Parse connections
|
||||
connections_data = data.get("connections", [])
|
||||
for conn in connections_data:
|
||||
if isinstance(conn, str):
|
||||
# Parse "source -> target" format
|
||||
parts = conn.split("->")
|
||||
if len(parts) == 2:
|
||||
self.connect(parts[0].strip(), parts[1].strip())
|
||||
elif isinstance(conn, dict):
|
||||
# Parse dict format: {"source": "a", "target": "b"}
|
||||
self.connect(conn["source"], conn["target"])
|
||||
|
||||
return self
|
||||
|
||||
def to_dict(self) -> Dict[str, Any]:
|
||||
"""Convert graph to dictionary."""
|
||||
return {
|
||||
"nodes": {
|
||||
name: {"type": node.type.value, **node.config}
|
||||
for name, node in self.nodes.items()
|
||||
},
|
||||
"connections": [
|
||||
{"source": conn.source, "target": conn.target}
|
||||
for conn in self.connections
|
||||
],
|
||||
}
|
||||
|
||||
def validate(self) -> List[str]:
|
||||
"""Validate graph structure and return list of errors."""
|
||||
errors = []
|
||||
|
||||
# Check for disconnected nodes
|
||||
connected_nodes = set()
|
||||
for conn in self.connections:
|
||||
connected_nodes.add(conn.source)
|
||||
connected_nodes.add(conn.target)
|
||||
|
||||
for node_name in self.nodes:
|
||||
if node_name not in connected_nodes:
|
||||
errors.append(f"Node '{node_name}' is not connected")
|
||||
|
||||
# Check for cycles (simplified)
|
||||
visited = set()
|
||||
temp = set()
|
||||
|
||||
def has_cycle(node_name: str) -> bool:
|
||||
if node_name in temp:
|
||||
return True
|
||||
if node_name in visited:
|
||||
return False
|
||||
|
||||
temp.add(node_name)
|
||||
for conn in self.connections:
|
||||
if conn.source == node_name:
|
||||
if has_cycle(conn.target):
|
||||
return True
|
||||
temp.remove(node_name)
|
||||
visited.add(node_name)
|
||||
return False
|
||||
|
||||
for node_name in self.nodes:
|
||||
if has_cycle(node_name):
|
||||
errors.append(f"Cycle detected involving node '{node_name}'")
|
||||
break
|
||||
|
||||
return errors
|
||||
|
||||
def __repr__(self) -> str:
|
||||
nodes_str = ", ".join(str(n) for n in self.nodes.values())
|
||||
return f"Graph(nodes=[{nodes_str}])"
|
||||
|
||||
|
||||
# Factory functions for common node types
|
||||
def source(name: str, source_type: str, **config) -> Node:
|
||||
"""Create a source node."""
|
||||
return Node(name, NodeType.SOURCE, {"source": source_type, **config})
|
||||
|
||||
|
||||
def camera(name: str, mode: str = "scroll", **config) -> Node:
|
||||
"""Create a camera node."""
|
||||
return Node(name, NodeType.CAMERA, {"mode": mode, **config})
|
||||
|
||||
|
||||
def display(name: str, backend: str = "terminal", **config) -> Node:
|
||||
"""Create a display node."""
|
||||
return Node(name, NodeType.DISPLAY, {"backend": backend, **config})
|
||||
|
||||
|
||||
def effect(name: str, effect_name: str, **config) -> Node:
|
||||
"""Create an effect node."""
|
||||
return Node(name, NodeType.EFFECT, {"effect": effect_name, **config})
|
||||
161
engine/pipeline/graph_adapter.py
Normal file
161
engine/pipeline/graph_adapter.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""Adapter to convert Graph to Pipeline stages.
|
||||
|
||||
This module bridges the new graph-based abstraction with the existing
|
||||
Stage-based pipeline system for backward compatibility.
|
||||
"""
|
||||
|
||||
from typing import Dict, Any, List, Optional
|
||||
|
||||
from engine.pipeline.graph import Graph, NodeType
|
||||
from engine.pipeline.controller import Pipeline, PipelineConfig
|
||||
from engine.pipeline.core import PipelineContext
|
||||
from engine.pipeline.params import PipelineParams
|
||||
from engine.pipeline.adapters import (
|
||||
CameraStage,
|
||||
DataSourceStage,
|
||||
DisplayStage,
|
||||
EffectPluginStage,
|
||||
FontStage,
|
||||
MessageOverlayStage,
|
||||
PositionStage,
|
||||
ViewportFilterStage,
|
||||
create_stage_from_display,
|
||||
create_stage_from_effect,
|
||||
)
|
||||
from engine.pipeline.adapters.positioning import PositioningMode
|
||||
from engine.display import DisplayRegistry
|
||||
from engine.effects import get_registry
|
||||
from engine.data_sources.sources import EmptyDataSource, HeadlinesDataSource
|
||||
from engine.camera import Camera
|
||||
|
||||
|
||||
class GraphAdapter:
|
||||
"""Converts Graph to Pipeline with existing Stage classes."""
|
||||
|
||||
def __init__(self, graph: Graph):
|
||||
self.graph = graph
|
||||
self.pipeline: Optional[Pipeline] = None
|
||||
self.context: Optional[PipelineContext] = None
|
||||
|
||||
def build_pipeline(
|
||||
self, viewport_width: int = 80, viewport_height: int = 24
|
||||
) -> Pipeline:
|
||||
"""Build a Pipeline from the Graph."""
|
||||
# Create pipeline context
|
||||
self.context = PipelineContext()
|
||||
self.context.terminal_width = viewport_width
|
||||
self.context.terminal_height = viewport_height
|
||||
|
||||
# Create params
|
||||
params = PipelineParams(
|
||||
viewport_width=viewport_width,
|
||||
viewport_height=viewport_height,
|
||||
)
|
||||
self.context.params = params
|
||||
|
||||
# Create pipeline config
|
||||
config = PipelineConfig()
|
||||
|
||||
# Create pipeline
|
||||
self.pipeline = Pipeline(config=config, context=self.context)
|
||||
|
||||
# Map graph nodes to pipeline stages
|
||||
self._map_nodes_to_stages()
|
||||
|
||||
# Build pipeline
|
||||
self.pipeline.build()
|
||||
|
||||
return self.pipeline
|
||||
|
||||
def _map_nodes_to_stages(self) -> None:
|
||||
"""Map graph nodes to pipeline stages."""
|
||||
for name, node in self.graph.nodes.items():
|
||||
if not node.enabled:
|
||||
continue
|
||||
|
||||
stage = self._create_stage_from_node(name, node)
|
||||
if stage:
|
||||
self.pipeline.add_stage(name, stage)
|
||||
|
||||
def _create_stage_from_node(self, name: str, node) -> Optional:
|
||||
"""Create a pipeline stage from a graph node."""
|
||||
stage = None
|
||||
|
||||
if node.type == NodeType.SOURCE:
|
||||
source_type = node.config.get("source", "headlines")
|
||||
if source_type == "headlines":
|
||||
source = HeadlinesDataSource()
|
||||
elif source_type == "empty":
|
||||
source = EmptyDataSource(
|
||||
width=self.context.terminal_width,
|
||||
height=self.context.terminal_height,
|
||||
)
|
||||
else:
|
||||
source = EmptyDataSource(
|
||||
width=self.context.terminal_width,
|
||||
height=self.context.terminal_height,
|
||||
)
|
||||
stage = DataSourceStage(source, name=name)
|
||||
|
||||
elif node.type == NodeType.CAMERA:
|
||||
mode = node.config.get("mode", "scroll")
|
||||
speed = node.config.get("speed", 1.0)
|
||||
# Map mode string to Camera factory method
|
||||
mode_lower = mode.lower()
|
||||
if hasattr(Camera, mode_lower):
|
||||
camera_factory = getattr(Camera, mode_lower)
|
||||
camera = camera_factory(speed=speed)
|
||||
else:
|
||||
# Fallback to scroll mode
|
||||
camera = Camera.scroll(speed=speed)
|
||||
stage = CameraStage(camera, name=name)
|
||||
|
||||
elif node.type == NodeType.DISPLAY:
|
||||
backend = node.config.get("backend", "terminal")
|
||||
positioning = node.config.get("positioning", "mixed")
|
||||
display = DisplayRegistry.create(backend)
|
||||
if display:
|
||||
stage = DisplayStage(display, name=name, positioning=positioning)
|
||||
|
||||
elif node.type == NodeType.EFFECT:
|
||||
effect_name = node.config.get("effect", "")
|
||||
intensity = node.config.get("intensity", 1.0)
|
||||
effect = get_registry().get(effect_name)
|
||||
if effect:
|
||||
# Set effect intensity (modifies global effect state)
|
||||
effect.config.intensity = intensity
|
||||
# Effects typically depend on rendered output
|
||||
dependencies = {"render.output"}
|
||||
stage = EffectPluginStage(effect, name=name, dependencies=dependencies)
|
||||
|
||||
elif node.type == NodeType.RENDER:
|
||||
stage = FontStage(name=name)
|
||||
|
||||
elif node.type == NodeType.OVERLAY:
|
||||
stage = MessageOverlayStage(name=name)
|
||||
|
||||
elif node.type == NodeType.POSITION:
|
||||
mode_str = node.config.get("mode", "mixed")
|
||||
try:
|
||||
mode = PositioningMode(mode_str)
|
||||
except ValueError:
|
||||
mode = PositioningMode.MIXED
|
||||
stage = PositionStage(mode=mode, name=name)
|
||||
|
||||
return stage
|
||||
|
||||
|
||||
def graph_to_pipeline(
|
||||
graph: Graph, viewport_width: int = 80, viewport_height: int = 24
|
||||
) -> Pipeline:
|
||||
"""Convert a Graph to a Pipeline."""
|
||||
adapter = GraphAdapter(graph)
|
||||
return adapter.build_pipeline(viewport_width, viewport_height)
|
||||
|
||||
|
||||
def dict_to_pipeline(
|
||||
data: Dict[str, Any], viewport_width: int = 80, viewport_height: int = 24
|
||||
) -> Pipeline:
|
||||
"""Convert a dictionary to a Pipeline."""
|
||||
graph = Graph().from_dict(data)
|
||||
return graph_to_pipeline(graph, viewport_width, viewport_height)
|
||||
Reference in New Issue
Block a user