Move internal imports in run_pipeline_mode() to module level to support proper mocking in integration tests. This enables more effective testing of the app's initialization and pipeline setup. Also simplifies the test suite to focus on key integration points. Changes: - Moved effects_plugins, DisplayRegistry, PerformanceMonitor, fetch functions, and pipeline adapters to module-level imports - Removed duplicate imports from run_pipeline_mode() - Simplified test_app.py to focus on core functionality All manual tests still pass (border-test preset works correctly).
244 lines
7.4 KiB
Python
244 lines
7.4 KiB
Python
"""
|
|
Application orchestrator — pipeline mode entry point.
|
|
"""
|
|
|
|
import sys
|
|
import time
|
|
|
|
import effects_plugins
|
|
from engine import config
|
|
from engine.display import DisplayRegistry
|
|
from engine.effects import PerformanceMonitor, get_registry, set_monitor
|
|
from engine.fetch import fetch_all, fetch_poetry, load_cache
|
|
from engine.pipeline import (
|
|
Pipeline,
|
|
PipelineConfig,
|
|
get_preset,
|
|
list_presets,
|
|
)
|
|
from engine.pipeline.adapters import (
|
|
RenderStage,
|
|
SourceItemsToBufferStage,
|
|
create_items_stage,
|
|
create_stage_from_display,
|
|
create_stage_from_effect,
|
|
)
|
|
|
|
|
|
def main():
|
|
"""Main entry point - all modes now use presets."""
|
|
if config.PIPELINE_DIAGRAM:
|
|
try:
|
|
from engine.pipeline import generate_pipeline_diagram
|
|
except ImportError:
|
|
print("Error: pipeline diagram not available")
|
|
return
|
|
print(generate_pipeline_diagram())
|
|
return
|
|
|
|
preset_name = None
|
|
|
|
if config.PRESET:
|
|
preset_name = config.PRESET
|
|
elif config.PIPELINE_MODE:
|
|
preset_name = config.PIPELINE_PRESET
|
|
else:
|
|
preset_name = "demo"
|
|
|
|
available = list_presets()
|
|
if preset_name not in available:
|
|
print(f"Error: Unknown preset '{preset_name}'")
|
|
print(f"Available presets: {', '.join(available)}")
|
|
sys.exit(1)
|
|
|
|
run_pipeline_mode(preset_name)
|
|
|
|
|
|
def run_pipeline_mode(preset_name: str = "demo"):
|
|
"""Run using the new unified pipeline architecture."""
|
|
print(" \033[1;38;5;46mPIPELINE MODE\033[0m")
|
|
print(" \033[38;5;245mUsing unified pipeline architecture\033[0m")
|
|
|
|
effects_plugins.discover_plugins()
|
|
|
|
monitor = PerformanceMonitor()
|
|
set_monitor(monitor)
|
|
|
|
preset = get_preset(preset_name)
|
|
if not preset:
|
|
print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m")
|
|
sys.exit(1)
|
|
|
|
print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m")
|
|
|
|
params = preset.to_params()
|
|
params.viewport_width = 80
|
|
params.viewport_height = 24
|
|
|
|
pipeline = Pipeline(
|
|
config=PipelineConfig(
|
|
source=preset.source,
|
|
display=preset.display,
|
|
camera=preset.camera,
|
|
effects=preset.effects,
|
|
)
|
|
)
|
|
|
|
print(" \033[38;5;245mFetching content...\033[0m")
|
|
|
|
# Handle special sources that don't need traditional fetching
|
|
introspection_source = None
|
|
if preset.source == "pipeline-inspect":
|
|
items = []
|
|
print(" \033[38;5;245mUsing pipeline introspection source\033[0m")
|
|
elif preset.source == "empty":
|
|
items = []
|
|
print(" \033[38;5;245mUsing empty source (no content)\033[0m")
|
|
else:
|
|
cached = load_cache()
|
|
if cached:
|
|
items = cached
|
|
elif preset.source == "poetry":
|
|
items, _, _ = fetch_poetry()
|
|
else:
|
|
items, _, _ = fetch_all()
|
|
|
|
if not items:
|
|
print(" \033[38;5;196mNo content available\033[0m")
|
|
sys.exit(1)
|
|
|
|
print(f" \033[38;5;82mLoaded {len(items)} items\033[0m")
|
|
|
|
# CLI --display flag takes priority over preset
|
|
# Check if --display was explicitly provided
|
|
display_name = preset.display
|
|
if "--display" in sys.argv:
|
|
idx = sys.argv.index("--display")
|
|
if idx + 1 < len(sys.argv):
|
|
display_name = sys.argv[idx + 1]
|
|
|
|
display = DisplayRegistry.create(display_name)
|
|
if not display:
|
|
print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m")
|
|
sys.exit(1)
|
|
|
|
display.init(80, 24)
|
|
|
|
effect_registry = get_registry()
|
|
|
|
# Create source stage based on preset source type
|
|
if preset.source == "pipeline-inspect":
|
|
from engine.data_sources.pipeline_introspection import (
|
|
PipelineIntrospectionSource,
|
|
)
|
|
from engine.pipeline.adapters import DataSourceStage
|
|
|
|
introspection_source = PipelineIntrospectionSource(
|
|
pipeline=None, # Will be set after pipeline.build()
|
|
viewport_width=80,
|
|
viewport_height=24,
|
|
)
|
|
pipeline.add_stage(
|
|
"source", DataSourceStage(introspection_source, name="pipeline-inspect")
|
|
)
|
|
elif preset.source == "empty":
|
|
from engine.data_sources.sources import EmptyDataSource
|
|
from engine.pipeline.adapters import DataSourceStage
|
|
|
|
empty_source = EmptyDataSource(width=80, height=24)
|
|
pipeline.add_stage("source", DataSourceStage(empty_source, name="empty"))
|
|
else:
|
|
pipeline.add_stage("source", create_items_stage(items, preset.source))
|
|
|
|
# Add appropriate render stage
|
|
if preset.source in ("pipeline-inspect", "empty"):
|
|
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
|
|
else:
|
|
pipeline.add_stage(
|
|
"render",
|
|
RenderStage(
|
|
items,
|
|
width=80,
|
|
height=24,
|
|
camera_speed=params.camera_speed,
|
|
camera_mode=preset.camera,
|
|
firehose_enabled=params.firehose_enabled,
|
|
),
|
|
)
|
|
|
|
for effect_name in preset.effects:
|
|
effect = effect_registry.get(effect_name)
|
|
if effect:
|
|
pipeline.add_stage(
|
|
f"effect_{effect_name}", create_stage_from_effect(effect, effect_name)
|
|
)
|
|
|
|
pipeline.add_stage("display", create_stage_from_display(display, display_name))
|
|
|
|
pipeline.build()
|
|
|
|
# For pipeline-inspect, set the pipeline after build to avoid circular dependency
|
|
if introspection_source is not None:
|
|
introspection_source.set_pipeline(pipeline)
|
|
|
|
if not pipeline.initialize():
|
|
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
|
|
sys.exit(1)
|
|
|
|
print(" \033[38;5;82mStarting pipeline...\033[0m")
|
|
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
|
|
|
|
ctx = pipeline.context
|
|
ctx.params = params
|
|
ctx.set("display", display)
|
|
ctx.set("items", items)
|
|
ctx.set("pipeline", pipeline)
|
|
ctx.set("pipeline_order", pipeline.execution_order)
|
|
|
|
current_width = 80
|
|
current_height = 24
|
|
|
|
if hasattr(display, "get_dimensions"):
|
|
current_width, current_height = display.get_dimensions()
|
|
params.viewport_width = current_width
|
|
params.viewport_height = current_height
|
|
|
|
try:
|
|
frame = 0
|
|
while True:
|
|
params.frame_number = frame
|
|
ctx.params = params
|
|
|
|
result = pipeline.execute(items)
|
|
if result.success:
|
|
display.show(result.data, border=params.border)
|
|
|
|
if hasattr(display, "is_quit_requested") and display.is_quit_requested():
|
|
if hasattr(display, "clear_quit_request"):
|
|
display.clear_quit_request()
|
|
raise KeyboardInterrupt()
|
|
|
|
if hasattr(display, "get_dimensions"):
|
|
new_w, new_h = display.get_dimensions()
|
|
if new_w != current_width or new_h != current_height:
|
|
current_width, current_height = new_w, new_h
|
|
params.viewport_width = current_width
|
|
params.viewport_height = current_height
|
|
|
|
time.sleep(1 / 60)
|
|
frame += 1
|
|
|
|
except KeyboardInterrupt:
|
|
pipeline.cleanup()
|
|
display.cleanup()
|
|
print("\n \033[38;5;245mPipeline stopped\033[0m")
|
|
return
|
|
|
|
pipeline.cleanup()
|
|
display.cleanup()
|
|
print("\n \033[38;5;245mPipeline stopped\033[0m")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|