feat(pipeline): integrate BorderMode and add UI preset
- params.py: border field now accepts bool | BorderMode - presets.py: add UI_PRESET with BorderMode.UI, remove SIXEL_PRESET - __init__.py: export UI_PRESET, drop SIXEL_PRESET - registry.py: auto-register FrameBufferStage on discovery - New FrameBufferStage for frame history and intensity maps - Tests: update test_pipeline for UI preset, add test_framebuffer_stage.py This sets the foundation for interactive UI panel and modern pipeline composition.
This commit is contained in:
@@ -50,8 +50,7 @@ from engine.pipeline.presets import (
|
||||
FIREHOSE_PRESET,
|
||||
PIPELINE_VIZ_PRESET,
|
||||
POETRY_PRESET,
|
||||
PRESETS,
|
||||
SIXEL_PRESET,
|
||||
UI_PRESET,
|
||||
WEBSOCKET_PRESET,
|
||||
PipelinePreset,
|
||||
create_preset_from_params,
|
||||
@@ -92,8 +91,8 @@ __all__ = [
|
||||
"POETRY_PRESET",
|
||||
"PIPELINE_VIZ_PRESET",
|
||||
"WEBSOCKET_PRESET",
|
||||
"SIXEL_PRESET",
|
||||
"FIREHOSE_PRESET",
|
||||
"UI_PRESET",
|
||||
"get_preset",
|
||||
"list_presets",
|
||||
"create_preset_from_params",
|
||||
|
||||
@@ -8,6 +8,11 @@ modify these params, which the pipeline then applies to its stages.
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
try:
|
||||
from engine.display import BorderMode
|
||||
except ImportError:
|
||||
BorderMode = object # Fallback for type checking
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineParams:
|
||||
@@ -23,7 +28,7 @@ class PipelineParams:
|
||||
|
||||
# Display config
|
||||
display: str = "terminal"
|
||||
border: bool = False
|
||||
border: bool | BorderMode = False
|
||||
|
||||
# Camera config
|
||||
camera_mode: str = "vertical"
|
||||
|
||||
@@ -13,6 +13,7 @@ Loading order:
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
from engine.display import BorderMode
|
||||
from engine.pipeline.params import PipelineParams
|
||||
|
||||
|
||||
@@ -26,7 +27,6 @@ def _load_toml_presets() -> dict[str, Any]:
|
||||
return {}
|
||||
|
||||
|
||||
# Pre-load TOML presets
|
||||
_YAML_PRESETS = _load_toml_presets()
|
||||
|
||||
|
||||
@@ -47,14 +47,24 @@ class PipelinePreset:
|
||||
display: str = "terminal"
|
||||
camera: str = "scroll"
|
||||
effects: list[str] = field(default_factory=list)
|
||||
border: bool = False
|
||||
border: bool | BorderMode = (
|
||||
False # Border mode: False=off, True=simple, BorderMode.UI for panel
|
||||
)
|
||||
|
||||
def to_params(self) -> PipelineParams:
|
||||
"""Convert to PipelineParams."""
|
||||
from engine.display import BorderMode
|
||||
|
||||
params = PipelineParams()
|
||||
params.source = self.source
|
||||
params.display = self.display
|
||||
params.border = self.border
|
||||
params.border = (
|
||||
self.border
|
||||
if isinstance(self.border, bool)
|
||||
else BorderMode.UI
|
||||
if self.border == BorderMode.UI
|
||||
else False
|
||||
)
|
||||
params.camera_mode = self.camera
|
||||
params.effect_order = self.effects.copy()
|
||||
return params
|
||||
@@ -83,6 +93,16 @@ DEMO_PRESET = PipelinePreset(
|
||||
effects=["noise", "fade", "glitch", "firehose"],
|
||||
)
|
||||
|
||||
UI_PRESET = PipelinePreset(
|
||||
name="ui",
|
||||
description="Interactive UI mode with right-side control panel",
|
||||
source="fixture",
|
||||
display="pygame",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade", "glitch"],
|
||||
border=BorderMode.UI,
|
||||
)
|
||||
|
||||
POETRY_PRESET = PipelinePreset(
|
||||
name="poetry",
|
||||
description="Poetry feed with subtle effects",
|
||||
@@ -110,15 +130,6 @@ WEBSOCKET_PRESET = PipelinePreset(
|
||||
effects=["noise", "fade", "glitch"],
|
||||
)
|
||||
|
||||
SIXEL_PRESET = PipelinePreset(
|
||||
name="sixel",
|
||||
description="Sixel graphics display mode",
|
||||
source="headlines",
|
||||
display="sixel",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade", "glitch"],
|
||||
)
|
||||
|
||||
FIREHOSE_PRESET = PipelinePreset(
|
||||
name="firehose",
|
||||
description="High-speed firehose mode",
|
||||
@@ -128,6 +139,16 @@ FIREHOSE_PRESET = PipelinePreset(
|
||||
effects=["noise", "fade", "glitch", "firehose"],
|
||||
)
|
||||
|
||||
FIXTURE_PRESET = PipelinePreset(
|
||||
name="fixture",
|
||||
description="Use cached headline fixtures",
|
||||
source="fixture",
|
||||
display="pygame",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade"],
|
||||
border=False,
|
||||
)
|
||||
|
||||
|
||||
# Build presets from YAML data
|
||||
def _build_presets() -> dict[str, PipelinePreset]:
|
||||
@@ -145,8 +166,9 @@ def _build_presets() -> dict[str, PipelinePreset]:
|
||||
"poetry": POETRY_PRESET,
|
||||
"pipeline": PIPELINE_VIZ_PRESET,
|
||||
"websocket": WEBSOCKET_PRESET,
|
||||
"sixel": SIXEL_PRESET,
|
||||
"firehose": FIREHOSE_PRESET,
|
||||
"ui": UI_PRESET,
|
||||
"fixture": FIXTURE_PRESET,
|
||||
}
|
||||
|
||||
for name, preset in builtins.items():
|
||||
|
||||
@@ -118,6 +118,14 @@ def discover_stages() -> None:
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Register buffer stages (framebuffer, etc.)
|
||||
try:
|
||||
from engine.pipeline.stages.framebuffer import FrameBufferStage
|
||||
|
||||
StageRegistry.register("effect", FrameBufferStage)
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
# Register display stages
|
||||
_register_display_stages()
|
||||
|
||||
|
||||
158
engine/pipeline/stages/framebuffer.py
Normal file
158
engine/pipeline/stages/framebuffer.py
Normal file
@@ -0,0 +1,158 @@
|
||||
"""
|
||||
Frame buffer stage - stores previous frames for temporal effects.
|
||||
|
||||
Provides:
|
||||
- frame_history: list of previous buffers (most recent first)
|
||||
- intensity_history: list of corresponding intensity maps
|
||||
- current_intensity: intensity map for current frame
|
||||
|
||||
Capability: "framebuffer.history"
|
||||
"""
|
||||
|
||||
import threading
|
||||
from dataclasses import dataclass
|
||||
from typing import Any
|
||||
|
||||
from engine.display import _strip_ansi
|
||||
from engine.pipeline.core import DataType, PipelineContext, Stage
|
||||
|
||||
|
||||
@dataclass
|
||||
class FrameBufferConfig:
|
||||
"""Configuration for FrameBufferStage."""
|
||||
|
||||
history_depth: int = 2 # Number of previous frames to keep
|
||||
|
||||
|
||||
class FrameBufferStage(Stage):
|
||||
"""Stores frame history and computes intensity maps."""
|
||||
|
||||
name = "framebuffer"
|
||||
category = "effect" # It's an effect that enriches context with frame history
|
||||
|
||||
def __init__(self, config: FrameBufferConfig | None = None, history_depth: int = 2):
|
||||
self.config = config or FrameBufferConfig(history_depth=history_depth)
|
||||
self._lock = threading.Lock()
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set[str]:
|
||||
return {"framebuffer.history"}
|
||||
|
||||
@property
|
||||
def dependencies(self) -> set[str]:
|
||||
# Depends on rendered output (since we want to capture final buffer)
|
||||
return {"render.output"}
|
||||
|
||||
@property
|
||||
def inlet_types(self) -> set:
|
||||
return {DataType.TEXT_BUFFER}
|
||||
|
||||
@property
|
||||
def outlet_types(self) -> set:
|
||||
return {DataType.TEXT_BUFFER} # Pass through unchanged
|
||||
|
||||
def init(self, ctx: PipelineContext) -> bool:
|
||||
"""Initialize framebuffer state in context."""
|
||||
ctx.set("frame_history", [])
|
||||
ctx.set("intensity_history", [])
|
||||
return True
|
||||
|
||||
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
||||
"""Store frame in history and compute intensity.
|
||||
|
||||
Args:
|
||||
data: Current text buffer (list[str])
|
||||
ctx: Pipeline context
|
||||
|
||||
Returns:
|
||||
Same buffer (pass-through)
|
||||
"""
|
||||
if not isinstance(data, list):
|
||||
return data
|
||||
|
||||
# Compute intensity map for current buffer (per-row, length = buffer rows)
|
||||
intensity_map = self._compute_buffer_intensity(data, len(data))
|
||||
|
||||
# Store in context
|
||||
ctx.set("current_intensity", intensity_map)
|
||||
|
||||
with self._lock:
|
||||
# Get existing histories
|
||||
history = ctx.get("frame_history", [])
|
||||
intensity_hist = ctx.get("intensity_history", [])
|
||||
|
||||
# Prepend current frame to history
|
||||
history.insert(0, data.copy())
|
||||
intensity_hist.insert(0, intensity_map)
|
||||
|
||||
# Trim to configured depth
|
||||
max_depth = self.config.history_depth
|
||||
ctx.set("frame_history", history[:max_depth])
|
||||
ctx.set("intensity_history", intensity_hist[:max_depth])
|
||||
|
||||
return data
|
||||
|
||||
def _compute_buffer_intensity(
|
||||
self, buf: list[str], max_rows: int = 24
|
||||
) -> list[float]:
|
||||
"""Compute average intensity per row in buffer.
|
||||
|
||||
Uses ANSI color if available; falls back to character density.
|
||||
|
||||
Args:
|
||||
buf: Text buffer (list of strings)
|
||||
max_rows: Maximum number of rows to process
|
||||
|
||||
Returns:
|
||||
List of intensity values (0.0-1.0) per row
|
||||
"""
|
||||
intensities = []
|
||||
# Limit to viewport height
|
||||
lines = buf[:max_rows]
|
||||
|
||||
for line in lines:
|
||||
# Strip ANSI codes for length calc
|
||||
|
||||
plain = _strip_ansi(line)
|
||||
if not plain:
|
||||
intensities.append(0.0)
|
||||
continue
|
||||
|
||||
# Simple heuristic: ratio of non-space characters
|
||||
# More sophisticated version could parse ANSI RGB brightness
|
||||
filled = sum(1 for c in plain if c not in (" ", "\t"))
|
||||
total = len(plain)
|
||||
intensity = filled / total if total > 0 else 0.0
|
||||
intensities.append(max(0.0, min(1.0, intensity)))
|
||||
|
||||
# Pad to max_rows if needed
|
||||
while len(intensities) < max_rows:
|
||||
intensities.append(0.0)
|
||||
|
||||
return intensities
|
||||
|
||||
def get_frame(
|
||||
self, index: int = 0, ctx: PipelineContext | None = None
|
||||
) -> list[str] | None:
|
||||
"""Get frame from history by index (0 = current, 1 = previous, etc)."""
|
||||
if ctx is None:
|
||||
return None
|
||||
history = ctx.get("frame_history", [])
|
||||
if 0 <= index < len(history):
|
||||
return history[index]
|
||||
return None
|
||||
|
||||
def get_intensity(
|
||||
self, index: int = 0, ctx: PipelineContext | None = None
|
||||
) -> list[float] | None:
|
||||
"""Get intensity map from history by index."""
|
||||
if ctx is None:
|
||||
return None
|
||||
intensity_hist = ctx.get("intensity_history", [])
|
||||
if 0 <= index < len(intensity_hist):
|
||||
return intensity_hist[index]
|
||||
return None
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Cleanup resources."""
|
||||
pass
|
||||
236
tests/test_framebuffer_stage.py
Normal file
236
tests/test_framebuffer_stage.py
Normal file
@@ -0,0 +1,236 @@
|
||||
"""
|
||||
Tests for FrameBufferStage.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
|
||||
from engine.pipeline.core import DataType, PipelineContext
|
||||
from engine.pipeline.params import PipelineParams
|
||||
from engine.pipeline.stages.framebuffer import FrameBufferConfig, FrameBufferStage
|
||||
|
||||
|
||||
def make_ctx(width: int = 80, height: int = 24) -> PipelineContext:
|
||||
"""Create a PipelineContext for testing."""
|
||||
ctx = PipelineContext()
|
||||
params = PipelineParams()
|
||||
params.viewport_width = width
|
||||
params.viewport_height = height
|
||||
ctx.params = params
|
||||
return ctx
|
||||
|
||||
|
||||
class TestFrameBufferStage:
|
||||
"""Tests for FrameBufferStage."""
|
||||
|
||||
def test_init(self):
|
||||
"""FrameBufferStage initializes with default config."""
|
||||
stage = FrameBufferStage()
|
||||
assert stage.name == "framebuffer"
|
||||
assert stage.category == "effect"
|
||||
assert stage.config.history_depth == 2
|
||||
|
||||
def test_capabilities(self):
|
||||
"""Stage provides framebuffer.history capability."""
|
||||
stage = FrameBufferStage()
|
||||
assert "framebuffer.history" in stage.capabilities
|
||||
|
||||
def test_dependencies(self):
|
||||
"""Stage depends on render.output."""
|
||||
stage = FrameBufferStage()
|
||||
assert "render.output" in stage.dependencies
|
||||
|
||||
def test_inlet_outlet_types(self):
|
||||
"""Stage accepts and produces TEXT_BUFFER."""
|
||||
stage = FrameBufferStage()
|
||||
assert DataType.TEXT_BUFFER in stage.inlet_types
|
||||
assert DataType.TEXT_BUFFER in stage.outlet_types
|
||||
|
||||
def test_init_context(self):
|
||||
"""init initializes context state."""
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
|
||||
result = stage.init(ctx)
|
||||
|
||||
assert result is True
|
||||
assert ctx.get("frame_history") == []
|
||||
assert ctx.get("intensity_history") == []
|
||||
|
||||
def test_process_stores_buffer_in_history(self):
|
||||
"""process stores buffer in history."""
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffer = ["line1", "line2", "line3"]
|
||||
result = stage.process(buffer, ctx)
|
||||
|
||||
assert result == buffer # Pass-through
|
||||
history = ctx.get("frame_history")
|
||||
assert len(history) == 1
|
||||
assert history[0] == buffer
|
||||
|
||||
def test_process_computes_intensity(self):
|
||||
"""process computes intensity map."""
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffer = ["hello world", "test line", ""]
|
||||
stage.process(buffer, ctx)
|
||||
|
||||
intensity = ctx.get("current_intensity")
|
||||
assert intensity is not None
|
||||
assert len(intensity) == 3 # Three rows
|
||||
# Non-empty lines should have intensity > 0
|
||||
assert intensity[0] > 0
|
||||
assert intensity[1] > 0
|
||||
# Empty line should have intensity 0
|
||||
assert intensity[2] == 0.0
|
||||
|
||||
def test_process_keeps_multiple_frames(self):
|
||||
"""process keeps configured depth of frames."""
|
||||
config = FrameBufferConfig(history_depth=3)
|
||||
stage = FrameBufferStage(config)
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
# Process several frames
|
||||
for i in range(5):
|
||||
buffer = [f"frame {i}"]
|
||||
stage.process(buffer, ctx)
|
||||
|
||||
history = ctx.get("frame_history")
|
||||
assert len(history) == 3 # Only last 3 kept
|
||||
# Should be in reverse chronological order (most recent first)
|
||||
assert history[0] == ["frame 4"]
|
||||
assert history[1] == ["frame 3"]
|
||||
assert history[2] == ["frame 2"]
|
||||
|
||||
def test_process_keeps_intensity_sync(self):
|
||||
"""process keeps intensity history in sync with frame history."""
|
||||
config = FrameBufferConfig(history_depth=3)
|
||||
stage = FrameBufferStage(config)
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffers = [
|
||||
["a"],
|
||||
["bb"],
|
||||
["ccc"],
|
||||
]
|
||||
for buf in buffers:
|
||||
stage.process(buf, ctx)
|
||||
|
||||
frame_hist = ctx.get("frame_history")
|
||||
intensity_hist = ctx.get("intensity_history")
|
||||
assert len(frame_hist) == len(intensity_hist) == 3
|
||||
|
||||
# Each frame's intensity should match
|
||||
for i, frame in enumerate(frame_hist):
|
||||
computed_intensity = stage._compute_buffer_intensity(frame, len(frame))
|
||||
assert intensity_hist[i] == pytest.approx(computed_intensity)
|
||||
|
||||
def test_get_frame(self):
|
||||
"""get_frame retrieves frames from history by index."""
|
||||
config = FrameBufferConfig(history_depth=3)
|
||||
stage = FrameBufferStage(config)
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffers = [["f1"], ["f2"], ["f3"]]
|
||||
for buf in buffers:
|
||||
stage.process(buf, ctx)
|
||||
|
||||
assert stage.get_frame(0, ctx) == ["f3"] # Most recent
|
||||
assert stage.get_frame(1, ctx) == ["f2"]
|
||||
assert stage.get_frame(2, ctx) == ["f1"]
|
||||
assert stage.get_frame(3, ctx) is None # Out of range
|
||||
|
||||
def test_get_intensity(self):
|
||||
"""get_intensity retrieves intensity maps by index."""
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
buffers = [["line"], ["longer line"]]
|
||||
for buf in buffers:
|
||||
stage.process(buf, ctx)
|
||||
|
||||
intensity0 = stage.get_intensity(0, ctx)
|
||||
intensity1 = stage.get_intensity(1, ctx)
|
||||
assert intensity0 is not None
|
||||
assert intensity1 is not None
|
||||
# Longer line should have higher intensity (more non-space chars)
|
||||
assert sum(intensity1) > sum(intensity0)
|
||||
|
||||
def test_compute_buffer_intensity_simple(self):
|
||||
"""_compute_buffer_intensity computes simple density."""
|
||||
stage = FrameBufferStage()
|
||||
|
||||
buf = ["abc", " ", "de"]
|
||||
intensities = stage._compute_buffer_intensity(buf, max_rows=3)
|
||||
|
||||
assert len(intensities) == 3
|
||||
# "abc" -> 3/3 = 1.0
|
||||
assert pytest.approx(intensities[0]) == 1.0
|
||||
# " " -> 0/2 = 0.0
|
||||
assert pytest.approx(intensities[1]) == 0.0
|
||||
# "de" -> 2/2 = 1.0
|
||||
assert pytest.approx(intensities[2]) == 1.0
|
||||
|
||||
def test_compute_buffer_intensity_with_ansi(self):
|
||||
"""_compute_buffer_intensity strips ANSI codes."""
|
||||
stage = FrameBufferStage()
|
||||
|
||||
# Line with ANSI color codes
|
||||
buf = ["\033[31mred\033[0m", "normal"]
|
||||
intensities = stage._compute_buffer_intensity(buf, max_rows=2)
|
||||
|
||||
assert len(intensities) == 2
|
||||
# Should treat "red" as 3 non-space chars
|
||||
assert pytest.approx(intensities[0]) == 1.0 # "red" = 3/3
|
||||
assert pytest.approx(intensities[1]) == 1.0 # "normal" = 6/6
|
||||
|
||||
def test_compute_buffer_intensity_padding(self):
|
||||
"""_compute_buffer_intensity pads to max_rows."""
|
||||
stage = FrameBufferStage()
|
||||
|
||||
buf = ["short"]
|
||||
intensities = stage._compute_buffer_intensity(buf, max_rows=5)
|
||||
|
||||
assert len(intensities) == 5
|
||||
assert intensities[0] > 0
|
||||
assert all(i == 0.0 for i in intensities[1:])
|
||||
|
||||
def test_thread_safety(self):
|
||||
"""process is thread-safe."""
|
||||
from threading import Thread
|
||||
|
||||
stage = FrameBufferStage()
|
||||
ctx = make_ctx()
|
||||
stage.init(ctx)
|
||||
|
||||
results = []
|
||||
|
||||
def worker(idx):
|
||||
buffer = [f"thread {idx}"]
|
||||
stage.process(buffer, ctx)
|
||||
results.append(len(ctx.get("frame_history", [])))
|
||||
|
||||
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
|
||||
for t in threads:
|
||||
t.start()
|
||||
for t in threads:
|
||||
t.join()
|
||||
|
||||
# All threads should see consistent state
|
||||
assert len(ctx.get("frame_history")) <= 2 # Depth limit
|
||||
# All worker threads should have completed without errors
|
||||
assert len(results) == 10
|
||||
|
||||
def test_cleanup(self):
|
||||
"""cleanup does nothing but can be called."""
|
||||
stage = FrameBufferStage()
|
||||
# Should not raise
|
||||
stage.cleanup()
|
||||
@@ -45,7 +45,8 @@ class TestStageRegistry:
|
||||
assert "pygame" in displays
|
||||
assert "websocket" in displays
|
||||
assert "null" in displays
|
||||
assert "sixel" in displays
|
||||
# sixel and kitty removed; should not be present
|
||||
assert "sixel" not in displays
|
||||
|
||||
def test_create_source_stage(self):
|
||||
"""StageRegistry.create creates source stages."""
|
||||
@@ -546,7 +547,7 @@ class TestPipelinePresets:
|
||||
FIREHOSE_PRESET,
|
||||
PIPELINE_VIZ_PRESET,
|
||||
POETRY_PRESET,
|
||||
SIXEL_PRESET,
|
||||
UI_PRESET,
|
||||
WEBSOCKET_PRESET,
|
||||
)
|
||||
|
||||
@@ -554,8 +555,8 @@ class TestPipelinePresets:
|
||||
assert POETRY_PRESET.name == "poetry"
|
||||
assert FIREHOSE_PRESET.name == "firehose"
|
||||
assert PIPELINE_VIZ_PRESET.name == "pipeline"
|
||||
assert SIXEL_PRESET.name == "sixel"
|
||||
assert WEBSOCKET_PRESET.name == "websocket"
|
||||
assert UI_PRESET.name == "ui"
|
||||
|
||||
def test_preset_to_params(self):
|
||||
"""Presets convert to PipelineParams correctly."""
|
||||
|
||||
Reference in New Issue
Block a user