feat(app): add direct CLI mode, validation framework, fixtures, and UI panel integration
- Add run_pipeline_mode_direct() for constructing pipelines from CLI flags - Add engine/pipeline/validation.py with validate_pipeline_config() and MVP rules - Add fixtures system: engine/fixtures/headlines.json for cached test data - Enhance fetch.py to use fixtures cache path - Support fixture source in run_pipeline_mode() - Add --pipeline-* CLI flags: source, effects, camera, display, UI, border - Integrate UIPanel: raw mode, preset picker, event callbacks, param adjustment - Add UI_PRESET support in app and hot-rebuild pipeline on preset change - Add test UIPanel rendering and interaction tests This provides a flexible pipeline construction interface with validation and interactive control. Fixes #29, #30, #31
This commit is contained in:
9
TODO.md
Normal file
9
TODO.md
Normal file
@@ -0,0 +1,9 @@
|
||||
# Tasks
|
||||
|
||||
- [ ] Add entropy/chaos score metadata to effects for auto-categorization and intensity control
|
||||
- [ ] Finish ModernGL display backend: integrate window system, implement glyph caching, add event handling, and support border modes.
|
||||
- [x] Integrate UIPanel with pipeline: register stages, link parameter schemas, handle events, implement hot-reload.
|
||||
- [x] Move cached fixture headlines to engine/fixtures/headlines.json and update default source to use fixture.
|
||||
- [x] Add interactive UI panel for pipeline configuration (right-side panel) with stage toggles and param sliders.
|
||||
- [x] Enumerate all effect plugin parameters automatically for UI control (intensity, decay, etc.)
|
||||
- [ ] Implement pipeline hot-rebuild when stage toggles or params change, preserving camera and display state.
|
||||
757
engine/app.py
757
engine/app.py
@@ -4,10 +4,11 @@ Application orchestrator — pipeline mode entry point.
|
||||
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import engine.effects.plugins as effects_plugins
|
||||
from engine import config
|
||||
from engine.display import DisplayRegistry
|
||||
from engine.display import BorderMode, DisplayRegistry
|
||||
from engine.effects import PerformanceMonitor, get_registry, set_monitor
|
||||
from engine.fetch import fetch_all, fetch_poetry, load_cache
|
||||
from engine.pipeline import (
|
||||
@@ -17,14 +18,18 @@ from engine.pipeline import (
|
||||
list_presets,
|
||||
)
|
||||
from engine.pipeline.adapters import (
|
||||
EffectPluginStage,
|
||||
SourceItemsToBufferStage,
|
||||
create_stage_from_display,
|
||||
create_stage_from_effect,
|
||||
)
|
||||
from engine.pipeline.core import PipelineContext
|
||||
from engine.pipeline.params import PipelineParams
|
||||
from engine.pipeline.ui import UIConfig, UIPanel
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point - all modes now use presets."""
|
||||
"""Main entry point - all modes now use presets or CLI construction."""
|
||||
if config.PIPELINE_DIAGRAM:
|
||||
try:
|
||||
from engine.pipeline import generate_pipeline_diagram
|
||||
@@ -34,6 +39,12 @@ def main():
|
||||
print(generate_pipeline_diagram())
|
||||
return
|
||||
|
||||
# Check for direct pipeline construction flags
|
||||
if "--pipeline-source" in sys.argv:
|
||||
# Construct pipeline directly from CLI args
|
||||
run_pipeline_mode_direct()
|
||||
return
|
||||
|
||||
preset_name = None
|
||||
|
||||
if config.PRESET:
|
||||
@@ -92,6 +103,12 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
elif preset.source == "empty":
|
||||
items = []
|
||||
print(" \033[38;5;245mUsing empty source (no content)\033[0m")
|
||||
elif preset.source == "fixture":
|
||||
items = load_cache()
|
||||
if not items:
|
||||
print(" \033[38;5;196mNo fixture cache available\033[0m")
|
||||
sys.exit(1)
|
||||
print(f" \033[38;5;82mLoaded {len(items)} items from fixture\033[0m")
|
||||
else:
|
||||
cached = load_cache()
|
||||
if cached:
|
||||
@@ -223,6 +240,347 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
|
||||
sys.exit(1)
|
||||
|
||||
# Initialize UI panel if border mode requires it
|
||||
ui_panel = None
|
||||
if isinstance(params.border, BorderMode) and params.border == BorderMode.UI:
|
||||
from engine.display import render_ui_panel
|
||||
|
||||
ui_panel = UIPanel(UIConfig(panel_width=24, start_with_preset_picker=True))
|
||||
# Enable raw mode for terminal input if supported
|
||||
if hasattr(display, "set_raw_mode"):
|
||||
display.set_raw_mode(True)
|
||||
# Register effect plugin stages from pipeline for UI control
|
||||
for stage in pipeline.stages.values():
|
||||
if isinstance(stage, EffectPluginStage):
|
||||
effect = stage._effect
|
||||
enabled = effect.config.enabled if hasattr(effect, "config") else True
|
||||
stage_control = ui_panel.register_stage(stage, enabled=enabled)
|
||||
# Store reference to effect for easier access
|
||||
stage_control.effect = effect # type: ignore[attr-defined]
|
||||
# Select first stage by default
|
||||
if ui_panel.stages:
|
||||
first_stage = next(iter(ui_panel.stages))
|
||||
ui_panel.select_stage(first_stage)
|
||||
# Populate param schema from EffectConfig if it's a dataclass
|
||||
ctrl = ui_panel.stages[first_stage]
|
||||
if hasattr(ctrl, "effect"):
|
||||
effect = ctrl.effect
|
||||
if hasattr(effect, "config"):
|
||||
config = effect.config
|
||||
# Try to get fields via dataclasses if available
|
||||
try:
|
||||
import dataclasses
|
||||
|
||||
if dataclasses.is_dataclass(config):
|
||||
for field_name, field_obj in dataclasses.fields(config):
|
||||
if field_name == "enabled":
|
||||
continue
|
||||
value = getattr(config, field_name, None)
|
||||
if value is not None:
|
||||
ctrl.params[field_name] = value
|
||||
ctrl.param_schema[field_name] = {
|
||||
"type": type(value).__name__,
|
||||
"min": 0
|
||||
if isinstance(value, (int, float))
|
||||
else None,
|
||||
"max": 1 if isinstance(value, float) else None,
|
||||
"step": 0.1 if isinstance(value, float) else 1,
|
||||
}
|
||||
except Exception:
|
||||
pass # No dataclass fields, skip param UI
|
||||
|
||||
# Set up callback for stage toggles
|
||||
def on_stage_toggled(stage_name: str, enabled: bool):
|
||||
"""Update the actual stage's enabled state when UI toggles."""
|
||||
stage = pipeline.get_stage(stage_name)
|
||||
if stage:
|
||||
# Set stage enabled flag for pipeline execution
|
||||
stage._enabled = enabled
|
||||
# Also update effect config if it's an EffectPluginStage
|
||||
if isinstance(stage, EffectPluginStage):
|
||||
stage._effect.config.enabled = enabled
|
||||
|
||||
ui_panel.set_event_callback("stage_toggled", on_stage_toggled)
|
||||
|
||||
# Set up callback for parameter changes
|
||||
def on_param_changed(stage_name: str, param_name: str, value: Any):
|
||||
"""Update the effect config when UI adjusts a parameter."""
|
||||
stage = pipeline.get_stage(stage_name)
|
||||
if stage and isinstance(stage, EffectPluginStage):
|
||||
effect = stage._effect
|
||||
if hasattr(effect, "config"):
|
||||
setattr(effect.config, param_name, value)
|
||||
# Mark effect as needing reconfiguration if it has a configure method
|
||||
if hasattr(effect, "configure"):
|
||||
try:
|
||||
effect.configure(effect.config)
|
||||
except Exception:
|
||||
pass # Ignore reconfiguration errors
|
||||
|
||||
ui_panel.set_event_callback("param_changed", on_param_changed)
|
||||
|
||||
# Set up preset list and handle preset changes
|
||||
from engine.pipeline import list_presets
|
||||
|
||||
ui_panel.set_presets(list_presets(), preset_name)
|
||||
|
||||
def on_preset_changed(preset_name: str):
|
||||
"""Handle preset change from UI - rebuild pipeline."""
|
||||
nonlocal \
|
||||
pipeline, \
|
||||
display, \
|
||||
items, \
|
||||
params, \
|
||||
ui_panel, \
|
||||
current_width, \
|
||||
current_height
|
||||
|
||||
print(f" \033[38;5;245mSwitching to preset: {preset_name}\033[0m")
|
||||
|
||||
try:
|
||||
# Clean up old pipeline
|
||||
pipeline.cleanup()
|
||||
|
||||
# Get new preset
|
||||
new_preset = get_preset(preset_name)
|
||||
if not new_preset:
|
||||
print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m")
|
||||
return
|
||||
|
||||
# Update params for new preset
|
||||
params = new_preset.to_params()
|
||||
params.viewport_width = current_width
|
||||
params.viewport_height = current_height
|
||||
|
||||
# Reconstruct pipeline configuration
|
||||
new_config = PipelineConfig(
|
||||
source=new_preset.source,
|
||||
display=new_preset.display,
|
||||
camera=new_preset.camera,
|
||||
effects=new_preset.effects,
|
||||
)
|
||||
|
||||
# Create new pipeline instance
|
||||
pipeline = Pipeline(config=new_config, context=PipelineContext())
|
||||
|
||||
# Re-add stages (similar to initial construction)
|
||||
# Source stage
|
||||
if new_preset.source == "pipeline-inspect":
|
||||
from engine.data_sources.pipeline_introspection import (
|
||||
PipelineIntrospectionSource,
|
||||
)
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
|
||||
introspection_source = PipelineIntrospectionSource(
|
||||
pipeline=None,
|
||||
viewport_width=current_width,
|
||||
viewport_height=current_height,
|
||||
)
|
||||
pipeline.add_stage(
|
||||
"source",
|
||||
DataSourceStage(introspection_source, name="pipeline-inspect"),
|
||||
)
|
||||
elif new_preset.source == "empty":
|
||||
from engine.data_sources.sources import EmptyDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
|
||||
empty_source = EmptyDataSource(
|
||||
width=current_width, height=current_height
|
||||
)
|
||||
pipeline.add_stage(
|
||||
"source", DataSourceStage(empty_source, name="empty")
|
||||
)
|
||||
elif new_preset.source == "fixture":
|
||||
items = load_cache()
|
||||
if not items:
|
||||
print(" \033[38;5;196mNo fixture cache available\033[0m")
|
||||
return
|
||||
from engine.data_sources.sources import ListDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
|
||||
list_source = ListDataSource(items, name="fixture")
|
||||
pipeline.add_stage(
|
||||
"source", DataSourceStage(list_source, name="fixture")
|
||||
)
|
||||
else:
|
||||
# Fetch or use cached items
|
||||
cached = load_cache()
|
||||
if cached:
|
||||
items = cached
|
||||
elif new_preset.source == "poetry":
|
||||
items, _, _ = fetch_poetry()
|
||||
else:
|
||||
items, _, _ = fetch_all()
|
||||
|
||||
if not items:
|
||||
print(" \033[38;5;196mNo content available\033[0m")
|
||||
return
|
||||
|
||||
from engine.data_sources.sources import ListDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
|
||||
list_source = ListDataSource(items, name=new_preset.source)
|
||||
pipeline.add_stage(
|
||||
"source", DataSourceStage(list_source, name=new_preset.source)
|
||||
)
|
||||
|
||||
# Add viewport filter and font for headline/poetry sources
|
||||
if new_preset.source in ["headlines", "poetry", "fixture"]:
|
||||
from engine.pipeline.adapters import FontStage, ViewportFilterStage
|
||||
|
||||
pipeline.add_stage(
|
||||
"viewport_filter", ViewportFilterStage(name="viewport-filter")
|
||||
)
|
||||
pipeline.add_stage("font", FontStage(name="font"))
|
||||
|
||||
# Add camera if specified
|
||||
if new_preset.camera:
|
||||
from engine.camera import Camera
|
||||
from engine.pipeline.adapters import CameraStage
|
||||
|
||||
speed = getattr(new_preset, "camera_speed", 1.0)
|
||||
camera = None
|
||||
cam_type = new_preset.camera
|
||||
if cam_type == "feed":
|
||||
camera = Camera.feed(speed=speed)
|
||||
elif cam_type == "scroll" or cam_type == "vertical":
|
||||
camera = Camera.scroll(speed=speed)
|
||||
elif cam_type == "horizontal":
|
||||
camera = Camera.horizontal(speed=speed)
|
||||
elif cam_type == "omni":
|
||||
camera = Camera.omni(speed=speed)
|
||||
elif cam_type == "floating":
|
||||
camera = Camera.floating(speed=speed)
|
||||
elif cam_type == "bounce":
|
||||
camera = Camera.bounce(speed=speed)
|
||||
|
||||
if camera:
|
||||
pipeline.add_stage("camera", CameraStage(camera, name=cam_type))
|
||||
|
||||
# Add effects
|
||||
effect_registry = get_registry()
|
||||
for effect_name in new_preset.effects:
|
||||
effect = effect_registry.get(effect_name)
|
||||
if effect:
|
||||
pipeline.add_stage(
|
||||
f"effect_{effect_name}",
|
||||
create_stage_from_effect(effect, effect_name),
|
||||
)
|
||||
|
||||
# Add display (respect CLI override)
|
||||
display_name = new_preset.display
|
||||
if "--display" in sys.argv:
|
||||
idx = sys.argv.index("--display")
|
||||
if idx + 1 < len(sys.argv):
|
||||
display_name = sys.argv[idx + 1]
|
||||
|
||||
new_display = DisplayRegistry.create(display_name)
|
||||
if not new_display and not display_name.startswith("multi"):
|
||||
print(
|
||||
f" \033[38;5;196mFailed to create display: {display_name}\033[0m"
|
||||
)
|
||||
return
|
||||
|
||||
if not new_display and display_name.startswith("multi"):
|
||||
parts = display_name[6:].split(",")
|
||||
new_display = DisplayRegistry.create_multi(parts)
|
||||
if not new_display:
|
||||
print(
|
||||
f" \033[38;5;196mFailed to create multi display: {parts}\033[0m"
|
||||
)
|
||||
return
|
||||
|
||||
if not new_display:
|
||||
print(
|
||||
f" \033[38;5;196mFailed to create display: {display_name}\033[0m"
|
||||
)
|
||||
return
|
||||
|
||||
new_display.init(0, 0)
|
||||
|
||||
pipeline.add_stage(
|
||||
"display", create_stage_from_display(new_display, display_name)
|
||||
)
|
||||
|
||||
pipeline.build()
|
||||
|
||||
# Set pipeline for introspection source if needed
|
||||
if (
|
||||
new_preset.source == "pipeline-inspect"
|
||||
and introspection_source is not None
|
||||
):
|
||||
introspection_source.set_pipeline(pipeline)
|
||||
|
||||
if not pipeline.initialize():
|
||||
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
|
||||
return
|
||||
|
||||
# Replace global references with new pipeline and display
|
||||
display = new_display
|
||||
|
||||
# Reinitialize UI panel with new effect stages
|
||||
if (
|
||||
isinstance(params.border, BorderMode)
|
||||
and params.border == BorderMode.UI
|
||||
):
|
||||
ui_panel = UIPanel(
|
||||
UIConfig(panel_width=24, start_with_preset_picker=True)
|
||||
)
|
||||
for stage in pipeline.stages.values():
|
||||
if isinstance(stage, EffectPluginStage):
|
||||
effect = stage._effect
|
||||
enabled = (
|
||||
effect.config.enabled
|
||||
if hasattr(effect, "config")
|
||||
else True
|
||||
)
|
||||
stage_control = ui_panel.register_stage(
|
||||
stage, enabled=enabled
|
||||
)
|
||||
stage_control.effect = effect # type: ignore[attr-defined]
|
||||
|
||||
if ui_panel.stages:
|
||||
first_stage = next(iter(ui_panel.stages))
|
||||
ui_panel.select_stage(first_stage)
|
||||
ctrl = ui_panel.stages[first_stage]
|
||||
if hasattr(ctrl, "effect"):
|
||||
effect = ctrl.effect
|
||||
if hasattr(effect, "config"):
|
||||
config = effect.config
|
||||
try:
|
||||
import dataclasses
|
||||
|
||||
if dataclasses.is_dataclass(config):
|
||||
for field_name, field_obj in dataclasses.fields(
|
||||
config
|
||||
):
|
||||
if field_name == "enabled":
|
||||
continue
|
||||
value = getattr(config, field_name, None)
|
||||
if value is not None:
|
||||
ctrl.params[field_name] = value
|
||||
ctrl.param_schema[field_name] = {
|
||||
"type": type(value).__name__,
|
||||
"min": 0
|
||||
if isinstance(value, (int, float))
|
||||
else None,
|
||||
"max": 1
|
||||
if isinstance(value, float)
|
||||
else None,
|
||||
"step": 0.1
|
||||
if isinstance(value, float)
|
||||
else 1,
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print(f" \033[38;5;82mPreset switched to {preset_name}\033[0m")
|
||||
|
||||
except Exception as e:
|
||||
print(f" \033[38;5;196mError switching preset: {e}\033[0m")
|
||||
|
||||
ui_panel.set_event_callback("preset_changed", on_preset_changed)
|
||||
|
||||
print(" \033[38;5;82mStarting pipeline...\033[0m")
|
||||
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
|
||||
|
||||
@@ -250,7 +608,34 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
|
||||
result = pipeline.execute(items)
|
||||
if result.success:
|
||||
display.show(result.data, border=params.border)
|
||||
# Handle UI panel compositing if enabled
|
||||
if ui_panel is not None:
|
||||
from engine.display import render_ui_panel
|
||||
|
||||
buf = render_ui_panel(
|
||||
result.data,
|
||||
current_width,
|
||||
current_height,
|
||||
ui_panel,
|
||||
fps=params.fps if hasattr(params, "fps") else 60.0,
|
||||
frame_time=0.0,
|
||||
)
|
||||
# Render with border=OFF since we already added borders
|
||||
display.show(buf, border=False)
|
||||
# Handle pygame events for UI
|
||||
if display_name == "pygame":
|
||||
import pygame
|
||||
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.KEYDOWN:
|
||||
ui_panel.process_key_event(event.key, event.mod)
|
||||
# If space toggled stage, we could rebuild here (TODO)
|
||||
else:
|
||||
# Normal border handling
|
||||
show_border = (
|
||||
params.border if isinstance(params.border, bool) else False
|
||||
)
|
||||
display.show(result.data, border=show_border)
|
||||
|
||||
if hasattr(display, "is_quit_requested") and display.is_quit_requested():
|
||||
if hasattr(display, "clear_quit_request"):
|
||||
@@ -278,5 +663,371 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
print("\n \033[38;5;245mPipeline stopped\033[0m")
|
||||
|
||||
|
||||
def run_pipeline_mode_direct():
|
||||
"""Construct and run a pipeline directly from CLI arguments.
|
||||
|
||||
Usage:
|
||||
python -m engine.app --pipeline-source headlines --pipeline-effects noise,fade --display null
|
||||
python -m engine.app --pipeline-source fixture --pipeline-effects glitch --pipeline-ui --display null
|
||||
|
||||
Flags:
|
||||
--pipeline-source <source>: Headlines, fixture, poetry, empty, pipeline-inspect
|
||||
--pipeline-effects <effects>: Comma-separated list (noise, fade, glitch, firehose, hud, tint, border, crop)
|
||||
--pipeline-camera <type>: scroll, feed, horizontal, omni, floating, bounce
|
||||
--pipeline-display <display>: terminal, pygame, websocket, null, multi:term,pygame
|
||||
--pipeline-ui: Enable UI panel (BorderMode.UI)
|
||||
--pipeline-border <mode>: off, simple, ui
|
||||
"""
|
||||
import sys
|
||||
|
||||
from engine.camera import Camera
|
||||
from engine.data_sources.pipeline_introspection import PipelineIntrospectionSource
|
||||
from engine.data_sources.sources import EmptyDataSource, ListDataSource
|
||||
from engine.display import BorderMode, DisplayRegistry
|
||||
from engine.effects import get_registry
|
||||
from engine.fetch import fetch_all, fetch_poetry, load_cache
|
||||
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
|
||||
from engine.pipeline.adapters import (
|
||||
CameraStage,
|
||||
DataSourceStage,
|
||||
EffectPluginStage,
|
||||
create_stage_from_display,
|
||||
create_stage_from_effect,
|
||||
)
|
||||
from engine.pipeline.ui import UIConfig, UIPanel
|
||||
|
||||
# Parse CLI arguments
|
||||
source_name = None
|
||||
effect_names = []
|
||||
camera_type = None # Will use MVP default (static)
|
||||
display_name = None # Will use MVP default (terminal)
|
||||
ui_enabled = False
|
||||
border_mode = BorderMode.OFF
|
||||
source_items = None
|
||||
allow_unsafe = False
|
||||
|
||||
i = 1
|
||||
argv = sys.argv
|
||||
while i < len(argv):
|
||||
arg = argv[i]
|
||||
if arg == "--pipeline-source" and i + 1 < len(argv):
|
||||
source_name = argv[i + 1]
|
||||
i += 2
|
||||
elif arg == "--pipeline-effects" and i + 1 < len(argv):
|
||||
effect_names = [e.strip() for e in argv[i + 1].split(",") if e.strip()]
|
||||
i += 2
|
||||
elif arg == "--pipeline-camera" and i + 1 < len(argv):
|
||||
camera_type = argv[i + 1]
|
||||
i += 2
|
||||
elif arg == "--pipeline-display" and i + 1 < len(argv):
|
||||
display_name = argv[i + 1]
|
||||
i += 2
|
||||
elif arg == "--pipeline-ui":
|
||||
ui_enabled = True
|
||||
i += 1
|
||||
elif arg == "--pipeline-border" and i + 1 < len(argv):
|
||||
mode = argv[i + 1]
|
||||
if mode == "simple":
|
||||
border_mode = True
|
||||
elif mode == "ui":
|
||||
border_mode = BorderMode.UI
|
||||
else:
|
||||
border_mode = False
|
||||
i += 2
|
||||
elif arg == "--allow-unsafe":
|
||||
allow_unsafe = True
|
||||
i += 1
|
||||
else:
|
||||
i += 1
|
||||
|
||||
if not source_name:
|
||||
print("Error: --pipeline-source is required")
|
||||
print(
|
||||
"Usage: python -m engine.app --pipeline-source <source> [--pipeline-effects <effects>] ..."
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
print(" \033[38;5;245mDirect pipeline construction\033[0m")
|
||||
print(f" Source: {source_name}")
|
||||
print(f" Effects: {effect_names}")
|
||||
print(f" Camera: {camera_type}")
|
||||
print(f" Display: {display_name}")
|
||||
print(f" UI Enabled: {ui_enabled}")
|
||||
|
||||
# Import validation
|
||||
from engine.pipeline.validation import validate_pipeline_config
|
||||
|
||||
# Create initial config and params
|
||||
params = PipelineParams()
|
||||
params.source = source_name
|
||||
params.camera_mode = camera_type if camera_type is not None else ""
|
||||
params.effect_order = effect_names
|
||||
params.border = border_mode
|
||||
|
||||
# Create minimal config for validation
|
||||
config = PipelineConfig(
|
||||
source=source_name,
|
||||
display=display_name or "", # Will be filled by validation
|
||||
camera=camera_type if camera_type is not None else "",
|
||||
effects=effect_names,
|
||||
)
|
||||
|
||||
# Run MVP validation
|
||||
result = validate_pipeline_config(config, params, allow_unsafe=allow_unsafe)
|
||||
|
||||
if result.warnings and not allow_unsafe:
|
||||
print(" \033[38;5;226mWarning: MVP validation found issues:\033[0m")
|
||||
for warning in result.warnings:
|
||||
print(f" - {warning}")
|
||||
|
||||
if result.changes:
|
||||
print(" \033[38;5;226mApplied MVP defaults:\033[0m")
|
||||
for change in result.changes:
|
||||
print(f" {change}")
|
||||
|
||||
if not result.valid:
|
||||
print(
|
||||
" \033[38;5;196mPipeline configuration invalid and could not be fixed\033[0m"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Show MVP summary
|
||||
print(" \033[38;5;245mMVP Configuration:\033[0m")
|
||||
print(f" Source: {result.config.source}")
|
||||
print(f" Display: {result.config.display}")
|
||||
print(f" Camera: {result.config.camera or 'static (none)'}")
|
||||
print(f" Effects: {result.config.effects if result.config.effects else 'none'}")
|
||||
print(f" Border: {result.params.border}")
|
||||
|
||||
# Load source items
|
||||
if source_name == "headlines":
|
||||
cached = load_cache()
|
||||
if cached:
|
||||
source_items = cached
|
||||
else:
|
||||
source_items, _, _ = fetch_all()
|
||||
elif source_name == "fixture":
|
||||
source_items = load_cache()
|
||||
if not source_items:
|
||||
print(" \033[38;5;196mNo fixture cache available\033[0m")
|
||||
sys.exit(1)
|
||||
elif source_name == "poetry":
|
||||
source_items, _, _ = fetch_poetry()
|
||||
elif source_name == "empty" or source_name == "pipeline-inspect":
|
||||
source_items = []
|
||||
else:
|
||||
print(f" \033[38;5;196mUnknown source: {source_name}\033[0m")
|
||||
sys.exit(1)
|
||||
|
||||
if source_items is not None:
|
||||
print(f" \033[38;5;82mLoaded {len(source_items)} items\033[0m")
|
||||
|
||||
# Set border mode
|
||||
if ui_enabled:
|
||||
border_mode = BorderMode.UI
|
||||
|
||||
# Build pipeline using validated config and params
|
||||
params = result.params
|
||||
params.viewport_width = 80
|
||||
params.viewport_height = 24
|
||||
|
||||
ctx = PipelineContext()
|
||||
ctx.params = params
|
||||
|
||||
# Create display using validated display name
|
||||
display_name = result.config.display or "terminal" # Default to terminal if empty
|
||||
display = DisplayRegistry.create(display_name)
|
||||
if not display:
|
||||
print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m")
|
||||
sys.exit(1)
|
||||
display.init(0, 0)
|
||||
|
||||
# Create pipeline using validated config
|
||||
pipeline = Pipeline(config=result.config, context=ctx)
|
||||
|
||||
# Add stages
|
||||
# Source stage
|
||||
if source_name == "pipeline-inspect":
|
||||
introspection_source = PipelineIntrospectionSource(
|
||||
pipeline=None,
|
||||
viewport_width=params.viewport_width,
|
||||
viewport_height=params.viewport_height,
|
||||
)
|
||||
pipeline.add_stage(
|
||||
"source", DataSourceStage(introspection_source, name="pipeline-inspect")
|
||||
)
|
||||
elif source_name == "empty":
|
||||
empty_source = EmptyDataSource(
|
||||
width=params.viewport_width, height=params.viewport_height
|
||||
)
|
||||
pipeline.add_stage("source", DataSourceStage(empty_source, name="empty"))
|
||||
else:
|
||||
list_source = ListDataSource(source_items, name=source_name)
|
||||
pipeline.add_stage("source", DataSourceStage(list_source, name=source_name))
|
||||
|
||||
# Add viewport filter and font for headline sources
|
||||
if source_name in ["headlines", "poetry", "fixture"]:
|
||||
from engine.pipeline.adapters import FontStage, ViewportFilterStage
|
||||
|
||||
pipeline.add_stage(
|
||||
"viewport_filter", ViewportFilterStage(name="viewport-filter")
|
||||
)
|
||||
pipeline.add_stage("font", FontStage(name="font"))
|
||||
|
||||
# Add camera
|
||||
speed = getattr(params, "camera_speed", 1.0)
|
||||
camera = None
|
||||
if camera_type == "feed":
|
||||
camera = Camera.feed(speed=speed)
|
||||
elif camera_type == "scroll":
|
||||
camera = Camera.scroll(speed=speed)
|
||||
elif camera_type == "horizontal":
|
||||
camera = Camera.horizontal(speed=speed)
|
||||
elif camera_type == "omni":
|
||||
camera = Camera.omni(speed=speed)
|
||||
elif camera_type == "floating":
|
||||
camera = Camera.floating(speed=speed)
|
||||
elif camera_type == "bounce":
|
||||
camera = Camera.bounce(speed=speed)
|
||||
|
||||
if camera:
|
||||
pipeline.add_stage("camera", CameraStage(camera, name=camera_type))
|
||||
|
||||
# Add effects
|
||||
effect_registry = get_registry()
|
||||
for effect_name in effect_names:
|
||||
effect = effect_registry.get(effect_name)
|
||||
if effect:
|
||||
pipeline.add_stage(
|
||||
f"effect_{effect_name}", create_stage_from_effect(effect, effect_name)
|
||||
)
|
||||
|
||||
# Add display
|
||||
pipeline.add_stage("display", create_stage_from_display(display, display_name))
|
||||
|
||||
pipeline.build()
|
||||
|
||||
if not pipeline.initialize():
|
||||
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
|
||||
sys.exit(1)
|
||||
|
||||
# Create UI panel if border mode is UI
|
||||
ui_panel = None
|
||||
if params.border == BorderMode.UI:
|
||||
ui_panel = UIPanel(UIConfig(panel_width=24, start_with_preset_picker=True))
|
||||
# Enable raw mode for terminal input if supported
|
||||
if hasattr(display, "set_raw_mode"):
|
||||
display.set_raw_mode(True)
|
||||
for stage in pipeline.stages.values():
|
||||
if isinstance(stage, EffectPluginStage):
|
||||
effect = stage._effect
|
||||
enabled = effect.config.enabled if hasattr(effect, "config") else True
|
||||
stage_control = ui_panel.register_stage(stage, enabled=enabled)
|
||||
stage_control.effect = effect # type: ignore[attr-defined]
|
||||
|
||||
if ui_panel.stages:
|
||||
first_stage = next(iter(ui_panel.stages))
|
||||
ui_panel.select_stage(first_stage)
|
||||
ctrl = ui_panel.stages[first_stage]
|
||||
if hasattr(ctrl, "effect"):
|
||||
effect = ctrl.effect
|
||||
if hasattr(effect, "config"):
|
||||
config = effect.config
|
||||
try:
|
||||
import dataclasses
|
||||
|
||||
if dataclasses.is_dataclass(config):
|
||||
for field_name, field_obj in dataclasses.fields(config):
|
||||
if field_name == "enabled":
|
||||
continue
|
||||
value = getattr(config, field_name, None)
|
||||
if value is not None:
|
||||
ctrl.params[field_name] = value
|
||||
ctrl.param_schema[field_name] = {
|
||||
"type": type(value).__name__,
|
||||
"min": 0
|
||||
if isinstance(value, (int, float))
|
||||
else None,
|
||||
"max": 1 if isinstance(value, float) else None,
|
||||
"step": 0.1 if isinstance(value, float) else 1,
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Run pipeline loop
|
||||
from engine.display import render_ui_panel
|
||||
|
||||
ctx.set("display", display)
|
||||
ctx.set("items", source_items)
|
||||
ctx.set("pipeline", pipeline)
|
||||
ctx.set("pipeline_order", pipeline.execution_order)
|
||||
|
||||
current_width = params.viewport_width
|
||||
current_height = params.viewport_height
|
||||
|
||||
print(" \033[38;5;82mStarting pipeline...\033[0m")
|
||||
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
|
||||
|
||||
try:
|
||||
frame = 0
|
||||
while True:
|
||||
params.frame_number = frame
|
||||
ctx.params = params
|
||||
|
||||
result = pipeline.execute(source_items)
|
||||
if not result.success:
|
||||
print(" \033[38;5;196mPipeline execution failed\033[0m")
|
||||
break
|
||||
|
||||
# Render with UI panel
|
||||
if ui_panel is not None:
|
||||
buf = render_ui_panel(
|
||||
result.data, current_width, current_height, ui_panel
|
||||
)
|
||||
display.show(buf, border=False)
|
||||
else:
|
||||
display.show(result.data, border=border_mode)
|
||||
|
||||
# Handle keyboard events if UI is enabled
|
||||
if ui_panel is not None:
|
||||
# Try pygame first
|
||||
if hasattr(display, "_pygame"):
|
||||
try:
|
||||
import pygame
|
||||
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.KEYDOWN:
|
||||
ui_panel.process_key_event(event.key, event.mod)
|
||||
except (ImportError, Exception):
|
||||
pass
|
||||
# Try terminal input
|
||||
elif hasattr(display, "get_input_keys"):
|
||||
try:
|
||||
keys = display.get_input_keys()
|
||||
for key in keys:
|
||||
ui_panel.process_key_event(key, 0)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Check for quit request
|
||||
if hasattr(display, "is_quit_requested") and display.is_quit_requested():
|
||||
if hasattr(display, "clear_quit_request"):
|
||||
display.clear_quit_request()
|
||||
raise KeyboardInterrupt()
|
||||
|
||||
time.sleep(1 / 60)
|
||||
frame += 1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pipeline.cleanup()
|
||||
display.cleanup()
|
||||
print("\n \033[38;5;245mPipeline stopped\033[0m")
|
||||
return
|
||||
|
||||
pipeline.cleanup()
|
||||
display.cleanup()
|
||||
print("\n \033[38;5;245mPipeline stopped\033[0m")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -117,11 +117,12 @@ def fetch_poetry():
|
||||
|
||||
|
||||
# ─── CACHE ────────────────────────────────────────────────
|
||||
_CACHE_DIR = pathlib.Path(__file__).resolve().parent.parent
|
||||
# Cache moved to engine/fixtures/headlines.json
|
||||
_CACHE_DIR = pathlib.Path(__file__).resolve().parent / "fixtures"
|
||||
|
||||
|
||||
def _cache_path():
|
||||
return _CACHE_DIR / f".mainline_cache_{config.MODE}.json"
|
||||
return _CACHE_DIR / "headlines.json"
|
||||
|
||||
|
||||
def load_cache():
|
||||
|
||||
19
engine/fixtures/headlines.json
Normal file
19
engine/fixtures/headlines.json
Normal file
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"items": [
|
||||
["Breaking: AI systems achieve breakthrough in natural language understanding", "TechDaily", "14:32"],
|
||||
["Scientists discover new exoplanet in habitable zone", "ScienceNews", "13:15"],
|
||||
["Global markets rally as inflation shows signs of cooling", "FinanceWire", "12:48"],
|
||||
["New study reveals benefits of Mediterranean diet for cognitive health", "HealthJournal", "11:22"],
|
||||
["Tech giants announce collaboration on AI safety standards", "TechDaily", "10:55"],
|
||||
["Archaeologists uncover 3000-year-old city in desert", "HistoryNow", "09:30"],
|
||||
["Renewable energy capacity surpasses fossil fuels for first time", "GreenWorld", "08:15"],
|
||||
["Space agency prepares for next Mars mission launch window", "SpaceNews", "07:42"],
|
||||
["New film breaks box office records on opening weekend", "EntertainmentHub", "06:18"],
|
||||
["Local community raises funds for new library project", "CommunityPost", "05:30"],
|
||||
["Quantum computing breakthrough could revolutionize cryptography", "TechWeekly", "15:20"],
|
||||
["New species of deep-sea creature discovered in Pacific trench", "NatureToday", "14:05"],
|
||||
["Electric vehicle sales surpass traditional cars in Europe", "AutoNews", "12:33"],
|
||||
["Renowned artist unveils interactive AI-generated exhibition", "ArtsMonthly", "11:10"],
|
||||
["Climate summit reaches historic agreement on emissions", "WorldNews", "09:55"]
|
||||
]
|
||||
}
|
||||
549
engine/pipeline/ui.py
Normal file
549
engine/pipeline/ui.py
Normal file
@@ -0,0 +1,549 @@
|
||||
"""
|
||||
Pipeline UI panel - Interactive controls for pipeline configuration.
|
||||
|
||||
Provides:
|
||||
- Stage list with enable/disable toggles
|
||||
- Parameter sliders for selected effect
|
||||
- Keyboard/mouse interaction
|
||||
|
||||
This module implements the right-side UI panel that appears in border="ui" mode.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class UIConfig:
|
||||
"""Configuration for the UI panel."""
|
||||
|
||||
panel_width: int = 24 # Characters wide
|
||||
stage_list_height: int = 12 # Number of stages to show at once
|
||||
param_height: int = 8 # Space for parameter controls
|
||||
scroll_offset: int = 0 # Scroll position in stage list
|
||||
start_with_preset_picker: bool = False # Show preset picker immediately
|
||||
|
||||
|
||||
@dataclass
|
||||
class StageControl:
|
||||
"""Represents a stage in the UI panel with its toggle state."""
|
||||
|
||||
name: str
|
||||
stage_name: str # Actual pipeline stage name
|
||||
category: str
|
||||
enabled: bool = True
|
||||
selected: bool = False
|
||||
params: dict[str, Any] = field(default_factory=dict) # Current param values
|
||||
param_schema: dict[str, dict] = field(default_factory=dict) # Param metadata
|
||||
|
||||
def toggle(self) -> None:
|
||||
"""Toggle enabled state."""
|
||||
self.enabled = not self.enabled
|
||||
|
||||
def get_param(self, name: str) -> Any:
|
||||
"""Get current parameter value."""
|
||||
return self.params.get(name)
|
||||
|
||||
def set_param(self, name: str, value: Any) -> None:
|
||||
"""Set parameter value."""
|
||||
self.params[name] = value
|
||||
|
||||
|
||||
class UIPanel:
|
||||
"""Interactive UI panel for pipeline configuration.
|
||||
|
||||
Manages:
|
||||
- Stage list with enable/disable checkboxes
|
||||
- Parameter sliders for selected stage
|
||||
- Keyboard/mouse event handling
|
||||
- Scroll state for long stage lists
|
||||
|
||||
The panel is rendered as a right border (panel_width characters wide)
|
||||
alongside the main viewport.
|
||||
"""
|
||||
|
||||
def __init__(self, config: UIConfig | None = None):
|
||||
self.config = config or UIConfig()
|
||||
self.stages: dict[str, StageControl] = {} # stage_name -> StageControl
|
||||
self.scroll_offset = 0
|
||||
self.selected_stage: str | None = None
|
||||
self._focused_param: str | None = None # For slider adjustment
|
||||
self._callbacks: dict[str, Callable] = {} # Event callbacks
|
||||
self._presets: list[str] = [] # Available preset names
|
||||
self._current_preset: str = "" # Current preset name
|
||||
self._show_preset_picker: bool = (
|
||||
config.start_with_preset_picker if config else False
|
||||
) # Picker overlay visible
|
||||
self._show_panel: bool = True # UI panel visibility
|
||||
self._preset_scroll_offset: int = 0 # Scroll in preset list
|
||||
|
||||
def register_stage(self, stage: Any, enabled: bool = True) -> StageControl:
|
||||
"""Register a stage for UI control.
|
||||
|
||||
Args:
|
||||
stage: Stage instance (must have .name, .category attributes)
|
||||
enabled: Initial enabled state
|
||||
|
||||
Returns:
|
||||
The created StageControl instance
|
||||
"""
|
||||
control = StageControl(
|
||||
name=stage.name,
|
||||
stage_name=stage.name,
|
||||
category=stage.category,
|
||||
enabled=enabled,
|
||||
)
|
||||
self.stages[stage.name] = control
|
||||
return control
|
||||
|
||||
def unregister_stage(self, stage_name: str) -> None:
|
||||
"""Remove a stage from UI control."""
|
||||
if stage_name in self.stages:
|
||||
del self.stages[stage_name]
|
||||
|
||||
def get_enabled_stages(self) -> list[str]:
|
||||
"""Get list of stage names that are currently enabled."""
|
||||
return [name for name, ctrl in self.stages.items() if ctrl.enabled]
|
||||
|
||||
def select_stage(self, stage_name: str | None = None) -> None:
|
||||
"""Select a stage (for parameter editing)."""
|
||||
if stage_name in self.stages:
|
||||
self.selected_stage = stage_name
|
||||
self.stages[stage_name].selected = True
|
||||
# Deselect others
|
||||
for name, ctrl in self.stages.items():
|
||||
if name != stage_name:
|
||||
ctrl.selected = False
|
||||
# Auto-focus first parameter when stage selected
|
||||
if self.stages[stage_name].params:
|
||||
self._focused_param = next(iter(self.stages[stage_name].params.keys()))
|
||||
else:
|
||||
self._focused_param = None
|
||||
|
||||
def toggle_stage(self, stage_name: str) -> bool:
|
||||
"""Toggle a stage's enabled state.
|
||||
|
||||
Returns:
|
||||
New enabled state
|
||||
"""
|
||||
if stage_name in self.stages:
|
||||
ctrl = self.stages[stage_name]
|
||||
ctrl.enabled = not ctrl.enabled
|
||||
return ctrl.enabled
|
||||
return False
|
||||
|
||||
def adjust_selected_param(self, delta: float) -> None:
|
||||
"""Adjust the currently focused parameter of selected stage.
|
||||
|
||||
Args:
|
||||
delta: Amount to add (positive or negative)
|
||||
"""
|
||||
if self.selected_stage and self._focused_param:
|
||||
ctrl = self.stages[self.selected_stage]
|
||||
if self._focused_param in ctrl.params:
|
||||
current = ctrl.params[self._focused_param]
|
||||
# Determine step size from schema
|
||||
schema = ctrl.param_schema.get(self._focused_param, {})
|
||||
step = schema.get("step", 0.1 if isinstance(current, float) else 1)
|
||||
new_val = current + delta * step
|
||||
# Clamp to min/max if specified
|
||||
if "min" in schema:
|
||||
new_val = max(schema["min"], new_val)
|
||||
if "max" in schema:
|
||||
new_val = min(schema["max"], new_val)
|
||||
# Only emit if value actually changed
|
||||
if new_val != current:
|
||||
ctrl.params[self._focused_param] = new_val
|
||||
self._emit_event(
|
||||
"param_changed",
|
||||
stage_name=self.selected_stage,
|
||||
param_name=self._focused_param,
|
||||
value=new_val,
|
||||
)
|
||||
|
||||
def scroll_stages(self, delta: int) -> None:
|
||||
"""Scroll the stage list."""
|
||||
max_offset = max(0, len(self.stages) - self.config.stage_list_height)
|
||||
self.scroll_offset = max(0, min(max_offset, self.scroll_offset + delta))
|
||||
|
||||
def render(self, width: int, height: int) -> list[str]:
|
||||
"""Render the UI panel.
|
||||
|
||||
Args:
|
||||
width: Total display width (panel uses last `panel_width` cols)
|
||||
height: Total display height
|
||||
|
||||
Returns:
|
||||
List of strings, each of length `panel_width`, to overlay on right side
|
||||
"""
|
||||
panel_width = min(
|
||||
self.config.panel_width, width - 4
|
||||
) # Reserve at least 2 for main
|
||||
lines = []
|
||||
|
||||
# If panel is hidden, render empty space
|
||||
if not self._show_panel:
|
||||
return [" " * panel_width for _ in range(height)]
|
||||
|
||||
# If preset picker is active, render that overlay instead of normal panel
|
||||
if self._show_preset_picker:
|
||||
picker_lines = self._render_preset_picker(panel_width)
|
||||
# Pad to full panel height if needed
|
||||
while len(picker_lines) < height:
|
||||
picker_lines.append(" " * panel_width)
|
||||
return [
|
||||
line.ljust(panel_width)[:panel_width] for line in picker_lines[:height]
|
||||
]
|
||||
|
||||
# Header
|
||||
title_line = "┌" + "─" * (panel_width - 2) + "┐"
|
||||
lines.append(title_line)
|
||||
|
||||
# Stage list section (occupies most of the panel)
|
||||
list_height = self.config.stage_list_height
|
||||
stage_names = list(self.stages.keys())
|
||||
for i in range(list_height):
|
||||
idx = i + self.scroll_offset
|
||||
if idx < len(stage_names):
|
||||
stage_name = stage_names[idx]
|
||||
ctrl = self.stages[stage_name]
|
||||
status = "✓" if ctrl.enabled else "✗"
|
||||
sel = ">" if ctrl.selected else " "
|
||||
# Truncate to fit panel (leave room for ">✓ " prefix and padding)
|
||||
max_name_len = panel_width - 5
|
||||
display_name = ctrl.name[:max_name_len]
|
||||
line = f"│{sel}{status} {display_name:<{max_name_len}}"
|
||||
lines.append(line[:panel_width])
|
||||
else:
|
||||
lines.append("│" + " " * (panel_width - 2) + "│")
|
||||
|
||||
# Separator
|
||||
lines.append("├" + "─" * (panel_width - 2) + "┤")
|
||||
|
||||
# Parameter section (if stage selected)
|
||||
if self.selected_stage and self.selected_stage in self.stages:
|
||||
ctrl = self.stages[self.selected_stage]
|
||||
if ctrl.params:
|
||||
# Render each parameter as "name: [=====] value" with focus indicator
|
||||
for param_name, param_value in ctrl.params.items():
|
||||
schema = ctrl.param_schema.get(param_name, {})
|
||||
is_focused = param_name == self._focused_param
|
||||
# Format value based on type
|
||||
if isinstance(param_value, float):
|
||||
val_str = f"{param_value:.2f}"
|
||||
elif isinstance(param_value, int):
|
||||
val_str = f"{param_value}"
|
||||
elif isinstance(param_value, bool):
|
||||
val_str = str(param_value)
|
||||
else:
|
||||
val_str = str(param_value)
|
||||
|
||||
# Build parameter line
|
||||
if (
|
||||
isinstance(param_value, (int, float))
|
||||
and "min" in schema
|
||||
and "max" in schema
|
||||
):
|
||||
# Render as slider
|
||||
min_val = schema["min"]
|
||||
max_val = schema["max"]
|
||||
# Normalize to 0-1 for bar length
|
||||
if max_val != min_val:
|
||||
ratio = (param_value - min_val) / (max_val - min_val)
|
||||
else:
|
||||
ratio = 0
|
||||
bar_width = (
|
||||
panel_width - len(param_name) - len(val_str) - 10
|
||||
) # approx space for "[] : ="
|
||||
if bar_width < 1:
|
||||
bar_width = 1
|
||||
filled = int(round(ratio * bar_width))
|
||||
bar = "[" + "=" * filled + " " * (bar_width - filled) + "]"
|
||||
param_line = f"│ {param_name}: {bar} {val_str}"
|
||||
else:
|
||||
# Simple name=value
|
||||
param_line = f"│ {param_name}={val_str}"
|
||||
|
||||
# Highlight focused parameter
|
||||
if is_focused:
|
||||
# Invert colors conceptually - for now use > prefix
|
||||
param_line = "│> " + param_line[2:]
|
||||
|
||||
# Truncate to fit panel width
|
||||
if len(param_line) > panel_width - 1:
|
||||
param_line = param_line[: panel_width - 1]
|
||||
lines.append(param_line + "│")
|
||||
else:
|
||||
lines.append("│ (no params)".ljust(panel_width - 1) + "│")
|
||||
else:
|
||||
lines.append("│ (select a stage)".ljust(panel_width - 1) + "│")
|
||||
|
||||
# Info line before footer
|
||||
info_parts = []
|
||||
if self._current_preset:
|
||||
info_parts.append(f"Preset: {self._current_preset}")
|
||||
if self._presets:
|
||||
info_parts.append("[P] presets")
|
||||
info_str = " | ".join(info_parts) if info_parts else ""
|
||||
if info_str:
|
||||
padded = info_str.ljust(panel_width - 2)
|
||||
lines.append("│" + padded + "│")
|
||||
|
||||
# Footer with instructions
|
||||
footer_line = self._render_footer(panel_width)
|
||||
lines.append(footer_line)
|
||||
|
||||
# Ensure all lines are exactly panel_width
|
||||
return [line.ljust(panel_width)[:panel_width] for line in lines]
|
||||
|
||||
def _render_footer(self, width: int) -> str:
|
||||
"""Render footer with key hints."""
|
||||
if width >= 40:
|
||||
# Show preset name and key hints
|
||||
preset_info = (
|
||||
f"Preset: {self._current_preset}" if self._current_preset else ""
|
||||
)
|
||||
hints = " [S]elect [Space]UI [Tab]Params [Arrows/HJKL]Adjust "
|
||||
if self._presets:
|
||||
hints += "[P]Preset "
|
||||
combined = f"{preset_info}{hints}"
|
||||
if len(combined) > width - 4:
|
||||
combined = combined[: width - 4]
|
||||
footer = "└" + "─" * (width - 2) + "┘"
|
||||
return footer # Just the line, we'll add info above in render
|
||||
else:
|
||||
return "└" + "─" * (width - 2) + "┘"
|
||||
|
||||
def process_key_event(self, key: str | int, modifiers: int = 0) -> bool:
|
||||
"""Process a keyboard event.
|
||||
|
||||
Args:
|
||||
key: Key symbol (e.g., ' ', 's', pygame.K_UP, etc.)
|
||||
modifiers: Modifier bits (Shift, Ctrl, Alt)
|
||||
|
||||
Returns:
|
||||
True if event was handled, False if not
|
||||
"""
|
||||
# Normalize to string for simplicity
|
||||
key_str = self._normalize_key(key, modifiers)
|
||||
|
||||
# Space: toggle UI panel visibility (only when preset picker not active)
|
||||
if key_str == " " and not self._show_preset_picker:
|
||||
self._show_panel = not getattr(self, "_show_panel", True)
|
||||
return True
|
||||
|
||||
# Space: toggle UI panel visibility (only when preset picker not active)
|
||||
if key_str == " " and not self._show_preset_picker:
|
||||
self._show_panel = not getattr(self, "_show_panel", True)
|
||||
return True
|
||||
|
||||
# S: select stage (cycle)
|
||||
if key_str == "s" and modifiers == 0:
|
||||
stages = list(self.stages.keys())
|
||||
if not stages:
|
||||
return False
|
||||
if self.selected_stage:
|
||||
current_idx = stages.index(self.selected_stage)
|
||||
next_idx = (current_idx + 1) % len(stages)
|
||||
else:
|
||||
next_idx = 0
|
||||
self.select_stage(stages[next_idx])
|
||||
return True
|
||||
|
||||
# P: toggle preset picker (only when panel is visible)
|
||||
if key_str == "p" and self._show_panel:
|
||||
self._show_preset_picker = not self._show_preset_picker
|
||||
if self._show_preset_picker:
|
||||
self._preset_scroll_offset = 0
|
||||
return True
|
||||
|
||||
# HJKL or Arrow Keys: scroll stage list, preset list, or adjust param
|
||||
# vi-style: K=up, J=down (J is actually next line in vi, but we use for down)
|
||||
# We'll use J for down, K for up, H for left, L for right
|
||||
elif key_str in ("up", "down", "kp8", "kp2", "j", "k"):
|
||||
# If preset picker is open, scroll preset list
|
||||
if self._show_preset_picker:
|
||||
delta = -1 if key_str in ("up", "kp8", "k") else 1
|
||||
self._preset_scroll_offset = max(0, self._preset_scroll_offset + delta)
|
||||
# Ensure scroll doesn't go past end
|
||||
max_offset = max(0, len(self._presets) - 1)
|
||||
self._preset_scroll_offset = min(max_offset, self._preset_scroll_offset)
|
||||
return True
|
||||
# If param is focused, adjust param value
|
||||
elif self.selected_stage and self._focused_param:
|
||||
delta = -1.0 if key_str in ("up", "kp8", "k") else 1.0
|
||||
self.adjust_selected_param(delta)
|
||||
return True
|
||||
# Otherwise scroll stages
|
||||
else:
|
||||
delta = -1 if key_str in ("up", "kp8", "k") else 1
|
||||
self.scroll_stages(delta)
|
||||
return True
|
||||
|
||||
# Left/Right or H/L: adjust param (if param selected)
|
||||
elif key_str in ("left", "right", "kp4", "kp6", "h", "l"):
|
||||
if self.selected_stage:
|
||||
delta = -0.1 if key_str in ("left", "kp4", "h") else 0.1
|
||||
self.adjust_selected_param(delta)
|
||||
return True
|
||||
|
||||
# Tab: cycle through parameters
|
||||
if key_str == "tab" and self.selected_stage:
|
||||
ctrl = self.stages[self.selected_stage]
|
||||
param_names = list(ctrl.params.keys())
|
||||
if param_names:
|
||||
if self._focused_param in param_names:
|
||||
current_idx = param_names.index(self._focused_param)
|
||||
next_idx = (current_idx + 1) % len(param_names)
|
||||
else:
|
||||
next_idx = 0
|
||||
self._focused_param = param_names[next_idx]
|
||||
return True
|
||||
|
||||
# Preset picker navigation
|
||||
if self._show_preset_picker:
|
||||
# Enter: select currently highlighted preset
|
||||
if key_str == "return":
|
||||
if self._presets:
|
||||
idx = self._preset_scroll_offset
|
||||
if idx < len(self._presets):
|
||||
self._current_preset = self._presets[idx]
|
||||
self._emit_event(
|
||||
"preset_changed", preset_name=self._current_preset
|
||||
)
|
||||
self._show_preset_picker = False
|
||||
return True
|
||||
# Escape: close picker without changing
|
||||
elif key_str == "escape":
|
||||
self._show_preset_picker = False
|
||||
return True
|
||||
|
||||
# Escape: deselect stage (only when picker not active)
|
||||
elif key_str == "escape" and self.selected_stage:
|
||||
self.selected_stage = None
|
||||
for ctrl in self.stages.values():
|
||||
ctrl.selected = False
|
||||
self._focused_param = None
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _normalize_key(self, key: str | int, modifiers: int) -> str:
|
||||
"""Normalize key to a string identifier."""
|
||||
# Handle pygame keysyms if imported
|
||||
try:
|
||||
import pygame
|
||||
|
||||
if isinstance(key, int):
|
||||
# Map pygame constants to strings
|
||||
key_map = {
|
||||
pygame.K_UP: "up",
|
||||
pygame.K_DOWN: "down",
|
||||
pygame.K_LEFT: "left",
|
||||
pygame.K_RIGHT: "right",
|
||||
pygame.K_SPACE: " ",
|
||||
pygame.K_ESCAPE: "escape",
|
||||
pygame.K_s: "s",
|
||||
pygame.K_w: "w",
|
||||
# HJKL navigation (vi-style)
|
||||
pygame.K_h: "h",
|
||||
pygame.K_j: "j",
|
||||
pygame.K_k: "k",
|
||||
pygame.K_l: "l",
|
||||
}
|
||||
# Check for keypad keys with KP prefix
|
||||
if hasattr(pygame, "K_KP8") and key == pygame.K_KP8:
|
||||
return "kp8"
|
||||
if hasattr(pygame, "K_KP2") and key == pygame.K_KP2:
|
||||
return "kp2"
|
||||
if hasattr(pygame, "K_KP4") and key == pygame.K_KP4:
|
||||
return "kp4"
|
||||
if hasattr(pygame, "K_KP6") and key == pygame.K_KP6:
|
||||
return "kp6"
|
||||
return key_map.get(key, f"pygame_{key}")
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Already a string?
|
||||
if isinstance(key, str):
|
||||
return key.lower()
|
||||
|
||||
return str(key)
|
||||
|
||||
def set_event_callback(self, event_type: str, callback: Callable) -> None:
|
||||
"""Register a callback for UI events.
|
||||
|
||||
Args:
|
||||
event_type: Event type ("stage_toggled", "param_changed", "stage_selected", "preset_changed")
|
||||
callback: Function to call when event occurs
|
||||
"""
|
||||
self._callbacks[event_type] = callback
|
||||
|
||||
def _emit_event(self, event_type: str, **data) -> None:
|
||||
"""Emit an event to registered callbacks."""
|
||||
callback = self._callbacks.get(event_type)
|
||||
if callback:
|
||||
try:
|
||||
callback(**data)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def set_presets(self, presets: list[str], current: str) -> None:
|
||||
"""Set available presets and current selection.
|
||||
|
||||
Args:
|
||||
presets: List of preset names
|
||||
current: Currently active preset name
|
||||
"""
|
||||
self._presets = presets
|
||||
self._current_preset = current
|
||||
|
||||
def cycle_preset(self, direction: int = 1) -> str:
|
||||
"""Cycle to next/previous preset.
|
||||
|
||||
Args:
|
||||
direction: 1 for next, -1 for previous
|
||||
|
||||
Returns:
|
||||
New preset name
|
||||
"""
|
||||
if not self._presets:
|
||||
return self._current_preset
|
||||
try:
|
||||
current_idx = self._presets.index(self._current_preset)
|
||||
except ValueError:
|
||||
current_idx = 0
|
||||
next_idx = (current_idx + direction) % len(self._presets)
|
||||
self._current_preset = self._presets[next_idx]
|
||||
self._emit_event("preset_changed", preset_name=self._current_preset)
|
||||
return self._current_preset
|
||||
|
||||
def _render_preset_picker(self, panel_width: int) -> list[str]:
|
||||
"""Render a full-screen preset picker overlay."""
|
||||
lines = []
|
||||
picker_height = min(len(self._presets) + 2, self.config.stage_list_height)
|
||||
# Create a centered box
|
||||
title = " Select Preset "
|
||||
box_width = min(40, panel_width - 2)
|
||||
lines.append("┌" + "─" * (box_width - 2) + "┐")
|
||||
lines.append("│" + title.center(box_width - 2) + "│")
|
||||
lines.append("├" + "─" * (box_width - 2) + "┤")
|
||||
# List presets with selection
|
||||
visible_start = self._preset_scroll_offset
|
||||
visible_end = visible_start + picker_height - 2
|
||||
for i in range(visible_start, min(visible_end, len(self._presets))):
|
||||
preset_name = self._presets[i]
|
||||
is_current = preset_name == self._current_preset
|
||||
prefix = "▶ " if is_current else " "
|
||||
line = f"│ {prefix}{preset_name}"
|
||||
if len(line) < box_width - 1:
|
||||
line = line.ljust(box_width - 1)
|
||||
lines.append(line[: box_width - 1] + "│")
|
||||
# Footer with help
|
||||
help_text = "[P] close [↑↓] navigate [Enter] select"
|
||||
footer = "├" + "─" * (box_width - 2) + "┤"
|
||||
lines.append(footer)
|
||||
lines.append("│" + help_text.center(box_width - 2) + "│")
|
||||
lines.append("└" + "─" * (box_width - 2) + "┘")
|
||||
return lines
|
||||
219
engine/pipeline/validation.py
Normal file
219
engine/pipeline/validation.py
Normal file
@@ -0,0 +1,219 @@
|
||||
"""
|
||||
Pipeline validation and MVP (Minimum Viable Pipeline) injection.
|
||||
|
||||
Provides validation functions to ensure pipelines meet minimum requirements
|
||||
and can auto-inject sensible defaults when fields are missing or invalid.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from engine.display import BorderMode, DisplayRegistry
|
||||
from engine.effects import get_registry
|
||||
from engine.pipeline.params import PipelineParams
|
||||
|
||||
# Known valid values
|
||||
VALID_SOURCES = ["headlines", "poetry", "fixture", "empty", "pipeline-inspect"]
|
||||
VALID_CAMERAS = [
|
||||
"feed",
|
||||
"scroll",
|
||||
"vertical",
|
||||
"horizontal",
|
||||
"omni",
|
||||
"floating",
|
||||
"bounce",
|
||||
"none",
|
||||
"",
|
||||
]
|
||||
VALID_DISPLAYS = None # Will be populated at runtime from DisplayRegistry
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationResult:
|
||||
"""Result of validation with changes and warnings."""
|
||||
|
||||
valid: bool
|
||||
warnings: list[str]
|
||||
changes: list[str]
|
||||
config: Any # PipelineConfig (forward ref)
|
||||
params: PipelineParams
|
||||
|
||||
|
||||
# MVP defaults
|
||||
MVP_DEFAULTS = {
|
||||
"source": "fixture",
|
||||
"display": "terminal",
|
||||
"camera": "", # Empty = no camera stage (static viewport)
|
||||
"effects": [],
|
||||
"border": False,
|
||||
}
|
||||
|
||||
|
||||
def validate_pipeline_config(
|
||||
config: Any, params: PipelineParams, allow_unsafe: bool = False
|
||||
) -> ValidationResult:
|
||||
"""Validate pipeline configuration against MVP requirements.
|
||||
|
||||
Args:
|
||||
config: PipelineConfig object (has source, display, camera, effects fields)
|
||||
params: PipelineParams object (has border field)
|
||||
allow_unsafe: If True, don't inject defaults or enforce MVP
|
||||
|
||||
Returns:
|
||||
ValidationResult with validity, warnings, changes, and validated config/params
|
||||
"""
|
||||
warnings = []
|
||||
changes = []
|
||||
|
||||
if allow_unsafe:
|
||||
# Still do basic validation but don't inject defaults
|
||||
# Always return valid=True when allow_unsafe is set
|
||||
warnings.extend(_validate_source(config.source))
|
||||
warnings.extend(_validate_display(config.display))
|
||||
warnings.extend(_validate_camera(config.camera))
|
||||
warnings.extend(_validate_effects(config.effects))
|
||||
warnings.extend(_validate_border(params.border))
|
||||
return ValidationResult(
|
||||
valid=True, # Always valid with allow_unsafe
|
||||
warnings=warnings,
|
||||
changes=[],
|
||||
config=config,
|
||||
params=params,
|
||||
)
|
||||
|
||||
# MVP injection mode
|
||||
# Source
|
||||
source_issues = _validate_source(config.source)
|
||||
if source_issues:
|
||||
warnings.extend(source_issues)
|
||||
config.source = MVP_DEFAULTS["source"]
|
||||
changes.append(f"source → {MVP_DEFAULTS['source']}")
|
||||
|
||||
# Display
|
||||
display_issues = _validate_display(config.display)
|
||||
if display_issues:
|
||||
warnings.extend(display_issues)
|
||||
config.display = MVP_DEFAULTS["display"]
|
||||
changes.append(f"display → {MVP_DEFAULTS['display']}")
|
||||
|
||||
# Camera
|
||||
camera_issues = _validate_camera(config.camera)
|
||||
if camera_issues:
|
||||
warnings.extend(camera_issues)
|
||||
config.camera = MVP_DEFAULTS["camera"]
|
||||
changes.append("camera → static (no camera stage)")
|
||||
|
||||
# Effects
|
||||
effect_issues = _validate_effects(config.effects)
|
||||
if effect_issues:
|
||||
warnings.extend(effect_issues)
|
||||
# Only change if all effects are invalid
|
||||
if len(config.effects) == 0 or all(
|
||||
e not in _get_valid_effects() for e in config.effects
|
||||
):
|
||||
config.effects = MVP_DEFAULTS["effects"]
|
||||
changes.append("effects → [] (none)")
|
||||
else:
|
||||
# Remove invalid effects, keep valid ones
|
||||
valid_effects = [e for e in config.effects if e in _get_valid_effects()]
|
||||
if valid_effects != config.effects:
|
||||
config.effects = valid_effects
|
||||
changes.append(f"effects → {valid_effects}")
|
||||
|
||||
# Border (in params)
|
||||
border_issues = _validate_border(params.border)
|
||||
if border_issues:
|
||||
warnings.extend(border_issues)
|
||||
params.border = MVP_DEFAULTS["border"]
|
||||
changes.append(f"border → {MVP_DEFAULTS['border']}")
|
||||
|
||||
valid = len(warnings) == 0
|
||||
if changes:
|
||||
# If we made changes, pipeline should be valid now
|
||||
valid = True
|
||||
|
||||
return ValidationResult(
|
||||
valid=valid,
|
||||
warnings=warnings,
|
||||
changes=changes,
|
||||
config=config,
|
||||
params=params,
|
||||
)
|
||||
|
||||
|
||||
def _validate_source(source: str) -> list[str]:
|
||||
"""Validate source field."""
|
||||
if not source:
|
||||
return ["source is empty"]
|
||||
if source not in VALID_SOURCES:
|
||||
return [f"unknown source '{source}', valid sources: {VALID_SOURCES}"]
|
||||
return []
|
||||
|
||||
|
||||
def _validate_display(display: str) -> list[str]:
|
||||
"""Validate display field."""
|
||||
if not display:
|
||||
return ["display is empty"]
|
||||
# Check if display is available (lazy load registry)
|
||||
try:
|
||||
available = DisplayRegistry.list_backends()
|
||||
if display not in available:
|
||||
return [f"display '{display}' not available, available: {available}"]
|
||||
except Exception as e:
|
||||
return [f"error checking display availability: {e}"]
|
||||
return []
|
||||
|
||||
|
||||
def _validate_camera(camera: str | None) -> list[str]:
|
||||
"""Validate camera field."""
|
||||
if camera is None:
|
||||
return ["camera is None"]
|
||||
# Empty string is valid (static, no camera stage)
|
||||
if camera == "":
|
||||
return []
|
||||
if camera not in VALID_CAMERAS:
|
||||
return [f"unknown camera '{camera}', valid cameras: {VALID_CAMERAS}"]
|
||||
return []
|
||||
|
||||
|
||||
def _get_valid_effects() -> set[str]:
|
||||
"""Get set of valid effect names."""
|
||||
registry = get_registry()
|
||||
return set(registry.list_all().keys())
|
||||
|
||||
|
||||
def _validate_effects(effects: list[str]) -> list[str]:
|
||||
"""Validate effects list."""
|
||||
if effects is None:
|
||||
return ["effects is None"]
|
||||
valid_effects = _get_valid_effects()
|
||||
issues = []
|
||||
for effect in effects:
|
||||
if effect not in valid_effects:
|
||||
issues.append(
|
||||
f"unknown effect '{effect}', valid effects: {sorted(valid_effects)}"
|
||||
)
|
||||
return issues
|
||||
|
||||
|
||||
def _validate_border(border: bool | BorderMode) -> list[str]:
|
||||
"""Validate border field."""
|
||||
if isinstance(border, bool):
|
||||
return []
|
||||
if isinstance(border, BorderMode):
|
||||
return []
|
||||
return [f"invalid border value, must be bool or BorderMode, got {type(border)}"]
|
||||
|
||||
|
||||
def get_mvp_summary(config: Any, params: PipelineParams) -> str:
|
||||
"""Get a human-readable summary of the MVP pipeline configuration."""
|
||||
camera_text = "none" if not config.camera else config.camera
|
||||
effects_text = "none" if not config.effects else ", ".join(config.effects)
|
||||
return (
|
||||
f"MVP Pipeline Configuration:\n"
|
||||
f" Source: {config.source}\n"
|
||||
f" Display: {config.display}\n"
|
||||
f" Camera: {camera_text} (static if empty)\n"
|
||||
f" Effects: {effects_text}\n"
|
||||
f" Border: {params.border}"
|
||||
)
|
||||
56
test_ui_simple.py
Normal file
56
test_ui_simple.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""
|
||||
Simple test for UIPanel integration.
|
||||
"""
|
||||
|
||||
from engine.pipeline.ui import UIPanel, UIConfig, StageControl
|
||||
|
||||
# Create panel
|
||||
panel = UIPanel(UIConfig(panel_width=24))
|
||||
|
||||
# Add some mock stages
|
||||
panel.register_stage(
|
||||
type(
|
||||
"Stage", (), {"name": "noise", "category": "effect", "is_enabled": lambda: True}
|
||||
),
|
||||
enabled=True,
|
||||
)
|
||||
panel.register_stage(
|
||||
type(
|
||||
"Stage", (), {"name": "fade", "category": "effect", "is_enabled": lambda: True}
|
||||
),
|
||||
enabled=False,
|
||||
)
|
||||
panel.register_stage(
|
||||
type(
|
||||
"Stage",
|
||||
(),
|
||||
{"name": "glitch", "category": "effect", "is_enabled": lambda: True},
|
||||
),
|
||||
enabled=True,
|
||||
)
|
||||
panel.register_stage(
|
||||
type(
|
||||
"Stage",
|
||||
(),
|
||||
{"name": "font", "category": "transform", "is_enabled": lambda: True},
|
||||
),
|
||||
enabled=True,
|
||||
)
|
||||
|
||||
# Select first stage
|
||||
panel.select_stage("noise")
|
||||
|
||||
# Render at 80x24
|
||||
lines = panel.render(80, 24)
|
||||
print("\n".join(lines))
|
||||
|
||||
print("\nStage list:")
|
||||
for name, ctrl in panel.stages.items():
|
||||
print(f" {name}: enabled={ctrl.enabled}, selected={ctrl.selected}")
|
||||
|
||||
print("\nToggle 'fade' and re-render:")
|
||||
panel.toggle_stage("fade")
|
||||
lines = panel.render(80, 24)
|
||||
print("\n".join(lines))
|
||||
|
||||
print("\nEnabled stages:", panel.get_enabled_stages())
|
||||
184
tests/test_ui_panel.py
Normal file
184
tests/test_ui_panel.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""
|
||||
Tests for UIPanel.
|
||||
"""
|
||||
|
||||
from engine.pipeline.ui import StageControl, UIConfig, UIPanel
|
||||
|
||||
|
||||
class MockStage:
|
||||
"""Mock stage for testing."""
|
||||
|
||||
def __init__(self, name, category="effect"):
|
||||
self.name = name
|
||||
self.category = category
|
||||
self._enabled = True
|
||||
|
||||
def is_enabled(self):
|
||||
return self._enabled
|
||||
|
||||
|
||||
class TestUIPanel:
|
||||
"""Tests for UIPanel."""
|
||||
|
||||
def test_init(self):
|
||||
"""UIPanel initializes with default config."""
|
||||
panel = UIPanel()
|
||||
assert panel.config.panel_width == 24
|
||||
assert panel.config.stage_list_height == 12
|
||||
assert panel.scroll_offset == 0
|
||||
assert panel.selected_stage is None
|
||||
|
||||
def test_register_stage(self):
|
||||
"""register_stage adds a stage control."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("noise")
|
||||
panel.register_stage(stage, enabled=True)
|
||||
assert "noise" in panel.stages
|
||||
ctrl = panel.stages["noise"]
|
||||
assert ctrl.name == "noise"
|
||||
assert ctrl.enabled is True
|
||||
assert ctrl.selected is False
|
||||
|
||||
def test_select_stage(self):
|
||||
"""select_stage sets selection."""
|
||||
panel = UIPanel()
|
||||
stage1 = MockStage("noise")
|
||||
stage2 = MockStage("fade")
|
||||
panel.register_stage(stage1)
|
||||
panel.register_stage(stage2)
|
||||
panel.select_stage("fade")
|
||||
assert panel.selected_stage == "fade"
|
||||
assert panel.stages["fade"].selected is True
|
||||
assert panel.stages["noise"].selected is False
|
||||
|
||||
def test_toggle_stage(self):
|
||||
"""toggle_stage flips enabled state."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("glitch")
|
||||
panel.register_stage(stage, enabled=True)
|
||||
result = panel.toggle_stage("glitch")
|
||||
assert result is False
|
||||
assert panel.stages["glitch"].enabled is False
|
||||
result = panel.toggle_stage("glitch")
|
||||
assert result is True
|
||||
|
||||
def test_get_enabled_stages(self):
|
||||
"""get_enabled_stages returns only enabled stage names."""
|
||||
panel = UIPanel()
|
||||
panel.register_stage(MockStage("noise"), enabled=True)
|
||||
panel.register_stage(MockStage("fade"), enabled=False)
|
||||
panel.register_stage(MockStage("glitch"), enabled=True)
|
||||
enabled = panel.get_enabled_stages()
|
||||
assert set(enabled) == {"noise", "glitch"}
|
||||
|
||||
def test_scroll_stages(self):
|
||||
"""scroll_stages moves the view."""
|
||||
panel = UIPanel(UIConfig(stage_list_height=3))
|
||||
for i in range(10):
|
||||
panel.register_stage(MockStage(f"stage{i}"))
|
||||
assert panel.scroll_offset == 0
|
||||
panel.scroll_stages(1)
|
||||
assert panel.scroll_offset == 1
|
||||
panel.scroll_stages(-1)
|
||||
assert panel.scroll_offset == 0
|
||||
# Clamp at max
|
||||
panel.scroll_stages(100)
|
||||
assert panel.scroll_offset == 7 # 10 - 3 = 7
|
||||
|
||||
def test_render_produces_lines(self):
|
||||
"""render produces list of strings of correct width."""
|
||||
panel = UIPanel(UIConfig(panel_width=20))
|
||||
panel.register_stage(MockStage("noise"), enabled=True)
|
||||
panel.register_stage(MockStage("fade"), enabled=False)
|
||||
panel.select_stage("noise")
|
||||
lines = panel.render(80, 24)
|
||||
# All lines should be exactly panel_width chars (20)
|
||||
for line in lines:
|
||||
assert len(line) == 20
|
||||
# Should have header, stage rows, separator, params area, footer
|
||||
assert len(lines) >= 5
|
||||
|
||||
def test_process_key_event_space_toggles_stage(self):
|
||||
"""process_key_event with space toggles UI panel visibility."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("glitch")
|
||||
panel.register_stage(stage, enabled=True)
|
||||
panel.select_stage("glitch")
|
||||
# Space should now toggle UI panel visibility, not stage
|
||||
assert panel._show_panel is True
|
||||
handled = panel.process_key_event(" ")
|
||||
assert handled is True
|
||||
assert panel._show_panel is False
|
||||
# Pressing space again should show panel
|
||||
handled = panel.process_key_event(" ")
|
||||
assert panel._show_panel is True
|
||||
|
||||
def test_process_key_event_space_does_not_toggle_in_picker(self):
|
||||
"""Space should not toggle UI panel when preset picker is active."""
|
||||
panel = UIPanel()
|
||||
panel._show_panel = True
|
||||
panel._show_preset_picker = True
|
||||
handled = panel.process_key_event(" ")
|
||||
assert handled is False # Not handled when picker active
|
||||
assert panel._show_panel is True # Unchanged
|
||||
|
||||
def test_process_key_event_s_selects_next(self):
|
||||
"""process_key_event with s cycles selection."""
|
||||
panel = UIPanel()
|
||||
panel.register_stage(MockStage("noise"))
|
||||
panel.register_stage(MockStage("fade"))
|
||||
panel.register_stage(MockStage("glitch"))
|
||||
panel.select_stage("noise")
|
||||
handled = panel.process_key_event("s")
|
||||
assert handled is True
|
||||
assert panel.selected_stage == "fade"
|
||||
|
||||
def test_process_key_event_hjkl_navigation(self):
|
||||
"""process_key_event with HJKL keys."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("noise")
|
||||
panel.register_stage(stage)
|
||||
panel.select_stage("noise")
|
||||
|
||||
# J or Down should scroll or adjust param
|
||||
assert panel.scroll_stages(1) is None # Just test it doesn't error
|
||||
# H or Left should adjust param (when param selected)
|
||||
panel.selected_stage = "noise"
|
||||
panel._focused_param = "intensity"
|
||||
panel.stages["noise"].params["intensity"] = 0.5
|
||||
|
||||
# Left/H should decrease
|
||||
handled = panel.process_key_event("h")
|
||||
assert handled is True
|
||||
# L or Right should increase
|
||||
handled = panel.process_key_event("l")
|
||||
assert handled is True
|
||||
|
||||
# K should scroll up
|
||||
panel.selected_stage = None
|
||||
handled = panel.process_key_event("k")
|
||||
assert handled is True
|
||||
|
||||
def test_set_event_callback(self):
|
||||
"""set_event_callback registers callback."""
|
||||
panel = UIPanel()
|
||||
called = []
|
||||
|
||||
def callback(stage_name, enabled):
|
||||
called.append((stage_name, enabled))
|
||||
|
||||
panel.set_event_callback("stage_toggled", callback)
|
||||
panel.toggle_stage("test") # No stage, won't trigger
|
||||
# Simulate toggle through event
|
||||
panel._emit_event("stage_toggled", stage_name="noise", enabled=False)
|
||||
assert called == [("noise", False)]
|
||||
|
||||
def test_register_stage_returns_control(self):
|
||||
"""register_stage should return the StageControl instance."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("noise_effect")
|
||||
control = panel.register_stage(stage, enabled=True)
|
||||
assert control is not None
|
||||
assert isinstance(control, StageControl)
|
||||
assert control.name == "noise_effect"
|
||||
assert control.enabled is True
|
||||
Reference in New Issue
Block a user