feat(graph): Integrate graph system with pipeline runner

Add support for loading pipelines from TOML graph configs in the
pipeline runner, maintaining full backward compatibility with presets.

- Add graph_config parameter to run_pipeline_mode() function
- Support both preset mode and graph mode with conditional logic
- Graph mode: loads from TOML file, uses graph-defined stages
- Preset mode: maintains existing behavior with manual stage building
- Handle items/context appropriately for each mode (graph uses own data sources)
- CLI display flag works in both modes

Backward compatible: graph_config defaults to None, so existing calls
to run_pipeline_mode(preset_name) continue to work unchanged.
This commit is contained in:
2026-03-21 19:26:45 -07:00
parent 406a58d292
commit 1a7da400e3

View File

@@ -104,8 +104,13 @@ def _handle_pipeline_mutation(pipeline: Pipeline, command: dict) -> bool:
return False return False
def run_pipeline_mode(preset_name: str = "demo"): def run_pipeline_mode(preset_name: str = "demo", graph_config: str | None = None):
"""Run using the new unified pipeline architecture.""" """Run using the new unified pipeline architecture.
Args:
preset_name: Name of the preset to use
graph_config: Path to a TOML graph configuration file (optional)
"""
import engine.effects.plugins as effects_plugins import engine.effects.plugins as effects_plugins
from engine.effects import PerformanceMonitor, set_monitor from engine.effects import PerformanceMonitor, set_monitor
@@ -117,17 +122,64 @@ def run_pipeline_mode(preset_name: str = "demo"):
monitor = PerformanceMonitor() monitor = PerformanceMonitor()
set_monitor(monitor) set_monitor(monitor)
preset = get_preset(preset_name) # Check if graph config is provided
if not preset: using_graph_config = graph_config is not None
print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m")
sys.exit(1)
print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m") if using_graph_config:
from engine.pipeline.graph_toml import load_pipeline_from_toml
params = preset.to_params() print(f" \033[38;5;245mLoading graph from: {graph_config}\033[0m")
# Use preset viewport if available, else default to 80x24
params.viewport_width = getattr(preset, "viewport_width", 80) # Determine viewport size
params.viewport_height = getattr(preset, "viewport_height", 24) viewport_width = 80
viewport_height = 24
if "--viewport" in sys.argv:
idx = sys.argv.index("--viewport")
if idx + 1 < len(sys.argv):
vp = sys.argv[idx + 1]
try:
viewport_width, viewport_height = map(int, vp.split("x"))
except ValueError:
print("Error: Invalid viewport format. Use WxH (e.g., 40x15)")
sys.exit(1)
# Load pipeline from graph config
try:
pipeline = load_pipeline_from_toml(
graph_config,
viewport_width=viewport_width,
viewport_height=viewport_height,
)
except Exception as e:
print(f" \033[38;5;196mError loading graph config: {e}\033[0m")
sys.exit(1)
# Set params for display
from engine.pipeline.params import PipelineParams
params = PipelineParams(
viewport_width=viewport_width, viewport_height=viewport_height
)
# Set display name from graph or CLI
display_name = "terminal" # Default for graph mode
if "--display" in sys.argv:
idx = sys.argv.index("--display")
if idx + 1 < len(sys.argv):
display_name = sys.argv[idx + 1]
else:
# Use preset-based pipeline
preset = get_preset(preset_name)
if not preset:
print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m")
sys.exit(1)
print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m")
params = preset.to_params()
# Use preset viewport if available, else default to 80x24
params.viewport_width = getattr(preset, "viewport_width", 80)
params.viewport_height = getattr(preset, "viewport_height", 24)
if "--viewport" in sys.argv: if "--viewport" in sys.argv:
idx = sys.argv.index("--viewport") idx = sys.argv.index("--viewport")
@@ -196,22 +248,28 @@ def run_pipeline_mode(preset_name: str = "demo"):
print(f" \033[38;5;82mLoaded {len(items)} items\033[0m") print(f" \033[38;5;82mLoaded {len(items)} items\033[0m")
# CLI --display flag takes priority over preset # CLI --display flag takes priority
# Check if --display was explicitly provided # Check if --display was explicitly provided
display_name = preset.display
display_explicitly_specified = "--display" in sys.argv display_explicitly_specified = "--display" in sys.argv
if display_explicitly_specified: if not using_graph_config:
idx = sys.argv.index("--display") # Preset mode: use preset display as default
if idx + 1 < len(sys.argv): display_name = preset.display
display_name = sys.argv[idx + 1] if display_explicitly_specified:
idx = sys.argv.index("--display")
if idx + 1 < len(sys.argv):
display_name = sys.argv[idx + 1]
else:
# Warn user that display is falling back to preset default
print(
f" \033[38;5;226mWarning: No --display specified, using preset default: {display_name}\033[0m"
)
print(
" \033[38;5;245mTip: Use --display null for headless mode (useful for testing/capture)\033[0m"
)
else: else:
# Warn user that display is falling back to preset default # Graph mode: display_name already set above
print( if not display_explicitly_specified:
f" \033[38;5;226mWarning: No --display specified, using preset default: {display_name}\033[0m" print(f" \033[38;5;245mUsing default display: {display_name}\033[0m")
)
print(
" \033[38;5;245mTip: Use --display null for headless mode (useful for testing/capture)\033[0m"
)
display = DisplayRegistry.create(display_name) display = DisplayRegistry.create(display_name)
if not display and not display_name.startswith("multi"): if not display and not display_name.startswith("multi"):
@@ -245,113 +303,123 @@ def run_pipeline_mode(preset_name: str = "demo"):
effect_registry = get_registry() effect_registry = get_registry()
# Create source stage based on preset source type # Only build stages from preset if not using graph config
if preset.source == "pipeline-inspect": # (graph config already has all stages defined)
from engine.data_sources.pipeline_introspection import ( if not using_graph_config:
PipelineIntrospectionSource, # Create source stage based on preset source type
) if preset.source == "pipeline-inspect":
from engine.pipeline.adapters import DataSourceStage from engine.data_sources.pipeline_introspection import (
PipelineIntrospectionSource,
)
from engine.pipeline.adapters import DataSourceStage
introspection_source = PipelineIntrospectionSource( introspection_source = PipelineIntrospectionSource(
pipeline=None, # Will be set after pipeline.build() pipeline=None, # Will be set after pipeline.build()
viewport_width=80, viewport_width=80,
viewport_height=24, viewport_height=24,
) )
pipeline.add_stage( pipeline.add_stage(
"source", DataSourceStage(introspection_source, name="pipeline-inspect") "source", DataSourceStage(introspection_source, name="pipeline-inspect")
) )
elif preset.source == "empty": elif preset.source == "empty":
from engine.data_sources.sources import EmptyDataSource from engine.data_sources.sources import EmptyDataSource
from engine.pipeline.adapters import DataSourceStage from engine.pipeline.adapters import DataSourceStage
empty_source = EmptyDataSource(width=80, height=24) empty_source = EmptyDataSource(width=80, height=24)
pipeline.add_stage("source", DataSourceStage(empty_source, name="empty")) pipeline.add_stage("source", DataSourceStage(empty_source, name="empty"))
else: else:
from engine.data_sources.sources import ListDataSource from engine.data_sources.sources import ListDataSource
from engine.pipeline.adapters import DataSourceStage from engine.pipeline.adapters import DataSourceStage
list_source = ListDataSource(items, name=preset.source) list_source = ListDataSource(items, name=preset.source)
pipeline.add_stage("source", DataSourceStage(list_source, name=preset.source)) pipeline.add_stage(
"source", DataSourceStage(list_source, name=preset.source)
)
# Add camera state update stage if specified in preset (must run before viewport filter) # Add camera state update stage if specified in preset (must run before viewport filter)
camera = None camera = None
if preset.camera: if preset.camera:
from engine.camera import Camera from engine.camera import Camera
from engine.pipeline.adapters import CameraClockStage, CameraStage from engine.pipeline.adapters import CameraClockStage, CameraStage
speed = getattr(preset, "camera_speed", 1.0) speed = getattr(preset, "camera_speed", 1.0)
if preset.camera == "feed": if preset.camera == "feed":
camera = Camera.feed(speed=speed) camera = Camera.feed(speed=speed)
elif preset.camera == "scroll": elif preset.camera == "scroll":
camera = Camera.scroll(speed=speed) camera = Camera.scroll(speed=speed)
elif preset.camera == "vertical": elif preset.camera == "vertical":
camera = Camera.scroll(speed=speed) # Backwards compat camera = Camera.scroll(speed=speed) # Backwards compat
elif preset.camera == "horizontal": elif preset.camera == "horizontal":
camera = Camera.horizontal(speed=speed) camera = Camera.horizontal(speed=speed)
elif preset.camera == "omni": elif preset.camera == "omni":
camera = Camera.omni(speed=speed) camera = Camera.omni(speed=speed)
elif preset.camera == "floating": elif preset.camera == "floating":
camera = Camera.floating(speed=speed) camera = Camera.floating(speed=speed)
elif preset.camera == "bounce": elif preset.camera == "bounce":
camera = Camera.bounce(speed=speed) camera = Camera.bounce(speed=speed)
elif preset.camera == "radial": elif preset.camera == "radial":
camera = Camera.radial(speed=speed) camera = Camera.radial(speed=speed)
elif preset.camera == "static" or preset.camera == "": elif preset.camera == "static" or preset.camera == "":
# Static camera: no movement, but provides camera_y=0 for viewport filter # Static camera: no movement, but provides camera_y=0 for viewport filter
camera = Camera.scroll(speed=0.0) # Speed 0 = no movement camera = Camera.scroll(speed=0.0) # Speed 0 = no movement
camera.set_canvas_size(200, 200) camera.set_canvas_size(200, 200)
if camera:
# Add camera update stage to ensure camera_y is available for viewport filter
pipeline.add_stage(
"camera_update", CameraClockStage(camera, name="camera-clock")
)
# Only build stages from preset if not using graph config
if not using_graph_config:
# Add FontStage for headlines/poetry (default for demo)
if preset.source in ["headlines", "poetry"]:
from engine.pipeline.adapters import FontStage, ViewportFilterStage
# Add viewport filter to prevent rendering all items
pipeline.add_stage(
"viewport_filter", ViewportFilterStage(name="viewport-filter")
)
pipeline.add_stage("font", FontStage(name="font"))
else:
# Fallback to simple conversion for other sources
pipeline.add_stage(
"render", SourceItemsToBufferStage(name="items-to-buffer")
)
# Add camera stage if specified in preset (after font/render stage)
if camera: if camera:
# Add camera update stage to ensure camera_y is available for viewport filter pipeline.add_stage("camera", CameraStage(camera, name=preset.camera))
for effect_name in preset.effects:
effect = effect_registry.get(effect_name)
if effect:
pipeline.add_stage(
f"effect_{effect_name}",
create_stage_from_effect(effect, effect_name),
)
# Add message overlay stage if enabled
if getattr(preset, "enable_message_overlay", False):
from engine import config as engine_config
from engine.pipeline.adapters import MessageOverlayConfig
overlay_config = MessageOverlayConfig(
enabled=True,
display_secs=engine_config.MESSAGE_DISPLAY_SECS
if hasattr(engine_config, "MESSAGE_DISPLAY_SECS")
else 30,
topic_url=engine_config.NTFY_TOPIC
if hasattr(engine_config, "NTFY_TOPIC")
else None,
)
pipeline.add_stage( pipeline.add_stage(
"camera_update", CameraClockStage(camera, name="camera-clock") "message_overlay", MessageOverlayStage(config=overlay_config)
) )
# Add FontStage for headlines/poetry (default for demo) pipeline.add_stage("display", create_stage_from_display(display, display_name))
if preset.source in ["headlines", "poetry"]:
from engine.pipeline.adapters import FontStage, ViewportFilterStage
# Add viewport filter to prevent rendering all items pipeline.build()
pipeline.add_stage(
"viewport_filter", ViewportFilterStage(name="viewport-filter")
)
pipeline.add_stage("font", FontStage(name="font"))
else:
# Fallback to simple conversion for other sources
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
# Add camera stage if specified in preset (after font/render stage)
if camera:
pipeline.add_stage("camera", CameraStage(camera, name=preset.camera))
for effect_name in preset.effects:
effect = effect_registry.get(effect_name)
if effect:
pipeline.add_stage(
f"effect_{effect_name}", create_stage_from_effect(effect, effect_name)
)
# Add message overlay stage if enabled
if getattr(preset, "enable_message_overlay", False):
from engine import config as engine_config
from engine.pipeline.adapters import MessageOverlayConfig
overlay_config = MessageOverlayConfig(
enabled=True,
display_secs=engine_config.MESSAGE_DISPLAY_SECS
if hasattr(engine_config, "MESSAGE_DISPLAY_SECS")
else 30,
topic_url=engine_config.NTFY_TOPIC
if hasattr(engine_config, "NTFY_TOPIC")
else None,
)
pipeline.add_stage(
"message_overlay", MessageOverlayStage(config=overlay_config)
)
pipeline.add_stage("display", create_stage_from_display(display, display_name))
pipeline.build()
# For pipeline-inspect, set the pipeline after build to avoid circular dependency # For pipeline-inspect, set the pipeline after build to avoid circular dependency
if introspection_source is not None: if introspection_source is not None:
@@ -831,7 +899,13 @@ def run_pipeline_mode(preset_name: str = "demo"):
ctx = pipeline.context ctx = pipeline.context
ctx.params = params ctx.params = params
ctx.set("display", display) ctx.set("display", display)
ctx.set("items", items) # For graph mode, items might not be defined - use empty list if needed
if not using_graph_config:
ctx.set("items", items)
else:
# Graph-based pipelines typically use their own data sources
# But we can set an empty list for compatibility
ctx.set("items", [])
ctx.set("pipeline", pipeline) ctx.set("pipeline", pipeline)
ctx.set("pipeline_order", pipeline.execution_order) ctx.set("pipeline_order", pipeline.execution_order)
ctx.set("camera_y", 0) ctx.set("camera_y", 0)