Files
Mainline/engine/app.py
David Gwilliam b37b2ccc73 refactor: move effects_plugins to engine/effects/plugins
- Move effects_plugins/ to engine/effects/plugins/
- Update imports in engine/app.py
- Update imports in all test files
- Follows capability-based deps architecture

Closes #27
2026-03-18 03:58:48 -07:00

283 lines
9.2 KiB
Python

"""
Application orchestrator — pipeline mode entry point.
"""
import sys
import time
import engine.effects.plugins as effects_plugins
from engine import config
from engine.display import DisplayRegistry
from engine.effects import PerformanceMonitor, get_registry, set_monitor
from engine.fetch import fetch_all, fetch_poetry, load_cache
from engine.pipeline import (
Pipeline,
PipelineConfig,
get_preset,
list_presets,
)
from engine.pipeline.adapters import (
SourceItemsToBufferStage,
create_stage_from_display,
create_stage_from_effect,
)
def main():
"""Main entry point - all modes now use presets."""
if config.PIPELINE_DIAGRAM:
try:
from engine.pipeline import generate_pipeline_diagram
except ImportError:
print("Error: pipeline diagram not available")
return
print(generate_pipeline_diagram())
return
preset_name = None
if config.PRESET:
preset_name = config.PRESET
elif config.PIPELINE_MODE:
preset_name = config.PIPELINE_PRESET
else:
preset_name = "demo"
available = list_presets()
if preset_name not in available:
print(f"Error: Unknown preset '{preset_name}'")
print(f"Available presets: {', '.join(available)}")
sys.exit(1)
run_pipeline_mode(preset_name)
def run_pipeline_mode(preset_name: str = "demo"):
"""Run using the new unified pipeline architecture."""
print(" \033[1;38;5;46mPIPELINE MODE\033[0m")
print(" \033[38;5;245mUsing unified pipeline architecture\033[0m")
effects_plugins.discover_plugins()
monitor = PerformanceMonitor()
set_monitor(monitor)
preset = get_preset(preset_name)
if not preset:
print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m")
sys.exit(1)
print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m")
params = preset.to_params()
params.viewport_width = 80
params.viewport_height = 24
pipeline = Pipeline(
config=PipelineConfig(
source=preset.source,
display=preset.display,
camera=preset.camera,
effects=preset.effects,
)
)
print(" \033[38;5;245mFetching content...\033[0m")
# Handle special sources that don't need traditional fetching
introspection_source = None
if preset.source == "pipeline-inspect":
items = []
print(" \033[38;5;245mUsing pipeline introspection source\033[0m")
elif preset.source == "empty":
items = []
print(" \033[38;5;245mUsing empty source (no content)\033[0m")
else:
cached = load_cache()
if cached:
items = cached
elif preset.source == "poetry":
items, _, _ = fetch_poetry()
else:
items, _, _ = fetch_all()
if not items:
print(" \033[38;5;196mNo content available\033[0m")
sys.exit(1)
print(f" \033[38;5;82mLoaded {len(items)} items\033[0m")
# CLI --display flag takes priority over preset
# Check if --display was explicitly provided
display_name = preset.display
if "--display" in sys.argv:
idx = sys.argv.index("--display")
if idx + 1 < len(sys.argv):
display_name = sys.argv[idx + 1]
display = DisplayRegistry.create(display_name)
if not display and not display_name.startswith("multi"):
print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m")
sys.exit(1)
# Handle multi display (format: "multi:terminal,pygame")
if not display and display_name.startswith("multi"):
parts = display_name[6:].split(
","
) # "multi:terminal,pygame" -> ["terminal", "pygame"]
display = DisplayRegistry.create_multi(parts)
if not display:
print(f" \033[38;5;196mFailed to create multi display: {parts}\033[0m")
sys.exit(1)
if not display:
print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m")
sys.exit(1)
display.init(0, 0)
effect_registry = get_registry()
# Create source stage based on preset source type
if preset.source == "pipeline-inspect":
from engine.data_sources.pipeline_introspection import (
PipelineIntrospectionSource,
)
from engine.pipeline.adapters import DataSourceStage
introspection_source = PipelineIntrospectionSource(
pipeline=None, # Will be set after pipeline.build()
viewport_width=80,
viewport_height=24,
)
pipeline.add_stage(
"source", DataSourceStage(introspection_source, name="pipeline-inspect")
)
elif preset.source == "empty":
from engine.data_sources.sources import EmptyDataSource
from engine.pipeline.adapters import DataSourceStage
empty_source = EmptyDataSource(width=80, height=24)
pipeline.add_stage("source", DataSourceStage(empty_source, name="empty"))
else:
from engine.data_sources.sources import ListDataSource
from engine.pipeline.adapters import DataSourceStage
list_source = ListDataSource(items, name=preset.source)
pipeline.add_stage("source", DataSourceStage(list_source, name=preset.source))
# Add FontStage for headlines/poetry (default for demo)
if preset.source in ["headlines", "poetry"]:
from engine.pipeline.adapters import FontStage, ViewportFilterStage
# Add viewport filter to prevent rendering all items
pipeline.add_stage(
"viewport_filter", ViewportFilterStage(name="viewport-filter")
)
pipeline.add_stage("font", FontStage(name="font"))
else:
# Fallback to simple conversion for other sources
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
# Add camera stage if specified in preset
if preset.camera:
from engine.camera import Camera
from engine.pipeline.adapters import CameraStage
camera = None
speed = getattr(preset, "camera_speed", 1.0)
if preset.camera == "feed":
camera = Camera.feed(speed=speed)
elif preset.camera == "scroll":
camera = Camera.scroll(speed=speed)
elif preset.camera == "vertical":
camera = Camera.scroll(speed=speed) # Backwards compat
elif preset.camera == "horizontal":
camera = Camera.horizontal(speed=speed)
elif preset.camera == "omni":
camera = Camera.omni(speed=speed)
elif preset.camera == "floating":
camera = Camera.floating(speed=speed)
elif preset.camera == "bounce":
camera = Camera.bounce(speed=speed)
if camera:
pipeline.add_stage("camera", CameraStage(camera, name=preset.camera))
for effect_name in preset.effects:
effect = effect_registry.get(effect_name)
if effect:
pipeline.add_stage(
f"effect_{effect_name}", create_stage_from_effect(effect, effect_name)
)
pipeline.add_stage("display", create_stage_from_display(display, display_name))
pipeline.build()
# For pipeline-inspect, set the pipeline after build to avoid circular dependency
if introspection_source is not None:
introspection_source.set_pipeline(pipeline)
if not pipeline.initialize():
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
sys.exit(1)
print(" \033[38;5;82mStarting pipeline...\033[0m")
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
ctx = pipeline.context
ctx.params = params
ctx.set("display", display)
ctx.set("items", items)
ctx.set("pipeline", pipeline)
ctx.set("pipeline_order", pipeline.execution_order)
ctx.set("camera_y", 0)
current_width = 80
current_height = 24
if hasattr(display, "get_dimensions"):
current_width, current_height = display.get_dimensions()
params.viewport_width = current_width
params.viewport_height = current_height
try:
frame = 0
while True:
params.frame_number = frame
ctx.params = params
result = pipeline.execute(items)
if result.success:
display.show(result.data, border=params.border)
if hasattr(display, "is_quit_requested") and display.is_quit_requested():
if hasattr(display, "clear_quit_request"):
display.clear_quit_request()
raise KeyboardInterrupt()
if hasattr(display, "get_dimensions"):
new_w, new_h = display.get_dimensions()
if new_w != current_width or new_h != current_height:
current_width, current_height = new_w, new_h
params.viewport_width = current_width
params.viewport_height = current_height
time.sleep(1 / 60)
frame += 1
except KeyboardInterrupt:
pipeline.cleanup()
display.cleanup()
print("\n \033[38;5;245mPipeline stopped\033[0m")
return
pipeline.cleanup()
display.cleanup()
print("\n \033[38;5;245mPipeline stopped\033[0m")
if __name__ == "__main__":
main()