forked from genewildish/Mainline
Compare commits
6 Commits
60ae4f7dfb
...
abe49ba7d7
| Author | SHA1 | Date | |
|---|---|---|---|
| abe49ba7d7 | |||
| 6d2c5ba304 | |||
| a95b24a246 | |||
| cdcdb7b172 | |||
| 21fb210c6e | |||
| 36afbacb6b |
9
TODO.md
Normal file
9
TODO.md
Normal file
@@ -0,0 +1,9 @@
|
||||
# Tasks
|
||||
|
||||
- [ ] Add entropy/chaos score metadata to effects for auto-categorization and intensity control
|
||||
- [ ] Finish ModernGL display backend: integrate window system, implement glyph caching, add event handling, and support border modes.
|
||||
- [x] Integrate UIPanel with pipeline: register stages, link parameter schemas, handle events, implement hot-reload.
|
||||
- [x] Move cached fixture headlines to engine/fixtures/headlines.json and update default source to use fixture.
|
||||
- [x] Add interactive UI panel for pipeline configuration (right-side panel) with stage toggles and param sliders.
|
||||
- [x] Enumerate all effect plugin parameters automatically for UI control (intensity, decay, etc.)
|
||||
- [ ] Implement pipeline hot-rebuild when stage toggles or params change, preserving camera and display state.
|
||||
@@ -1,145 +0,0 @@
|
||||
# README Update Design — 2026-03-15
|
||||
|
||||
## Goal
|
||||
|
||||
Restructure and expand `README.md` to:
|
||||
1. Align with the current codebase (Python 3.10+, uv/mise/pytest/ruff toolchain, 6 new fonts)
|
||||
2. Add extensibility-focused content (`Extending` section)
|
||||
3. Add developer workflow coverage (`Development` section)
|
||||
4. Improve navigability via top-level grouping (Approach C)
|
||||
|
||||
---
|
||||
|
||||
## Proposed Structure
|
||||
|
||||
```
|
||||
# MAINLINE
|
||||
> tagline + description
|
||||
|
||||
## Using
|
||||
### Run
|
||||
### Config
|
||||
### Feeds
|
||||
### Fonts
|
||||
### ntfy.sh
|
||||
|
||||
## Internals
|
||||
### How it works
|
||||
### Architecture
|
||||
|
||||
## Extending
|
||||
### NtfyPoller
|
||||
### MicMonitor
|
||||
### Render pipeline
|
||||
|
||||
## Development
|
||||
### Setup
|
||||
### Tasks
|
||||
### Testing
|
||||
### Linting
|
||||
|
||||
## Roadmap
|
||||
|
||||
---
|
||||
*footer*
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
## Section-by-section design
|
||||
|
||||
### Using
|
||||
|
||||
All existing content preserved verbatim. Two changes:
|
||||
- **Run**: add `uv run mainline.py` as an alternative invocation; expand bootstrap note to mention `uv sync` / `uv sync --all-extras`
|
||||
- **ntfy.sh**: remove `NtfyPoller` reuse code example (moves to Extending); keep push instructions and topic config
|
||||
|
||||
Subsections moved into Using (currently standalone):
|
||||
- `Feeds` — it's configuration, not a concept
|
||||
- `ntfy.sh` (usage half)
|
||||
|
||||
### Internals
|
||||
|
||||
All existing content preserved verbatim. One change:
|
||||
- **Architecture**: append `tests/` directory listing to the module tree
|
||||
|
||||
### Extending
|
||||
|
||||
Entirely new section. Three subsections:
|
||||
|
||||
**NtfyPoller**
|
||||
- Minimal working import + usage example
|
||||
- Note: stdlib only dependencies
|
||||
|
||||
```python
|
||||
from engine.ntfy import NtfyPoller
|
||||
|
||||
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
|
||||
poller.start()
|
||||
|
||||
# in your render loop:
|
||||
msg = poller.get_active_message() # → (title, body, timestamp) or None
|
||||
if msg:
|
||||
title, body, ts = msg
|
||||
render_my_message(title, body) # visualizer-specific
|
||||
```
|
||||
|
||||
**MicMonitor**
|
||||
- Minimal working import + usage example
|
||||
- Note: sounddevice/numpy optional, degrades gracefully
|
||||
|
||||
```python
|
||||
from engine.mic import MicMonitor
|
||||
|
||||
mic = MicMonitor(threshold_db=50)
|
||||
if mic.start(): # returns False if sounddevice unavailable
|
||||
excess = mic.excess # dB above threshold, clamped to 0
|
||||
db = mic.db # raw RMS dB level
|
||||
```
|
||||
|
||||
**Render pipeline**
|
||||
- Brief prose about `engine.render` as importable pipeline
|
||||
- Minimal sketch of serve.py / ESP32 usage pattern
|
||||
- Reference to `Mainline Renderer + ntfy Message Queue for ESP32.md`
|
||||
|
||||
### Development
|
||||
|
||||
Entirely new section. Four subsections:
|
||||
|
||||
**Setup**
|
||||
- Hard requirements: Python 3.10+, uv
|
||||
- `uv sync` / `uv sync --all-extras` / `uv sync --group dev`
|
||||
|
||||
**Tasks** (via mise)
|
||||
- `mise run test`, `test-cov`, `lint`, `lint-fix`, `format`, `run`, `run-poetry`, `run-firehose`
|
||||
|
||||
**Testing**
|
||||
- Tests in `tests/` covering config, filter, mic, ntfy, sources, terminal
|
||||
- `uv run pytest` and `uv run pytest --cov=engine --cov-report=term-missing`
|
||||
|
||||
**Linting**
|
||||
- `uv run ruff check` and `uv run ruff format`
|
||||
- Note: pre-commit hooks run lint via `hk`
|
||||
|
||||
### Roadmap
|
||||
|
||||
Existing `## Ideas / Future` content preserved verbatim. Only change: rename heading to `## Roadmap`.
|
||||
|
||||
### Footer
|
||||
|
||||
Update `Python 3.9+` → `Python 3.10+`.
|
||||
|
||||
---
|
||||
|
||||
## Files changed
|
||||
|
||||
- `README.md` — restructured and expanded as above
|
||||
- No other files
|
||||
|
||||
---
|
||||
|
||||
## What is not changing
|
||||
|
||||
- All existing prose, examples, and config table values — preserved verbatim where retained
|
||||
- The Ideas/Future content — kept intact under the new Roadmap heading
|
||||
- The cyberpunk voice and terse style of the existing README
|
||||
757
engine/app.py
757
engine/app.py
@@ -4,10 +4,11 @@ Application orchestrator — pipeline mode entry point.
|
||||
|
||||
import sys
|
||||
import time
|
||||
from typing import Any
|
||||
|
||||
import engine.effects.plugins as effects_plugins
|
||||
from engine import config
|
||||
from engine.display import DisplayRegistry
|
||||
from engine.display import BorderMode, DisplayRegistry
|
||||
from engine.effects import PerformanceMonitor, get_registry, set_monitor
|
||||
from engine.fetch import fetch_all, fetch_poetry, load_cache
|
||||
from engine.pipeline import (
|
||||
@@ -17,14 +18,18 @@ from engine.pipeline import (
|
||||
list_presets,
|
||||
)
|
||||
from engine.pipeline.adapters import (
|
||||
EffectPluginStage,
|
||||
SourceItemsToBufferStage,
|
||||
create_stage_from_display,
|
||||
create_stage_from_effect,
|
||||
)
|
||||
from engine.pipeline.core import PipelineContext
|
||||
from engine.pipeline.params import PipelineParams
|
||||
from engine.pipeline.ui import UIConfig, UIPanel
|
||||
|
||||
|
||||
def main():
|
||||
"""Main entry point - all modes now use presets."""
|
||||
"""Main entry point - all modes now use presets or CLI construction."""
|
||||
if config.PIPELINE_DIAGRAM:
|
||||
try:
|
||||
from engine.pipeline import generate_pipeline_diagram
|
||||
@@ -34,6 +39,12 @@ def main():
|
||||
print(generate_pipeline_diagram())
|
||||
return
|
||||
|
||||
# Check for direct pipeline construction flags
|
||||
if "--pipeline-source" in sys.argv:
|
||||
# Construct pipeline directly from CLI args
|
||||
run_pipeline_mode_direct()
|
||||
return
|
||||
|
||||
preset_name = None
|
||||
|
||||
if config.PRESET:
|
||||
@@ -92,6 +103,12 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
elif preset.source == "empty":
|
||||
items = []
|
||||
print(" \033[38;5;245mUsing empty source (no content)\033[0m")
|
||||
elif preset.source == "fixture":
|
||||
items = load_cache()
|
||||
if not items:
|
||||
print(" \033[38;5;196mNo fixture cache available\033[0m")
|
||||
sys.exit(1)
|
||||
print(f" \033[38;5;82mLoaded {len(items)} items from fixture\033[0m")
|
||||
else:
|
||||
cached = load_cache()
|
||||
if cached:
|
||||
@@ -223,6 +240,347 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
|
||||
sys.exit(1)
|
||||
|
||||
# Initialize UI panel if border mode requires it
|
||||
ui_panel = None
|
||||
if isinstance(params.border, BorderMode) and params.border == BorderMode.UI:
|
||||
from engine.display import render_ui_panel
|
||||
|
||||
ui_panel = UIPanel(UIConfig(panel_width=24, start_with_preset_picker=True))
|
||||
# Enable raw mode for terminal input if supported
|
||||
if hasattr(display, "set_raw_mode"):
|
||||
display.set_raw_mode(True)
|
||||
# Register effect plugin stages from pipeline for UI control
|
||||
for stage in pipeline.stages.values():
|
||||
if isinstance(stage, EffectPluginStage):
|
||||
effect = stage._effect
|
||||
enabled = effect.config.enabled if hasattr(effect, "config") else True
|
||||
stage_control = ui_panel.register_stage(stage, enabled=enabled)
|
||||
# Store reference to effect for easier access
|
||||
stage_control.effect = effect # type: ignore[attr-defined]
|
||||
# Select first stage by default
|
||||
if ui_panel.stages:
|
||||
first_stage = next(iter(ui_panel.stages))
|
||||
ui_panel.select_stage(first_stage)
|
||||
# Populate param schema from EffectConfig if it's a dataclass
|
||||
ctrl = ui_panel.stages[first_stage]
|
||||
if hasattr(ctrl, "effect"):
|
||||
effect = ctrl.effect
|
||||
if hasattr(effect, "config"):
|
||||
config = effect.config
|
||||
# Try to get fields via dataclasses if available
|
||||
try:
|
||||
import dataclasses
|
||||
|
||||
if dataclasses.is_dataclass(config):
|
||||
for field_name, field_obj in dataclasses.fields(config):
|
||||
if field_name == "enabled":
|
||||
continue
|
||||
value = getattr(config, field_name, None)
|
||||
if value is not None:
|
||||
ctrl.params[field_name] = value
|
||||
ctrl.param_schema[field_name] = {
|
||||
"type": type(value).__name__,
|
||||
"min": 0
|
||||
if isinstance(value, (int, float))
|
||||
else None,
|
||||
"max": 1 if isinstance(value, float) else None,
|
||||
"step": 0.1 if isinstance(value, float) else 1,
|
||||
}
|
||||
except Exception:
|
||||
pass # No dataclass fields, skip param UI
|
||||
|
||||
# Set up callback for stage toggles
|
||||
def on_stage_toggled(stage_name: str, enabled: bool):
|
||||
"""Update the actual stage's enabled state when UI toggles."""
|
||||
stage = pipeline.get_stage(stage_name)
|
||||
if stage:
|
||||
# Set stage enabled flag for pipeline execution
|
||||
stage._enabled = enabled
|
||||
# Also update effect config if it's an EffectPluginStage
|
||||
if isinstance(stage, EffectPluginStage):
|
||||
stage._effect.config.enabled = enabled
|
||||
|
||||
ui_panel.set_event_callback("stage_toggled", on_stage_toggled)
|
||||
|
||||
# Set up callback for parameter changes
|
||||
def on_param_changed(stage_name: str, param_name: str, value: Any):
|
||||
"""Update the effect config when UI adjusts a parameter."""
|
||||
stage = pipeline.get_stage(stage_name)
|
||||
if stage and isinstance(stage, EffectPluginStage):
|
||||
effect = stage._effect
|
||||
if hasattr(effect, "config"):
|
||||
setattr(effect.config, param_name, value)
|
||||
# Mark effect as needing reconfiguration if it has a configure method
|
||||
if hasattr(effect, "configure"):
|
||||
try:
|
||||
effect.configure(effect.config)
|
||||
except Exception:
|
||||
pass # Ignore reconfiguration errors
|
||||
|
||||
ui_panel.set_event_callback("param_changed", on_param_changed)
|
||||
|
||||
# Set up preset list and handle preset changes
|
||||
from engine.pipeline import list_presets
|
||||
|
||||
ui_panel.set_presets(list_presets(), preset_name)
|
||||
|
||||
def on_preset_changed(preset_name: str):
|
||||
"""Handle preset change from UI - rebuild pipeline."""
|
||||
nonlocal \
|
||||
pipeline, \
|
||||
display, \
|
||||
items, \
|
||||
params, \
|
||||
ui_panel, \
|
||||
current_width, \
|
||||
current_height
|
||||
|
||||
print(f" \033[38;5;245mSwitching to preset: {preset_name}\033[0m")
|
||||
|
||||
try:
|
||||
# Clean up old pipeline
|
||||
pipeline.cleanup()
|
||||
|
||||
# Get new preset
|
||||
new_preset = get_preset(preset_name)
|
||||
if not new_preset:
|
||||
print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m")
|
||||
return
|
||||
|
||||
# Update params for new preset
|
||||
params = new_preset.to_params()
|
||||
params.viewport_width = current_width
|
||||
params.viewport_height = current_height
|
||||
|
||||
# Reconstruct pipeline configuration
|
||||
new_config = PipelineConfig(
|
||||
source=new_preset.source,
|
||||
display=new_preset.display,
|
||||
camera=new_preset.camera,
|
||||
effects=new_preset.effects,
|
||||
)
|
||||
|
||||
# Create new pipeline instance
|
||||
pipeline = Pipeline(config=new_config, context=PipelineContext())
|
||||
|
||||
# Re-add stages (similar to initial construction)
|
||||
# Source stage
|
||||
if new_preset.source == "pipeline-inspect":
|
||||
from engine.data_sources.pipeline_introspection import (
|
||||
PipelineIntrospectionSource,
|
||||
)
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
|
||||
introspection_source = PipelineIntrospectionSource(
|
||||
pipeline=None,
|
||||
viewport_width=current_width,
|
||||
viewport_height=current_height,
|
||||
)
|
||||
pipeline.add_stage(
|
||||
"source",
|
||||
DataSourceStage(introspection_source, name="pipeline-inspect"),
|
||||
)
|
||||
elif new_preset.source == "empty":
|
||||
from engine.data_sources.sources import EmptyDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
|
||||
empty_source = EmptyDataSource(
|
||||
width=current_width, height=current_height
|
||||
)
|
||||
pipeline.add_stage(
|
||||
"source", DataSourceStage(empty_source, name="empty")
|
||||
)
|
||||
elif new_preset.source == "fixture":
|
||||
items = load_cache()
|
||||
if not items:
|
||||
print(" \033[38;5;196mNo fixture cache available\033[0m")
|
||||
return
|
||||
from engine.data_sources.sources import ListDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
|
||||
list_source = ListDataSource(items, name="fixture")
|
||||
pipeline.add_stage(
|
||||
"source", DataSourceStage(list_source, name="fixture")
|
||||
)
|
||||
else:
|
||||
# Fetch or use cached items
|
||||
cached = load_cache()
|
||||
if cached:
|
||||
items = cached
|
||||
elif new_preset.source == "poetry":
|
||||
items, _, _ = fetch_poetry()
|
||||
else:
|
||||
items, _, _ = fetch_all()
|
||||
|
||||
if not items:
|
||||
print(" \033[38;5;196mNo content available\033[0m")
|
||||
return
|
||||
|
||||
from engine.data_sources.sources import ListDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
|
||||
list_source = ListDataSource(items, name=new_preset.source)
|
||||
pipeline.add_stage(
|
||||
"source", DataSourceStage(list_source, name=new_preset.source)
|
||||
)
|
||||
|
||||
# Add viewport filter and font for headline/poetry sources
|
||||
if new_preset.source in ["headlines", "poetry", "fixture"]:
|
||||
from engine.pipeline.adapters import FontStage, ViewportFilterStage
|
||||
|
||||
pipeline.add_stage(
|
||||
"viewport_filter", ViewportFilterStage(name="viewport-filter")
|
||||
)
|
||||
pipeline.add_stage("font", FontStage(name="font"))
|
||||
|
||||
# Add camera if specified
|
||||
if new_preset.camera:
|
||||
from engine.camera import Camera
|
||||
from engine.pipeline.adapters import CameraStage
|
||||
|
||||
speed = getattr(new_preset, "camera_speed", 1.0)
|
||||
camera = None
|
||||
cam_type = new_preset.camera
|
||||
if cam_type == "feed":
|
||||
camera = Camera.feed(speed=speed)
|
||||
elif cam_type == "scroll" or cam_type == "vertical":
|
||||
camera = Camera.scroll(speed=speed)
|
||||
elif cam_type == "horizontal":
|
||||
camera = Camera.horizontal(speed=speed)
|
||||
elif cam_type == "omni":
|
||||
camera = Camera.omni(speed=speed)
|
||||
elif cam_type == "floating":
|
||||
camera = Camera.floating(speed=speed)
|
||||
elif cam_type == "bounce":
|
||||
camera = Camera.bounce(speed=speed)
|
||||
|
||||
if camera:
|
||||
pipeline.add_stage("camera", CameraStage(camera, name=cam_type))
|
||||
|
||||
# Add effects
|
||||
effect_registry = get_registry()
|
||||
for effect_name in new_preset.effects:
|
||||
effect = effect_registry.get(effect_name)
|
||||
if effect:
|
||||
pipeline.add_stage(
|
||||
f"effect_{effect_name}",
|
||||
create_stage_from_effect(effect, effect_name),
|
||||
)
|
||||
|
||||
# Add display (respect CLI override)
|
||||
display_name = new_preset.display
|
||||
if "--display" in sys.argv:
|
||||
idx = sys.argv.index("--display")
|
||||
if idx + 1 < len(sys.argv):
|
||||
display_name = sys.argv[idx + 1]
|
||||
|
||||
new_display = DisplayRegistry.create(display_name)
|
||||
if not new_display and not display_name.startswith("multi"):
|
||||
print(
|
||||
f" \033[38;5;196mFailed to create display: {display_name}\033[0m"
|
||||
)
|
||||
return
|
||||
|
||||
if not new_display and display_name.startswith("multi"):
|
||||
parts = display_name[6:].split(",")
|
||||
new_display = DisplayRegistry.create_multi(parts)
|
||||
if not new_display:
|
||||
print(
|
||||
f" \033[38;5;196mFailed to create multi display: {parts}\033[0m"
|
||||
)
|
||||
return
|
||||
|
||||
if not new_display:
|
||||
print(
|
||||
f" \033[38;5;196mFailed to create display: {display_name}\033[0m"
|
||||
)
|
||||
return
|
||||
|
||||
new_display.init(0, 0)
|
||||
|
||||
pipeline.add_stage(
|
||||
"display", create_stage_from_display(new_display, display_name)
|
||||
)
|
||||
|
||||
pipeline.build()
|
||||
|
||||
# Set pipeline for introspection source if needed
|
||||
if (
|
||||
new_preset.source == "pipeline-inspect"
|
||||
and introspection_source is not None
|
||||
):
|
||||
introspection_source.set_pipeline(pipeline)
|
||||
|
||||
if not pipeline.initialize():
|
||||
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
|
||||
return
|
||||
|
||||
# Replace global references with new pipeline and display
|
||||
display = new_display
|
||||
|
||||
# Reinitialize UI panel with new effect stages
|
||||
if (
|
||||
isinstance(params.border, BorderMode)
|
||||
and params.border == BorderMode.UI
|
||||
):
|
||||
ui_panel = UIPanel(
|
||||
UIConfig(panel_width=24, start_with_preset_picker=True)
|
||||
)
|
||||
for stage in pipeline.stages.values():
|
||||
if isinstance(stage, EffectPluginStage):
|
||||
effect = stage._effect
|
||||
enabled = (
|
||||
effect.config.enabled
|
||||
if hasattr(effect, "config")
|
||||
else True
|
||||
)
|
||||
stage_control = ui_panel.register_stage(
|
||||
stage, enabled=enabled
|
||||
)
|
||||
stage_control.effect = effect # type: ignore[attr-defined]
|
||||
|
||||
if ui_panel.stages:
|
||||
first_stage = next(iter(ui_panel.stages))
|
||||
ui_panel.select_stage(first_stage)
|
||||
ctrl = ui_panel.stages[first_stage]
|
||||
if hasattr(ctrl, "effect"):
|
||||
effect = ctrl.effect
|
||||
if hasattr(effect, "config"):
|
||||
config = effect.config
|
||||
try:
|
||||
import dataclasses
|
||||
|
||||
if dataclasses.is_dataclass(config):
|
||||
for field_name, field_obj in dataclasses.fields(
|
||||
config
|
||||
):
|
||||
if field_name == "enabled":
|
||||
continue
|
||||
value = getattr(config, field_name, None)
|
||||
if value is not None:
|
||||
ctrl.params[field_name] = value
|
||||
ctrl.param_schema[field_name] = {
|
||||
"type": type(value).__name__,
|
||||
"min": 0
|
||||
if isinstance(value, (int, float))
|
||||
else None,
|
||||
"max": 1
|
||||
if isinstance(value, float)
|
||||
else None,
|
||||
"step": 0.1
|
||||
if isinstance(value, float)
|
||||
else 1,
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
print(f" \033[38;5;82mPreset switched to {preset_name}\033[0m")
|
||||
|
||||
except Exception as e:
|
||||
print(f" \033[38;5;196mError switching preset: {e}\033[0m")
|
||||
|
||||
ui_panel.set_event_callback("preset_changed", on_preset_changed)
|
||||
|
||||
print(" \033[38;5;82mStarting pipeline...\033[0m")
|
||||
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
|
||||
|
||||
@@ -250,7 +608,34 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
|
||||
result = pipeline.execute(items)
|
||||
if result.success:
|
||||
display.show(result.data, border=params.border)
|
||||
# Handle UI panel compositing if enabled
|
||||
if ui_panel is not None:
|
||||
from engine.display import render_ui_panel
|
||||
|
||||
buf = render_ui_panel(
|
||||
result.data,
|
||||
current_width,
|
||||
current_height,
|
||||
ui_panel,
|
||||
fps=params.fps if hasattr(params, "fps") else 60.0,
|
||||
frame_time=0.0,
|
||||
)
|
||||
# Render with border=OFF since we already added borders
|
||||
display.show(buf, border=False)
|
||||
# Handle pygame events for UI
|
||||
if display_name == "pygame":
|
||||
import pygame
|
||||
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.KEYDOWN:
|
||||
ui_panel.process_key_event(event.key, event.mod)
|
||||
# If space toggled stage, we could rebuild here (TODO)
|
||||
else:
|
||||
# Normal border handling
|
||||
show_border = (
|
||||
params.border if isinstance(params.border, bool) else False
|
||||
)
|
||||
display.show(result.data, border=show_border)
|
||||
|
||||
if hasattr(display, "is_quit_requested") and display.is_quit_requested():
|
||||
if hasattr(display, "clear_quit_request"):
|
||||
@@ -278,5 +663,371 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
print("\n \033[38;5;245mPipeline stopped\033[0m")
|
||||
|
||||
|
||||
def run_pipeline_mode_direct():
|
||||
"""Construct and run a pipeline directly from CLI arguments.
|
||||
|
||||
Usage:
|
||||
python -m engine.app --pipeline-source headlines --pipeline-effects noise,fade --display null
|
||||
python -m engine.app --pipeline-source fixture --pipeline-effects glitch --pipeline-ui --display null
|
||||
|
||||
Flags:
|
||||
--pipeline-source <source>: Headlines, fixture, poetry, empty, pipeline-inspect
|
||||
--pipeline-effects <effects>: Comma-separated list (noise, fade, glitch, firehose, hud, tint, border, crop)
|
||||
--pipeline-camera <type>: scroll, feed, horizontal, omni, floating, bounce
|
||||
--pipeline-display <display>: terminal, pygame, websocket, null, multi:term,pygame
|
||||
--pipeline-ui: Enable UI panel (BorderMode.UI)
|
||||
--pipeline-border <mode>: off, simple, ui
|
||||
"""
|
||||
import sys
|
||||
|
||||
from engine.camera import Camera
|
||||
from engine.data_sources.pipeline_introspection import PipelineIntrospectionSource
|
||||
from engine.data_sources.sources import EmptyDataSource, ListDataSource
|
||||
from engine.display import BorderMode, DisplayRegistry
|
||||
from engine.effects import get_registry
|
||||
from engine.fetch import fetch_all, fetch_poetry, load_cache
|
||||
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
|
||||
from engine.pipeline.adapters import (
|
||||
CameraStage,
|
||||
DataSourceStage,
|
||||
EffectPluginStage,
|
||||
create_stage_from_display,
|
||||
create_stage_from_effect,
|
||||
)
|
||||
from engine.pipeline.ui import UIConfig, UIPanel
|
||||
|
||||
# Parse CLI arguments
|
||||
source_name = None
|
||||
effect_names = []
|
||||
camera_type = None # Will use MVP default (static)
|
||||
display_name = None # Will use MVP default (terminal)
|
||||
ui_enabled = False
|
||||
border_mode = BorderMode.OFF
|
||||
source_items = None
|
||||
allow_unsafe = False
|
||||
|
||||
i = 1
|
||||
argv = sys.argv
|
||||
while i < len(argv):
|
||||
arg = argv[i]
|
||||
if arg == "--pipeline-source" and i + 1 < len(argv):
|
||||
source_name = argv[i + 1]
|
||||
i += 2
|
||||
elif arg == "--pipeline-effects" and i + 1 < len(argv):
|
||||
effect_names = [e.strip() for e in argv[i + 1].split(",") if e.strip()]
|
||||
i += 2
|
||||
elif arg == "--pipeline-camera" and i + 1 < len(argv):
|
||||
camera_type = argv[i + 1]
|
||||
i += 2
|
||||
elif arg == "--pipeline-display" and i + 1 < len(argv):
|
||||
display_name = argv[i + 1]
|
||||
i += 2
|
||||
elif arg == "--pipeline-ui":
|
||||
ui_enabled = True
|
||||
i += 1
|
||||
elif arg == "--pipeline-border" and i + 1 < len(argv):
|
||||
mode = argv[i + 1]
|
||||
if mode == "simple":
|
||||
border_mode = True
|
||||
elif mode == "ui":
|
||||
border_mode = BorderMode.UI
|
||||
else:
|
||||
border_mode = False
|
||||
i += 2
|
||||
elif arg == "--allow-unsafe":
|
||||
allow_unsafe = True
|
||||
i += 1
|
||||
else:
|
||||
i += 1
|
||||
|
||||
if not source_name:
|
||||
print("Error: --pipeline-source is required")
|
||||
print(
|
||||
"Usage: python -m engine.app --pipeline-source <source> [--pipeline-effects <effects>] ..."
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
print(" \033[38;5;245mDirect pipeline construction\033[0m")
|
||||
print(f" Source: {source_name}")
|
||||
print(f" Effects: {effect_names}")
|
||||
print(f" Camera: {camera_type}")
|
||||
print(f" Display: {display_name}")
|
||||
print(f" UI Enabled: {ui_enabled}")
|
||||
|
||||
# Import validation
|
||||
from engine.pipeline.validation import validate_pipeline_config
|
||||
|
||||
# Create initial config and params
|
||||
params = PipelineParams()
|
||||
params.source = source_name
|
||||
params.camera_mode = camera_type if camera_type is not None else ""
|
||||
params.effect_order = effect_names
|
||||
params.border = border_mode
|
||||
|
||||
# Create minimal config for validation
|
||||
config = PipelineConfig(
|
||||
source=source_name,
|
||||
display=display_name or "", # Will be filled by validation
|
||||
camera=camera_type if camera_type is not None else "",
|
||||
effects=effect_names,
|
||||
)
|
||||
|
||||
# Run MVP validation
|
||||
result = validate_pipeline_config(config, params, allow_unsafe=allow_unsafe)
|
||||
|
||||
if result.warnings and not allow_unsafe:
|
||||
print(" \033[38;5;226mWarning: MVP validation found issues:\033[0m")
|
||||
for warning in result.warnings:
|
||||
print(f" - {warning}")
|
||||
|
||||
if result.changes:
|
||||
print(" \033[38;5;226mApplied MVP defaults:\033[0m")
|
||||
for change in result.changes:
|
||||
print(f" {change}")
|
||||
|
||||
if not result.valid:
|
||||
print(
|
||||
" \033[38;5;196mPipeline configuration invalid and could not be fixed\033[0m"
|
||||
)
|
||||
sys.exit(1)
|
||||
|
||||
# Show MVP summary
|
||||
print(" \033[38;5;245mMVP Configuration:\033[0m")
|
||||
print(f" Source: {result.config.source}")
|
||||
print(f" Display: {result.config.display}")
|
||||
print(f" Camera: {result.config.camera or 'static (none)'}")
|
||||
print(f" Effects: {result.config.effects if result.config.effects else 'none'}")
|
||||
print(f" Border: {result.params.border}")
|
||||
|
||||
# Load source items
|
||||
if source_name == "headlines":
|
||||
cached = load_cache()
|
||||
if cached:
|
||||
source_items = cached
|
||||
else:
|
||||
source_items, _, _ = fetch_all()
|
||||
elif source_name == "fixture":
|
||||
source_items = load_cache()
|
||||
if not source_items:
|
||||
print(" \033[38;5;196mNo fixture cache available\033[0m")
|
||||
sys.exit(1)
|
||||
elif source_name == "poetry":
|
||||
source_items, _, _ = fetch_poetry()
|
||||
elif source_name == "empty" or source_name == "pipeline-inspect":
|
||||
source_items = []
|
||||
else:
|
||||
print(f" \033[38;5;196mUnknown source: {source_name}\033[0m")
|
||||
sys.exit(1)
|
||||
|
||||
if source_items is not None:
|
||||
print(f" \033[38;5;82mLoaded {len(source_items)} items\033[0m")
|
||||
|
||||
# Set border mode
|
||||
if ui_enabled:
|
||||
border_mode = BorderMode.UI
|
||||
|
||||
# Build pipeline using validated config and params
|
||||
params = result.params
|
||||
params.viewport_width = 80
|
||||
params.viewport_height = 24
|
||||
|
||||
ctx = PipelineContext()
|
||||
ctx.params = params
|
||||
|
||||
# Create display using validated display name
|
||||
display_name = result.config.display or "terminal" # Default to terminal if empty
|
||||
display = DisplayRegistry.create(display_name)
|
||||
if not display:
|
||||
print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m")
|
||||
sys.exit(1)
|
||||
display.init(0, 0)
|
||||
|
||||
# Create pipeline using validated config
|
||||
pipeline = Pipeline(config=result.config, context=ctx)
|
||||
|
||||
# Add stages
|
||||
# Source stage
|
||||
if source_name == "pipeline-inspect":
|
||||
introspection_source = PipelineIntrospectionSource(
|
||||
pipeline=None,
|
||||
viewport_width=params.viewport_width,
|
||||
viewport_height=params.viewport_height,
|
||||
)
|
||||
pipeline.add_stage(
|
||||
"source", DataSourceStage(introspection_source, name="pipeline-inspect")
|
||||
)
|
||||
elif source_name == "empty":
|
||||
empty_source = EmptyDataSource(
|
||||
width=params.viewport_width, height=params.viewport_height
|
||||
)
|
||||
pipeline.add_stage("source", DataSourceStage(empty_source, name="empty"))
|
||||
else:
|
||||
list_source = ListDataSource(source_items, name=source_name)
|
||||
pipeline.add_stage("source", DataSourceStage(list_source, name=source_name))
|
||||
|
||||
# Add viewport filter and font for headline sources
|
||||
if source_name in ["headlines", "poetry", "fixture"]:
|
||||
from engine.pipeline.adapters import FontStage, ViewportFilterStage
|
||||
|
||||
pipeline.add_stage(
|
||||
"viewport_filter", ViewportFilterStage(name="viewport-filter")
|
||||
)
|
||||
pipeline.add_stage("font", FontStage(name="font"))
|
||||
|
||||
# Add camera
|
||||
speed = getattr(params, "camera_speed", 1.0)
|
||||
camera = None
|
||||
if camera_type == "feed":
|
||||
camera = Camera.feed(speed=speed)
|
||||
elif camera_type == "scroll":
|
||||
camera = Camera.scroll(speed=speed)
|
||||
elif camera_type == "horizontal":
|
||||
camera = Camera.horizontal(speed=speed)
|
||||
elif camera_type == "omni":
|
||||
camera = Camera.omni(speed=speed)
|
||||
elif camera_type == "floating":
|
||||
camera = Camera.floating(speed=speed)
|
||||
elif camera_type == "bounce":
|
||||
camera = Camera.bounce(speed=speed)
|
||||
|
||||
if camera:
|
||||
pipeline.add_stage("camera", CameraStage(camera, name=camera_type))
|
||||
|
||||
# Add effects
|
||||
effect_registry = get_registry()
|
||||
for effect_name in effect_names:
|
||||
effect = effect_registry.get(effect_name)
|
||||
if effect:
|
||||
pipeline.add_stage(
|
||||
f"effect_{effect_name}", create_stage_from_effect(effect, effect_name)
|
||||
)
|
||||
|
||||
# Add display
|
||||
pipeline.add_stage("display", create_stage_from_display(display, display_name))
|
||||
|
||||
pipeline.build()
|
||||
|
||||
if not pipeline.initialize():
|
||||
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
|
||||
sys.exit(1)
|
||||
|
||||
# Create UI panel if border mode is UI
|
||||
ui_panel = None
|
||||
if params.border == BorderMode.UI:
|
||||
ui_panel = UIPanel(UIConfig(panel_width=24, start_with_preset_picker=True))
|
||||
# Enable raw mode for terminal input if supported
|
||||
if hasattr(display, "set_raw_mode"):
|
||||
display.set_raw_mode(True)
|
||||
for stage in pipeline.stages.values():
|
||||
if isinstance(stage, EffectPluginStage):
|
||||
effect = stage._effect
|
||||
enabled = effect.config.enabled if hasattr(effect, "config") else True
|
||||
stage_control = ui_panel.register_stage(stage, enabled=enabled)
|
||||
stage_control.effect = effect # type: ignore[attr-defined]
|
||||
|
||||
if ui_panel.stages:
|
||||
first_stage = next(iter(ui_panel.stages))
|
||||
ui_panel.select_stage(first_stage)
|
||||
ctrl = ui_panel.stages[first_stage]
|
||||
if hasattr(ctrl, "effect"):
|
||||
effect = ctrl.effect
|
||||
if hasattr(effect, "config"):
|
||||
config = effect.config
|
||||
try:
|
||||
import dataclasses
|
||||
|
||||
if dataclasses.is_dataclass(config):
|
||||
for field_name, field_obj in dataclasses.fields(config):
|
||||
if field_name == "enabled":
|
||||
continue
|
||||
value = getattr(config, field_name, None)
|
||||
if value is not None:
|
||||
ctrl.params[field_name] = value
|
||||
ctrl.param_schema[field_name] = {
|
||||
"type": type(value).__name__,
|
||||
"min": 0
|
||||
if isinstance(value, (int, float))
|
||||
else None,
|
||||
"max": 1 if isinstance(value, float) else None,
|
||||
"step": 0.1 if isinstance(value, float) else 1,
|
||||
}
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Run pipeline loop
|
||||
from engine.display import render_ui_panel
|
||||
|
||||
ctx.set("display", display)
|
||||
ctx.set("items", source_items)
|
||||
ctx.set("pipeline", pipeline)
|
||||
ctx.set("pipeline_order", pipeline.execution_order)
|
||||
|
||||
current_width = params.viewport_width
|
||||
current_height = params.viewport_height
|
||||
|
||||
print(" \033[38;5;82mStarting pipeline...\033[0m")
|
||||
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
|
||||
|
||||
try:
|
||||
frame = 0
|
||||
while True:
|
||||
params.frame_number = frame
|
||||
ctx.params = params
|
||||
|
||||
result = pipeline.execute(source_items)
|
||||
if not result.success:
|
||||
print(" \033[38;5;196mPipeline execution failed\033[0m")
|
||||
break
|
||||
|
||||
# Render with UI panel
|
||||
if ui_panel is not None:
|
||||
buf = render_ui_panel(
|
||||
result.data, current_width, current_height, ui_panel
|
||||
)
|
||||
display.show(buf, border=False)
|
||||
else:
|
||||
display.show(result.data, border=border_mode)
|
||||
|
||||
# Handle keyboard events if UI is enabled
|
||||
if ui_panel is not None:
|
||||
# Try pygame first
|
||||
if hasattr(display, "_pygame"):
|
||||
try:
|
||||
import pygame
|
||||
|
||||
for event in pygame.event.get():
|
||||
if event.type == pygame.KEYDOWN:
|
||||
ui_panel.process_key_event(event.key, event.mod)
|
||||
except (ImportError, Exception):
|
||||
pass
|
||||
# Try terminal input
|
||||
elif hasattr(display, "get_input_keys"):
|
||||
try:
|
||||
keys = display.get_input_keys()
|
||||
for key in keys:
|
||||
ui_panel.process_key_event(key, 0)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Check for quit request
|
||||
if hasattr(display, "is_quit_requested") and display.is_quit_requested():
|
||||
if hasattr(display, "clear_quit_request"):
|
||||
display.clear_quit_request()
|
||||
raise KeyboardInterrupt()
|
||||
|
||||
time.sleep(1 / 60)
|
||||
frame += 1
|
||||
|
||||
except KeyboardInterrupt:
|
||||
pipeline.cleanup()
|
||||
display.cleanup()
|
||||
print("\n \033[38;5;245mPipeline stopped\033[0m")
|
||||
return
|
||||
|
||||
pipeline.cleanup()
|
||||
display.cleanup()
|
||||
print("\n \033[38;5;245mPipeline stopped\033[0m")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
@@ -5,102 +5,58 @@ Allows swapping output backends via the Display protocol.
|
||||
Supports auto-discovery of display backends.
|
||||
"""
|
||||
|
||||
from enum import Enum, auto
|
||||
from typing import Protocol
|
||||
|
||||
from engine.display.backends.kitty import KittyDisplay
|
||||
# Optional backend - requires moderngl package
|
||||
try:
|
||||
from engine.display.backends.moderngl import ModernGLDisplay
|
||||
|
||||
_MODERNGL_AVAILABLE = True
|
||||
except ImportError:
|
||||
ModernGLDisplay = None
|
||||
_MODERNGL_AVAILABLE = False
|
||||
|
||||
from engine.display.backends.multi import MultiDisplay
|
||||
from engine.display.backends.null import NullDisplay
|
||||
from engine.display.backends.pygame import PygameDisplay
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
from engine.display.backends.terminal import TerminalDisplay
|
||||
from engine.display.backends.websocket import WebSocketDisplay
|
||||
|
||||
|
||||
class BorderMode(Enum):
|
||||
"""Border rendering modes for displays."""
|
||||
|
||||
OFF = auto() # No border
|
||||
SIMPLE = auto() # Traditional border with FPS/frame time
|
||||
UI = auto() # Right-side UI panel with interactive controls
|
||||
|
||||
|
||||
class Display(Protocol):
|
||||
"""Protocol for display backends.
|
||||
|
||||
All display backends must implement:
|
||||
- width, height: Terminal dimensions
|
||||
- init(width, height, reuse=False): Initialize the display
|
||||
- show(buffer): Render buffer to display
|
||||
- clear(): Clear the display
|
||||
- cleanup(): Shutdown the display
|
||||
Required attributes:
|
||||
- width: int
|
||||
- height: int
|
||||
|
||||
Optional methods for keyboard input:
|
||||
- is_quit_requested(): Returns True if user pressed Ctrl+C/Q or Escape
|
||||
- clear_quit_request(): Clears the quit request flag
|
||||
Required methods (duck typing - actual signatures may vary):
|
||||
- init(width, height, reuse=False)
|
||||
- show(buffer, border=False)
|
||||
- clear()
|
||||
- cleanup()
|
||||
- get_dimensions() -> (width, height)
|
||||
|
||||
The reuse flag allows attaching to an existing display instance
|
||||
rather than creating a new window/connection.
|
||||
Optional attributes (for UI mode):
|
||||
- ui_panel: UIPanel instance (set by app when border=UI)
|
||||
|
||||
Keyboard input support by backend:
|
||||
- terminal: No native input (relies on signal handler for Ctrl+C)
|
||||
- pygame: Supports Ctrl+C, Ctrl+Q, Escape for graceful shutdown
|
||||
- websocket: No native input (relies on signal handler for Ctrl+C)
|
||||
- sixel: No native input (relies on signal handler for Ctrl+C)
|
||||
- null: No native input
|
||||
- kitty: Supports Ctrl+C, Ctrl+Q, Escape (via pygame-like handling)
|
||||
Optional methods:
|
||||
- is_quit_requested() -> bool
|
||||
- clear_quit_request() -> None
|
||||
"""
|
||||
|
||||
width: int
|
||||
height: int
|
||||
|
||||
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
||||
"""Initialize display with dimensions.
|
||||
|
||||
Args:
|
||||
width: Terminal width in characters
|
||||
height: Terminal height in rows
|
||||
reuse: If True, attach to existing display instead of creating new
|
||||
"""
|
||||
...
|
||||
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
"""Show buffer on display.
|
||||
|
||||
Args:
|
||||
buffer: Buffer to display
|
||||
border: If True, render border around buffer (default False)
|
||||
"""
|
||||
...
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear display."""
|
||||
...
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Shutdown display."""
|
||||
...
|
||||
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get current terminal dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
|
||||
This method is called after show() to check if the display
|
||||
was resized. The main loop should compare this to the current
|
||||
viewport dimensions and update accordingly.
|
||||
"""
|
||||
...
|
||||
|
||||
def is_quit_requested(self) -> bool:
|
||||
"""Check if user requested quit (Ctrl+C, Ctrl+Q, or Escape).
|
||||
|
||||
Returns:
|
||||
True if quit was requested, False otherwise
|
||||
|
||||
Optional method - only implemented by backends that support keyboard input.
|
||||
"""
|
||||
...
|
||||
|
||||
def clear_quit_request(self) -> None:
|
||||
"""Clear the quit request flag.
|
||||
|
||||
Optional method - only implemented by backends that support keyboard input.
|
||||
"""
|
||||
...
|
||||
|
||||
|
||||
class DisplayRegistry:
|
||||
"""Registry for display backends with auto-discovery."""
|
||||
@@ -110,22 +66,18 @@ class DisplayRegistry:
|
||||
|
||||
@classmethod
|
||||
def register(cls, name: str, backend_class: type[Display]) -> None:
|
||||
"""Register a display backend."""
|
||||
cls._backends[name.lower()] = backend_class
|
||||
|
||||
@classmethod
|
||||
def get(cls, name: str) -> type[Display] | None:
|
||||
"""Get a display backend class by name."""
|
||||
return cls._backends.get(name.lower())
|
||||
|
||||
@classmethod
|
||||
def list_backends(cls) -> list[str]:
|
||||
"""List all available display backend names."""
|
||||
return list(cls._backends.keys())
|
||||
|
||||
@classmethod
|
||||
def create(cls, name: str, **kwargs) -> Display | None:
|
||||
"""Create a display instance by name."""
|
||||
cls.initialize()
|
||||
backend_class = cls.get(name)
|
||||
if backend_class:
|
||||
@@ -134,31 +86,18 @@ class DisplayRegistry:
|
||||
|
||||
@classmethod
|
||||
def initialize(cls) -> None:
|
||||
"""Initialize and register all built-in backends."""
|
||||
if cls._initialized:
|
||||
return
|
||||
|
||||
cls.register("terminal", TerminalDisplay)
|
||||
cls.register("null", NullDisplay)
|
||||
cls.register("websocket", WebSocketDisplay)
|
||||
cls.register("sixel", SixelDisplay)
|
||||
cls.register("kitty", KittyDisplay)
|
||||
cls.register("pygame", PygameDisplay)
|
||||
|
||||
if _MODERNGL_AVAILABLE:
|
||||
cls.register("moderngl", ModernGLDisplay) # type: ignore[arg-type]
|
||||
cls._initialized = True
|
||||
|
||||
@classmethod
|
||||
def create_multi(cls, names: list[str]) -> "Display | None":
|
||||
"""Create a MultiDisplay from a list of backend names.
|
||||
|
||||
Args:
|
||||
names: List of display backend names (e.g., ["terminal", "pygame"])
|
||||
|
||||
Returns:
|
||||
MultiDisplay instance or None if any backend fails
|
||||
"""
|
||||
from engine.display.backends.multi import MultiDisplay
|
||||
|
||||
def create_multi(cls, names: list[str]) -> MultiDisplay | None:
|
||||
displays = []
|
||||
for name in names:
|
||||
backend = cls.create(name)
|
||||
@@ -166,10 +105,8 @@ class DisplayRegistry:
|
||||
displays.append(backend)
|
||||
else:
|
||||
return None
|
||||
|
||||
if not displays:
|
||||
return None
|
||||
|
||||
return MultiDisplay(displays)
|
||||
|
||||
|
||||
@@ -190,44 +127,28 @@ def _strip_ansi(s: str) -> str:
|
||||
return re.sub(r"\x1b\[[0-9;]*[a-zA-Z]", "", s)
|
||||
|
||||
|
||||
def render_border(
|
||||
def _render_simple_border(
|
||||
buf: list[str], width: int, height: int, fps: float = 0.0, frame_time: float = 0.0
|
||||
) -> list[str]:
|
||||
"""Render a border around the buffer.
|
||||
|
||||
Args:
|
||||
buf: Input buffer (list of strings)
|
||||
width: Display width in characters
|
||||
height: Display height in rows
|
||||
fps: Current FPS to display in top border (optional)
|
||||
frame_time: Frame time in ms to display in bottom border (optional)
|
||||
|
||||
Returns:
|
||||
Buffer with border applied
|
||||
"""
|
||||
"""Render a traditional border around the buffer."""
|
||||
if not buf or width < 3 or height < 3:
|
||||
return buf
|
||||
|
||||
inner_w = width - 2
|
||||
inner_h = height - 2
|
||||
|
||||
# Crop buffer to fit inside border
|
||||
cropped = []
|
||||
for i in range(min(inner_h, len(buf))):
|
||||
line = buf[i]
|
||||
# Calculate visible width (excluding ANSI codes)
|
||||
visible_len = len(_strip_ansi(line))
|
||||
if visible_len > inner_w:
|
||||
# Truncate carefully - this is approximate for ANSI text
|
||||
cropped.append(line[:inner_w])
|
||||
else:
|
||||
cropped.append(line + " " * (inner_w - visible_len))
|
||||
|
||||
# Pad with empty lines if needed
|
||||
while len(cropped) < inner_h:
|
||||
cropped.append(" " * inner_w)
|
||||
|
||||
# Build borders
|
||||
if fps > 0:
|
||||
fps_str = f" FPS:{fps:.0f}"
|
||||
if len(fps_str) < inner_w:
|
||||
@@ -248,10 +169,8 @@ def render_border(
|
||||
else:
|
||||
bottom_border = "└" + "─" * inner_w + "┘"
|
||||
|
||||
# Build result with left/right borders
|
||||
result = [top_border]
|
||||
for line in cropped:
|
||||
# Ensure exactly inner_w characters before adding right border
|
||||
if len(line) < inner_w:
|
||||
line = line + " " * (inner_w - len(line))
|
||||
elif len(line) > inner_w:
|
||||
@@ -262,14 +181,107 @@ def render_border(
|
||||
return result
|
||||
|
||||
|
||||
def render_ui_panel(
|
||||
buf: list[str],
|
||||
width: int,
|
||||
height: int,
|
||||
ui_panel,
|
||||
fps: float = 0.0,
|
||||
frame_time: float = 0.0,
|
||||
) -> list[str]:
|
||||
"""Render buffer with a right-side UI panel."""
|
||||
from engine.pipeline.ui import UIPanel
|
||||
|
||||
if not isinstance(ui_panel, UIPanel):
|
||||
return _render_simple_border(buf, width, height, fps, frame_time)
|
||||
|
||||
panel_width = min(ui_panel.config.panel_width, width - 4)
|
||||
main_width = width - panel_width - 1
|
||||
|
||||
panel_lines = ui_panel.render(panel_width, height)
|
||||
|
||||
main_buf = buf[: height - 2]
|
||||
main_result = _render_simple_border(
|
||||
main_buf, main_width + 2, height, fps, frame_time
|
||||
)
|
||||
|
||||
combined = []
|
||||
for i in range(height):
|
||||
if i < len(main_result):
|
||||
main_line = main_result[i]
|
||||
if len(main_line) >= 2:
|
||||
main_content = (
|
||||
main_line[1:-1] if main_line[-1] in "│┌┐└┘" else main_line[1:]
|
||||
)
|
||||
main_content = main_content.ljust(main_width)[:main_width]
|
||||
else:
|
||||
main_content = " " * main_width
|
||||
else:
|
||||
main_content = " " * main_width
|
||||
|
||||
panel_idx = i
|
||||
panel_line = (
|
||||
panel_lines[panel_idx][:panel_width].ljust(panel_width)
|
||||
if panel_idx < len(panel_lines)
|
||||
else " " * panel_width
|
||||
)
|
||||
|
||||
separator = "│" if 0 < i < height - 1 else "┼" if i == 0 else "┴"
|
||||
combined.append(main_content + separator + panel_line)
|
||||
|
||||
return combined
|
||||
|
||||
|
||||
def render_border(
|
||||
buf: list[str],
|
||||
width: int,
|
||||
height: int,
|
||||
fps: float = 0.0,
|
||||
frame_time: float = 0.0,
|
||||
border_mode: BorderMode | bool = BorderMode.SIMPLE,
|
||||
) -> list[str]:
|
||||
"""Render a border or UI panel around the buffer.
|
||||
|
||||
Args:
|
||||
buf: Input buffer
|
||||
width: Display width
|
||||
height: Display height
|
||||
fps: FPS for top border
|
||||
frame_time: Frame time for bottom border
|
||||
border_mode: Border rendering mode
|
||||
|
||||
Returns:
|
||||
Buffer with border/panel applied
|
||||
"""
|
||||
# Normalize border_mode to BorderMode enum
|
||||
if isinstance(border_mode, bool):
|
||||
border_mode = BorderMode.SIMPLE if border_mode else BorderMode.OFF
|
||||
|
||||
if border_mode == BorderMode.UI:
|
||||
# UI panel requires a UIPanel instance (injected separately)
|
||||
# For now, this will be called by displays that have a ui_panel attribute
|
||||
# This function signature doesn't include ui_panel, so we'll handle it in render_ui_panel
|
||||
# Fall back to simple border if no panel available
|
||||
return _render_simple_border(buf, width, height, fps, frame_time)
|
||||
elif border_mode == BorderMode.SIMPLE:
|
||||
return _render_simple_border(buf, width, height, fps, frame_time)
|
||||
else:
|
||||
return buf
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Display",
|
||||
"DisplayRegistry",
|
||||
"get_monitor",
|
||||
"render_border",
|
||||
"render_ui_panel",
|
||||
"BorderMode",
|
||||
"TerminalDisplay",
|
||||
"NullDisplay",
|
||||
"WebSocketDisplay",
|
||||
"SixelDisplay",
|
||||
"MultiDisplay",
|
||||
"PygameDisplay",
|
||||
]
|
||||
|
||||
if _MODERNGL_AVAILABLE:
|
||||
__all__.append("ModernGLDisplay")
|
||||
|
||||
@@ -1,180 +0,0 @@
|
||||
"""
|
||||
Kitty graphics display backend - renders using kitty's native graphics protocol.
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from engine.display.renderer import get_default_font_path, parse_ansi
|
||||
|
||||
|
||||
def _encode_kitty_graphic(image_data: bytes, width: int, height: int) -> bytes:
|
||||
"""Encode image data using kitty's graphics protocol."""
|
||||
import base64
|
||||
|
||||
encoded = base64.b64encode(image_data).decode("ascii")
|
||||
|
||||
chunks = []
|
||||
for i in range(0, len(encoded), 4096):
|
||||
chunk = encoded[i : i + 4096]
|
||||
if i == 0:
|
||||
chunks.append(f"\x1b_Gf=100,t=d,s={width},v={height},c=1,r=1;{chunk}\x1b\\")
|
||||
else:
|
||||
chunks.append(f"\x1b_Gm={height};{chunk}\x1b\\")
|
||||
|
||||
return "".join(chunks).encode("utf-8")
|
||||
|
||||
|
||||
class KittyDisplay:
|
||||
"""Kitty graphics display backend using kitty's native protocol."""
|
||||
|
||||
width: int = 80
|
||||
height: int = 24
|
||||
|
||||
def __init__(self, cell_width: int = 9, cell_height: int = 16):
|
||||
self.width = 80
|
||||
self.height = 24
|
||||
self.cell_width = cell_width
|
||||
self.cell_height = cell_height
|
||||
self._initialized = False
|
||||
self._font_path = None
|
||||
|
||||
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
||||
"""Initialize display with dimensions.
|
||||
|
||||
Args:
|
||||
width: Terminal width in characters
|
||||
height: Terminal height in rows
|
||||
reuse: Ignored for KittyDisplay (protocol doesn't support reuse)
|
||||
"""
|
||||
self.width = width
|
||||
self.height = height
|
||||
self._initialized = True
|
||||
|
||||
def _get_font_path(self) -> str | None:
|
||||
"""Get font path from env or detect common locations."""
|
||||
import os
|
||||
|
||||
if self._font_path:
|
||||
return self._font_path
|
||||
|
||||
env_font = os.environ.get("MAINLINE_KITTY_FONT")
|
||||
if env_font and os.path.exists(env_font):
|
||||
self._font_path = env_font
|
||||
return env_font
|
||||
|
||||
font_path = get_default_font_path()
|
||||
if font_path:
|
||||
self._font_path = font_path
|
||||
|
||||
return self._font_path
|
||||
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
import sys
|
||||
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get metrics for border display
|
||||
fps = 0.0
|
||||
frame_time = 0.0
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
stats = monitor.get_stats()
|
||||
avg_ms = stats.get("pipeline", {}).get("avg_ms", 0) if stats else 0
|
||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
||||
if avg_ms and frame_count > 0:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Apply border if requested
|
||||
if border:
|
||||
from engine.display import render_border
|
||||
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
img_width = self.width * self.cell_width
|
||||
img_height = self.height * self.cell_height
|
||||
|
||||
try:
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
except ImportError:
|
||||
return
|
||||
|
||||
img = Image.new("RGBA", (img_width, img_height), (0, 0, 0, 255))
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
font_path = self._get_font_path()
|
||||
font = None
|
||||
if font_path:
|
||||
try:
|
||||
font = ImageFont.truetype(font_path, self.cell_height - 2)
|
||||
except Exception:
|
||||
font = None
|
||||
|
||||
if font is None:
|
||||
try:
|
||||
font = ImageFont.load_default()
|
||||
except Exception:
|
||||
font = None
|
||||
|
||||
for row_idx, line in enumerate(buffer[: self.height]):
|
||||
if row_idx >= self.height:
|
||||
break
|
||||
|
||||
tokens = parse_ansi(line)
|
||||
x_pos = 0
|
||||
y_pos = row_idx * self.cell_height
|
||||
|
||||
for text, fg, bg, bold in tokens:
|
||||
if not text:
|
||||
continue
|
||||
|
||||
if bg != (0, 0, 0):
|
||||
bbox = draw.textbbox((x_pos, y_pos), text, font=font)
|
||||
draw.rectangle(bbox, fill=(*bg, 255))
|
||||
|
||||
if bold and font:
|
||||
draw.text((x_pos - 1, y_pos - 1), text, fill=(*fg, 255), font=font)
|
||||
|
||||
draw.text((x_pos, y_pos), text, fill=(*fg, 255), font=font)
|
||||
|
||||
if font:
|
||||
x_pos += draw.textlength(text, font=font)
|
||||
|
||||
from io import BytesIO
|
||||
|
||||
output = BytesIO()
|
||||
img.save(output, format="PNG")
|
||||
png_data = output.getvalue()
|
||||
|
||||
graphic = _encode_kitty_graphic(png_data, img_width, img_height)
|
||||
|
||||
sys.stdout.buffer.write(graphic)
|
||||
sys.stdout.flush()
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
chars_in = sum(len(line) for line in buffer)
|
||||
monitor.record_effect("kitty_display", elapsed_ms, chars_in, chars_in)
|
||||
|
||||
def clear(self) -> None:
|
||||
import sys
|
||||
|
||||
sys.stdout.buffer.write(b"\x1b_Ga=d\x1b\\")
|
||||
sys.stdout.flush()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self.clear()
|
||||
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get current dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
"""
|
||||
return (self.width, self.height)
|
||||
@@ -33,6 +33,8 @@ class NullDisplay:
|
||||
self._last_buffer = None
|
||||
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
import sys
|
||||
|
||||
from engine.display import get_monitor, render_border
|
||||
|
||||
# Get FPS for border (if available)
|
||||
@@ -52,6 +54,26 @@ class NullDisplay:
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
self._last_buffer = buffer
|
||||
|
||||
# For debugging: print first few frames to stdout
|
||||
if hasattr(self, "_frame_count"):
|
||||
self._frame_count += 1
|
||||
else:
|
||||
self._frame_count = 0
|
||||
|
||||
# Only print first 5 frames or every 10th frame
|
||||
if self._frame_count <= 5 or self._frame_count % 10 == 0:
|
||||
sys.stdout.write("\n" + "=" * 80 + "\n")
|
||||
sys.stdout.write(
|
||||
f"Frame {self._frame_count} (buffer height: {len(buffer)})\n"
|
||||
)
|
||||
sys.stdout.write("=" * 80 + "\n")
|
||||
for i, line in enumerate(buffer[:30]): # Show first 30 lines
|
||||
sys.stdout.write(f"{i:2}: {line}\n")
|
||||
if len(buffer) > 30:
|
||||
sys.stdout.write(f"... ({len(buffer) - 30} more lines)\n")
|
||||
sys.stdout.flush()
|
||||
|
||||
if monitor:
|
||||
t0 = time.perf_counter()
|
||||
chars_in = sum(len(line) for line in buffer)
|
||||
|
||||
@@ -136,6 +136,21 @@ class PygameDisplay:
|
||||
else:
|
||||
self._font = pygame.font.SysFont("monospace", self.cell_height - 2)
|
||||
|
||||
# Check if font supports box-drawing characters; if not, try to find one
|
||||
self._use_fallback_border = False
|
||||
if self._font:
|
||||
try:
|
||||
# Test rendering some key box-drawing characters
|
||||
test_chars = ["┌", "─", "┐", "│", "└", "┘"]
|
||||
for ch in test_chars:
|
||||
surf = self._font.render(ch, True, (255, 255, 255))
|
||||
# If surface is empty (width=0 or all black), font lacks glyph
|
||||
if surf.get_width() == 0:
|
||||
raise ValueError("Missing glyph")
|
||||
except Exception:
|
||||
# Font doesn't support box-drawing, will use line drawing fallback
|
||||
self._use_fallback_border = True
|
||||
|
||||
self._initialized = True
|
||||
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
@@ -184,14 +199,26 @@ class PygameDisplay:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Apply border if requested
|
||||
if border:
|
||||
from engine.display import render_border
|
||||
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
self._screen.fill((0, 0, 0))
|
||||
|
||||
# If border requested but font lacks box-drawing glyphs, use graphical fallback
|
||||
if border and self._use_fallback_border:
|
||||
self._draw_fallback_border(fps, frame_time)
|
||||
# Adjust content area to fit inside border
|
||||
content_offset_x = self.cell_width
|
||||
content_offset_y = self.cell_height
|
||||
self.window_width - 2 * self.cell_width
|
||||
self.window_height - 2 * self.cell_height
|
||||
else:
|
||||
# Normal rendering (with or without text border)
|
||||
content_offset_x = 0
|
||||
content_offset_y = 0
|
||||
|
||||
if border:
|
||||
from engine.display import render_border
|
||||
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
blit_list = []
|
||||
|
||||
for row_idx, line in enumerate(buffer[: self.height]):
|
||||
@@ -199,7 +226,7 @@ class PygameDisplay:
|
||||
break
|
||||
|
||||
tokens = parse_ansi(line)
|
||||
x_pos = 0
|
||||
x_pos = content_offset_x
|
||||
|
||||
for text, fg, bg, _bold in tokens:
|
||||
if not text:
|
||||
@@ -219,10 +246,17 @@ class PygameDisplay:
|
||||
self._glyph_cache[cache_key] = self._font.render(text, True, fg)
|
||||
|
||||
surface = self._glyph_cache[cache_key]
|
||||
blit_list.append((surface, (x_pos, row_idx * self.cell_height)))
|
||||
blit_list.append(
|
||||
(surface, (x_pos, content_offset_y + row_idx * self.cell_height))
|
||||
)
|
||||
x_pos += self._font.size(text)[0]
|
||||
|
||||
self._screen.blits(blit_list)
|
||||
|
||||
# Draw fallback border using graphics if needed
|
||||
if border and self._use_fallback_border:
|
||||
self._draw_fallback_border(fps, frame_time)
|
||||
|
||||
self._pygame.display.flip()
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
@@ -231,6 +265,56 @@ class PygameDisplay:
|
||||
chars_in = sum(len(line) for line in buffer)
|
||||
monitor.record_effect("pygame_display", elapsed_ms, chars_in, chars_in)
|
||||
|
||||
def _draw_fallback_border(self, fps: float, frame_time: float) -> None:
|
||||
"""Draw border using pygame graphics primitives instead of text."""
|
||||
if not self._screen or not self._pygame:
|
||||
return
|
||||
|
||||
# Colors
|
||||
border_color = (0, 255, 0) # Green (like terminal border)
|
||||
text_color = (255, 255, 255)
|
||||
|
||||
# Calculate dimensions
|
||||
x1 = 0
|
||||
y1 = 0
|
||||
x2 = self.window_width - 1
|
||||
y2 = self.window_height - 1
|
||||
|
||||
# Draw outer rectangle
|
||||
self._pygame.draw.rect(
|
||||
self._screen, border_color, (x1, y1, x2 - x1 + 1, y2 - y1 + 1), 1
|
||||
)
|
||||
|
||||
# Draw top border with FPS
|
||||
if fps > 0:
|
||||
fps_text = f" FPS:{fps:.0f}"
|
||||
else:
|
||||
fps_text = ""
|
||||
# We need to render this text with a fallback font that has basic ASCII
|
||||
# Use system font which should have these characters
|
||||
try:
|
||||
font = self._font # May not have box chars but should have alphanumeric
|
||||
text_surf = font.render(fps_text, True, text_color, (0, 0, 0))
|
||||
text_rect = text_surf.get_rect()
|
||||
# Position on top border, right-aligned
|
||||
text_x = x2 - text_rect.width - 5
|
||||
text_y = y1 + 2
|
||||
self._screen.blit(text_surf, (text_x, text_y))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Draw bottom border with frame time
|
||||
if frame_time > 0:
|
||||
ft_text = f" {frame_time:.1f}ms"
|
||||
try:
|
||||
ft_surf = self._font.render(ft_text, True, text_color, (0, 0, 0))
|
||||
ft_rect = ft_surf.get_rect()
|
||||
ft_x = x2 - ft_rect.width - 5
|
||||
ft_y = y2 - ft_rect.height - 2
|
||||
self._screen.blit(ft_surf, (ft_x, ft_y))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def clear(self) -> None:
|
||||
if self._screen and self._pygame:
|
||||
self._screen.fill((0, 0, 0))
|
||||
|
||||
@@ -1,228 +0,0 @@
|
||||
"""
|
||||
Sixel graphics display backend - renders to sixel graphics in terminal.
|
||||
"""
|
||||
|
||||
import time
|
||||
|
||||
from engine.display.renderer import get_default_font_path, parse_ansi
|
||||
|
||||
|
||||
def _encode_sixel(image) -> str:
|
||||
"""Encode a PIL Image to sixel format (pure Python)."""
|
||||
img = image.convert("RGBA")
|
||||
width, height = img.size
|
||||
pixels = img.load()
|
||||
|
||||
palette = []
|
||||
pixel_palette_idx = {}
|
||||
|
||||
def get_color_idx(r, g, b, a):
|
||||
if a < 128:
|
||||
return -1
|
||||
key = (r // 32, g // 32, b // 32)
|
||||
if key not in pixel_palette_idx:
|
||||
idx = len(palette)
|
||||
if idx < 256:
|
||||
palette.append((r, g, b))
|
||||
pixel_palette_idx[key] = idx
|
||||
return pixel_palette_idx.get(key, 0)
|
||||
|
||||
for y in range(height):
|
||||
for x in range(width):
|
||||
r, g, b, a = pixels[x, y]
|
||||
get_color_idx(r, g, b, a)
|
||||
|
||||
if not palette:
|
||||
return ""
|
||||
|
||||
if len(palette) == 1:
|
||||
palette = [palette[0], (0, 0, 0)]
|
||||
|
||||
sixel_data = []
|
||||
sixel_data.append(
|
||||
f'"{"".join(f"#{i};2;{r};{g};{b}" for i, (r, g, b) in enumerate(palette))}'
|
||||
)
|
||||
|
||||
for x in range(width):
|
||||
col_data = []
|
||||
for y in range(0, height, 6):
|
||||
bits = 0
|
||||
color_idx = -1
|
||||
for dy in range(6):
|
||||
if y + dy < height:
|
||||
r, g, b, a = pixels[x, y + dy]
|
||||
if a >= 128:
|
||||
bits |= 1 << dy
|
||||
idx = get_color_idx(r, g, b, a)
|
||||
if color_idx == -1:
|
||||
color_idx = idx
|
||||
elif color_idx != idx:
|
||||
color_idx = -2
|
||||
|
||||
if color_idx >= 0:
|
||||
col_data.append(
|
||||
chr(63 + color_idx) + chr(63 + bits)
|
||||
if bits
|
||||
else chr(63 + color_idx) + "?"
|
||||
)
|
||||
elif color_idx == -2:
|
||||
pass
|
||||
|
||||
if col_data:
|
||||
sixel_data.append("".join(col_data) + "$")
|
||||
else:
|
||||
sixel_data.append("-" if x < width - 1 else "$")
|
||||
|
||||
sixel_data.append("\x1b\\")
|
||||
|
||||
return "\x1bPq" + "".join(sixel_data)
|
||||
|
||||
|
||||
class SixelDisplay:
|
||||
"""Sixel graphics display backend - renders to sixel graphics in terminal."""
|
||||
|
||||
width: int = 80
|
||||
height: int = 24
|
||||
|
||||
def __init__(self, cell_width: int = 9, cell_height: int = 16):
|
||||
self.width = 80
|
||||
self.height = 24
|
||||
self.cell_width = cell_width
|
||||
self.cell_height = cell_height
|
||||
self._initialized = False
|
||||
self._font_path = None
|
||||
|
||||
def _get_font_path(self) -> str | None:
|
||||
"""Get font path from env or detect common locations."""
|
||||
import os
|
||||
|
||||
if self._font_path:
|
||||
return self._font_path
|
||||
|
||||
env_font = os.environ.get("MAINLINE_SIXEL_FONT")
|
||||
if env_font and os.path.exists(env_font):
|
||||
self._font_path = env_font
|
||||
return env_font
|
||||
|
||||
font_path = get_default_font_path()
|
||||
if font_path:
|
||||
self._font_path = font_path
|
||||
|
||||
return self._font_path
|
||||
|
||||
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
||||
"""Initialize display with dimensions.
|
||||
|
||||
Args:
|
||||
width: Terminal width in characters
|
||||
height: Terminal height in rows
|
||||
reuse: Ignored for SixelDisplay
|
||||
"""
|
||||
self.width = width
|
||||
self.height = height
|
||||
self._initialized = True
|
||||
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
import sys
|
||||
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get metrics for border display
|
||||
fps = 0.0
|
||||
frame_time = 0.0
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
stats = monitor.get_stats()
|
||||
avg_ms = stats.get("pipeline", {}).get("avg_ms", 0) if stats else 0
|
||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
||||
if avg_ms and frame_count > 0:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Apply border if requested
|
||||
if border:
|
||||
from engine.display import render_border
|
||||
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
img_width = self.width * self.cell_width
|
||||
img_height = self.height * self.cell_height
|
||||
|
||||
try:
|
||||
from PIL import Image, ImageDraw, ImageFont
|
||||
except ImportError:
|
||||
return
|
||||
|
||||
img = Image.new("RGBA", (img_width, img_height), (0, 0, 0, 255))
|
||||
draw = ImageDraw.Draw(img)
|
||||
|
||||
font_path = self._get_font_path()
|
||||
font = None
|
||||
if font_path:
|
||||
try:
|
||||
font = ImageFont.truetype(font_path, self.cell_height - 2)
|
||||
except Exception:
|
||||
font = None
|
||||
|
||||
if font is None:
|
||||
try:
|
||||
font = ImageFont.load_default()
|
||||
except Exception:
|
||||
font = None
|
||||
|
||||
for row_idx, line in enumerate(buffer[: self.height]):
|
||||
if row_idx >= self.height:
|
||||
break
|
||||
|
||||
tokens = parse_ansi(line)
|
||||
x_pos = 0
|
||||
y_pos = row_idx * self.cell_height
|
||||
|
||||
for text, fg, bg, bold in tokens:
|
||||
if not text:
|
||||
continue
|
||||
|
||||
if bg != (0, 0, 0):
|
||||
bbox = draw.textbbox((x_pos, y_pos), text, font=font)
|
||||
draw.rectangle(bbox, fill=(*bg, 255))
|
||||
|
||||
if bold and font:
|
||||
draw.text((x_pos - 1, y_pos - 1), text, fill=(*fg, 255), font=font)
|
||||
|
||||
draw.text((x_pos, y_pos), text, fill=(*fg, 255), font=font)
|
||||
|
||||
if font:
|
||||
x_pos += draw.textlength(text, font=font)
|
||||
|
||||
sixel = _encode_sixel(img)
|
||||
|
||||
sys.stdout.buffer.write(sixel.encode("utf-8"))
|
||||
sys.stdout.flush()
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
chars_in = sum(len(line) for line in buffer)
|
||||
monitor.record_effect("sixel_display", elapsed_ms, chars_in, chars_in)
|
||||
|
||||
def clear(self) -> None:
|
||||
import sys
|
||||
|
||||
sys.stdout.buffer.write(b"\x1b[2J\x1b[H")
|
||||
sys.stdout.flush()
|
||||
|
||||
def cleanup(self) -> None:
|
||||
pass
|
||||
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get current dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
"""
|
||||
return (self.width, self.height)
|
||||
@@ -1,5 +1,14 @@
|
||||
"""
|
||||
WebSocket display backend - broadcasts frame buffer to connected web clients.
|
||||
|
||||
TODO: Transform to a true streaming backend with:
|
||||
- Proper WebSocket message streaming (currently sends full buffer each frame)
|
||||
- Connection pooling and backpressure handling
|
||||
- Binary protocol for efficiency (instead of JSON)
|
||||
- Client management with proper async handling
|
||||
- Mark for deprecation if replaced by a new streaming implementation
|
||||
|
||||
Current implementation: Simple broadcast of text frames to all connected clients.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
|
||||
@@ -5,7 +5,7 @@ from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
|
||||
class FadeEffect(EffectPlugin):
|
||||
name = "fade"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
config = EffectConfig(enabled=True, intensity=1.0, entropy=0.1)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not ctx.ticker_height:
|
||||
|
||||
@@ -9,7 +9,7 @@ from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
|
||||
|
||||
class FirehoseEffect(EffectPlugin):
|
||||
name = "firehose"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
config = EffectConfig(enabled=True, intensity=1.0, entropy=0.9)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0
|
||||
|
||||
@@ -6,7 +6,7 @@ from engine.terminal import DIM, G_LO, RST
|
||||
|
||||
class GlitchEffect(EffectPlugin):
|
||||
name = "glitch"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
config = EffectConfig(enabled=True, intensity=1.0, entropy=0.8)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not buf:
|
||||
|
||||
@@ -7,7 +7,7 @@ from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
|
||||
|
||||
class NoiseEffect(EffectPlugin):
|
||||
name = "noise"
|
||||
config = EffectConfig(enabled=True, intensity=0.15)
|
||||
config = EffectConfig(enabled=True, intensity=0.15, entropy=0.4)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not ctx.ticker_height:
|
||||
|
||||
@@ -44,6 +44,11 @@ class PartialUpdate:
|
||||
|
||||
@dataclass
|
||||
class EffectContext:
|
||||
"""Context passed to effect plugins during processing.
|
||||
|
||||
Contains terminal dimensions, camera state, frame info, and real-time sensor values.
|
||||
"""
|
||||
|
||||
terminal_width: int
|
||||
terminal_height: int
|
||||
scroll_cam: int
|
||||
@@ -56,6 +61,26 @@ class EffectContext:
|
||||
items: list = field(default_factory=list)
|
||||
_state: dict[str, Any] = field(default_factory=dict, repr=False)
|
||||
|
||||
def compute_entropy(self, effect_name: str, data: Any) -> float:
|
||||
"""Compute entropy score for an effect based on its output.
|
||||
|
||||
Args:
|
||||
effect_name: Name of the effect
|
||||
data: Processed buffer or effect-specific data
|
||||
|
||||
Returns:
|
||||
Entropy score 0.0-1.0 representing visual chaos
|
||||
"""
|
||||
# Default implementation: use effect name as seed for deterministic randomness
|
||||
# Better implementations can analyze actual buffer content
|
||||
import hashlib
|
||||
|
||||
data_str = str(data)[:100] if data else ""
|
||||
hash_val = hashlib.md5(f"{effect_name}:{data_str}".encode()).hexdigest()
|
||||
# Convert hash to float 0.0-1.0
|
||||
entropy = int(hash_val[:8], 16) / 0xFFFFFFFF
|
||||
return min(max(entropy, 0.0), 1.0)
|
||||
|
||||
def get_sensor_value(self, sensor_name: str) -> float | None:
|
||||
"""Get a sensor value from context state.
|
||||
|
||||
@@ -80,6 +105,7 @@ class EffectContext:
|
||||
class EffectConfig:
|
||||
enabled: bool = True
|
||||
intensity: float = 1.0
|
||||
entropy: float = 0.0 # Visual chaos metric (0.0 = calm, 1.0 = chaotic)
|
||||
params: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
|
||||
@@ -117,11 +117,12 @@ def fetch_poetry():
|
||||
|
||||
|
||||
# ─── CACHE ────────────────────────────────────────────────
|
||||
_CACHE_DIR = pathlib.Path(__file__).resolve().parent.parent
|
||||
# Cache moved to engine/fixtures/headlines.json
|
||||
_CACHE_DIR = pathlib.Path(__file__).resolve().parent / "fixtures"
|
||||
|
||||
|
||||
def _cache_path():
|
||||
return _CACHE_DIR / f".mainline_cache_{config.MODE}.json"
|
||||
return _CACHE_DIR / "headlines.json"
|
||||
|
||||
|
||||
def load_cache():
|
||||
|
||||
19
engine/fixtures/headlines.json
Normal file
19
engine/fixtures/headlines.json
Normal file
@@ -0,0 +1,19 @@
|
||||
{
|
||||
"items": [
|
||||
["Breaking: AI systems achieve breakthrough in natural language understanding", "TechDaily", "14:32"],
|
||||
["Scientists discover new exoplanet in habitable zone", "ScienceNews", "13:15"],
|
||||
["Global markets rally as inflation shows signs of cooling", "FinanceWire", "12:48"],
|
||||
["New study reveals benefits of Mediterranean diet for cognitive health", "HealthJournal", "11:22"],
|
||||
["Tech giants announce collaboration on AI safety standards", "TechDaily", "10:55"],
|
||||
["Archaeologists uncover 3000-year-old city in desert", "HistoryNow", "09:30"],
|
||||
["Renewable energy capacity surpasses fossil fuels for first time", "GreenWorld", "08:15"],
|
||||
["Space agency prepares for next Mars mission launch window", "SpaceNews", "07:42"],
|
||||
["New film breaks box office records on opening weekend", "EntertainmentHub", "06:18"],
|
||||
["Local community raises funds for new library project", "CommunityPost", "05:30"],
|
||||
["Quantum computing breakthrough could revolutionize cryptography", "TechWeekly", "15:20"],
|
||||
["New species of deep-sea creature discovered in Pacific trench", "NatureToday", "14:05"],
|
||||
["Electric vehicle sales surpass traditional cars in Europe", "AutoNews", "12:33"],
|
||||
["Renowned artist unveils interactive AI-generated exhibition", "ArtsMonthly", "11:10"],
|
||||
["Climate summit reaches historic agreement on emissions", "WorldNews", "09:55"]
|
||||
]
|
||||
}
|
||||
@@ -50,8 +50,7 @@ from engine.pipeline.presets import (
|
||||
FIREHOSE_PRESET,
|
||||
PIPELINE_VIZ_PRESET,
|
||||
POETRY_PRESET,
|
||||
PRESETS,
|
||||
SIXEL_PRESET,
|
||||
UI_PRESET,
|
||||
WEBSOCKET_PRESET,
|
||||
PipelinePreset,
|
||||
create_preset_from_params,
|
||||
@@ -92,8 +91,8 @@ __all__ = [
|
||||
"POETRY_PRESET",
|
||||
"PIPELINE_VIZ_PRESET",
|
||||
"WEBSOCKET_PRESET",
|
||||
"SIXEL_PRESET",
|
||||
"FIREHOSE_PRESET",
|
||||
"UI_PRESET",
|
||||
"get_preset",
|
||||
"list_presets",
|
||||
"create_preset_from_params",
|
||||
|
||||
@@ -8,6 +8,11 @@ modify these params, which the pipeline then applies to its stages.
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
try:
|
||||
from engine.display import BorderMode
|
||||
except ImportError:
|
||||
BorderMode = object # Fallback for type checking
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineParams:
|
||||
@@ -23,7 +28,7 @@ class PipelineParams:
|
||||
|
||||
# Display config
|
||||
display: str = "terminal"
|
||||
border: bool = False
|
||||
border: bool | BorderMode = False
|
||||
|
||||
# Camera config
|
||||
camera_mode: str = "vertical"
|
||||
|
||||
@@ -13,6 +13,7 @@ Loading order:
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from engine.display import BorderMode
|
||||
from engine.pipeline.params import PipelineParams
|
||||
|
||||
|
||||
@@ -26,7 +27,6 @@ def _load_toml_presets() -> dict[str, Any]:
|
||||
return {}
|
||||
|
||||
|
||||
# Pre-load TOML presets
|
||||
_YAML_PRESETS = _load_toml_presets()
|
||||
|
||||
|
||||
@@ -47,14 +47,24 @@ class PipelinePreset:
|
||||
display: str = "terminal"
|
||||
camera: str = "scroll"
|
||||
effects: list[str] = field(default_factory=list)
|
||||
border: bool = False
|
||||
border: bool | BorderMode = (
|
||||
False # Border mode: False=off, True=simple, BorderMode.UI for panel
|
||||
)
|
||||
|
||||
def to_params(self) -> PipelineParams:
|
||||
"""Convert to PipelineParams."""
|
||||
from engine.display import BorderMode
|
||||
|
||||
params = PipelineParams()
|
||||
params.source = self.source
|
||||
params.display = self.display
|
||||
params.border = self.border
|
||||
params.border = (
|
||||
self.border
|
||||
if isinstance(self.border, bool)
|
||||
else BorderMode.UI
|
||||
if self.border == BorderMode.UI
|
||||
else False
|
||||
)
|
||||
params.camera_mode = self.camera
|
||||
params.effect_order = self.effects.copy()
|
||||
return params
|
||||
@@ -83,6 +93,16 @@ DEMO_PRESET = PipelinePreset(
|
||||
effects=["noise", "fade", "glitch", "firehose"],
|
||||
)
|
||||
|
||||
UI_PRESET = PipelinePreset(
|
||||
name="ui",
|
||||
description="Interactive UI mode with right-side control panel",
|
||||
source="fixture",
|
||||
display="pygame",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade", "glitch"],
|
||||
border=BorderMode.UI,
|
||||
)
|
||||
|
||||
POETRY_PRESET = PipelinePreset(
|
||||
name="poetry",
|
||||
description="Poetry feed with subtle effects",
|
||||
@@ -110,15 +130,6 @@ WEBSOCKET_PRESET = PipelinePreset(
|
||||
effects=["noise", "fade", "glitch"],
|
||||
)
|
||||
|
||||
SIXEL_PRESET = PipelinePreset(
|
||||
name="sixel",
|
||||
description="Sixel graphics display mode",
|
||||
source="headlines",
|
||||
display="sixel",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade", "glitch"],
|
||||
)
|
||||
|
||||
FIREHOSE_PRESET = PipelinePreset(
|
||||
name="firehose",
|
||||
description="High-speed firehose mode",
|
||||
@@ -128,6 +139,16 @@ FIREHOSE_PRESET = PipelinePreset(
|
||||
effects=["noise", "fade", "glitch", "firehose"],
|
||||
)
|
||||
|
||||
FIXTURE_PRESET = PipelinePreset(
|
||||
name="fixture",
|
||||
description="Use cached headline fixtures",
|
||||
source="fixture",
|
||||
display="pygame",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade"],
|
||||
border=False,
|
||||
)
|
||||
|
||||
|
||||
# Build presets from YAML data
|
||||
def _build_presets() -> dict[str, PipelinePreset]:
|
||||
@@ -145,8 +166,9 @@ def _build_presets() -> dict[str, PipelinePreset]:
|
||||
"poetry": POETRY_PRESET,
|
||||
"pipeline": PIPELINE_VIZ_PRESET,
|
||||
"websocket": WEBSOCKET_PRESET,
|
||||
"sixel": SIXEL_PRESET,
|
||||
"firehose": FIREHOSE_PRESET,
|
||||
"ui": UI_PRESET,
|
||||
"fixture": FIXTURE_PRESET,
|
||||
}
|
||||
|
||||
for name, preset in builtins.items():
|
||||
|
||||
@@ -118,6 +118,14 @@ def discover_stages() -> None:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Register buffer stages (framebuffer, etc.)
|
||||
try:
|
||||
from engine.pipeline.stages.framebuffer import FrameBufferStage
|
||||
|
||||
StageRegistry.register("effect", FrameBufferStage)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Register display stages
|
||||
_register_display_stages()
|
||||
|
||||
|
||||
158
engine/pipeline/stages/framebuffer.py
Normal file
158
engine/pipeline/stages/framebuffer.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""
|
||||
Frame buffer stage - stores previous frames for temporal effects.
|
||||
|
||||
Provides:
|
||||
- frame_history: list of previous buffers (most recent first)
|
||||
- intensity_history: list of corresponding intensity maps
|
||||
- current_intensity: intensity map for current frame
|
||||
|
||||
Capability: "framebuffer.history"
|
||||
"""
|
||||
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from engine.display import _strip_ansi
|
||||
from engine.pipeline.core import DataType, PipelineContext, Stage
|
||||
|
||||
|
||||
@dataclass
|
||||
class FrameBufferConfig:
|
||||
"""Configuration for FrameBufferStage."""
|
||||
|
||||
history_depth: int = 2 # Number of previous frames to keep
|
||||
|
||||
|
||||
class FrameBufferStage(Stage):
|
||||
"""Stores frame history and computes intensity maps."""
|
||||
|
||||
name = "framebuffer"
|
||||
category = "effect" # It's an effect that enriches context with frame history
|
||||
|
||||
def __init__(self, config: FrameBufferConfig | None = None, history_depth: int = 2):
|
||||
self.config = config or FrameBufferConfig(history_depth=history_depth)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set[str]:
|
||||
return {"framebuffer.history"}
|
||||
|
||||
@property
|
||||
def dependencies(self) -> set[str]:
|
||||
# Depends on rendered output (since we want to capture final buffer)
|
||||
return {"render.output"}
|
||||
|
||||
@property
|
||||
def inlet_types(self) -> set:
|
||||
return {DataType.TEXT_BUFFER}
|
||||
|
||||
@property
|
||||
def outlet_types(self) -> set:
|
||||
return {DataType.TEXT_BUFFER} # Pass through unchanged
|
||||
|
||||
def init(self, ctx: PipelineContext) -> bool:
|
||||
"""Initialize framebuffer state in context."""
|
||||
ctx.set("frame_history", [])
|
||||
ctx.set("intensity_history", [])
|
||||
return True
|
||||
|
||||
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
||||
"""Store frame in history and compute intensity.
|
||||
|
||||
Args:
|
||||
data: Current text buffer (list[str])
|
||||
ctx: Pipeline context
|
||||
|
||||
Returns:
|
||||
Same buffer (pass-through)
|
||||
"""
|
||||
if not isinstance(data, list):
|
||||
return data
|
||||
|
||||
# Compute intensity map for current buffer (per-row, length = buffer rows)
|
||||
intensity_map = self._compute_buffer_intensity(data, len(data))
|
||||
|
||||
# Store in context
|
||||
ctx.set("current_intensity", intensity_map)
|
||||
|
||||
with self._lock:
|
||||
# Get existing histories
|
||||
history = ctx.get("frame_history", [])
|
||||
intensity_hist = ctx.get("intensity_history", [])
|
||||
|
||||
# Prepend current frame to history
|
||||
history.insert(0, data.copy())
|
||||
intensity_hist.insert(0, intensity_map)
|
||||
|
||||
# Trim to configured depth
|
||||
max_depth = self.config.history_depth
|
||||
ctx.set("frame_history", history[:max_depth])
|
||||
ctx.set("intensity_history", intensity_hist[:max_depth])
|
||||
|
||||
return data
|
||||
|
||||
def _compute_buffer_intensity(
|
||||
self, buf: list[str], max_rows: int = 24
|
||||
) -> list[float]:
|
||||
"""Compute average intensity per row in buffer.
|
||||
|
||||
Uses ANSI color if available; falls back to character density.
|
||||
|
||||
Args:
|
||||
buf: Text buffer (list of strings)
|
||||
max_rows: Maximum number of rows to process
|
||||
|
||||
Returns:
|
||||
List of intensity values (0.0-1.0) per row
|
||||
"""
|
||||
intensities = []
|
||||
# Limit to viewport height
|
||||
lines = buf[:max_rows]
|
||||
|
||||
for line in lines:
|
||||
# Strip ANSI codes for length calc
|
||||
|
||||
plain = _strip_ansi(line)
|
||||
if not plain:
|
||||
intensities.append(0.0)
|
||||
continue
|
||||
|
||||
# Simple heuristic: ratio of non-space characters
|
||||
# More sophisticated version could parse ANSI RGB brightness
|
||||
filled = sum(1 for c in plain if c not in (" ", "\t"))
|
||||
total = len(plain)
|
||||
intensity = filled / total if total > 0 else 0.0
|
||||
intensities.append(max(0.0, min(1.0, intensity)))
|
||||
|
||||
# Pad to max_rows if needed
|
||||
while len(intensities) < max_rows:
|
||||
intensities.append(0.0)
|
||||
|
||||
return intensities
|
||||
|
||||
def get_frame(
|
||||
self, index: int = 0, ctx: PipelineContext | None = None
|
||||
) -> list[str] | None:
|
||||
"""Get frame from history by index (0 = current, 1 = previous, etc)."""
|
||||
if ctx is None:
|
||||
return None
|
||||
history = ctx.get("frame_history", [])
|
||||
if 0 <= index < len(history):
|
||||
return history[index]
|
||||
return None
|
||||
|
||||
def get_intensity(
|
||||
self, index: int = 0, ctx: PipelineContext | None = None
|
||||
) -> list[float] | None:
|
||||
"""Get intensity map from history by index."""
|
||||
if ctx is None:
|
||||
return None
|
||||
intensity_hist = ctx.get("intensity_history", [])
|
||||
if 0 <= index < len(intensity_hist):
|
||||
return intensity_hist[index]
|
||||
return None
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Cleanup resources."""
|
||||
pass
|
||||
549
engine/pipeline/ui.py
Normal file
549
engine/pipeline/ui.py
Normal file
@@ -0,0 +1,549 @@
|
||||
"""
|
||||
Pipeline UI panel - Interactive controls for pipeline configuration.
|
||||
|
||||
Provides:
|
||||
- Stage list with enable/disable toggles
|
||||
- Parameter sliders for selected effect
|
||||
- Keyboard/mouse interaction
|
||||
|
||||
This module implements the right-side UI panel that appears in border="ui" mode.
|
||||
"""
|
||||
|
||||
from collections.abc import Callable
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class UIConfig:
|
||||
"""Configuration for the UI panel."""
|
||||
|
||||
panel_width: int = 24 # Characters wide
|
||||
stage_list_height: int = 12 # Number of stages to show at once
|
||||
param_height: int = 8 # Space for parameter controls
|
||||
scroll_offset: int = 0 # Scroll position in stage list
|
||||
start_with_preset_picker: bool = False # Show preset picker immediately
|
||||
|
||||
|
||||
@dataclass
|
||||
class StageControl:
|
||||
"""Represents a stage in the UI panel with its toggle state."""
|
||||
|
||||
name: str
|
||||
stage_name: str # Actual pipeline stage name
|
||||
category: str
|
||||
enabled: bool = True
|
||||
selected: bool = False
|
||||
params: dict[str, Any] = field(default_factory=dict) # Current param values
|
||||
param_schema: dict[str, dict] = field(default_factory=dict) # Param metadata
|
||||
|
||||
def toggle(self) -> None:
|
||||
"""Toggle enabled state."""
|
||||
self.enabled = not self.enabled
|
||||
|
||||
def get_param(self, name: str) -> Any:
|
||||
"""Get current parameter value."""
|
||||
return self.params.get(name)
|
||||
|
||||
def set_param(self, name: str, value: Any) -> None:
|
||||
"""Set parameter value."""
|
||||
self.params[name] = value
|
||||
|
||||
|
||||
class UIPanel:
|
||||
"""Interactive UI panel for pipeline configuration.
|
||||
|
||||
Manages:
|
||||
- Stage list with enable/disable checkboxes
|
||||
- Parameter sliders for selected stage
|
||||
- Keyboard/mouse event handling
|
||||
- Scroll state for long stage lists
|
||||
|
||||
The panel is rendered as a right border (panel_width characters wide)
|
||||
alongside the main viewport.
|
||||
"""
|
||||
|
||||
def __init__(self, config: UIConfig | None = None):
|
||||
self.config = config or UIConfig()
|
||||
self.stages: dict[str, StageControl] = {} # stage_name -> StageControl
|
||||
self.scroll_offset = 0
|
||||
self.selected_stage: str | None = None
|
||||
self._focused_param: str | None = None # For slider adjustment
|
||||
self._callbacks: dict[str, Callable] = {} # Event callbacks
|
||||
self._presets: list[str] = [] # Available preset names
|
||||
self._current_preset: str = "" # Current preset name
|
||||
self._show_preset_picker: bool = (
|
||||
config.start_with_preset_picker if config else False
|
||||
) # Picker overlay visible
|
||||
self._show_panel: bool = True # UI panel visibility
|
||||
self._preset_scroll_offset: int = 0 # Scroll in preset list
|
||||
|
||||
def register_stage(self, stage: Any, enabled: bool = True) -> StageControl:
|
||||
"""Register a stage for UI control.
|
||||
|
||||
Args:
|
||||
stage: Stage instance (must have .name, .category attributes)
|
||||
enabled: Initial enabled state
|
||||
|
||||
Returns:
|
||||
The created StageControl instance
|
||||
"""
|
||||
control = StageControl(
|
||||
name=stage.name,
|
||||
stage_name=stage.name,
|
||||
category=stage.category,
|
||||
enabled=enabled,
|
||||
)
|
||||
self.stages[stage.name] = control
|
||||
return control
|
||||
|
||||
def unregister_stage(self, stage_name: str) -> None:
|
||||
"""Remove a stage from UI control."""
|
||||
if stage_name in self.stages:
|
||||
del self.stages[stage_name]
|
||||
|
||||
def get_enabled_stages(self) -> list[str]:
|
||||
"""Get list of stage names that are currently enabled."""
|
||||
return [name for name, ctrl in self.stages.items() if ctrl.enabled]
|
||||
|
||||
def select_stage(self, stage_name: str | None = None) -> None:
|
||||
"""Select a stage (for parameter editing)."""
|
||||
if stage_name in self.stages:
|
||||
self.selected_stage = stage_name
|
||||
self.stages[stage_name].selected = True
|
||||
# Deselect others
|
||||
for name, ctrl in self.stages.items():
|
||||
if name != stage_name:
|
||||
ctrl.selected = False
|
||||
# Auto-focus first parameter when stage selected
|
||||
if self.stages[stage_name].params:
|
||||
self._focused_param = next(iter(self.stages[stage_name].params.keys()))
|
||||
else:
|
||||
self._focused_param = None
|
||||
|
||||
def toggle_stage(self, stage_name: str) -> bool:
|
||||
"""Toggle a stage's enabled state.
|
||||
|
||||
Returns:
|
||||
New enabled state
|
||||
"""
|
||||
if stage_name in self.stages:
|
||||
ctrl = self.stages[stage_name]
|
||||
ctrl.enabled = not ctrl.enabled
|
||||
return ctrl.enabled
|
||||
return False
|
||||
|
||||
def adjust_selected_param(self, delta: float) -> None:
|
||||
"""Adjust the currently focused parameter of selected stage.
|
||||
|
||||
Args:
|
||||
delta: Amount to add (positive or negative)
|
||||
"""
|
||||
if self.selected_stage and self._focused_param:
|
||||
ctrl = self.stages[self.selected_stage]
|
||||
if self._focused_param in ctrl.params:
|
||||
current = ctrl.params[self._focused_param]
|
||||
# Determine step size from schema
|
||||
schema = ctrl.param_schema.get(self._focused_param, {})
|
||||
step = schema.get("step", 0.1 if isinstance(current, float) else 1)
|
||||
new_val = current + delta * step
|
||||
# Clamp to min/max if specified
|
||||
if "min" in schema:
|
||||
new_val = max(schema["min"], new_val)
|
||||
if "max" in schema:
|
||||
new_val = min(schema["max"], new_val)
|
||||
# Only emit if value actually changed
|
||||
if new_val != current:
|
||||
ctrl.params[self._focused_param] = new_val
|
||||
self._emit_event(
|
||||
"param_changed",
|
||||
stage_name=self.selected_stage,
|
||||
param_name=self._focused_param,
|
||||
value=new_val,
|
||||
)
|
||||
|
||||
def scroll_stages(self, delta: int) -> None:
|
||||
"""Scroll the stage list."""
|
||||
max_offset = max(0, len(self.stages) - self.config.stage_list_height)
|
||||
self.scroll_offset = max(0, min(max_offset, self.scroll_offset + delta))
|
||||
|
||||
def render(self, width: int, height: int) -> list[str]:
|
||||
"""Render the UI panel.
|
||||
|
||||
Args:
|
||||
width: Total display width (panel uses last `panel_width` cols)
|
||||
height: Total display height
|
||||
|
||||
Returns:
|
||||
List of strings, each of length `panel_width`, to overlay on right side
|
||||
"""
|
||||
panel_width = min(
|
||||
self.config.panel_width, width - 4
|
||||
) # Reserve at least 2 for main
|
||||
lines = []
|
||||
|
||||
# If panel is hidden, render empty space
|
||||
if not self._show_panel:
|
||||
return [" " * panel_width for _ in range(height)]
|
||||
|
||||
# If preset picker is active, render that overlay instead of normal panel
|
||||
if self._show_preset_picker:
|
||||
picker_lines = self._render_preset_picker(panel_width)
|
||||
# Pad to full panel height if needed
|
||||
while len(picker_lines) < height:
|
||||
picker_lines.append(" " * panel_width)
|
||||
return [
|
||||
line.ljust(panel_width)[:panel_width] for line in picker_lines[:height]
|
||||
]
|
||||
|
||||
# Header
|
||||
title_line = "┌" + "─" * (panel_width - 2) + "┐"
|
||||
lines.append(title_line)
|
||||
|
||||
# Stage list section (occupies most of the panel)
|
||||
list_height = self.config.stage_list_height
|
||||
stage_names = list(self.stages.keys())
|
||||
for i in range(list_height):
|
||||
idx = i + self.scroll_offset
|
||||
if idx < len(stage_names):
|
||||
stage_name = stage_names[idx]
|
||||
ctrl = self.stages[stage_name]
|
||||
status = "✓" if ctrl.enabled else "✗"
|
||||
sel = ">" if ctrl.selected else " "
|
||||
# Truncate to fit panel (leave room for ">✓ " prefix and padding)
|
||||
max_name_len = panel_width - 5
|
||||
display_name = ctrl.name[:max_name_len]
|
||||
line = f"│{sel}{status} {display_name:<{max_name_len}}"
|
||||
lines.append(line[:panel_width])
|
||||
else:
|
||||
lines.append("│" + " " * (panel_width - 2) + "│")
|
||||
|
||||
# Separator
|
||||
lines.append("├" + "─" * (panel_width - 2) + "┤")
|
||||
|
||||
# Parameter section (if stage selected)
|
||||
if self.selected_stage and self.selected_stage in self.stages:
|
||||
ctrl = self.stages[self.selected_stage]
|
||||
if ctrl.params:
|
||||
# Render each parameter as "name: [=====] value" with focus indicator
|
||||
for param_name, param_value in ctrl.params.items():
|
||||
schema = ctrl.param_schema.get(param_name, {})
|
||||
is_focused = param_name == self._focused_param
|
||||
# Format value based on type
|
||||
if isinstance(param_value, float):
|
||||
val_str = f"{param_value:.2f}"
|
||||
elif isinstance(param_value, int):
|
||||
val_str = f"{param_value}"
|
||||
elif isinstance(param_value, bool):
|
||||
val_str = str(param_value)
|
||||
else:
|
||||
val_str = str(param_value)
|
||||
|
||||
# Build parameter line
|
||||
if (
|
||||
isinstance(param_value, (int, float))
|
||||
and "min" in schema
|
||||
and "max" in schema
|
||||
):
|
||||
# Render as slider
|
||||
min_val = schema["min"]
|
||||
max_val = schema["max"]
|
||||
# Normalize to 0-1 for bar length
|
||||
if max_val != min_val:
|
||||
ratio = (param_value - min_val) / (max_val - min_val)
|
||||
else:
|
||||
ratio = 0
|
||||
bar_width = (
|
||||
panel_width - len(param_name) - len(val_str) - 10
|
||||
) # approx space for "[] : ="
|
||||
if bar_width < 1:
|
||||
bar_width = 1
|
||||
filled = int(round(ratio * bar_width))
|
||||
bar = "[" + "=" * filled + " " * (bar_width - filled) + "]"
|
||||
param_line = f"│ {param_name}: {bar} {val_str}"
|
||||
else:
|
||||
# Simple name=value
|
||||
param_line = f"│ {param_name}={val_str}"
|
||||
|
||||
# Highlight focused parameter
|
||||
if is_focused:
|
||||
# Invert colors conceptually - for now use > prefix
|
||||
param_line = "│> " + param_line[2:]
|
||||
|
||||
# Truncate to fit panel width
|
||||
if len(param_line) > panel_width - 1:
|
||||
param_line = param_line[: panel_width - 1]
|
||||
lines.append(param_line + "│")
|
||||
else:
|
||||
lines.append("│ (no params)".ljust(panel_width - 1) + "│")
|
||||
else:
|
||||
lines.append("│ (select a stage)".ljust(panel_width - 1) + "│")
|
||||
|
||||
# Info line before footer
|
||||
info_parts = []
|
||||
if self._current_preset:
|
||||
info_parts.append(f"Preset: {self._current_preset}")
|
||||
if self._presets:
|
||||
info_parts.append("[P] presets")
|
||||
info_str = " | ".join(info_parts) if info_parts else ""
|
||||
if info_str:
|
||||
padded = info_str.ljust(panel_width - 2)
|
||||
lines.append("│" + padded + "│")
|
||||
|
||||
# Footer with instructions
|
||||
footer_line = self._render_footer(panel_width)
|
||||
lines.append(footer_line)
|
||||
|
||||
# Ensure all lines are exactly panel_width
|
||||
return [line.ljust(panel_width)[:panel_width] for line in lines]
|
||||
|
||||
def _render_footer(self, width: int) -> str:
|
||||
"""Render footer with key hints."""
|
||||
if width >= 40:
|
||||
# Show preset name and key hints
|
||||
preset_info = (
|
||||
f"Preset: {self._current_preset}" if self._current_preset else ""
|
||||
)
|
||||
hints = " [S]elect [Space]UI [Tab]Params [Arrows/HJKL]Adjust "
|
||||
if self._presets:
|
||||
hints += "[P]Preset "
|
||||
combined = f"{preset_info}{hints}"
|
||||
if len(combined) > width - 4:
|
||||
combined = combined[: width - 4]
|
||||
footer = "└" + "─" * (width - 2) + "┘"
|
||||
return footer # Just the line, we'll add info above in render
|
||||
else:
|
||||
return "└" + "─" * (width - 2) + "┘"
|
||||
|
||||
def process_key_event(self, key: str | int, modifiers: int = 0) -> bool:
|
||||
"""Process a keyboard event.
|
||||
|
||||
Args:
|
||||
key: Key symbol (e.g., ' ', 's', pygame.K_UP, etc.)
|
||||
modifiers: Modifier bits (Shift, Ctrl, Alt)
|
||||
|
||||
Returns:
|
||||
True if event was handled, False if not
|
||||
"""
|
||||
# Normalize to string for simplicity
|
||||
key_str = self._normalize_key(key, modifiers)
|
||||
|
||||
# Space: toggle UI panel visibility (only when preset picker not active)
|
||||
if key_str == " " and not self._show_preset_picker:
|
||||
self._show_panel = not getattr(self, "_show_panel", True)
|
||||
return True
|
||||
|
||||
# Space: toggle UI panel visibility (only when preset picker not active)
|
||||
if key_str == " " and not self._show_preset_picker:
|
||||
self._show_panel = not getattr(self, "_show_panel", True)
|
||||
return True
|
||||
|
||||
# S: select stage (cycle)
|
||||
if key_str == "s" and modifiers == 0:
|
||||
stages = list(self.stages.keys())
|
||||
if not stages:
|
||||
return False
|
||||
if self.selected_stage:
|
||||
current_idx = stages.index(self.selected_stage)
|
||||
next_idx = (current_idx + 1) % len(stages)
|
||||
else:
|
||||
next_idx = 0
|
||||
self.select_stage(stages[next_idx])
|
||||
return True
|
||||
|
||||
# P: toggle preset picker (only when panel is visible)
|
||||
if key_str == "p" and self._show_panel:
|
||||
self._show_preset_picker = not self._show_preset_picker
|
||||
if self._show_preset_picker:
|
||||
self._preset_scroll_offset = 0
|
||||
return True
|
||||
|
||||
# HJKL or Arrow Keys: scroll stage list, preset list, or adjust param
|
||||
# vi-style: K=up, J=down (J is actually next line in vi, but we use for down)
|
||||
# We'll use J for down, K for up, H for left, L for right
|
||||
elif key_str in ("up", "down", "kp8", "kp2", "j", "k"):
|
||||
# If preset picker is open, scroll preset list
|
||||
if self._show_preset_picker:
|
||||
delta = -1 if key_str in ("up", "kp8", "k") else 1
|
||||
self._preset_scroll_offset = max(0, self._preset_scroll_offset + delta)
|
||||
# Ensure scroll doesn't go past end
|
||||
max_offset = max(0, len(self._presets) - 1)
|
||||
self._preset_scroll_offset = min(max_offset, self._preset_scroll_offset)
|
||||
return True
|
||||
# If param is focused, adjust param value
|
||||
elif self.selected_stage and self._focused_param:
|
||||
delta = -1.0 if key_str in ("up", "kp8", "k") else 1.0
|
||||
self.adjust_selected_param(delta)
|
||||
return True
|
||||
# Otherwise scroll stages
|
||||
else:
|
||||
delta = -1 if key_str in ("up", "kp8", "k") else 1
|
||||
self.scroll_stages(delta)
|
||||
return True
|
||||
|
||||
# Left/Right or H/L: adjust param (if param selected)
|
||||
elif key_str in ("left", "right", "kp4", "kp6", "h", "l"):
|
||||
if self.selected_stage:
|
||||
delta = -0.1 if key_str in ("left", "kp4", "h") else 0.1
|
||||
self.adjust_selected_param(delta)
|
||||
return True
|
||||
|
||||
# Tab: cycle through parameters
|
||||
if key_str == "tab" and self.selected_stage:
|
||||
ctrl = self.stages[self.selected_stage]
|
||||
param_names = list(ctrl.params.keys())
|
||||
if param_names:
|
||||
if self._focused_param in param_names:
|
||||
current_idx = param_names.index(self._focused_param)
|
||||
next_idx = (current_idx + 1) % len(param_names)
|
||||
else:
|
||||
next_idx = 0
|
||||
self._focused_param = param_names[next_idx]
|
||||
return True
|
||||
|
||||
# Preset picker navigation
|
||||
if self._show_preset_picker:
|
||||
# Enter: select currently highlighted preset
|
||||
if key_str == "return":
|
||||
if self._presets:
|
||||
idx = self._preset_scroll_offset
|
||||
if idx < len(self._presets):
|
||||
self._current_preset = self._presets[idx]
|
||||
self._emit_event(
|
||||
"preset_changed", preset_name=self._current_preset
|
||||
)
|
||||
self._show_preset_picker = False
|
||||
return True
|
||||
# Escape: close picker without changing
|
||||
elif key_str == "escape":
|
||||
self._show_preset_picker = False
|
||||
return True
|
||||
|
||||
# Escape: deselect stage (only when picker not active)
|
||||
elif key_str == "escape" and self.selected_stage:
|
||||
self.selected_stage = None
|
||||
for ctrl in self.stages.values():
|
||||
ctrl.selected = False
|
||||
self._focused_param = None
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
def _normalize_key(self, key: str | int, modifiers: int) -> str:
|
||||
"""Normalize key to a string identifier."""
|
||||
# Handle pygame keysyms if imported
|
||||
try:
|
||||
import pygame
|
||||
|
||||
if isinstance(key, int):
|
||||
# Map pygame constants to strings
|
||||
key_map = {
|
||||
pygame.K_UP: "up",
|
||||
pygame.K_DOWN: "down",
|
||||
pygame.K_LEFT: "left",
|
||||
pygame.K_RIGHT: "right",
|
||||
pygame.K_SPACE: " ",
|
||||
pygame.K_ESCAPE: "escape",
|
||||
pygame.K_s: "s",
|
||||
pygame.K_w: "w",
|
||||
# HJKL navigation (vi-style)
|
||||
pygame.K_h: "h",
|
||||
pygame.K_j: "j",
|
||||
pygame.K_k: "k",
|
||||
pygame.K_l: "l",
|
||||
}
|
||||
# Check for keypad keys with KP prefix
|
||||
if hasattr(pygame, "K_KP8") and key == pygame.K_KP8:
|
||||
return "kp8"
|
||||
if hasattr(pygame, "K_KP2") and key == pygame.K_KP2:
|
||||
return "kp2"
|
||||
if hasattr(pygame, "K_KP4") and key == pygame.K_KP4:
|
||||
return "kp4"
|
||||
if hasattr(pygame, "K_KP6") and key == pygame.K_KP6:
|
||||
return "kp6"
|
||||
return key_map.get(key, f"pygame_{key}")
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Already a string?
|
||||
if isinstance(key, str):
|
||||
return key.lower()
|
||||
|
||||
return str(key)
|
||||
|
||||
def set_event_callback(self, event_type: str, callback: Callable) -> None:
|
||||
"""Register a callback for UI events.
|
||||
|
||||
Args:
|
||||
event_type: Event type ("stage_toggled", "param_changed", "stage_selected", "preset_changed")
|
||||
callback: Function to call when event occurs
|
||||
"""
|
||||
self._callbacks[event_type] = callback
|
||||
|
||||
def _emit_event(self, event_type: str, **data) -> None:
|
||||
"""Emit an event to registered callbacks."""
|
||||
callback = self._callbacks.get(event_type)
|
||||
if callback:
|
||||
try:
|
||||
callback(**data)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
def set_presets(self, presets: list[str], current: str) -> None:
|
||||
"""Set available presets and current selection.
|
||||
|
||||
Args:
|
||||
presets: List of preset names
|
||||
current: Currently active preset name
|
||||
"""
|
||||
self._presets = presets
|
||||
self._current_preset = current
|
||||
|
||||
def cycle_preset(self, direction: int = 1) -> str:
|
||||
"""Cycle to next/previous preset.
|
||||
|
||||
Args:
|
||||
direction: 1 for next, -1 for previous
|
||||
|
||||
Returns:
|
||||
New preset name
|
||||
"""
|
||||
if not self._presets:
|
||||
return self._current_preset
|
||||
try:
|
||||
current_idx = self._presets.index(self._current_preset)
|
||||
except ValueError:
|
||||
current_idx = 0
|
||||
next_idx = (current_idx + direction) % len(self._presets)
|
||||
self._current_preset = self._presets[next_idx]
|
||||
self._emit_event("preset_changed", preset_name=self._current_preset)
|
||||
return self._current_preset
|
||||
|
||||
def _render_preset_picker(self, panel_width: int) -> list[str]:
|
||||
"""Render a full-screen preset picker overlay."""
|
||||
lines = []
|
||||
picker_height = min(len(self._presets) + 2, self.config.stage_list_height)
|
||||
# Create a centered box
|
||||
title = " Select Preset "
|
||||
box_width = min(40, panel_width - 2)
|
||||
lines.append("┌" + "─" * (box_width - 2) + "┐")
|
||||
lines.append("│" + title.center(box_width - 2) + "│")
|
||||
lines.append("├" + "─" * (box_width - 2) + "┤")
|
||||
# List presets with selection
|
||||
visible_start = self._preset_scroll_offset
|
||||
visible_end = visible_start + picker_height - 2
|
||||
for i in range(visible_start, min(visible_end, len(self._presets))):
|
||||
preset_name = self._presets[i]
|
||||
is_current = preset_name == self._current_preset
|
||||
prefix = "▶ " if is_current else " "
|
||||
line = f"│ {prefix}{preset_name}"
|
||||
if len(line) < box_width - 1:
|
||||
line = line.ljust(box_width - 1)
|
||||
lines.append(line[: box_width - 1] + "│")
|
||||
# Footer with help
|
||||
help_text = "[P] close [↑↓] navigate [Enter] select"
|
||||
footer = "├" + "─" * (box_width - 2) + "┤"
|
||||
lines.append(footer)
|
||||
lines.append("│" + help_text.center(box_width - 2) + "│")
|
||||
lines.append("└" + "─" * (box_width - 2) + "┘")
|
||||
return lines
|
||||
219
engine/pipeline/validation.py
Normal file
219
engine/pipeline/validation.py
Normal file
@@ -0,0 +1,219 @@
|
||||
"""
|
||||
Pipeline validation and MVP (Minimum Viable Pipeline) injection.
|
||||
|
||||
Provides validation functions to ensure pipelines meet minimum requirements
|
||||
and can auto-inject sensible defaults when fields are missing or invalid.
|
||||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from engine.display import BorderMode, DisplayRegistry
|
||||
from engine.effects import get_registry
|
||||
from engine.pipeline.params import PipelineParams
|
||||
|
||||
# Known valid values
|
||||
VALID_SOURCES = ["headlines", "poetry", "fixture", "empty", "pipeline-inspect"]
|
||||
VALID_CAMERAS = [
|
||||
"feed",
|
||||
"scroll",
|
||||
"vertical",
|
||||
"horizontal",
|
||||
"omni",
|
||||
"floating",
|
||||
"bounce",
|
||||
"none",
|
||||
"",
|
||||
]
|
||||
VALID_DISPLAYS = None # Will be populated at runtime from DisplayRegistry
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationResult:
|
||||
"""Result of validation with changes and warnings."""
|
||||
|
||||
valid: bool
|
||||
warnings: list[str]
|
||||
changes: list[str]
|
||||
config: Any # PipelineConfig (forward ref)
|
||||
params: PipelineParams
|
||||
|
||||
|
||||
# MVP defaults
|
||||
MVP_DEFAULTS = {
|
||||
"source": "fixture",
|
||||
"display": "terminal",
|
||||
"camera": "", # Empty = no camera stage (static viewport)
|
||||
"effects": [],
|
||||
"border": False,
|
||||
}
|
||||
|
||||
|
||||
def validate_pipeline_config(
|
||||
config: Any, params: PipelineParams, allow_unsafe: bool = False
|
||||
) -> ValidationResult:
|
||||
"""Validate pipeline configuration against MVP requirements.
|
||||
|
||||
Args:
|
||||
config: PipelineConfig object (has source, display, camera, effects fields)
|
||||
params: PipelineParams object (has border field)
|
||||
allow_unsafe: If True, don't inject defaults or enforce MVP
|
||||
|
||||
Returns:
|
||||
ValidationResult with validity, warnings, changes, and validated config/params
|
||||
"""
|
||||
warnings = []
|
||||
changes = []
|
||||
|
||||
if allow_unsafe:
|
||||
# Still do basic validation but don't inject defaults
|
||||
# Always return valid=True when allow_unsafe is set
|
||||
warnings.extend(_validate_source(config.source))
|
||||
warnings.extend(_validate_display(config.display))
|
||||
warnings.extend(_validate_camera(config.camera))
|
||||
warnings.extend(_validate_effects(config.effects))
|
||||
warnings.extend(_validate_border(params.border))
|
||||
return ValidationResult(
|
||||
valid=True, # Always valid with allow_unsafe
|
||||
warnings=warnings,
|
||||
changes=[],
|
||||
config=config,
|
||||
params=params,
|
||||
)
|
||||
|
||||
# MVP injection mode
|
||||
# Source
|
||||
source_issues = _validate_source(config.source)
|
||||
if source_issues:
|
||||
warnings.extend(source_issues)
|
||||
config.source = MVP_DEFAULTS["source"]
|
||||
changes.append(f"source → {MVP_DEFAULTS['source']}")
|
||||
|
||||
# Display
|
||||
display_issues = _validate_display(config.display)
|
||||
if display_issues:
|
||||
warnings.extend(display_issues)
|
||||
config.display = MVP_DEFAULTS["display"]
|
||||
changes.append(f"display → {MVP_DEFAULTS['display']}")
|
||||
|
||||
# Camera
|
||||
camera_issues = _validate_camera(config.camera)
|
||||
if camera_issues:
|
||||
warnings.extend(camera_issues)
|
||||
config.camera = MVP_DEFAULTS["camera"]
|
||||
changes.append("camera → static (no camera stage)")
|
||||
|
||||
# Effects
|
||||
effect_issues = _validate_effects(config.effects)
|
||||
if effect_issues:
|
||||
warnings.extend(effect_issues)
|
||||
# Only change if all effects are invalid
|
||||
if len(config.effects) == 0 or all(
|
||||
e not in _get_valid_effects() for e in config.effects
|
||||
):
|
||||
config.effects = MVP_DEFAULTS["effects"]
|
||||
changes.append("effects → [] (none)")
|
||||
else:
|
||||
# Remove invalid effects, keep valid ones
|
||||
valid_effects = [e for e in config.effects if e in _get_valid_effects()]
|
||||
if valid_effects != config.effects:
|
||||
config.effects = valid_effects
|
||||
changes.append(f"effects → {valid_effects}")
|
||||
|
||||
# Border (in params)
|
||||
border_issues = _validate_border(params.border)
|
||||
if border_issues:
|
||||
warnings.extend(border_issues)
|
||||
params.border = MVP_DEFAULTS["border"]
|
||||
changes.append(f"border → {MVP_DEFAULTS['border']}")
|
||||
|
||||
valid = len(warnings) == 0
|
||||
if changes:
|
||||
# If we made changes, pipeline should be valid now
|
||||
valid = True
|
||||
|
||||
return ValidationResult(
|
||||
valid=valid,
|
||||
warnings=warnings,
|
||||
changes=changes,
|
||||
config=config,
|
||||
params=params,
|
||||
)
|
||||
|
||||
|
||||
def _validate_source(source: str) -> list[str]:
|
||||
"""Validate source field."""
|
||||
if not source:
|
||||
return ["source is empty"]
|
||||
if source not in VALID_SOURCES:
|
||||
return [f"unknown source '{source}', valid sources: {VALID_SOURCES}"]
|
||||
return []
|
||||
|
||||
|
||||
def _validate_display(display: str) -> list[str]:
|
||||
"""Validate display field."""
|
||||
if not display:
|
||||
return ["display is empty"]
|
||||
# Check if display is available (lazy load registry)
|
||||
try:
|
||||
available = DisplayRegistry.list_backends()
|
||||
if display not in available:
|
||||
return [f"display '{display}' not available, available: {available}"]
|
||||
except Exception as e:
|
||||
return [f"error checking display availability: {e}"]
|
||||
return []
|
||||
|
||||
|
||||
def _validate_camera(camera: str | None) -> list[str]:
|
||||
"""Validate camera field."""
|
||||
if camera is None:
|
||||
return ["camera is None"]
|
||||
# Empty string is valid (static, no camera stage)
|
||||
if camera == "":
|
||||
return []
|
||||
if camera not in VALID_CAMERAS:
|
||||
return [f"unknown camera '{camera}', valid cameras: {VALID_CAMERAS}"]
|
||||
return []
|
||||
|
||||
|
||||
def _get_valid_effects() -> set[str]:
|
||||
"""Get set of valid effect names."""
|
||||
registry = get_registry()
|
||||
return set(registry.list_all().keys())
|
||||
|
||||
|
||||
def _validate_effects(effects: list[str]) -> list[str]:
|
||||
"""Validate effects list."""
|
||||
if effects is None:
|
||||
return ["effects is None"]
|
||||
valid_effects = _get_valid_effects()
|
||||
issues = []
|
||||
for effect in effects:
|
||||
if effect not in valid_effects:
|
||||
issues.append(
|
||||
f"unknown effect '{effect}', valid effects: {sorted(valid_effects)}"
|
||||
)
|
||||
return issues
|
||||
|
||||
|
||||
def _validate_border(border: bool | BorderMode) -> list[str]:
|
||||
"""Validate border field."""
|
||||
if isinstance(border, bool):
|
||||
return []
|
||||
if isinstance(border, BorderMode):
|
||||
return []
|
||||
return [f"invalid border value, must be bool or BorderMode, got {type(border)}"]
|
||||
|
||||
|
||||
def get_mvp_summary(config: Any, params: PipelineParams) -> str:
|
||||
"""Get a human-readable summary of the MVP pipeline configuration."""
|
||||
camera_text = "none" if not config.camera else config.camera
|
||||
effects_text = "none" if not config.effects else ", ".join(config.effects)
|
||||
return (
|
||||
f"MVP Pipeline Configuration:\n"
|
||||
f" Source: {config.source}\n"
|
||||
f" Display: {config.display}\n"
|
||||
f" Camera: {camera_text} (static if empty)\n"
|
||||
f" Effects: {effects_text}\n"
|
||||
f" Border: {params.border}"
|
||||
)
|
||||
56
test_ui_simple.py
Normal file
56
test_ui_simple.py
Normal file
@@ -0,0 +1,56 @@
|
||||
"""
|
||||
Simple test for UIPanel integration.
|
||||
"""
|
||||
|
||||
from engine.pipeline.ui import UIPanel, UIConfig, StageControl
|
||||
|
||||
# Create panel
|
||||
panel = UIPanel(UIConfig(panel_width=24))
|
||||
|
||||
# Add some mock stages
|
||||
panel.register_stage(
|
||||
type(
|
||||
"Stage", (), {"name": "noise", "category": "effect", "is_enabled": lambda: True}
|
||||
),
|
||||
enabled=True,
|
||||
)
|
||||
panel.register_stage(
|
||||
type(
|
||||
"Stage", (), {"name": "fade", "category": "effect", "is_enabled": lambda: True}
|
||||
),
|
||||
enabled=False,
|
||||
)
|
||||
panel.register_stage(
|
||||
type(
|
||||
"Stage",
|
||||
(),
|
||||
{"name": "glitch", "category": "effect", "is_enabled": lambda: True},
|
||||
),
|
||||
enabled=True,
|
||||
)
|
||||
panel.register_stage(
|
||||
type(
|
||||
"Stage",
|
||||
(),
|
||||
{"name": "font", "category": "transform", "is_enabled": lambda: True},
|
||||
),
|
||||
enabled=True,
|
||||
)
|
||||
|
||||
# Select first stage
|
||||
panel.select_stage("noise")
|
||||
|
||||
# Render at 80x24
|
||||
lines = panel.render(80, 24)
|
||||
print("\n".join(lines))
|
||||
|
||||
print("\nStage list:")
|
||||
for name, ctrl in panel.stages.items():
|
||||
print(f" {name}: enabled={ctrl.enabled}, selected={ctrl.selected}")
|
||||
|
||||
print("\nToggle 'fade' and re-render:")
|
||||
panel.toggle_stage("fade")
|
||||
lines = panel.render(80, 24)
|
||||
print("\n".join(lines))
|
||||
|
||||
print("\nEnabled stages:", panel.get_enabled_stages())
|
||||
@@ -77,11 +77,13 @@ class TestDisplayRegistry:
|
||||
DisplayRegistry.initialize()
|
||||
assert DisplayRegistry.get("terminal") == TerminalDisplay
|
||||
assert DisplayRegistry.get("null") == NullDisplay
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
from engine.display.backends.pygame import PygameDisplay
|
||||
from engine.display.backends.websocket import WebSocketDisplay
|
||||
|
||||
assert DisplayRegistry.get("websocket") == WebSocketDisplay
|
||||
assert DisplayRegistry.get("sixel") == SixelDisplay
|
||||
assert DisplayRegistry.get("pygame") == PygameDisplay
|
||||
# Removed backends (sixel, kitty) should not be present
|
||||
assert DisplayRegistry.get("sixel") is None
|
||||
|
||||
def test_initialize_idempotent(self):
|
||||
"""initialize can be called multiple times safely."""
|
||||
|
||||
236
tests/test_framebuffer_stage.py
Normal file
236
tests/test_framebuffer_stage.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""
|
||||
Tests for FrameBufferStage.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from engine.pipeline.core import DataType, PipelineContext
|
||||
from engine.pipeline.params import PipelineParams
|
||||
from engine.pipeline.stages.framebuffer import FrameBufferConfig, FrameBufferStage
|
||||
|
||||
|
||||
def make_ctx(width: int = 80, height: int = 24) -> PipelineContext:
|
||||
"""Create a PipelineContext for testing."""
|
||||
ctx = PipelineContext()
|
||||
params = PipelineParams()
|
||||
params.viewport_width = width
|
||||
params.viewport_height = height
|
||||
ctx.params = params
|
||||
return ctx
|
||||
|
||||
|
||||
class TestFrameBufferStage:
|
||||
"""Tests for FrameBufferStage."""
|
||||
|
||||
def test_init(self):
|
||||
"""FrameBufferStage initializes with default config."""
|
||||
stage = FrameBufferStage()
|
||||
assert stage.name == "framebuffer"
|
||||
assert stage.category == "effect"
|
||||
assert stage.config.history_depth == 2
|
||||
|
||||
def test_capabilities(self):
|
||||
"""Stage provides framebuffer.history capability."""
|
||||
stage = FrameBufferStage()
|
||||
assert "framebuffer.history" in stage.capabilities
|
||||
|
||||
def test_dependencies(self):
|
||||
"""Stage depends on render.output."""
|
||||
stage = FrameBufferStage()
|
||||
assert "render.output" in stage.dependencies
|
||||
|
||||
def test_inlet_outlet_types(self):
|
||||
"""Stage accepts and produces TEXT_BUFFER."""
|
||||
stage = FrameBufferStage()
|
||||
assert DataType.TEXT_BUFFER in stage.inlet_types
|
||||
assert DataType.TEXT_BUFFER in stage.outlet_types
|
||||
|
||||
def test_init_context(self):
|
||||
"""init initializes context state."""
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
|
||||
result = stage.init(ctx)
|
||||
|
||||
assert result is True
|
||||
assert ctx.get("frame_history") == []
|
||||
assert ctx.get("intensity_history") == []
|
||||
|
||||
def test_process_stores_buffer_in_history(self):
|
||||
"""process stores buffer in history."""
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffer = ["line1", "line2", "line3"]
|
||||
result = stage.process(buffer, ctx)
|
||||
|
||||
assert result == buffer # Pass-through
|
||||
history = ctx.get("frame_history")
|
||||
assert len(history) == 1
|
||||
assert history[0] == buffer
|
||||
|
||||
def test_process_computes_intensity(self):
|
||||
"""process computes intensity map."""
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffer = ["hello world", "test line", ""]
|
||||
stage.process(buffer, ctx)
|
||||
|
||||
intensity = ctx.get("current_intensity")
|
||||
assert intensity is not None
|
||||
assert len(intensity) == 3 # Three rows
|
||||
# Non-empty lines should have intensity > 0
|
||||
assert intensity[0] > 0
|
||||
assert intensity[1] > 0
|
||||
# Empty line should have intensity 0
|
||||
assert intensity[2] == 0.0
|
||||
|
||||
def test_process_keeps_multiple_frames(self):
|
||||
"""process keeps configured depth of frames."""
|
||||
config = FrameBufferConfig(history_depth=3)
|
||||
stage = FrameBufferStage(config)
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
# Process several frames
|
||||
for i in range(5):
|
||||
buffer = [f"frame {i}"]
|
||||
stage.process(buffer, ctx)
|
||||
|
||||
history = ctx.get("frame_history")
|
||||
assert len(history) == 3 # Only last 3 kept
|
||||
# Should be in reverse chronological order (most recent first)
|
||||
assert history[0] == ["frame 4"]
|
||||
assert history[1] == ["frame 3"]
|
||||
assert history[2] == ["frame 2"]
|
||||
|
||||
def test_process_keeps_intensity_sync(self):
|
||||
"""process keeps intensity history in sync with frame history."""
|
||||
config = FrameBufferConfig(history_depth=3)
|
||||
stage = FrameBufferStage(config)
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffers = [
|
||||
["a"],
|
||||
["bb"],
|
||||
["ccc"],
|
||||
]
|
||||
for buf in buffers:
|
||||
stage.process(buf, ctx)
|
||||
|
||||
frame_hist = ctx.get("frame_history")
|
||||
intensity_hist = ctx.get("intensity_history")
|
||||
assert len(frame_hist) == len(intensity_hist) == 3
|
||||
|
||||
# Each frame's intensity should match
|
||||
for i, frame in enumerate(frame_hist):
|
||||
computed_intensity = stage._compute_buffer_intensity(frame, len(frame))
|
||||
assert intensity_hist[i] == pytest.approx(computed_intensity)
|
||||
|
||||
def test_get_frame(self):
|
||||
"""get_frame retrieves frames from history by index."""
|
||||
config = FrameBufferConfig(history_depth=3)
|
||||
stage = FrameBufferStage(config)
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffers = [["f1"], ["f2"], ["f3"]]
|
||||
for buf in buffers:
|
||||
stage.process(buf, ctx)
|
||||
|
||||
assert stage.get_frame(0, ctx) == ["f3"] # Most recent
|
||||
assert stage.get_frame(1, ctx) == ["f2"]
|
||||
assert stage.get_frame(2, ctx) == ["f1"]
|
||||
assert stage.get_frame(3, ctx) is None # Out of range
|
||||
|
||||
def test_get_intensity(self):
|
||||
"""get_intensity retrieves intensity maps by index."""
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffers = [["line"], ["longer line"]]
|
||||
for buf in buffers:
|
||||
stage.process(buf, ctx)
|
||||
|
||||
intensity0 = stage.get_intensity(0, ctx)
|
||||
intensity1 = stage.get_intensity(1, ctx)
|
||||
assert intensity0 is not None
|
||||
assert intensity1 is not None
|
||||
# Longer line should have higher intensity (more non-space chars)
|
||||
assert sum(intensity1) > sum(intensity0)
|
||||
|
||||
def test_compute_buffer_intensity_simple(self):
|
||||
"""_compute_buffer_intensity computes simple density."""
|
||||
stage = FrameBufferStage()
|
||||
|
||||
buf = ["abc", " ", "de"]
|
||||
intensities = stage._compute_buffer_intensity(buf, max_rows=3)
|
||||
|
||||
assert len(intensities) == 3
|
||||
# "abc" -> 3/3 = 1.0
|
||||
assert pytest.approx(intensities[0]) == 1.0
|
||||
# " " -> 0/2 = 0.0
|
||||
assert pytest.approx(intensities[1]) == 0.0
|
||||
# "de" -> 2/2 = 1.0
|
||||
assert pytest.approx(intensities[2]) == 1.0
|
||||
|
||||
def test_compute_buffer_intensity_with_ansi(self):
|
||||
"""_compute_buffer_intensity strips ANSI codes."""
|
||||
stage = FrameBufferStage()
|
||||
|
||||
# Line with ANSI color codes
|
||||
buf = ["\033[31mred\033[0m", "normal"]
|
||||
intensities = stage._compute_buffer_intensity(buf, max_rows=2)
|
||||
|
||||
assert len(intensities) == 2
|
||||
# Should treat "red" as 3 non-space chars
|
||||
assert pytest.approx(intensities[0]) == 1.0 # "red" = 3/3
|
||||
assert pytest.approx(intensities[1]) == 1.0 # "normal" = 6/6
|
||||
|
||||
def test_compute_buffer_intensity_padding(self):
|
||||
"""_compute_buffer_intensity pads to max_rows."""
|
||||
stage = FrameBufferStage()
|
||||
|
||||
buf = ["short"]
|
||||
intensities = stage._compute_buffer_intensity(buf, max_rows=5)
|
||||
|
||||
assert len(intensities) == 5
|
||||
assert intensities[0] > 0
|
||||
assert all(i == 0.0 for i in intensities[1:])
|
||||
|
||||
def test_thread_safety(self):
|
||||
"""process is thread-safe."""
|
||||
from threading import Thread
|
||||
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
results = []
|
||||
|
||||
def worker(idx):
|
||||
buffer = [f"thread {idx}"]
|
||||
stage.process(buffer, ctx)
|
||||
results.append(len(ctx.get("frame_history", [])))
|
||||
|
||||
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# All threads should see consistent state
|
||||
assert len(ctx.get("frame_history")) <= 2 # Depth limit
|
||||
# All worker threads should have completed without errors
|
||||
assert len(results) == 10
|
||||
|
||||
def test_cleanup(self):
|
||||
"""cleanup does nothing but can be called."""
|
||||
stage = FrameBufferStage()
|
||||
# Should not raise
|
||||
stage.cleanup()
|
||||
@@ -45,7 +45,8 @@ class TestStageRegistry:
|
||||
assert "pygame" in displays
|
||||
assert "websocket" in displays
|
||||
assert "null" in displays
|
||||
assert "sixel" in displays
|
||||
# sixel and kitty removed; should not be present
|
||||
assert "sixel" not in displays
|
||||
|
||||
def test_create_source_stage(self):
|
||||
"""StageRegistry.create creates source stages."""
|
||||
@@ -546,7 +547,7 @@ class TestPipelinePresets:
|
||||
FIREHOSE_PRESET,
|
||||
PIPELINE_VIZ_PRESET,
|
||||
POETRY_PRESET,
|
||||
SIXEL_PRESET,
|
||||
UI_PRESET,
|
||||
WEBSOCKET_PRESET,
|
||||
)
|
||||
|
||||
@@ -554,8 +555,8 @@ class TestPipelinePresets:
|
||||
assert POETRY_PRESET.name == "poetry"
|
||||
assert FIREHOSE_PRESET.name == "firehose"
|
||||
assert PIPELINE_VIZ_PRESET.name == "pipeline"
|
||||
assert SIXEL_PRESET.name == "sixel"
|
||||
assert WEBSOCKET_PRESET.name == "websocket"
|
||||
assert UI_PRESET.name == "ui"
|
||||
|
||||
def test_preset_to_params(self):
|
||||
"""Presets convert to PipelineParams correctly."""
|
||||
|
||||
@@ -1,128 +0,0 @@
|
||||
"""
|
||||
Tests for engine.display.backends.sixel module.
|
||||
"""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
||||
class TestSixelDisplay:
|
||||
"""Tests for SixelDisplay class."""
|
||||
|
||||
def test_init_stores_dimensions(self):
|
||||
"""init stores dimensions."""
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
|
||||
display = SixelDisplay()
|
||||
display.init(80, 24)
|
||||
assert display.width == 80
|
||||
assert display.height == 24
|
||||
|
||||
def test_init_custom_cell_size(self):
|
||||
"""init accepts custom cell size."""
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
|
||||
display = SixelDisplay(cell_width=12, cell_height=18)
|
||||
assert display.cell_width == 12
|
||||
assert display.cell_height == 18
|
||||
|
||||
def test_show_handles_empty_buffer(self):
|
||||
"""show handles empty buffer gracefully."""
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
|
||||
display = SixelDisplay()
|
||||
display.init(80, 24)
|
||||
|
||||
with patch("engine.display.backends.sixel._encode_sixel") as mock_encode:
|
||||
mock_encode.return_value = ""
|
||||
display.show([])
|
||||
|
||||
def test_show_handles_pil_import_error(self):
|
||||
"""show gracefully handles missing PIL."""
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
|
||||
display = SixelDisplay()
|
||||
display.init(80, 24)
|
||||
|
||||
with patch.dict("sys.modules", {"PIL": None}):
|
||||
display.show(["test line"])
|
||||
|
||||
def test_clear_sends_escape_sequence(self):
|
||||
"""clear sends clear screen escape sequence."""
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
|
||||
display = SixelDisplay()
|
||||
|
||||
with patch("sys.stdout") as mock_stdout:
|
||||
display.clear()
|
||||
mock_stdout.buffer.write.assert_called()
|
||||
|
||||
def test_cleanup_does_nothing(self):
|
||||
"""cleanup does nothing."""
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
|
||||
display = SixelDisplay()
|
||||
display.cleanup()
|
||||
|
||||
|
||||
class TestSixelAnsiParsing:
|
||||
"""Tests for ANSI parsing in SixelDisplay."""
|
||||
|
||||
def test_parse_empty_string(self):
|
||||
"""handles empty string."""
|
||||
from engine.display.renderer import parse_ansi
|
||||
|
||||
result = parse_ansi("")
|
||||
assert len(result) > 0
|
||||
|
||||
def test_parse_plain_text(self):
|
||||
"""parses plain text without ANSI codes."""
|
||||
from engine.display.renderer import parse_ansi
|
||||
|
||||
result = parse_ansi("hello world")
|
||||
assert len(result) == 1
|
||||
text, fg, bg, bold = result[0]
|
||||
assert text == "hello world"
|
||||
|
||||
def test_parse_with_color_codes(self):
|
||||
"""parses ANSI color codes."""
|
||||
from engine.display.renderer import parse_ansi
|
||||
|
||||
result = parse_ansi("\033[31mred\033[0m")
|
||||
assert len(result) == 1
|
||||
assert result[0][0] == "red"
|
||||
assert result[0][1] == (205, 49, 49)
|
||||
|
||||
def test_parse_with_bold(self):
|
||||
"""parses bold codes."""
|
||||
from engine.display.renderer import parse_ansi
|
||||
|
||||
result = parse_ansi("\033[1mbold\033[0m")
|
||||
assert len(result) == 1
|
||||
assert result[0][0] == "bold"
|
||||
assert result[0][3] is True
|
||||
|
||||
def test_parse_256_color(self):
|
||||
"""parses 256 color codes."""
|
||||
from engine.display.renderer import parse_ansi
|
||||
|
||||
result = parse_ansi("\033[38;5;196mred\033[0m")
|
||||
assert len(result) == 1
|
||||
assert result[0][0] == "red"
|
||||
|
||||
|
||||
class TestSixelEncoding:
|
||||
"""Tests for Sixel encoding."""
|
||||
|
||||
def test_encode_empty_image(self):
|
||||
"""handles empty image."""
|
||||
from engine.display.backends.sixel import _encode_sixel
|
||||
|
||||
with patch("PIL.Image.Image") as mock_image:
|
||||
mock_img_instance = MagicMock()
|
||||
mock_img_instance.convert.return_value = mock_img_instance
|
||||
mock_img_instance.size = (0, 0)
|
||||
mock_img_instance.load.return_value = {}
|
||||
mock_image.return_value = mock_img_instance
|
||||
|
||||
result = _encode_sixel(mock_img_instance)
|
||||
assert result == ""
|
||||
184
tests/test_ui_panel.py
Normal file
184
tests/test_ui_panel.py
Normal file
@@ -0,0 +1,184 @@
|
||||
"""
|
||||
Tests for UIPanel.
|
||||
"""
|
||||
|
||||
from engine.pipeline.ui import StageControl, UIConfig, UIPanel
|
||||
|
||||
|
||||
class MockStage:
|
||||
"""Mock stage for testing."""
|
||||
|
||||
def __init__(self, name, category="effect"):
|
||||
self.name = name
|
||||
self.category = category
|
||||
self._enabled = True
|
||||
|
||||
def is_enabled(self):
|
||||
return self._enabled
|
||||
|
||||
|
||||
class TestUIPanel:
|
||||
"""Tests for UIPanel."""
|
||||
|
||||
def test_init(self):
|
||||
"""UIPanel initializes with default config."""
|
||||
panel = UIPanel()
|
||||
assert panel.config.panel_width == 24
|
||||
assert panel.config.stage_list_height == 12
|
||||
assert panel.scroll_offset == 0
|
||||
assert panel.selected_stage is None
|
||||
|
||||
def test_register_stage(self):
|
||||
"""register_stage adds a stage control."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("noise")
|
||||
panel.register_stage(stage, enabled=True)
|
||||
assert "noise" in panel.stages
|
||||
ctrl = panel.stages["noise"]
|
||||
assert ctrl.name == "noise"
|
||||
assert ctrl.enabled is True
|
||||
assert ctrl.selected is False
|
||||
|
||||
def test_select_stage(self):
|
||||
"""select_stage sets selection."""
|
||||
panel = UIPanel()
|
||||
stage1 = MockStage("noise")
|
||||
stage2 = MockStage("fade")
|
||||
panel.register_stage(stage1)
|
||||
panel.register_stage(stage2)
|
||||
panel.select_stage("fade")
|
||||
assert panel.selected_stage == "fade"
|
||||
assert panel.stages["fade"].selected is True
|
||||
assert panel.stages["noise"].selected is False
|
||||
|
||||
def test_toggle_stage(self):
|
||||
"""toggle_stage flips enabled state."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("glitch")
|
||||
panel.register_stage(stage, enabled=True)
|
||||
result = panel.toggle_stage("glitch")
|
||||
assert result is False
|
||||
assert panel.stages["glitch"].enabled is False
|
||||
result = panel.toggle_stage("glitch")
|
||||
assert result is True
|
||||
|
||||
def test_get_enabled_stages(self):
|
||||
"""get_enabled_stages returns only enabled stage names."""
|
||||
panel = UIPanel()
|
||||
panel.register_stage(MockStage("noise"), enabled=True)
|
||||
panel.register_stage(MockStage("fade"), enabled=False)
|
||||
panel.register_stage(MockStage("glitch"), enabled=True)
|
||||
enabled = panel.get_enabled_stages()
|
||||
assert set(enabled) == {"noise", "glitch"}
|
||||
|
||||
def test_scroll_stages(self):
|
||||
"""scroll_stages moves the view."""
|
||||
panel = UIPanel(UIConfig(stage_list_height=3))
|
||||
for i in range(10):
|
||||
panel.register_stage(MockStage(f"stage{i}"))
|
||||
assert panel.scroll_offset == 0
|
||||
panel.scroll_stages(1)
|
||||
assert panel.scroll_offset == 1
|
||||
panel.scroll_stages(-1)
|
||||
assert panel.scroll_offset == 0
|
||||
# Clamp at max
|
||||
panel.scroll_stages(100)
|
||||
assert panel.scroll_offset == 7 # 10 - 3 = 7
|
||||
|
||||
def test_render_produces_lines(self):
|
||||
"""render produces list of strings of correct width."""
|
||||
panel = UIPanel(UIConfig(panel_width=20))
|
||||
panel.register_stage(MockStage("noise"), enabled=True)
|
||||
panel.register_stage(MockStage("fade"), enabled=False)
|
||||
panel.select_stage("noise")
|
||||
lines = panel.render(80, 24)
|
||||
# All lines should be exactly panel_width chars (20)
|
||||
for line in lines:
|
||||
assert len(line) == 20
|
||||
# Should have header, stage rows, separator, params area, footer
|
||||
assert len(lines) >= 5
|
||||
|
||||
def test_process_key_event_space_toggles_stage(self):
|
||||
"""process_key_event with space toggles UI panel visibility."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("glitch")
|
||||
panel.register_stage(stage, enabled=True)
|
||||
panel.select_stage("glitch")
|
||||
# Space should now toggle UI panel visibility, not stage
|
||||
assert panel._show_panel is True
|
||||
handled = panel.process_key_event(" ")
|
||||
assert handled is True
|
||||
assert panel._show_panel is False
|
||||
# Pressing space again should show panel
|
||||
handled = panel.process_key_event(" ")
|
||||
assert panel._show_panel is True
|
||||
|
||||
def test_process_key_event_space_does_not_toggle_in_picker(self):
|
||||
"""Space should not toggle UI panel when preset picker is active."""
|
||||
panel = UIPanel()
|
||||
panel._show_panel = True
|
||||
panel._show_preset_picker = True
|
||||
handled = panel.process_key_event(" ")
|
||||
assert handled is False # Not handled when picker active
|
||||
assert panel._show_panel is True # Unchanged
|
||||
|
||||
def test_process_key_event_s_selects_next(self):
|
||||
"""process_key_event with s cycles selection."""
|
||||
panel = UIPanel()
|
||||
panel.register_stage(MockStage("noise"))
|
||||
panel.register_stage(MockStage("fade"))
|
||||
panel.register_stage(MockStage("glitch"))
|
||||
panel.select_stage("noise")
|
||||
handled = panel.process_key_event("s")
|
||||
assert handled is True
|
||||
assert panel.selected_stage == "fade"
|
||||
|
||||
def test_process_key_event_hjkl_navigation(self):
|
||||
"""process_key_event with HJKL keys."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("noise")
|
||||
panel.register_stage(stage)
|
||||
panel.select_stage("noise")
|
||||
|
||||
# J or Down should scroll or adjust param
|
||||
assert panel.scroll_stages(1) is None # Just test it doesn't error
|
||||
# H or Left should adjust param (when param selected)
|
||||
panel.selected_stage = "noise"
|
||||
panel._focused_param = "intensity"
|
||||
panel.stages["noise"].params["intensity"] = 0.5
|
||||
|
||||
# Left/H should decrease
|
||||
handled = panel.process_key_event("h")
|
||||
assert handled is True
|
||||
# L or Right should increase
|
||||
handled = panel.process_key_event("l")
|
||||
assert handled is True
|
||||
|
||||
# K should scroll up
|
||||
panel.selected_stage = None
|
||||
handled = panel.process_key_event("k")
|
||||
assert handled is True
|
||||
|
||||
def test_set_event_callback(self):
|
||||
"""set_event_callback registers callback."""
|
||||
panel = UIPanel()
|
||||
called = []
|
||||
|
||||
def callback(stage_name, enabled):
|
||||
called.append((stage_name, enabled))
|
||||
|
||||
panel.set_event_callback("stage_toggled", callback)
|
||||
panel.toggle_stage("test") # No stage, won't trigger
|
||||
# Simulate toggle through event
|
||||
panel._emit_event("stage_toggled", stage_name="noise", enabled=False)
|
||||
assert called == [("noise", False)]
|
||||
|
||||
def test_register_stage_returns_control(self):
|
||||
"""register_stage should return the StageControl instance."""
|
||||
panel = UIPanel()
|
||||
stage = MockStage("noise_effect")
|
||||
control = panel.register_stage(stage, enabled=True)
|
||||
assert control is not None
|
||||
assert isinstance(control, StageControl)
|
||||
assert control.name == "noise_effect"
|
||||
assert control.enabled is True
|
||||
Reference in New Issue
Block a user