diff --git a/engine/app/main.py b/engine/app/main.py index a651ed2..43f9e37 100644 --- a/engine/app/main.py +++ b/engine/app/main.py @@ -101,6 +101,8 @@ def run_pipeline_mode_direct(): border_mode = BorderMode.OFF source_items = None allow_unsafe = False + viewport_width = None + viewport_height = None i = 1 argv = sys.argv @@ -115,6 +117,14 @@ def run_pipeline_mode_direct(): elif arg == "--pipeline-camera" and i + 1 < len(argv): camera_type = argv[i + 1] i += 2 + elif arg == "--viewport" and i + 1 < len(argv): + vp = argv[i + 1] + try: + viewport_width, viewport_height = map(int, vp.split("x")) + except ValueError: + print("Error: Invalid viewport format. Use WxH (e.g., 40x15)") + sys.exit(1) + i += 2 elif arg == "--pipeline-display" and i + 1 < len(argv): display_name = argv[i + 1] i += 2 @@ -221,8 +231,8 @@ def run_pipeline_mode_direct(): # Build pipeline using validated config and params params = result.params - params.viewport_width = 80 - params.viewport_height = 24 + params.viewport_width = viewport_width if viewport_width is not None else 80 + params.viewport_height = viewport_height if viewport_height is not None else 24 ctx = PipelineContext() ctx.params = params @@ -356,6 +366,12 @@ def run_pipeline_mode_direct(): current_width = params.viewport_width current_height = params.viewport_height + # Only get dimensions from display if viewport wasn't explicitly set + if "--viewport" not in sys.argv and hasattr(display, "get_dimensions"): + current_width, current_height = display.get_dimensions() + params.viewport_width = current_width + params.viewport_height = current_height + print(" \033[38;5;82mStarting pipeline...\033[0m") print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") diff --git a/engine/app/pipeline_runner.py b/engine/app/pipeline_runner.py index d18b6b0..d883745 100644 --- a/engine/app/pipeline_runner.py +++ b/engine/app/pipeline_runner.py @@ -45,8 +45,19 @@ def run_pipeline_mode(preset_name: str = "demo"): print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m") params = preset.to_params() - params.viewport_width = 80 - params.viewport_height = 24 + # Use preset viewport if available, else default to 80x24 + params.viewport_width = getattr(preset, "viewport_width", 80) + params.viewport_height = getattr(preset, "viewport_height", 24) + + if "--viewport" in sys.argv: + idx = sys.argv.index("--viewport") + if idx + 1 < len(sys.argv): + vp = sys.argv[idx + 1] + try: + params.viewport_width, params.viewport_height = map(int, vp.split("x")) + except ValueError: + print("Error: Invalid viewport format. Use WxH (e.g., 40x15)") + sys.exit(1) pipeline = Pipeline( config=PipelineConfig( @@ -156,25 +167,12 @@ def run_pipeline_mode(preset_name: str = "demo"): list_source = ListDataSource(items, name=preset.source) pipeline.add_stage("source", DataSourceStage(list_source, name=preset.source)) - # Add FontStage for headlines/poetry (default for demo) - if preset.source in ["headlines", "poetry"]: - from engine.pipeline.adapters import FontStage, ViewportFilterStage - - # Add viewport filter to prevent rendering all items - pipeline.add_stage( - "viewport_filter", ViewportFilterStage(name="viewport-filter") - ) - pipeline.add_stage("font", FontStage(name="font")) - else: - # Fallback to simple conversion for other sources - pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) - - # Add camera stage if specified in preset (after font/render stage) + # Add camera state update stage if specified in preset (must run before viewport filter) + camera = None if preset.camera: from engine.camera import Camera - from engine.pipeline.adapters import CameraStage + from engine.pipeline.adapters import CameraClockStage, CameraStage - camera = None speed = getattr(preset, "camera_speed", 1.0) if preset.camera == "feed": camera = Camera.feed(speed=speed) @@ -190,9 +188,35 @@ def run_pipeline_mode(preset_name: str = "demo"): camera = Camera.floating(speed=speed) elif preset.camera == "bounce": camera = Camera.bounce(speed=speed) + elif preset.camera == "radial": + camera = Camera.radial(speed=speed) + elif preset.camera == "static" or preset.camera == "": + # Static camera: no movement, but provides camera_y=0 for viewport filter + camera = Camera.scroll(speed=0.0) # Speed 0 = no movement + camera.set_canvas_size(200, 200) if camera: - pipeline.add_stage("camera", CameraStage(camera, name=preset.camera)) + # Add camera update stage to ensure camera_y is available for viewport filter + pipeline.add_stage( + "camera_update", CameraClockStage(camera, name="camera-clock") + ) + + # Add FontStage for headlines/poetry (default for demo) + if preset.source in ["headlines", "poetry"]: + from engine.pipeline.adapters import FontStage, ViewportFilterStage + + # Add viewport filter to prevent rendering all items + pipeline.add_stage( + "viewport_filter", ViewportFilterStage(name="viewport-filter") + ) + pipeline.add_stage("font", FontStage(name="font")) + else: + # Fallback to simple conversion for other sources + pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + + # Add camera stage if specified in preset (after font/render stage) + if camera: + pipeline.add_stage("camera", CameraStage(camera, name=preset.camera)) for effect_name in preset.effects: effect = effect_registry.get(effect_name) @@ -451,7 +475,7 @@ def run_pipeline_mode(preset_name: str = "demo"): # Add camera if specified if new_preset.camera: from engine.camera import Camera - from engine.pipeline.adapters import CameraStage + from engine.pipeline.adapters import CameraClockStage, CameraStage speed = getattr(new_preset, "camera_speed", 1.0) camera = None @@ -468,8 +492,19 @@ def run_pipeline_mode(preset_name: str = "demo"): camera = Camera.floating(speed=speed) elif cam_type == "bounce": camera = Camera.bounce(speed=speed) + elif cam_type == "radial": + camera = Camera.radial(speed=speed) + elif cam_type == "static" or cam_type == "": + # Static camera: no movement, but provides camera_y=0 for viewport filter + camera = Camera.scroll(speed=0.0) + camera.set_canvas_size(200, 200) if camera: + # Add camera update stage to ensure camera_y is available for viewport filter + pipeline.add_stage( + "camera_update", + CameraClockStage(camera, name="camera-clock"), + ) pipeline.add_stage("camera", CameraStage(camera, name=cam_type)) # Add effects @@ -637,10 +672,11 @@ def run_pipeline_mode(preset_name: str = "demo"): ctx.set("pipeline_order", pipeline.execution_order) ctx.set("camera_y", 0) - current_width = 80 - current_height = 24 + current_width = params.viewport_width + current_height = params.viewport_height - if hasattr(display, "get_dimensions"): + # Only get dimensions from display if viewport wasn't explicitly set + if "--viewport" not in sys.argv and hasattr(display, "get_dimensions"): current_width, current_height = display.get_dimensions() params.viewport_width = current_width params.viewport_height = current_height @@ -687,7 +723,7 @@ def run_pipeline_mode(preset_name: str = "demo"): display.clear_quit_request() raise KeyboardInterrupt() - if hasattr(display, "get_dimensions"): + if "--viewport" not in sys.argv and hasattr(display, "get_dimensions"): new_w, new_h = display.get_dimensions() if new_w != current_width or new_h != current_height: current_width, current_height = new_w, new_h diff --git a/engine/data_sources/checkerboard.py b/engine/data_sources/checkerboard.py new file mode 100644 index 0000000..48326f2 --- /dev/null +++ b/engine/data_sources/checkerboard.py @@ -0,0 +1,60 @@ +"""Checkerboard data source for visual pattern generation.""" + +from engine.data_sources.sources import DataSource, SourceItem + + +class CheckerboardDataSource(DataSource): + """Data source that generates a checkerboard pattern. + + Creates a grid of alternating characters, useful for testing motion effects + and camera movement. The pattern is static; movement comes from camera panning. + """ + + def __init__( + self, + width: int = 200, + height: int = 200, + square_size: int = 10, + char_a: str = "#", + char_b: str = " ", + ): + """Initialize checkerboard data source. + + Args: + width: Total pattern width in characters + height: Total pattern height in lines + square_size: Size of each checker square in characters + char_a: Character for "filled" squares (default: '#') + char_b: Character for "empty" squares (default: ' ') + """ + self.width = width + self.height = height + self.square_size = square_size + self.char_a = char_a + self.char_b = char_b + + @property + def name(self) -> str: + return "checkerboard" + + @property + def is_dynamic(self) -> bool: + return False + + def fetch(self) -> list[SourceItem]: + """Generate the checkerboard pattern as a single SourceItem.""" + lines = [] + for y in range(self.height): + line_chars = [] + for x in range(self.width): + # Determine which square this position belongs to + square_x = x // self.square_size + square_y = y // self.square_size + # Alternate pattern based on parity of square coordinates + if (square_x + square_y) % 2 == 0: + line_chars.append(self.char_a) + else: + line_chars.append(self.char_b) + lines.append("".join(line_chars)) + content = "\n".join(lines) + return [SourceItem(content=content, source="checkerboard", timestamp="0")] diff --git a/engine/display/__init__.py b/engine/display/__init__.py index 5a29d06..bbf9a30 100644 --- a/engine/display/__init__.py +++ b/engine/display/__init__.py @@ -20,6 +20,7 @@ except ImportError: from engine.display.backends.multi import MultiDisplay from engine.display.backends.null import NullDisplay from engine.display.backends.pygame import PygameDisplay +from engine.display.backends.replay import ReplayDisplay from engine.display.backends.terminal import TerminalDisplay from engine.display.backends.websocket import WebSocketDisplay @@ -90,6 +91,7 @@ class DisplayRegistry: return cls.register("terminal", TerminalDisplay) cls.register("null", NullDisplay) + cls.register("replay", ReplayDisplay) cls.register("websocket", WebSocketDisplay) cls.register("pygame", PygameDisplay) if _MODERNGL_AVAILABLE: @@ -278,6 +280,7 @@ __all__ = [ "BorderMode", "TerminalDisplay", "NullDisplay", + "ReplayDisplay", "WebSocketDisplay", "MultiDisplay", "PygameDisplay", diff --git a/engine/display/backends/null.py b/engine/display/backends/null.py index 215965d..835644f 100644 --- a/engine/display/backends/null.py +++ b/engine/display/backends/null.py @@ -2,7 +2,10 @@ Null/headless display backend. """ +import json import time +from pathlib import Path +from typing import Any class NullDisplay: @@ -10,7 +13,8 @@ class NullDisplay: This display does nothing - useful for headless benchmarking or when no display output is needed. Captures last buffer - for testing purposes. + for testing purposes. Supports frame recording for replay + and file export/import. """ width: int = 80 @@ -19,6 +23,9 @@ class NullDisplay: def __init__(self): self._last_buffer = None + self._is_recording = False + self._recorded_frames: list[dict[str, Any]] = [] + self._frame_count = 0 def init(self, width: int, height: int, reuse: bool = False) -> None: """Initialize display with dimensions. @@ -37,7 +44,6 @@ class NullDisplay: from engine.display import get_monitor, render_border - # Get FPS for border (if available) fps = 0.0 frame_time = 0.0 monitor = get_monitor() @@ -49,26 +55,28 @@ class NullDisplay: fps = 1000.0 / avg_ms frame_time = avg_ms - # Apply border if requested (same as terminal display) if border: buffer = render_border(buffer, self.width, self.height, fps, frame_time) self._last_buffer = buffer - # For debugging: print first few frames to stdout - if hasattr(self, "_frame_count"): - self._frame_count += 1 - else: - self._frame_count = 0 + if self._is_recording: + self._recorded_frames.append( + { + "frame_number": self._frame_count, + "buffer": buffer, + "width": self.width, + "height": self.height, + } + ) - # Only print first 5 frames or every 10th frame if self._frame_count <= 5 or self._frame_count % 10 == 0: sys.stdout.write("\n" + "=" * 80 + "\n") sys.stdout.write( f"Frame {self._frame_count} (buffer height: {len(buffer)})\n" ) sys.stdout.write("=" * 80 + "\n") - for i, line in enumerate(buffer[:30]): # Show first 30 lines + for i, line in enumerate(buffer[:30]): sys.stdout.write(f"{i:2}: {line}\n") if len(buffer) > 30: sys.stdout.write(f"... ({len(buffer) - 30} more lines)\n") @@ -80,6 +88,78 @@ class NullDisplay: elapsed_ms = (time.perf_counter() - t0) * 1000 monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in) + self._frame_count += 1 + + def start_recording(self) -> None: + """Begin recording frames.""" + self._is_recording = True + self._recorded_frames = [] + + def stop_recording(self) -> None: + """Stop recording frames.""" + self._is_recording = False + + def get_frames(self) -> list[list[str]]: + """Get recorded frames as list of buffers. + + Returns: + List of buffers, each buffer is a list of strings (lines) + """ + return [frame["buffer"] for frame in self._recorded_frames] + + def get_recorded_data(self) -> list[dict[str, Any]]: + """Get full recorded data including metadata. + + Returns: + List of frame dicts with 'frame_number', 'buffer', 'width', 'height' + """ + return self._recorded_frames + + def clear_recording(self) -> None: + """Clear recorded frames.""" + self._recorded_frames = [] + + def save_recording(self, filepath: str | Path) -> None: + """Save recorded frames to a JSON file. + + Args: + filepath: Path to save the recording + """ + path = Path(filepath) + data = { + "version": 1, + "display": "null", + "width": self.width, + "height": self.height, + "frame_count": len(self._recorded_frames), + "frames": self._recorded_frames, + } + path.write_text(json.dumps(data, indent=2)) + + def load_recording(self, filepath: str | Path) -> list[dict[str, Any]]: + """Load recorded frames from a JSON file. + + Args: + filepath: Path to load the recording from + + Returns: + List of frame dicts + """ + path = Path(filepath) + data = json.loads(path.read_text()) + self._recorded_frames = data.get("frames", []) + self.width = data.get("width", 80) + self.height = data.get("height", 24) + return self._recorded_frames + + def replay_frames(self) -> list[list[str]]: + """Get frames for replay. + + Returns: + List of buffers for replay + """ + return self.get_frames() + def clear(self) -> None: pass diff --git a/engine/display/backends/replay.py b/engine/display/backends/replay.py new file mode 100644 index 0000000..4076ffe --- /dev/null +++ b/engine/display/backends/replay.py @@ -0,0 +1,122 @@ +""" +Replay display backend - plays back recorded frames. +""" + +from typing import Any + + +class ReplayDisplay: + """Replay display - plays back recorded frames. + + This display reads frames from a recording (list of frame data) + and yields them sequentially, useful for testing and demo purposes. + """ + + width: int = 80 + height: int = 24 + + def __init__(self): + self._frames: list[dict[str, Any]] = [] + self._current_frame = 0 + self._playback_index = 0 + self._loop = False + + def init(self, width: int, height: int, reuse: bool = False) -> None: + """Initialize display with dimensions. + + Args: + width: Terminal width in characters + height: Terminal height in rows + reuse: Ignored for ReplayDisplay + """ + self.width = width + self.height = height + + def set_frames(self, frames: list[dict[str, Any]]) -> None: + """Set frames to replay. + + Args: + frames: List of frame dicts with 'buffer', 'width', 'height' + """ + self._frames = frames + self._current_frame = 0 + self._playback_index = 0 + + def set_loop(self, loop: bool) -> None: + """Set loop playback mode. + + Args: + loop: True to loop, False to stop at end + """ + self._loop = loop + + def show(self, buffer: list[str], border: bool = False) -> None: + """Display a frame (ignored in replay mode). + + Args: + buffer: Buffer to display (ignored) + border: Border flag (ignored) + """ + pass + + def get_next_frame(self) -> list[str] | None: + """Get the next frame in the recording. + + Returns: + Buffer list of strings, or None if playback is done + """ + if not self._frames: + return None + + if self._playback_index >= len(self._frames): + if self._loop: + self._playback_index = 0 + else: + return None + + frame = self._frames[self._playback_index] + self._playback_index += 1 + return frame.get("buffer") + + def reset(self) -> None: + """Reset playback to the beginning.""" + self._playback_index = 0 + + def seek(self, index: int) -> None: + """Seek to a specific frame. + + Args: + index: Frame index to seek to + """ + if 0 <= index < len(self._frames): + self._playback_index = index + + def is_finished(self) -> bool: + """Check if playback is finished. + + Returns: + True if at end of frames and not looping + """ + return not self._loop and self._playback_index >= len(self._frames) + + def clear(self) -> None: + pass + + def cleanup(self) -> None: + pass + + def get_dimensions(self) -> tuple[int, int]: + """Get current dimensions. + + Returns: + (width, height) in character cells + """ + return (self.width, self.height) + + def is_quit_requested(self) -> bool: + """Check if quit was requested (optional protocol method).""" + return False + + def clear_quit_request(self) -> None: + """Clear quit request (optional protocol method).""" + pass diff --git a/engine/effects/plugins/afterimage.py b/engine/effects/plugins/afterimage.py new file mode 100644 index 0000000..0709312 --- /dev/null +++ b/engine/effects/plugins/afterimage.py @@ -0,0 +1,122 @@ +"""Afterimage effect using previous frame.""" + +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin + + +class AfterimageEffect(EffectPlugin): + """Show a faint ghost of the previous frame. + + This effect requires a FrameBufferStage to be present in the pipeline. + It shows a dimmed version of the previous frame super-imposed on the + current frame. + + Attributes: + name: "afterimage" + config: EffectConfig with intensity parameter (0.0-1.0) + param_bindings: Optional sensor bindings for intensity modulation + + Example: + >>> effect = AfterimageEffect() + >>> effect.configure(EffectConfig(intensity=0.3)) + >>> result = effect.process(buffer, ctx) + """ + + name = "afterimage" + config: EffectConfig = EffectConfig(enabled=True, intensity=0.3) + param_bindings: dict[str, dict[str, str | float]] = {} + supports_partial_updates = False + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + """Apply afterimage effect using the previous frame. + + Args: + buf: Current text buffer (list of strings) + ctx: Effect context with access to framebuffer history + + Returns: + Buffer with ghost of previous frame overlaid + """ + if not buf: + return buf + + # Get framebuffer history from context + history = None + + for key in ctx.state: + if key.startswith("framebuffer.") and key.endswith(".history"): + history = ctx.state[key] + break + + if not history or len(history) < 1: + # No previous frame available + return buf + + # Get intensity from config + intensity = self.config.params.get("intensity", self.config.intensity) + intensity = max(0.0, min(1.0, intensity)) + + if intensity <= 0.0: + return buf + + # Get the previous frame (index 1, since index 0 is current) + prev_frame = history[1] if len(history) > 1 else None + if not prev_frame: + return buf + + # Blend current and previous frames + viewport_height = ctx.terminal_height - ctx.ticker_height + result = [] + + for row in range(len(buf)): + if row >= viewport_height: + result.append(buf[row]) + continue + + current_line = buf[row] + prev_line = prev_frame[row] if row < len(prev_frame) else "" + + if not prev_line: + result.append(current_line) + continue + + # Apply dimming effect by reducing ANSI color intensity or adding transparency + # For a simple text version, we'll use a blend strategy + blended = self._blend_lines(current_line, prev_line, intensity) + result.append(blended) + + return result + + def _blend_lines(self, current: str, previous: str, intensity: float) -> str: + """Blend current and previous line with given intensity. + + For text with ANSI codes, true blending is complex. This is a simplified + version that uses color averaging when possible. + + A more sophisticated implementation would: + 1. Parse ANSI color codes from both lines + 2. Blend RGB values based on intensity + 3. Reconstruct the line with blended colors + + For now, we'll use a heuristic: if lines are similar, return current. + If they differ, we alternate or use the previous as a faint overlay. + """ + if current == previous: + return current + + # Simple blending: intensity determines mix + # intensity=1.0 => fully current + # intensity=0.3 => 70% previous ghost, 30% current + + if intensity > 0.7: + return current + elif intensity < 0.3: + # Show previous but dimmed (simulate by adding faint color/gray) + return previous # Would need to dim ANSI colors + else: + # For medium intensity, alternate based on character pattern + # This is a placeholder for proper blending + return current + + def configure(self, config: EffectConfig) -> None: + """Configure the effect.""" + self.config = config diff --git a/engine/effects/plugins/motionblur.py b/engine/effects/plugins/motionblur.py new file mode 100644 index 0000000..d329b96 --- /dev/null +++ b/engine/effects/plugins/motionblur.py @@ -0,0 +1,119 @@ +"""Motion blur effect using frame history.""" + +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin + + +class MotionBlurEffect(EffectPlugin): + """Apply motion blur by blending current frame with previous frames. + + This effect requires a FrameBufferStage to be present in the pipeline. + The framebuffer provides frame history which is blended with the current + frame based on intensity. + + Attributes: + name: "motionblur" + config: EffectConfig with intensity parameter (0.0-1.0) + param_bindings: Optional sensor bindings for intensity modulation + + Example: + >>> effect = MotionBlurEffect() + >>> effect.configure(EffectConfig(intensity=0.5)) + >>> result = effect.process(buffer, ctx) + """ + + name = "motionblur" + config: EffectConfig = EffectConfig(enabled=True, intensity=0.5) + param_bindings: dict[str, dict[str, str | float]] = {} + supports_partial_updates = False + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + """Apply motion blur by blending with previous frames. + + Args: + buf: Current text buffer (list of strings) + ctx: Effect context with access to framebuffer history + + Returns: + Blended buffer with motion blur effect applied + """ + if not buf: + return buf + + # Get framebuffer history from context + # We'll look for the first available framebuffer history + history = None + + for key in ctx.state: + if key.startswith("framebuffer.") and key.endswith(".history"): + history = ctx.state[key] + break + + if not history: + # No framebuffer available, return unchanged + return buf + + # Get intensity from config + intensity = self.config.params.get("intensity", self.config.intensity) + intensity = max(0.0, min(1.0, intensity)) + + if intensity <= 0.0: + return buf + + # Get decay factor (how quickly older frames fade) + decay = self.config.params.get("decay", 0.7) + + # Build output buffer + result = [] + viewport_height = ctx.terminal_height - ctx.ticker_height + + # Determine how many frames to blend (up to history depth) + max_frames = min(len(history), 5) # Cap at 5 frames for performance + + for row in range(len(buf)): + if row >= viewport_height: + # Beyond viewport, just copy + result.append(buf[row]) + continue + + # Start with current frame + blended = buf[row] + + # Blend with historical frames + weight_sum = 1.0 + if max_frames > 0 and intensity > 0: + for i in range(max_frames): + frame_weight = intensity * (decay**i) + if frame_weight < 0.01: # Skip negligible weights + break + + hist_row = history[i][row] if row < len(history[i]) else "" + # Simple string blending: we'll concatenate with space + # For a proper effect, we'd need to blend ANSI colors + # This is a simplified version that just adds the frames + blended = self._blend_strings(blended, hist_row, frame_weight) + weight_sum += frame_weight + + result.append(blended) + + return result + + def _blend_strings(self, current: str, historical: str, weight: float) -> str: + """Blend two strings with given weight. + + This is a simplified blending that works with ANSI codes. + For proper blending we'd need to parse colors, but for now + we use a heuristic: if strings are identical, return one. + If they differ, we alternate or concatenate based on weight. + """ + if current == historical: + return current + + # If weight is high, show current; if low, show historical + if weight > 0.5: + return current + else: + return historical + + def configure(self, config: EffectConfig) -> None: + """Configure the effect.""" + self.config = config diff --git a/engine/pipeline/adapters/__init__.py b/engine/pipeline/adapters/__init__.py index 3855a45..396ddd9 100644 --- a/engine/pipeline/adapters/__init__.py +++ b/engine/pipeline/adapters/__init__.py @@ -4,7 +4,7 @@ This module provides adapters that wrap existing components (EffectPlugin, Display, DataSource, Camera) as Stage implementations. """ -from .camera import CameraStage +from .camera import CameraClockStage, CameraStage from .data_source import DataSourceStage, PassthroughStage, SourceItemsToBufferStage from .display import DisplayStage from .effect_plugin import EffectPluginStage @@ -30,6 +30,7 @@ __all__ = [ "PassthroughStage", "SourceItemsToBufferStage", "CameraStage", + "CameraClockStage", "ViewportFilterStage", "FontStage", "ImageToTextStage", diff --git a/engine/pipeline/adapters/camera.py b/engine/pipeline/adapters/camera.py index 2c5dd6e..42d33fd 100644 --- a/engine/pipeline/adapters/camera.py +++ b/engine/pipeline/adapters/camera.py @@ -6,8 +6,83 @@ from typing import Any from engine.pipeline.core import DataType, PipelineContext, Stage +class CameraClockStage(Stage): + """Per-frame clock stage that updates camera state. + + This stage runs once per frame and updates the camera's internal state + (position, time). It makes camera_y/camera_x available to subsequent + stages via the pipeline context. + + Unlike other stages, this is a pure clock stage and doesn't process + data - it just updates camera state and passes data through unchanged. + """ + + def __init__(self, camera, name: str = "camera-clock"): + self._camera = camera + self.name = name + self.category = "camera" + self.optional = False + self._last_frame_time: float | None = None + + @property + def stage_type(self) -> str: + return "camera" + + @property + def capabilities(self) -> set[str]: + # Provides camera state info only + # NOTE: Do NOT provide "source" as it conflicts with viewport_filter's "source.filtered" + return {"camera.state"} + + @property + def dependencies(self) -> set[str]: + # Clock stage - no dependencies (updates every frame regardless of data flow) + return set() + + @property + def inlet_types(self) -> set: + # Accept any data type - this is a pass-through stage + return {DataType.ANY} + + @property + def outlet_types(self) -> set: + # Pass through whatever was received + return {DataType.ANY} + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Update camera state and pass data through. + + This stage updates the camera's internal state (position, time) and + makes the updated camera_y/camera_x available to subsequent stages + via the pipeline context. + + The data is passed through unchanged - this stage only updates + camera state, it doesn't transform the data. + """ + if data is None: + return data + + current_time = time.perf_counter() + dt = 0.0 + if self._last_frame_time is not None: + dt = current_time - self._last_frame_time + self._camera.update(dt) + self._last_frame_time = current_time + + # Update context with current camera position + ctx.set_state("camera_y", self._camera.y) + ctx.set_state("camera_x", self._camera.x) + + # Pass data through unchanged + return data + + class CameraStage(Stage): - """Adapter wrapping Camera as a Stage.""" + """Adapter wrapping Camera as a Stage. + + This stage applies camera viewport transformation to the rendered buffer. + Camera state updates are handled by CameraClockStage. + """ def __init__(self, camera, name: str = "vertical"): self._camera = camera @@ -22,7 +97,7 @@ class CameraStage(Stage): Returns: Dictionary containing camera state that can be restored """ - return { + state = { "x": self._camera.x, "y": self._camera.y, "mode": self._camera.mode.value @@ -36,6 +111,14 @@ class CameraStage(Stage): "_y_float": getattr(self._camera, "_y_float", 0.0), "_time": getattr(self._camera, "_time", 0.0), } + # Save radial camera state if present + if hasattr(self._camera, "_r_float"): + state["_r_float"] = self._camera._r_float + if hasattr(self._camera, "_theta_float"): + state["_theta_float"] = self._camera._theta_float + if hasattr(self._camera, "_radial_input"): + state["_radial_input"] = self._camera._radial_input + return state def restore_state(self, state: dict[str, Any]) -> None: """Restore camera state from saved state. @@ -68,6 +151,14 @@ class CameraStage(Stage): if hasattr(self._camera, "_time"): self._camera._time = state.get("_time", 0.0) + # Restore radial camera state if present + if hasattr(self._camera, "_r_float"): + self._camera._r_float = state.get("_r_float", 0.0) + if hasattr(self._camera, "_theta_float"): + self._camera._theta_float = state.get("_theta_float", 0.0) + if hasattr(self._camera, "_radial_input"): + self._camera._radial_input = state.get("_radial_input", 0.0) + @property def stage_type(self) -> str: return "camera" @@ -93,18 +184,26 @@ class CameraStage(Stage): if data is None: return data - current_time = time.perf_counter() - dt = 0.0 - if self._last_frame_time is not None: - dt = current_time - self._last_frame_time - self._camera.update(dt) - self._last_frame_time = current_time - - ctx.set_state("camera_y", self._camera.y) - ctx.set_state("camera_x", self._camera.x) + # Camera state is updated by CameraClockStage + # We only apply the viewport transformation here if hasattr(self._camera, "apply"): viewport_width = ctx.params.viewport_width if ctx.params else 80 viewport_height = ctx.params.viewport_height if ctx.params else 24 - return self._camera.apply(data, viewport_width, viewport_height) + + # Use filtered camera position if available (from ViewportFilterStage) + # This handles the case where the buffer has been filtered and starts at row 0 + filtered_camera_y = ctx.get("camera_y", self._camera.y) + + # Temporarily adjust camera position for filtering + original_y = self._camera.y + self._camera.y = filtered_camera_y + + try: + result = self._camera.apply(data, viewport_width, viewport_height) + finally: + # Restore original camera position + self._camera.y = original_y + + return result return data diff --git a/engine/pipeline/adapters/effect_plugin.py b/engine/pipeline/adapters/effect_plugin.py index 965fed7..4661788 100644 --- a/engine/pipeline/adapters/effect_plugin.py +++ b/engine/pipeline/adapters/effect_plugin.py @@ -6,13 +6,22 @@ from engine.pipeline.core import PipelineContext, Stage class EffectPluginStage(Stage): - """Adapter wrapping EffectPlugin as a Stage.""" + """Adapter wrapping EffectPlugin as a Stage. - def __init__(self, effect_plugin, name: str = "effect"): + Supports capability-based dependencies through the dependencies parameter. + """ + + def __init__( + self, + effect_plugin, + name: str = "effect", + dependencies: set[str] | None = None, + ): self._effect = effect_plugin self.name = name self.category = "effect" self.optional = False + self._dependencies = dependencies or set() @property def stage_type(self) -> str: @@ -49,7 +58,7 @@ class EffectPluginStage(Stage): @property def dependencies(self) -> set[str]: - return set() + return self._dependencies @property def inlet_types(self) -> set: diff --git a/engine/pipeline/adapters/transform.py b/engine/pipeline/adapters/transform.py index bc75ac4..e1b6c08 100644 --- a/engine/pipeline/adapters/transform.py +++ b/engine/pipeline/adapters/transform.py @@ -49,7 +49,9 @@ class ViewportFilterStage(Stage): @property def dependencies(self) -> set[str]: - return {"source"} + # Always requires camera.state for viewport filtering + # CameraUpdateStage provides this (auto-injected if missing) + return {"source", "camera.state"} @property def inlet_types(self) -> set: @@ -95,9 +97,13 @@ class ViewportFilterStage(Stage): # Find start index (first item that intersects with visible range) start_idx = 0 + start_item_y = 0 # Y position where the first visible item starts for i, total_h in enumerate(cumulative_heights): if total_h > start_y: start_idx = i + # Calculate the Y position of the start of this item + if i > 0: + start_item_y = cumulative_heights[i - 1] break # Find end index (first item that extends beyond visible range) @@ -107,6 +113,16 @@ class ViewportFilterStage(Stage): end_idx = i + 1 break + # Adjust camera_y for the filtered buffer + # The filtered buffer starts at row 0, but the camera position + # needs to be relative to where the first visible item starts + filtered_camera_y = camera_y - start_item_y + + # Update context with the filtered camera position + # This ensures CameraStage can correctly slice the filtered buffer + ctx.set_state("camera_y", filtered_camera_y) + ctx.set_state("camera_x", ctx.get("camera_x", 0)) # Keep camera_x unchanged + # Return visible items return data[start_idx:end_idx] @@ -127,9 +143,16 @@ class FontStage(Stage): def capabilities(self) -> set[str]: return {"render.output"} + @property + def stage_dependencies(self) -> set[str]: + # Must connect to viewport_filter stage to get filtered source + return {"viewport_filter"} + @property def dependencies(self) -> set[str]: - return {"source"} + # Depend on source.filtered (provided by viewport_filter) + # This ensures we get the filtered/processed source, not raw source + return {"source.filtered"} @property def inlet_types(self) -> set: @@ -147,6 +170,11 @@ class FontStage(Stage): if not isinstance(data, list): return [str(data)] + import os + + if os.environ.get("DEBUG_CAMERA"): + print(f"FontStage: input items={len(data)}") + viewport_width = ctx.params.viewport_width if ctx.params else 80 result = [] diff --git a/engine/pipeline/controller.py b/engine/pipeline/controller.py index e34184b..89722cf 100644 --- a/engine/pipeline/controller.py +++ b/engine/pipeline/controller.py @@ -68,6 +68,15 @@ class Pipeline: self._metrics_enabled = self.config.enable_metrics self._frame_metrics: list[FrameMetrics] = [] self._max_metrics_frames = 60 + + # Minimum capabilities required for pipeline to function + # NOTE: Research later - allow presets to override these defaults + self._minimum_capabilities: set[str] = { + "source", + "render.output", + "display.output", + "camera.state", # Always required for viewport filtering + } self._current_frame_number = 0 def add_stage(self, name: str, stage: Stage, initialize: bool = True) -> "Pipeline": @@ -214,15 +223,22 @@ class Pipeline: pass def _rebuild(self) -> None: - """Rebuild execution order after mutation without full reinitialization.""" + """Rebuild execution order after mutation or auto-injection.""" + was_initialized = self._initialized + self._initialized = False + self._capability_map = self._build_capability_map() self._execution_order = self._resolve_dependencies() + try: self._validate_dependencies() self._validate_types() except StageError: pass + # Restore initialized state + self._initialized = was_initialized + def get_stage(self, name: str) -> Stage | None: """Get a stage by name.""" return self._stages.get(name) @@ -297,10 +313,123 @@ class Pipeline: "stage_count": len(self._stages), } - def build(self) -> "Pipeline": - """Build execution order based on dependencies.""" + @property + def minimum_capabilities(self) -> set[str]: + """Get minimum capabilities required for pipeline to function.""" + return self._minimum_capabilities + + @minimum_capabilities.setter + def minimum_capabilities(self, value: set[str]): + """Set minimum required capabilities. + + NOTE: Research later - allow presets to override these defaults + """ + self._minimum_capabilities = value + + def validate_minimum_capabilities(self) -> tuple[bool, list[str]]: + """Validate that all minimum capabilities are provided. + + Returns: + Tuple of (is_valid, missing_capabilities) + """ + missing = [] + for cap in self._minimum_capabilities: + if not self._find_stage_with_capability(cap): + missing.append(cap) + return len(missing) == 0, missing + + def ensure_minimum_capabilities(self) -> list[str]: + """Automatically inject MVP stages if minimum capabilities are missing. + + Auto-injection is always on, but defaults are trivial to override. + Returns: + List of stages that were injected + """ + from engine.camera import Camera + from engine.data_sources.sources import EmptyDataSource + from engine.display import DisplayRegistry + from engine.pipeline.adapters import ( + CameraClockStage, + CameraStage, + DataSourceStage, + DisplayStage, + SourceItemsToBufferStage, + ) + + injected = [] + + # Check for source capability + if ( + not self._find_stage_with_capability("source") + and "source" not in self._stages + ): + empty_source = EmptyDataSource(width=80, height=24) + self.add_stage("source", DataSourceStage(empty_source, name="empty")) + injected.append("source") + + # Check for camera.state capability (must be BEFORE render to accept SOURCE_ITEMS) + camera = None + if not self._find_stage_with_capability("camera.state"): + # Inject static camera (trivial, no movement) + camera = Camera.scroll(speed=0.0) + camera.set_canvas_size(200, 200) + if "camera_update" not in self._stages: + self.add_stage( + "camera_update", CameraClockStage(camera, name="camera-clock") + ) + injected.append("camera_update") + + # Check for render capability + if ( + not self._find_stage_with_capability("render.output") + and "render" not in self._stages + ): + self.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + injected.append("render") + + # Check for camera stage (must be AFTER render to accept TEXT_BUFFER) + if camera and "camera" not in self._stages: + self.add_stage("camera", CameraStage(camera, name="static")) + injected.append("camera") + + # Check for display capability + if ( + not self._find_stage_with_capability("display.output") + and "display" not in self._stages + ): + display = DisplayRegistry.create("terminal") + if display: + self.add_stage("display", DisplayStage(display, name="terminal")) + injected.append("display") + + # Rebuild pipeline if stages were injected + if injected: + self._rebuild() + + return injected + + def build(self, auto_inject: bool = True) -> "Pipeline": + """Build execution order based on dependencies. + + Args: + auto_inject: If True, automatically inject MVP stages for missing capabilities + """ self._capability_map = self._build_capability_map() self._execution_order = self._resolve_dependencies() + + # Validate minimum capabilities and auto-inject if needed + if auto_inject: + is_valid, missing = self.validate_minimum_capabilities() + if not is_valid: + injected = self.ensure_minimum_capabilities() + if injected: + print( + f" \033[38;5;226mAuto-injected stages for missing capabilities: {injected}\033[0m" + ) + # Rebuild after auto-injection + self._capability_map = self._build_capability_map() + self._execution_order = self._resolve_dependencies() + self._validate_dependencies() self._validate_types() self._initialized = True @@ -367,12 +496,24 @@ class Pipeline: temp_mark.add(name) stage = self._stages.get(name) if stage: + # Handle capability-based dependencies for dep in stage.dependencies: # Find a stage that provides this capability dep_stage_name = self._find_stage_with_capability(dep) if dep_stage_name: visit(dep_stage_name) + # Handle direct stage dependencies + for stage_dep in stage.stage_dependencies: + if stage_dep in self._stages: + visit(stage_dep) + else: + # Stage dependency not found - this is an error + raise StageError( + name, + f"Missing stage dependency: '{stage_dep}' not found in pipeline", + ) + temp_mark.remove(name) visited.add(name) ordered.append(name) diff --git a/engine/pipeline/core.py b/engine/pipeline/core.py index e3d566c..55ebf8c 100644 --- a/engine/pipeline/core.py +++ b/engine/pipeline/core.py @@ -155,6 +155,21 @@ class Stage(ABC): """ return set() + @property + def stage_dependencies(self) -> set[str]: + """Return set of stage names this stage must connect to directly. + + This allows explicit stage-to-stage dependencies, useful for enforcing + pipeline structure when capability matching alone is insufficient. + + Examples: + - {"viewport_filter"} # Must connect to viewport_filter stage + - {"camera_update"} # Must connect to camera_update stage + + NOTE: These are stage names (as added to pipeline), not capabilities. + """ + return set() + def init(self, ctx: "PipelineContext") -> bool: """Initialize stage with pipeline context. diff --git a/engine/pipeline/presets.py b/engine/pipeline/presets.py index 58d24ab..c1370d2 100644 --- a/engine/pipeline/presets.py +++ b/engine/pipeline/presets.py @@ -50,6 +50,11 @@ class PipelinePreset: border: bool | BorderMode = ( False # Border mode: False=off, True=simple, BorderMode.UI for panel ) + # Extended fields for fine-tuning + camera_speed: float = 1.0 # Camera movement speed + viewport_width: int = 80 # Viewport width in columns + viewport_height: int = 24 # Viewport height in rows + source_items: list[dict[str, Any]] | None = None # For ListDataSource def to_params(self) -> PipelineParams: """Convert to PipelineParams.""" @@ -67,6 +72,8 @@ class PipelinePreset: ) params.camera_mode = self.camera params.effect_order = self.effects.copy() + # Note: camera_speed, viewport_width/height are not stored in PipelineParams + # They are used directly from the preset object in pipeline_runner.py return params @classmethod @@ -80,6 +87,10 @@ class PipelinePreset: camera=data.get("camera", "vertical"), effects=data.get("effects", []), border=data.get("border", False), + camera_speed=data.get("camera_speed", 1.0), + viewport_width=data.get("viewport_width", 80), + viewport_height=data.get("viewport_height", 24), + source_items=data.get("source_items"), ) diff --git a/engine/pipeline/stages/framebuffer.py b/engine/pipeline/stages/framebuffer.py index f8de5ae..790be34 100644 --- a/engine/pipeline/stages/framebuffer.py +++ b/engine/pipeline/stages/framebuffer.py @@ -1,12 +1,12 @@ """ Frame buffer stage - stores previous frames for temporal effects. -Provides: -- frame_history: list of previous buffers (most recent first) -- intensity_history: list of corresponding intensity maps -- current_intensity: intensity map for current frame +Provides (per-instance, using instance name): +- framebuffer.{name}.history: list of previous buffers (most recent first) +- framebuffer.{name}.intensity_history: list of corresponding intensity maps +- framebuffer.{name}.current_intensity: intensity map for current frame -Capability: "framebuffer.history" +Capability: "framebuffer.history.{name}" """ import threading @@ -22,21 +22,32 @@ class FrameBufferConfig: """Configuration for FrameBufferStage.""" history_depth: int = 2 # Number of previous frames to keep + name: str = "default" # Unique instance name for capability and context keys class FrameBufferStage(Stage): - """Stores frame history and computes intensity maps.""" + """Stores frame history and computes intensity maps. + + Supports multiple instances with unique capabilities and context keys. + """ name = "framebuffer" category = "effect" # It's an effect that enriches context with frame history - def __init__(self, config: FrameBufferConfig | None = None, history_depth: int = 2): - self.config = config or FrameBufferConfig(history_depth=history_depth) + def __init__( + self, + config: FrameBufferConfig | None = None, + history_depth: int = 2, + name: str = "default", + ): + self.config = config or FrameBufferConfig( + history_depth=history_depth, name=name + ) self._lock = threading.Lock() @property def capabilities(self) -> set[str]: - return {"framebuffer.history"} + return {f"framebuffer.history.{self.config.name}"} @property def dependencies(self) -> set[str]: @@ -53,8 +64,9 @@ class FrameBufferStage(Stage): def init(self, ctx: PipelineContext) -> bool: """Initialize framebuffer state in context.""" - ctx.set("frame_history", []) - ctx.set("intensity_history", []) + prefix = f"framebuffer.{self.config.name}" + ctx.set(f"{prefix}.history", []) + ctx.set(f"{prefix}.intensity_history", []) return True def process(self, data: Any, ctx: PipelineContext) -> Any: @@ -70,16 +82,18 @@ class FrameBufferStage(Stage): if not isinstance(data, list): return data + prefix = f"framebuffer.{self.config.name}" + # Compute intensity map for current buffer (per-row, length = buffer rows) intensity_map = self._compute_buffer_intensity(data, len(data)) # Store in context - ctx.set("current_intensity", intensity_map) + ctx.set(f"{prefix}.current_intensity", intensity_map) with self._lock: # Get existing histories - history = ctx.get("frame_history", []) - intensity_hist = ctx.get("intensity_history", []) + history = ctx.get(f"{prefix}.history", []) + intensity_hist = ctx.get(f"{prefix}.intensity_history", []) # Prepend current frame to history history.insert(0, data.copy()) @@ -87,8 +101,8 @@ class FrameBufferStage(Stage): # Trim to configured depth max_depth = self.config.history_depth - ctx.set("frame_history", history[:max_depth]) - ctx.set("intensity_history", intensity_hist[:max_depth]) + ctx.set(f"{prefix}.history", history[:max_depth]) + ctx.set(f"{prefix}.intensity_history", intensity_hist[:max_depth]) return data @@ -137,7 +151,8 @@ class FrameBufferStage(Stage): """Get frame from history by index (0 = current, 1 = previous, etc).""" if ctx is None: return None - history = ctx.get("frame_history", []) + prefix = f"framebuffer.{self.config.name}" + history = ctx.get(f"{prefix}.history", []) if 0 <= index < len(history): return history[index] return None @@ -148,7 +163,8 @@ class FrameBufferStage(Stage): """Get intensity map from history by index.""" if ctx is None: return None - intensity_hist = ctx.get("intensity_history", []) + prefix = f"framebuffer.{self.config.name}" + intensity_hist = ctx.get(f"{prefix}.intensity_history", []) if 0 <= index < len(intensity_hist): return intensity_hist[index] return None diff --git a/engine/pipeline/validation.py b/engine/pipeline/validation.py index 37fa413..8cc0781 100644 --- a/engine/pipeline/validation.py +++ b/engine/pipeline/validation.py @@ -22,6 +22,8 @@ VALID_CAMERAS = [ "omni", "floating", "bounce", + "radial", + "static", "none", "", ] @@ -43,7 +45,7 @@ class ValidationResult: MVP_DEFAULTS = { "source": "fixture", "display": "terminal", - "camera": "", # Empty = no camera stage (static viewport) + "camera": "static", # Static camera provides camera_y=0 for viewport filtering "effects": [], "border": False, } diff --git a/opencode-instructions.md b/opencode-instructions.md new file mode 100644 index 0000000..e1288d6 --- /dev/null +++ b/opencode-instructions.md @@ -0,0 +1 @@ +/home/david/.skills/opencode-instructions/SKILL.md \ No newline at end of file diff --git a/presets.toml b/presets.toml index 3821573..635bffd 100644 --- a/presets.toml +++ b/presets.toml @@ -9,292 +9,68 @@ # - ./presets.toml (local override) # ============================================ -# TEST PRESETS +# TEST PRESETS (for CI and development) # ============================================ -[presets.test-single-item] -description = "Test: Single item to isolate rendering stage issues" +[presets.test-basic] +description = "Test: Basic pipeline with no effects" source = "empty" -display = "terminal" +display = "null" camera = "feed" effects = [] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 +viewport_width = 100 # Custom size for testing +viewport_height = 30 -[presets.test-single-item-border] -description = "Test: Single item with border effect only" +[presets.test-border] +description = "Test: Single item with border effect" source = "empty" -display = "terminal" +display = "null" camera = "feed" effects = ["border"] -camera_speed = 0.1 viewport_width = 80 viewport_height = 24 -[presets.test-headlines] -description = "Test: Headlines from cache with border effect" -source = "headlines" -display = "terminal" -camera = "feed" -effects = ["border"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.test-headlines-noise] -description = "Test: Headlines from cache with noise effect" -source = "headlines" -display = "terminal" -camera = "feed" -effects = ["noise"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.test-demo-effects] -description = "Test: All demo effects with terminal display" -source = "headlines" -display = "terminal" -camera = "feed" -effects = ["noise", "fade", "firehose"] -camera_speed = 0.3 -viewport_width = 80 -viewport_height = 24 - -# ============================================ -# DATA SOURCE GALLERY -# ============================================ - -[presets.gallery-sources] -description = "Gallery: Headlines data source" -source = "headlines" -display = "pygame" -camera = "feed" -effects = [] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-sources-poetry] -description = "Gallery: Poetry data source" -source = "poetry" -display = "pygame" -camera = "feed" -effects = ["fade"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-sources-pipeline] -description = "Gallery: Pipeline introspection" -source = "pipeline-inspect" -display = "pygame" +[presets.test-scroll-camera] +description = "Test: Scrolling camera movement" +source = "empty" +display = "null" camera = "scroll" effects = [] -camera_speed = 0.3 -viewport_width = 100 -viewport_height = 35 - -[presets.gallery-sources-empty] -description = "Gallery: Empty source (for border tests)" -source = "empty" -display = "terminal" -camera = "feed" -effects = ["border"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -# ============================================ -# EFFECT GALLERY -# ============================================ - -[presets.gallery-effect-noise] -description = "Gallery: Noise effect" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["noise"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-effect-fade] -description = "Gallery: Fade effect" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["fade"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-effect-glitch] -description = "Gallery: Glitch effect" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["glitch"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-effect-firehose] -description = "Gallery: Firehose effect" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["firehose"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-effect-hud] -description = "Gallery: HUD effect" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["hud"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-effect-tint] -description = "Gallery: Tint effect" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["tint"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-effect-border] -description = "Gallery: Border effect" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["border"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-effect-crop] -description = "Gallery: Crop effect" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["crop"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -# ============================================ -# CAMERA GALLERY -# ============================================ - -[presets.gallery-camera-feed] -description = "Gallery: Feed camera (rapid single-item)" -source = "headlines" -display = "pygame" -camera = "feed" -effects = ["noise"] -camera_speed = 1.0 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-camera-scroll] -description = "Gallery: Scroll camera (smooth)" -source = "headlines" -display = "pygame" -camera = "scroll" -effects = ["noise"] -camera_speed = 0.3 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-camera-horizontal] -description = "Gallery: Horizontal camera" -source = "headlines" -display = "pygame" -camera = "horizontal" -effects = ["noise"] camera_speed = 0.5 viewport_width = 80 viewport_height = 24 -[presets.gallery-camera-omni] -description = "Gallery: Omni camera" -source = "headlines" -display = "pygame" -camera = "omni" -effects = ["noise"] -camera_speed = 0.5 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-camera-floating] -description = "Gallery: Floating camera" -source = "headlines" -display = "pygame" -camera = "floating" -effects = ["noise"] -camera_speed = 1.0 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-camera-bounce] -description = "Gallery: Bounce camera" -source = "headlines" -display = "pygame" -camera = "bounce" -effects = ["noise"] -camera_speed = 1.0 -viewport_width = 80 -viewport_height = 24 - # ============================================ -# DISPLAY GALLERY +# DEMO PRESETS (for demonstration and exploration) # ============================================ -[presets.gallery-display-terminal] -description = "Gallery: Terminal display" +[presets.demo-base] +description = "Demo: Base preset for effect hot-swapping" source = "headlines" display = "terminal" camera = "feed" -effects = ["noise"] +effects = [] # Demo script will add/remove effects dynamically camera_speed = 0.1 viewport_width = 80 viewport_height = 24 -[presets.gallery-display-pygame] -description = "Gallery: Pygame display" +[presets.demo-pygame] +description = "Demo: Pygame display version" source = "headlines" display = "pygame" camera = "feed" -effects = ["noise"] +effects = [] # Demo script will add/remove effects dynamically camera_speed = 0.1 viewport_width = 80 viewport_height = 24 -[presets.gallery-display-websocket] -description = "Gallery: WebSocket display" +[presets.demo-camera-showcase] +description = "Demo: Camera mode showcase" source = "headlines" -display = "websocket" +display = "terminal" camera = "feed" -effects = ["noise"] -camera_speed = 0.1 -viewport_width = 80 -viewport_height = 24 - -[presets.gallery-display-multi] -description = "Gallery: MultiDisplay (terminal + pygame)" -source = "headlines" -display = "multi:terminal,pygame" -camera = "feed" -effects = ["noise"] -camera_speed = 0.1 +effects = [] # Demo script will cycle through camera modes +camera_speed = 0.5 viewport_width = 80 viewport_height = 24 @@ -307,9 +83,10 @@ enabled = false threshold_db = 50.0 [sensors.oscillator] -enabled = false +enabled = true # Enable for demo script gentle oscillation waveform = "sine" -frequency = 1.0 +frequency = 0.05 # ~20 second cycle (gentle) +amplitude = 0.5 # 50% modulation # ============================================ # EFFECT CONFIGURATIONS @@ -334,3 +111,15 @@ intensity = 1.0 [effect_configs.hud] enabled = true intensity = 1.0 + +[effect_configs.tint] +enabled = true +intensity = 1.0 + +[effect_configs.border] +enabled = true +intensity = 1.0 + +[effect_configs.crop] +enabled = true +intensity = 1.0 diff --git a/scripts/demo_hot_rebuild.py b/scripts/demo_hot_rebuild.py new file mode 100644 index 0000000..57074c5 --- /dev/null +++ b/scripts/demo_hot_rebuild.py @@ -0,0 +1,222 @@ +#!/usr/bin/env python3 +""" +Demo script for testing pipeline hot-rebuild and state preservation. + +Usage: + python scripts/demo_hot_rebuild.py + python scripts/demo_hot_rebuild.py --viewport 40x15 + +This script: +1. Creates a small viewport (40x15) for easier capture +2. Uses NullDisplay with recording enabled +3. Runs the pipeline for N frames (capturing initial state) +4. Triggers a "hot-rebuild" (e.g., toggling an effect stage) +5. Runs the pipeline for M more frames +6. Verifies state preservation by comparing frames before/after rebuild +7. Prints visual comparison to stdout +""" + +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from engine.display import DisplayRegistry +from engine.effects import get_registry +from engine.fetch import load_cache +from engine.pipeline import Pipeline, PipelineConfig, PipelineContext +from engine.pipeline.adapters import ( + EffectPluginStage, + FontStage, + SourceItemsToBufferStage, + ViewportFilterStage, + create_stage_from_display, + create_stage_from_effect, +) +from engine.pipeline.params import PipelineParams + + +def run_demo(viewport_width: int = 40, viewport_height: int = 15): + """Run the hot-rebuild demo.""" + print(f"\n{'=' * 60}") + print(f"Pipeline Hot-Rebuild Demo") + print(f"Viewport: {viewport_width}x{viewport_height}") + print(f"{'=' * 60}\n") + + import engine.effects.plugins as effects_plugins + + effects_plugins.discover_plugins() + + print("[1/6] Loading source items...") + items = load_cache() + if not items: + print(" ERROR: No fixture cache available") + sys.exit(1) + print(f" Loaded {len(items)} items") + + print("[2/6] Creating NullDisplay with recording...") + display = DisplayRegistry.create("null") + display.init(viewport_width, viewport_height) + display.start_recording() + print(" Recording started") + + print("[3/6] Building pipeline...") + params = PipelineParams() + params.viewport_width = viewport_width + params.viewport_height = viewport_height + + config = PipelineConfig( + source="fixture", + display="null", + camera="scroll", + effects=["noise", "fade"], + ) + + pipeline = Pipeline(config=config, context=PipelineContext()) + + from engine.data_sources.sources import ListDataSource + from engine.pipeline.adapters import DataSourceStage + + list_source = ListDataSource(items, name="fixture") + pipeline.add_stage("source", DataSourceStage(list_source, name="fixture")) + pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter")) + pipeline.add_stage("font", FontStage(name="font")) + + effect_registry = get_registry() + for effect_name in config.effects: + effect = effect_registry.get(effect_name) + if effect: + pipeline.add_stage( + f"effect_{effect_name}", + create_stage_from_effect(effect, effect_name), + ) + + pipeline.add_stage("display", create_stage_from_display(display, "null")) + pipeline.build() + + if not pipeline.initialize(): + print(" ERROR: Failed to initialize pipeline") + sys.exit(1) + + print(" Pipeline built and initialized") + + ctx = pipeline.context + ctx.params = params + ctx.set("display", display) + ctx.set("items", items) + ctx.set("pipeline", pipeline) + ctx.set("pipeline_order", pipeline.execution_order) + ctx.set("camera_y", 0) + + print("[4/6] Running pipeline for 10 frames (before rebuild)...") + frames_before = [] + for frame in range(10): + params.frame_number = frame + ctx.params = params + result = pipeline.execute(items) + if result.success: + frames_before.append(display._last_buffer) + print(f" Captured {len(frames_before)} frames") + + print("[5/6] Triggering hot-rebuild (toggling 'fade' effect)...") + fade_stage = pipeline.get_stage("effect_fade") + if fade_stage and isinstance(fade_stage, EffectPluginStage): + new_enabled = not fade_stage.is_enabled() + fade_stage.set_enabled(new_enabled) + fade_stage._effect.config.enabled = new_enabled + print(f" Fade effect enabled: {new_enabled}") + else: + print(" WARNING: Could not find fade effect stage") + + print("[6/6] Running pipeline for 10 more frames (after rebuild)...") + frames_after = [] + for frame in range(10, 20): + params.frame_number = frame + ctx.params = params + result = pipeline.execute(items) + if result.success: + frames_after.append(display._last_buffer) + print(f" Captured {len(frames_after)} frames") + + display.stop_recording() + + print("\n" + "=" * 60) + print("RESULTS") + print("=" * 60) + + print("\n[State Preservation Check]") + if frames_before and frames_after: + last_before = frames_before[-1] + first_after = frames_after[0] + + if last_before == first_after: + print(" PASS: Buffer state preserved across rebuild") + else: + print(" INFO: Buffer changed after rebuild (expected - effect toggled)") + + print("\n[Frame Continuity Check]") + recorded_frames = display.get_frames() + print(f" Total recorded frames: {len(recorded_frames)}") + print(f" Frames before rebuild: {len(frames_before)}") + print(f" Frames after rebuild: {len(frames_after)}") + + if len(recorded_frames) == 20: + print(" PASS: All frames recorded") + else: + print(" WARNING: Frame count mismatch") + + print("\n[Visual Comparison - First frame before vs after rebuild]") + print("\n--- Before rebuild (frame 9) ---") + for i, line in enumerate(frames_before[0][:viewport_height]): + print(f"{i:2}: {line}") + + print("\n--- After rebuild (frame 10) ---") + for i, line in enumerate(frames_after[0][:viewport_height]): + print(f"{i:2}: {line}") + + print("\n[Recording Save/Load Test]") + test_file = Path("/tmp/test_recording.json") + display.save_recording(test_file) + print(f" Saved recording to: {test_file}") + + display2 = DisplayRegistry.create("null") + display2.init(viewport_width, viewport_height) + display2.load_recording(test_file) + loaded_frames = display2.get_frames() + print(f" Loaded {len(loaded_frames)} frames from file") + + if len(loaded_frames) == len(recorded_frames): + print(" PASS: Recording save/load works correctly") + else: + print(" WARNING: Frame count mismatch after load") + + test_file.unlink(missing_ok=True) + + pipeline.cleanup() + display.cleanup() + + print("\n" + "=" * 60) + print("Demo complete!") + print("=" * 60 + "\n") + + +def main(): + viewport_width = 40 + viewport_height = 15 + + if "--viewport" in sys.argv: + idx = sys.argv.index("--viewport") + if idx + 1 < len(sys.argv): + vp = sys.argv[idx + 1] + try: + viewport_width, viewport_height = map(int, vp.split("x")) + except ValueError: + print("Error: Invalid viewport format. Use WxH (e.g., 40x15)") + sys.exit(1) + + run_demo(viewport_width, viewport_height) + + +if __name__ == "__main__": + main() diff --git a/scripts/pipeline_demo.py b/scripts/pipeline_demo.py new file mode 100644 index 0000000..b1a1d56 --- /dev/null +++ b/scripts/pipeline_demo.py @@ -0,0 +1,509 @@ +#!/usr/bin/env python3 +""" +Pipeline Demo Orchestrator + +Demonstrates all effects and camera modes with gentle oscillation. +Runs a comprehensive test of the Mainline pipeline system with proper +frame rate control and extended duration for visibility. +""" + +import argparse +import math +import signal +import sys +import time +from typing import Any + +from engine.camera import Camera +from engine.data_sources.checkerboard import CheckerboardDataSource +from engine.data_sources.sources import SourceItem +from engine.display import DisplayRegistry, NullDisplay +from engine.effects.plugins import discover_plugins +from engine.effects import get_registry +from engine.effects.types import EffectConfig +from engine.frame import FrameTimer +from engine.pipeline import Pipeline, PipelineConfig, PipelineContext +from engine.pipeline.adapters import ( + CameraClockStage, + CameraStage, + DataSourceStage, + DisplayStage, + EffectPluginStage, + SourceItemsToBufferStage, +) +from engine.pipeline.stages.framebuffer import FrameBufferStage + + +class GentleOscillator: + """Produces smooth, gentle sinusoidal values.""" + + def __init__( + self, speed: float = 60.0, amplitude: float = 1.0, offset: float = 0.0 + ): + self.speed = speed # Period length in frames + self.amplitude = amplitude # Amplitude + self.offset = offset # Base offset + + def value(self, frame: int) -> float: + """Get oscillated value for given frame.""" + return self.offset + self.amplitude * 0.5 * (1 + math.sin(frame / self.speed)) + + +class PipelineDemoOrchestrator: + """Orchestrates comprehensive pipeline demonstrations.""" + + def __init__( + self, + use_terminal: bool = True, + target_fps: float = 30.0, + effect_duration: float = 8.0, + mode_duration: float = 3.0, + enable_fps_switch: bool = False, + loop: bool = False, + verbose: bool = False, + ): + self.use_terminal = use_terminal + self.target_fps = target_fps + self.effect_duration = effect_duration + self.mode_duration = mode_duration + self.enable_fps_switch = enable_fps_switch + self.loop = loop + self.verbose = verbose + self.frame_count = 0 + self.pipeline = None + self.context = None + self.framebuffer = None + self.camera = None + self.timer = None + + def log(self, message: str, verbose: bool = False): + """Print with timestamp if verbose or always-important.""" + if self.verbose or not verbose: + print(f"[{time.strftime('%H:%M:%S')}] {message}") + + def build_base_pipeline( + self, camera_type: str = "scroll", camera_speed: float = 0.5 + ): + """Build a base pipeline with all required components.""" + self.log(f"Building base pipeline: camera={camera_type}, speed={camera_speed}") + + # Camera + camera = Camera.scroll(speed=camera_speed) + camera.set_canvas_size(200, 200) + + # Context + ctx = PipelineContext() + + # Pipeline config + config = PipelineConfig( + source="empty", + display="terminal" if self.use_terminal else "null", + camera=camera_type, + effects=[], + enable_metrics=True, + ) + pipeline = Pipeline(config=config, context=ctx) + + # Use a large checkerboard pattern for visible motion effects + source = CheckerboardDataSource(width=200, height=200, square_size=10) + pipeline.add_stage("source", DataSourceStage(source, name="checkerboard")) + + # Add camera clock (must run every frame) + pipeline.add_stage( + "camera_update", CameraClockStage(camera, name="camera-clock") + ) + + # Add render + pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + + # Add camera stage + pipeline.add_stage("camera", CameraStage(camera, name="camera")) + + # Add framebuffer (optional for effects that use it) + self.framebuffer = FrameBufferStage(name="default", history_depth=5) + pipeline.add_stage("framebuffer", self.framebuffer) + + # Add display + display_backend = "terminal" if self.use_terminal else "null" + display = DisplayRegistry.create(display_backend) + if display: + pipeline.add_stage("display", DisplayStage(display, name=display_backend)) + + # Build and initialize + pipeline.build(auto_inject=False) + pipeline.initialize() + + self.pipeline = pipeline + self.context = ctx + self.camera = camera + + self.log("Base pipeline built successfully") + return pipeline + + def test_effects_oscillation(self): + """Test each effect with gentle intensity oscillation.""" + self.log("\n=== EFFECTS OSCILLATION TEST ===") + self.log( + f"Duration: {self.effect_duration}s per effect at {self.target_fps} FPS" + ) + + discover_plugins() # Ensure all plugins are registered + registry = get_registry() + all_effects = registry.list_all() + effect_names = [ + name + for name in all_effects.keys() + if name not in ("motionblur", "afterimage") + ] + + # Calculate frames based on duration and FPS + frames_per_effect = int(self.effect_duration * self.target_fps) + oscillator = GentleOscillator(speed=90, amplitude=0.7, offset=0.3) + + total_effects = len(effect_names) + 2 # +2 for motionblur and afterimage + estimated_total = total_effects * self.effect_duration + + self.log(f"Testing {len(effect_names)} regular effects + 2 framebuffer effects") + self.log(f"Estimated time: {estimated_total:.0f}s") + + for idx, effect_name in enumerate(sorted(effect_names), 1): + try: + self.log(f"[{idx}/{len(effect_names)}] Testing effect: {effect_name}") + + effect = registry.get(effect_name) + if not effect: + self.log(f" Skipped: plugin not found") + continue + + stage = EffectPluginStage(effect, name=effect_name) + self.pipeline.add_stage(f"effect_{effect_name}", stage) + self.pipeline.build(auto_inject=False) + + self._run_frames( + frames_per_effect, oscillator=oscillator, effect=effect + ) + + self.pipeline.remove_stage(f"effect_{effect_name}") + self.pipeline.build(auto_inject=False) + + self.log(f" ✓ {effect_name} completed successfully") + + except Exception as e: + self.log(f" ✗ {effect_name} failed: {e}") + + # Test motionblur and afterimage separately with framebuffer + for effect_name in ["motionblur", "afterimage"]: + try: + self.log( + f"[{len(effect_names) + 1}/{total_effects}] Testing effect: {effect_name} (with framebuffer)" + ) + + effect = registry.get(effect_name) + if not effect: + self.log(f" Skipped: plugin not found") + continue + + stage = EffectPluginStage( + effect, + name=effect_name, + dependencies={"framebuffer.history.default"}, + ) + self.pipeline.add_stage(f"effect_{effect_name}", stage) + self.pipeline.build(auto_inject=False) + + self._run_frames( + frames_per_effect, oscillator=oscillator, effect=effect + ) + + self.pipeline.remove_stage(f"effect_{effect_name}") + self.pipeline.build(auto_inject=False) + self.log(f" ✓ {effect_name} completed successfully") + + except Exception as e: + self.log(f" ✗ {effect_name} failed: {e}") + + def _run_frames(self, num_frames: int, oscillator=None, effect=None): + """Run a specified number of frames with proper timing.""" + for frame in range(num_frames): + self.frame_count += 1 + self.context.set("frame_number", frame) + + if oscillator and effect: + intensity = oscillator.value(frame) + effect.configure(EffectConfig(intensity=intensity)) + + dt = self.timer.sleep_until_next_frame() + self.camera.update(dt) + self.pipeline.execute([]) + + def test_framebuffer(self): + """Test framebuffer functionality.""" + self.log("\n=== FRAMEBUFFER TEST ===") + + try: + # Run frames using FrameTimer for consistent pacing + self._run_frames(10) + + # Check framebuffer history + history = self.context.get("framebuffer.default.history") + assert history is not None, "No framebuffer history found" + assert len(history) > 0, "Framebuffer history is empty" + + self.log(f"History frames: {len(history)}") + self.log(f"Configured depth: {self.framebuffer.config.history_depth}") + + # Check intensity computation + intensity = self.context.get("framebuffer.default.current_intensity") + assert intensity is not None, "No intensity map found" + self.log(f"Intensity map length: {len(intensity)}") + + # Check that frames are being stored correctly + recent_frame = self.framebuffer.get_frame(0, self.context) + assert recent_frame is not None, "Cannot retrieve recent frame" + self.log(f"Recent frame rows: {len(recent_frame)}") + + self.log("✓ Framebuffer test passed") + + except Exception as e: + self.log(f"✗ Framebuffer test failed: {e}") + raise + + def test_camera_modes(self): + """Test each camera mode.""" + self.log("\n=== CAMERA MODES TEST ===") + self.log(f"Duration: {self.mode_duration}s per mode at {self.target_fps} FPS") + + camera_modes = [ + ("feed", 0.1), + ("scroll", 0.5), + ("horizontal", 0.3), + ("omni", 0.3), + ("floating", 0.5), + ("bounce", 0.5), + ("radial", 0.3), + ] + + frames_per_mode = int(self.mode_duration * self.target_fps) + self.log(f"Testing {len(camera_modes)} camera modes") + self.log(f"Estimated time: {len(camera_modes) * self.mode_duration:.0f}s") + + for idx, (camera_type, speed) in enumerate(camera_modes, 1): + try: + self.log(f"[{idx}/{len(camera_modes)}] Testing camera: {camera_type}") + + # Rebuild camera + self.camera.reset() + cam_class = getattr(Camera, camera_type, Camera.scroll) + new_camera = cam_class(speed=speed) + new_camera.set_canvas_size(200, 200) + + # Update camera stages + clock_stage = CameraClockStage(new_camera, name="camera-clock") + self.pipeline.replace_stage("camera_update", clock_stage) + + camera_stage = CameraStage(new_camera, name="camera") + self.pipeline.replace_stage("camera", camera_stage) + + self.camera = new_camera + + # Run frames with proper timing + self._run_frames(frames_per_mode) + + # Verify camera moved (check final position) + x, y = self.camera.x, self.camera.y + self.log(f" Final position: ({x:.1f}, {y:.1f})") + + if camera_type == "feed": + assert x == 0 and y == 0, "Feed camera should not move" + elif camera_type in ("scroll", "horizontal"): + assert abs(x) > 0 or abs(y) > 0, "Camera should have moved" + else: + self.log(f" Position check skipped (mode={camera_type})") + + self.log(f" ✓ {camera_type} completed successfully") + + except Exception as e: + self.log(f" ✗ {camera_type} failed: {e}") + + def test_fps_switch_demo(self): + """Demonstrate the effect of different frame rates on animation smoothness.""" + if not self.enable_fps_switch: + return + + self.log("\n=== FPS SWITCH DEMONSTRATION ===") + + fps_sequence = [ + (30.0, 5.0), # 30 FPS for 5 seconds + (60.0, 5.0), # 60 FPS for 5 seconds + (30.0, 5.0), # Back to 30 FPS for 5 seconds + (20.0, 3.0), # 20 FPS for 3 seconds + (60.0, 3.0), # 60 FPS for 3 seconds + ] + + original_fps = self.target_fps + + for fps, duration in fps_sequence: + self.log(f"\n--- Switching to {fps} FPS for {duration}s ---") + self.target_fps = fps + self.timer.target_frame_dt = 1.0 / fps + + # Update display FPS if supported + display = ( + self.pipeline.get_stage("display").stage + if self.pipeline.get_stage("display") + else None + ) + if display and hasattr(display, "target_fps"): + display.target_fps = fps + display._frame_period = 1.0 / fps if fps > 0 else 0 + + frames = int(duration * fps) + camera_type = "radial" # Use radial for smooth rotation that's visible at different FPS + speed = 0.3 + + # Rebuild camera if needed + self.camera.reset() + new_camera = Camera.radial(speed=speed) + new_camera.set_canvas_size(200, 200) + clock_stage = CameraClockStage(new_camera, name="camera-clock") + self.pipeline.replace_stage("camera_update", clock_stage) + camera_stage = CameraStage(new_camera, name="camera") + self.pipeline.replace_stage("camera", camera_stage) + self.camera = new_camera + + for frame in range(frames): + self.context.set("frame_number", frame) + dt = self.timer.sleep_until_next_frame() + self.camera.update(dt) + result = self.pipeline.execute([]) + + self.log(f" Completed {frames} frames at {fps} FPS") + + # Restore original FPS + self.target_fps = original_fps + self.timer.target_frame_dt = 1.0 / original_fps + self.log("✓ FPS switch demo completed") + + def run(self): + """Run the complete demo.""" + start_time = time.time() + self.log("Starting Pipeline Demo Orchestrator") + self.log("=" * 50) + + # Initialize frame timer + self.timer = FrameTimer(target_frame_dt=1.0 / self.target_fps) + + # Build pipeline + self.build_base_pipeline() + + try: + # Test framebuffer first (needed for motion blur effects) + self.test_framebuffer() + + # Test effects + self.test_effects_oscillation() + + # Test camera modes + self.test_camera_modes() + + # Optional FPS switch demonstration + if self.enable_fps_switch: + self.test_fps_switch_demo() + else: + self.log("\n=== FPS SWITCH DEMO ===") + self.log("Skipped (enable with --switch-fps)") + + elapsed = time.time() - start_time + self.log("\n" + "=" * 50) + self.log("Demo completed successfully!") + self.log(f"Total frames processed: {self.frame_count}") + self.log(f"Total elapsed time: {elapsed:.1f}s") + self.log(f"Average FPS: {self.frame_count / elapsed:.1f}") + + finally: + # Always cleanup properly + self._cleanup() + + def _cleanup(self): + """Clean up pipeline resources.""" + self.log("Cleaning up...", verbose=True) + if self.pipeline: + try: + self.pipeline.cleanup() + if self.verbose: + self.log("Pipeline cleaned up successfully", verbose=True) + except Exception as e: + self.log(f"Error during pipeline cleanup: {e}", verbose=True) + + # If not looping, clear references + if not self.loop: + self.pipeline = None + self.context = None + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Pipeline Demo Orchestrator - comprehensive demo of Mainline pipeline" + ) + parser.add_argument( + "--null", + action="store_true", + help="Use null display (no visual output)", + ) + parser.add_argument( + "--fps", + type=float, + default=30.0, + help="Target frame rate (default: 30)", + ) + parser.add_argument( + "--effect-duration", + type=float, + default=8.0, + help="Duration per effect in seconds (default: 8)", + ) + parser.add_argument( + "--mode-duration", + type=float, + default=3.0, + help="Duration per camera mode in seconds (default: 3)", + ) + parser.add_argument( + "--switch-fps", + action="store_true", + help="Include FPS switching demonstration", + ) + parser.add_argument( + "--loop", + action="store_true", + help="Run demo in an infinite loop", + ) + parser.add_argument( + "--verbose", + action="store_true", + help="Enable verbose output", + ) + + args = parser.parse_args() + + orchestrator = PipelineDemoOrchestrator( + use_terminal=not args.null, + target_fps=args.fps, + effect_duration=args.effect_duration, + mode_duration=args.mode_duration, + enable_fps_switch=args.switch_fps, + loop=args.loop, + verbose=args.verbose, + ) + + try: + orchestrator.run() + except KeyboardInterrupt: + print("\nInterrupted by user") + sys.exit(0) + except Exception as e: + print(f"\nDemo failed: {e}") + import traceback + + traceback.print_exc() + sys.exit(1) diff --git a/tests/test_app.py b/tests/test_app.py index ded29bb..942bc8f 100644 --- a/tests/test_app.py +++ b/tests/test_app.py @@ -27,13 +27,13 @@ class TestMain: """main() uses PRESET from config if set.""" with ( patch("engine.config.PIPELINE_DIAGRAM", False), - patch("engine.config.PRESET", "gallery-sources"), + patch("engine.config.PRESET", "demo"), patch("engine.config.PIPELINE_MODE", False), patch("engine.app.main.run_pipeline_mode") as mock_run, ): sys.argv = ["mainline.py"] main() - mock_run.assert_called_once_with("gallery-sources") + mock_run.assert_called_once_with("demo") def test_main_exits_on_unknown_preset(self): """main() exits with error for unknown preset.""" @@ -122,7 +122,7 @@ class TestRunPipelineMode: mock_create.return_value = mock_display try: - run_pipeline_mode("gallery-display-terminal") + run_pipeline_mode("demo-base") except (KeyboardInterrupt, SystemExit): pass diff --git a/tests/test_camera_acceptance.py b/tests/test_camera_acceptance.py new file mode 100644 index 0000000..1faa519 --- /dev/null +++ b/tests/test_camera_acceptance.py @@ -0,0 +1,826 @@ +""" +Camera acceptance tests using NullDisplay frame recording and ReplayDisplay. + +Tests all camera modes by: +1. Creating deterministic source data (numbered lines) +2. Running pipeline with small viewport (40x15) +3. Recording frames with NullDisplay +4. Asserting expected viewport content for each mode + +Usage: + pytest tests/test_camera_acceptance.py -v + pytest tests/test_camera_acceptance.py --show-frames -v + +The --show-frames flag displays recorded frames for visual verification. +""" + +import math +import sys +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from engine.camera import Camera, CameraMode +from engine.display import DisplayRegistry +from engine.effects import get_registry +from engine.pipeline import Pipeline, PipelineConfig, PipelineContext +from engine.pipeline.adapters import ( + CameraClockStage, + CameraStage, + FontStage, + ViewportFilterStage, + create_stage_from_display, + create_stage_from_effect, +) +from engine.pipeline.params import PipelineParams + + +def get_camera_position(pipeline, camera): + """Helper to get camera position directly from the camera object. + + The pipeline context's camera_y/camera_x values may be transformed by + ViewportFilterStage (filtered relative position). This helper gets the + true camera position from the camera object itself. + + Args: + pipeline: The pipeline instance + camera: The camera object + + Returns: + tuple (x, y) of the camera's absolute position + """ + return (camera.x, camera.y) + + +# Register custom CLI option for showing frames +def pytest_addoption(parser): + parser.addoption( + "--show-frames", + action="store_true", + default=False, + help="Display recorded frames for visual verification", + ) + + +@pytest.fixture +def show_frames(request): + """Get the --show-frames flag value.""" + try: + return request.config.getoption("--show-frames") + except ValueError: + # Option not registered, default to False + return False + + +@pytest.fixture +def viewport_dims(): + """Small viewport dimensions for testing.""" + return (40, 15) + + +@pytest.fixture +def items(): + """Create deterministic test data - numbered lines for easy verification.""" + # Create 100 numbered lines: LINE 000, LINE 001, etc. + return [{"text": f"LINE {i:03d} - This is line number {i}"} for i in range(100)] + + +@pytest.fixture +def null_display(viewport_dims): + """Create a NullDisplay for testing.""" + display = DisplayRegistry.create("null") + display.init(viewport_dims[0], viewport_dims[1]) + return display + + +def create_pipeline_with_camera( + camera, items, null_display, viewport_dims, effects=None +): + """Helper to create a pipeline with a specific camera.""" + effects = effects or [] + width, height = viewport_dims + + params = PipelineParams() + params.viewport_width = width + params.viewport_height = height + + config = PipelineConfig( + source="fixture", + display="null", + camera="scroll", + effects=effects, + ) + + pipeline = Pipeline(config=config, context=PipelineContext()) + + from engine.data_sources.sources import ListDataSource + from engine.pipeline.adapters import DataSourceStage + + list_source = ListDataSource(items, name="fixture") + pipeline.add_stage("source", DataSourceStage(list_source, name="fixture")) + + # Add camera update stage to ensure camera_y is available for viewport filter + pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock")) + + # Note: camera should come after font/viewport_filter, before effects + pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter")) + pipeline.add_stage("font", FontStage(name="font")) + pipeline.add_stage( + "camera", + CameraStage( + camera, name="radial" if camera.mode == CameraMode.RADIAL else "vertical" + ), + ) + + if effects: + effect_registry = get_registry() + for effect_name in effects: + effect = effect_registry.get(effect_name) + if effect: + pipeline.add_stage( + f"effect_{effect_name}", + create_stage_from_effect(effect, effect_name), + ) + + pipeline.add_stage("display", create_stage_from_display(null_display, "null")) + pipeline.build() + + if not pipeline.initialize(): + return None + + ctx = pipeline.context + ctx.params = params + ctx.set("display", null_display) + ctx.set("items", items) + ctx.set("pipeline", pipeline) + ctx.set("pipeline_order", pipeline.execution_order) + + return pipeline + + +class DisplayHelper: + """Helper to display frames for visual verification.""" + + @staticmethod + def show_frame(buffer, title, viewport_dims, marker_line=None): + """Display a single frame with visual markers.""" + width, height = viewport_dims + print(f"\n{'=' * (width + 20)}") + print(f" {title}") + print(f"{'=' * (width + 20)}") + + for i, line in enumerate(buffer[:height]): + # Add marker if this line should be highlighted + marker = ">>>" if marker_line == i else " " + print(f"{marker} [{i:2}] {line[:width]}") + + print(f"{'=' * (width + 20)}\n") + + +class TestFeedCamera: + """Test FEED mode: rapid single-item scrolling (1 row/frame at speed=1.0).""" + + def test_feed_camera_scrolls_down( + self, items, null_display, viewport_dims, show_frames + ): + """FEED camera should move content down (y increases) at 1 row/frame.""" + camera = Camera.feed(speed=1.0) + camera.set_canvas_size(200, 100) + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + # Run for 10 frames with small delay between frames + # to ensure camera has time to move (dt calculation relies on time.perf_counter()) + import time + + for frame in range(10): + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + if frame < 9: # No need to sleep after last frame + time.sleep(0.02) # Wait 20ms so dt~0.02, camera moves ~1.2 rows + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame(frames[0], "FEED Camera - Frame 0", viewport_dims) + DisplayHelper.show_frame(frames[5], "FEED Camera - Frame 5", viewport_dims) + DisplayHelper.show_frame(frames[9], "FEED Camera - Frame 9", viewport_dims) + + # FEED mode: each frame y increases by speed*dt*60 + # At dt=1.0, speed=1.0: y increases by 60 per frame + # But clamp to canvas bounds (200) + # Frame 0: y=0, should show LINE 000 + # Frame 1: y=60, should show LINE 060 + + # Verify frame 0 contains ASCII art content (rendered from LINE 000) + # The text is converted to block characters, so check for non-empty frames + assert len(frames[0]) > 0, "Frame 0 should not be empty" + assert frames[0][0].strip() != "", "Frame 0 should have visible content" + + # Verify camera position changed between frames + # Feed mode moves 1 row per frame at speed=1.0 with dt~0.02 + # After 5 frames, camera should have moved down + assert camera.y > 0, f"Camera should have moved down, y={camera.y}" + + # Verify different frames show different content (camera is scrolling) + # Check that frame 0 and frame 5 are different + frame_0_str = "\n".join(frames[0]) + frame_5_str = "\n".join(frames[5]) + assert frame_0_str != frame_5_str, ( + "Frame 0 and Frame 5 should show different content" + ) + + +class TestScrollCamera: + """Test SCROLL mode: smooth vertical scrolling with float accumulation.""" + + def test_scroll_camera_smooth_movement( + self, items, null_display, viewport_dims, show_frames + ): + """SCROLL camera should move content smoothly with sub-integer precision.""" + camera = Camera.scroll(speed=0.5) + camera.set_canvas_size(0, 200) # Match viewport width for text wrapping + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + # Run for 20 frames + for frame in range(20): + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame( + frames[0], "SCROLL Camera - Frame 0", viewport_dims + ) + DisplayHelper.show_frame( + frames[10], "SCROLL Camera - Frame 10", viewport_dims + ) + + # SCROLL mode uses float accumulation for smooth scrolling + # At speed=0.5, dt=1.0: y increases by 0.5 * 60 = 30 pixels per frame + # Verify camera_y is increasing (which causes the scroll) + camera_y_values = [] + for frame in range(5): + # Get camera.y directly (not filtered context value) + pipeline.context.set("frame_number", frame) + pipeline.execute(items) + camera_y_values.append(camera.y) + + print(f"\nSCROLL test - camera_y positions: {camera_y_values}") + + # Verify camera_y is non-zero (camera is moving) + assert camera_y_values[-1] > 0, ( + "Camera should have scrolled down (camera_y > 0)" + ) + + # Verify camera_y is increasing + for i in range(len(camera_y_values) - 1): + assert camera_y_values[i + 1] >= camera_y_values[i], ( + f"Camera_y should be non-decreasing: {camera_y_values}" + ) + + +class TestHorizontalCamera: + """Test HORIZONTAL mode: left/right scrolling.""" + + def test_horizontal_camera_scrolls_right( + self, items, null_display, viewport_dims, show_frames + ): + """HORIZONTAL camera should move content right (x increases).""" + camera = Camera.horizontal(speed=1.0) + camera.set_canvas_size(200, 200) + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + for frame in range(10): + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame( + frames[0], "HORIZONTAL Camera - Frame 0", viewport_dims + ) + DisplayHelper.show_frame( + frames[5], "HORIZONTAL Camera - Frame 5", viewport_dims + ) + + # HORIZONTAL mode: x increases by speed*dt*60 + # At dt=1.0, speed=1.0: x increases by 60 per frame + # Frame 0: x=0 + # Frame 5: x=300 (clamped to canvas_width-viewport_width) + + # Verify frame 0 contains content (ASCII art of LINE 000) + assert len(frames[0]) > 0, "Frame 0 should not be empty" + assert frames[0][0].strip() != "", "Frame 0 should have visible content" + + # Verify camera x is increasing + print("\nHORIZONTAL test - camera positions:") + for i in range(10): + print(f" Frame {i}: x={camera.x}, y={camera.y}") + camera.update(1.0) + + # Verify camera moved + assert camera.x > 0, f"Camera should have moved right, x={camera.x}" + + +class TestOmniCamera: + """Test OMNI mode: diagonal scrolling (x and y increase together).""" + + def test_omni_camera_diagonal_movement( + self, items, null_display, viewport_dims, show_frames + ): + """OMNI camera should move content diagonally (both x and y increase).""" + camera = Camera.omni(speed=1.0) + camera.set_canvas_size(200, 200) + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + for frame in range(10): + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame(frames[0], "OMNI Camera - Frame 0", viewport_dims) + DisplayHelper.show_frame(frames[5], "OMNI Camera - Frame 5", viewport_dims) + + # OMNI mode: y increases by speed*dt*60, x increases by speed*dt*60*0.5 + # At dt=1.0, speed=1.0: y += 60, x += 30 + + # Verify frame 0 contains content (ASCII art) + assert len(frames[0]) > 0, "Frame 0 should not be empty" + assert frames[0][0].strip() != "", "Frame 0 should have visible content" + + print("\nOMNI test - camera positions:") + camera.reset() + for frame in range(5): + print(f" Frame {frame}: x={camera.x}, y={camera.y}") + camera.update(1.0) + + # Verify camera moved + assert camera.y > 0, f"Camera should have moved down, y={camera.y}" + + +class TestFloatingCamera: + """Test FLOATING mode: sinusoidal bobbing motion.""" + + def test_floating_camera_bobbing( + self, items, null_display, viewport_dims, show_frames + ): + """FLOATING camera should move content in a sinusoidal pattern.""" + camera = Camera.floating(speed=1.0) + camera.set_canvas_size(200, 200) + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + for frame in range(32): + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame( + frames[0], "FLOATING Camera - Frame 0", viewport_dims + ) + DisplayHelper.show_frame( + frames[8], "FLOATING Camera - Frame 8 (quarter cycle)", viewport_dims + ) + DisplayHelper.show_frame( + frames[16], "FLOATING Camera - Frame 16 (half cycle)", viewport_dims + ) + + # FLOATING mode: y = sin(time*2) * speed * 30 + # Period: 2π / 2 = π ≈ 3.14 seconds (or ~3.14 frames at dt=1.0) + # Full cycle ~32 frames + + print("\nFLOATING test - sinusoidal motion:") + camera.reset() + for frame in range(16): + print(f" Frame {frame}: y={camera.y}, x={camera.x}") + camera.update(1.0) + + # Verify y oscillates around 0 + camera.reset() + camera.update(1.0) # Frame 1 + y1 = camera.y + camera.update(1.0) # Frame 2 + y2 = camera.y + camera.update(1.0) # Frame 3 + y3 = camera.y + + # After a few frames, y should oscillate (not monotonic) + assert y1 != y2 or y2 != y3, "FLOATING camera should oscillate" + + +class TestBounceCamera: + """Test BOUNCE mode: bouncing DVD-style motion.""" + + def test_bounce_camera_reverses_at_edges( + self, items, null_display, viewport_dims, show_frames + ): + """BOUNCE camera should reverse direction when hitting canvas edges.""" + camera = Camera.bounce(speed=5.0) # Faster for quicker test + # Set zoom > 1.0 so viewport is smaller than canvas, allowing movement + camera.set_zoom(2.0) # Zoom out 2x, viewport is half the canvas size + camera.set_canvas_size(400, 400) + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + for frame in range(50): + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame( + frames[0], "BOUNCE Camera - Frame 0", viewport_dims + ) + DisplayHelper.show_frame( + frames[25], "BOUNCE Camera - Frame 25", viewport_dims + ) + + # BOUNCE mode: moves until it hits edge, then reverses + # Verify the camera moves and changes direction + + print("\nBOUNCE test - bouncing motion:") + camera.reset() + camera.set_zoom(2.0) # Reset also resets zoom, so set it again + for frame in range(20): + print(f" Frame {frame}: x={camera.x}, y={camera.y}") + camera.update(1.0) + + # Check that camera hits bounds and reverses + camera.reset() + camera.set_zoom(2.0) # Reset also resets zoom, so set it again + for _ in range(51): # Odd number ensures ending at opposite corner + camera.update(1.0) + + # Camera should have hit an edge and reversed direction + # With 400x400 canvas, viewport 200x200 (zoom=2), max_x = 200, max_y = 200 + # Starting at (0,0), after 51 updates it should be at (200, 200) + max_x = max(0, camera.canvas_width - camera.viewport_width) + print(f"BOUNCE camera final position: x={camera.x}, y={camera.y}") + assert camera.x == max_x, ( + f"Camera should be at max_x ({max_x}), got x={camera.x}" + ) + + # Check bounds are respected + vw = camera.viewport_width + vh = camera.viewport_height + assert camera.x >= 0 and camera.x <= camera.canvas_width - vw + assert camera.y >= 0 and camera.y <= camera.canvas_height - vh + + +class TestRadialCamera: + """Test RADIAL mode: polar coordinate scanning (rotation around center).""" + + def test_radial_camera_rotates_around_center( + self, items, null_display, viewport_dims, show_frames + ): + """RADIAL camera should rotate around the center of the canvas.""" + camera = Camera.radial(speed=0.5) + camera.set_canvas_size(200, 200) + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + for frame in range(32): # 32 frames = 2π at ~0.2 rad/frame + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame( + frames[0], "RADIAL Camera - Frame 0", viewport_dims + ) + DisplayHelper.show_frame( + frames[8], "RADIAL Camera - Frame 8 (quarter turn)", viewport_dims + ) + DisplayHelper.show_frame( + frames[16], "RADIAL Camera - Frame 16 (half turn)", viewport_dims + ) + DisplayHelper.show_frame( + frames[24], "RADIAL Camera - Frame 24 (3/4 turn)", viewport_dims + ) + + # RADIAL mode: rotates around center with smooth angular motion + # At speed=0.5: theta increases by ~0.2 rad/frame (0.5 * dt * 1.0) + + print("\nRADIAL test - rotational motion:") + camera.reset() + for frame in range(32): + theta_deg = (camera._theta_float * 180 / math.pi) % 360 + print( + f" Frame {frame}: theta={theta_deg:.1f}°, x={camera.x}, y={camera.y}" + ) + camera.update(1.0) + + # Verify rotation occurs (angle should change) + camera.reset() + theta_start = camera._theta_float + camera.update(1.0) # Frame 1 + theta_mid = camera._theta_float + camera.update(1.0) # Frame 2 + theta_end = camera._theta_float + + assert theta_mid > theta_start, "Theta should increase (rotation)" + assert theta_end > theta_mid, "Theta should continue increasing" + + def test_radial_camera_with_sensor_integration( + self, items, null_display, viewport_dims, show_frames + ): + """RADIAL camera can be driven by external sensor (OSC integration test).""" + from engine.sensors.oscillator import ( + OscillatorSensor, + register_oscillator_sensor, + ) + + # Create an oscillator sensor for testing + register_oscillator_sensor(name="test_osc", waveform="sine", frequency=0.5) + osc = OscillatorSensor(name="test_osc", waveform="sine", frequency=0.5) + + camera = Camera.radial(speed=0.3) + camera.set_canvas_size(200, 200) + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + # Run frames while modulating camera with oscillator + for frame in range(32): + # Read oscillator value and set as radial input + osc_value = osc.read() + if osc_value: + camera.set_radial_input(osc_value.value) + + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame( + frames[0], "RADIAL+OSC Camera - Frame 0", viewport_dims + ) + DisplayHelper.show_frame( + frames[8], "RADIAL+OSC Camera - Frame 8", viewport_dims + ) + DisplayHelper.show_frame( + frames[16], "RADIAL+OSC Camera - Frame 16", viewport_dims + ) + + print("\nRADIAL+OSC test - sensor-driven rotation:") + osc.start() + camera.reset() + for frame in range(16): + osc_value = osc.read() + if osc_value: + camera.set_radial_input(osc_value.value) + camera.update(1.0) + theta_deg = (camera._theta_float * 180 / math.pi) % 360 + print( + f" Frame {frame}: osc={osc_value.value if osc_value else 0:.3f}, theta={theta_deg:.1f}°" + ) + + # Verify camera position changes when driven by sensor + camera.reset() + x_start = camera.x + camera.update(1.0) + x_mid = camera.x + assert x_start != x_mid, "Camera should move when driven by oscillator" + + osc.stop() + + def test_radial_camera_with_direct_angle_setting( + self, items, null_display, viewport_dims, show_frames + ): + """RADIAL camera can have angle set directly for OSC integration.""" + camera = Camera.radial(speed=0.0) # No auto-rotation + camera.set_canvas_size(200, 200) + camera._r_float = 80.0 # Set initial radius to see movement + + pipeline = create_pipeline_with_camera( + camera, items, null_display, viewport_dims + ) + assert pipeline is not None, "Pipeline creation failed" + + null_display.start_recording() + + # Set angle directly to sweep through full rotation + for frame in range(32): + angle = (frame / 32) * 2 * math.pi # 0 to 2π over 32 frames + camera.set_radial_angle(angle) + camera.update(1.0) # Must update to convert polar to Cartesian + + pipeline.context.set("frame_number", frame) + result = pipeline.execute(items) + assert result.success, f"Frame {frame} execution failed" + + null_display.stop_recording() + frames = null_display.get_frames() + + if show_frames: + DisplayHelper.show_frame( + frames[0], "RADIAL Direct Angle - Frame 0", viewport_dims + ) + DisplayHelper.show_frame( + frames[8], "RADIAL Direct Angle - Frame 8", viewport_dims + ) + DisplayHelper.show_frame( + frames[16], "RADIAL Direct Angle - Frame 16", viewport_dims + ) + + print("\nRADIAL Direct Angle test - sweeping rotation:") + for frame in range(32): + angle = (frame / 32) * 2 * math.pi + camera.set_radial_angle(angle) + camera.update(1.0) # Update converts angle to x,y position + theta_deg = angle * 180 / math.pi + print( + f" Frame {frame}: set_angle={theta_deg:.1f}°, actual_x={camera.x}, actual_y={camera.y}" + ) + + # Verify camera position changes as angle sweeps + camera.reset() + camera._r_float = 80.0 # Set radius for testing + camera.set_radial_angle(0) + camera.update(1.0) + x0 = camera.x + camera.set_radial_angle(math.pi / 2) + camera.update(1.0) + x90 = camera.x + assert x0 != x90, ( + f"Camera position should change with angle (x0={x0}, x90={x90})" + ) + + +class TestCameraModeEnum: + """Test CameraMode enum integrity.""" + + def test_all_modes_exist(self): + """Verify all camera modes are defined.""" + modes = [m.name for m in CameraMode] + expected = [ + "FEED", + "SCROLL", + "HORIZONTAL", + "OMNI", + "FLOATING", + "BOUNCE", + "RADIAL", + ] + + for mode in expected: + assert mode in modes, f"CameraMode.{mode} should exist" + + def test_radial_mode_exists(self): + """Verify RADIAL mode is properly defined.""" + assert CameraMode.RADIAL is not None + assert isinstance(CameraMode.RADIAL, CameraMode) + assert CameraMode.RADIAL.name == "RADIAL" + + +class TestCameraFactoryMethods: + """Test camera factory methods create proper camera instances.""" + + def test_radial_factory(self): + """RADIAL factory should create a camera with correct mode.""" + camera = Camera.radial(speed=2.0) + assert camera.mode == CameraMode.RADIAL + assert camera.speed == 2.0 + assert hasattr(camera, "_r_float") + assert hasattr(camera, "_theta_float") + + def test_radial_factory_initializes_state(self): + """RADIAL factory should initialize radial state.""" + camera = Camera.radial() + assert camera._r_float == 0.0 + assert camera._theta_float == 0.0 + + +class TestCameraStateSaveRestore: + """Test camera state can be saved and restored (for hot-rebuild).""" + + def test_radial_camera_state_save(self): + """RADIAL camera should save polar coordinate state.""" + camera = Camera.radial() + camera._theta_float = math.pi / 4 + camera._r_float = 50.0 + + # Save state via CameraStage adapter + from engine.pipeline.adapters.camera import CameraStage + + stage = CameraStage(camera) + + state = stage.save_state() + assert "_theta_float" in state + assert "_r_float" in state + assert state["_theta_float"] == math.pi / 4 + assert state["_r_float"] == 50.0 + + def test_radial_camera_state_restore(self): + """RADIAL camera should restore polar coordinate state.""" + camera1 = Camera.radial() + camera1._theta_float = math.pi / 3 + camera1._r_float = 75.0 + + from engine.pipeline.adapters.camera import CameraStage + + stage1 = CameraStage(camera1) + state = stage1.save_state() + + # Create new camera and restore + camera2 = Camera.radial() + stage2 = CameraStage(camera2) + stage2.restore_state(state) + + assert abs(camera2._theta_float - math.pi / 3) < 0.001 + assert abs(camera2._r_float - 75.0) < 0.001 + + +class TestCameraViewportApplication: + """Test camera.apply() properly slices buffers.""" + + def test_radial_camera_viewport_slicing(self): + """RADIAL camera should properly slice buffer based on position.""" + camera = Camera.radial(speed=0.5) + camera.set_canvas_size(200, 200) + + # Update to move camera + camera.update(1.0) + + # Create test buffer with 200 lines + buffer = [f"LINE {i:03d}" for i in range(200)] + + # Apply camera viewport (15 lines high) + result = camera.apply(buffer, viewport_width=40, viewport_height=15) + + # Result should be exactly 15 lines + assert len(result) == 15 + + # Each line should be 40 characters (padded or truncated) + for line in result: + assert len(line) <= 40 diff --git a/tests/test_display.py b/tests/test_display.py index e6b1275..20215e4 100644 --- a/tests/test_display.py +++ b/tests/test_display.py @@ -120,12 +120,16 @@ class TestTerminalDisplay: def test_get_dimensions_returns_cached_value(self): """get_dimensions returns cached dimensions for stability.""" - display = TerminalDisplay() - display.init(80, 24) + import os + from unittest.mock import patch - # First call should set cache - d1 = display.get_dimensions() - assert d1 == (80, 24) + # Mock terminal size to ensure deterministic dimensions + term_size = os.terminal_size((80, 24)) + with patch("os.get_terminal_size", return_value=term_size): + display = TerminalDisplay() + display.init(80, 24) + d1 = display.get_dimensions() + assert d1 == (80, 24) def test_show_clears_screen_before_each_frame(self): """show clears previous frame to prevent visual wobble. diff --git a/tests/test_framebuffer_acceptance.py b/tests/test_framebuffer_acceptance.py new file mode 100644 index 0000000..8f42b6a --- /dev/null +++ b/tests/test_framebuffer_acceptance.py @@ -0,0 +1,195 @@ +"""Integration test: FrameBufferStage in the pipeline.""" + +import queue + +from engine.data_sources.sources import ListDataSource, SourceItem +from engine.effects.types import EffectConfig +from engine.pipeline import Pipeline, PipelineConfig +from engine.pipeline.adapters import ( + DataSourceStage, + DisplayStage, + SourceItemsToBufferStage, +) +from engine.pipeline.core import PipelineContext +from engine.pipeline.stages.framebuffer import FrameBufferStage + + +class QueueDisplay: + """Stub display that captures every frame into a queue.""" + + def __init__(self): + self.frames: queue.Queue[list[str]] = queue.Queue() + self.width = 80 + self.height = 24 + self._init_called = False + + def init(self, width: int, height: int, reuse: bool = False) -> None: + self.width = width + self.height = height + self._init_called = True + + def show(self, buffer: list[str], border: bool = False) -> None: + self.frames.put(list(buffer)) + + def clear(self) -> None: + pass + + def cleanup(self) -> None: + pass + + def get_dimensions(self) -> tuple[int, int]: + return (self.width, self.height) + + +def _build_pipeline( + items: list[SourceItem], + history_depth: int = 5, + width: int = 80, + height: int = 24, +) -> tuple[Pipeline, QueueDisplay, PipelineContext]: + """Build pipeline: source -> render -> framebuffer -> display.""" + display = QueueDisplay() + + ctx = PipelineContext() + ctx.set("items", items) + + pipeline = Pipeline( + config=PipelineConfig(enable_metrics=True), + context=ctx, + ) + + # Source + source = ListDataSource(items, name="test-source") + pipeline.add_stage("source", DataSourceStage(source, name="test-source")) + + # Render + pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + + # Framebuffer + framebuffer = FrameBufferStage(name="default", history_depth=history_depth) + pipeline.add_stage("framebuffer", framebuffer) + + # Display + pipeline.add_stage("display", DisplayStage(display, name="queue")) + + pipeline.build() + pipeline.initialize() + + return pipeline, display, ctx + + +class TestFrameBufferAcceptance: + """Test FrameBufferStage in a full pipeline.""" + + def test_framebuffer_populates_history(self): + """After several frames, framebuffer should have history stored.""" + items = [ + SourceItem(content="Frame\nBuffer\nTest", source="test", timestamp="0") + ] + pipeline, display, ctx = _build_pipeline(items, history_depth=5) + + # Run 3 frames + for i in range(3): + result = pipeline.execute([]) + assert result.success, f"Pipeline failed at frame {i}: {result.error}" + + # Check framebuffer history in context + history = ctx.get("framebuffer.default.history") + assert history is not None, "Framebuffer history not found in context" + assert len(history) == 3, f"Expected 3 history frames, got {len(history)}" + + def test_framebuffer_respects_depth(self): + """Framebuffer should not exceed configured history depth.""" + items = [SourceItem(content="Depth\nTest", source="test", timestamp="0")] + pipeline, display, ctx = _build_pipeline(items, history_depth=3) + + # Run 5 frames + for i in range(5): + result = pipeline.execute([]) + assert result.success + + history = ctx.get("framebuffer.default.history") + assert history is not None + assert len(history) == 3, f"Expected depth 3, got {len(history)}" + + def test_framebuffer_current_intensity(self): + """Framebuffer should compute current intensity map.""" + items = [SourceItem(content="Intensity\nMap", source="test", timestamp="0")] + pipeline, display, ctx = _build_pipeline(items, history_depth=5) + + # Run at least one frame + result = pipeline.execute([]) + assert result.success + + intensity = ctx.get("framebuffer.default.current_intensity") + assert intensity is not None, "No intensity map in context" + # Intensity should be a list of one value per line? Actually it's a 2D array or list? + # Let's just check it's non-empty + assert len(intensity) > 0, "Intensity map is empty" + + def test_framebuffer_get_frame(self): + """Should be able to retrieve specific frames from history.""" + items = [SourceItem(content="Retrieve\nFrame", source="test", timestamp="0")] + pipeline, display, ctx = _build_pipeline(items, history_depth=5) + + # Run 2 frames + for i in range(2): + result = pipeline.execute([]) + assert result.success + + # Retrieve frame 0 (most recent) + recent = pipeline.get_stage("framebuffer").get_frame(0, ctx) + assert recent is not None, "Cannot retrieve recent frame" + assert len(recent) > 0, "Recent frame is empty" + + # Retrieve frame 1 (previous) + previous = pipeline.get_stage("framebuffer").get_frame(1, ctx) + assert previous is not None, "Cannot retrieve previous frame" + + def test_framebuffer_with_motionblur_effect(self): + """MotionBlurEffect should work when depending on framebuffer.""" + from engine.effects.plugins.motionblur import MotionBlurEffect + from engine.pipeline.adapters import EffectPluginStage + + items = [SourceItem(content="Motion\nBlur", source="test", timestamp="0")] + display = QueueDisplay() + ctx = PipelineContext() + ctx.set("items", items) + + pipeline = Pipeline( + config=PipelineConfig(enable_metrics=True), + context=ctx, + ) + + source = ListDataSource(items, name="test") + pipeline.add_stage("source", DataSourceStage(source, name="test")) + pipeline.add_stage("render", SourceItemsToBufferStage(name="render")) + + framebuffer = FrameBufferStage(name="default", history_depth=3) + pipeline.add_stage("framebuffer", framebuffer) + + motionblur = MotionBlurEffect() + motionblur.configure(EffectConfig(enabled=True, intensity=0.5)) + pipeline.add_stage( + "motionblur", + EffectPluginStage( + motionblur, + name="motionblur", + dependencies={"framebuffer.history.default"}, + ), + ) + + pipeline.add_stage("display", DisplayStage(display, name="queue")) + + pipeline.build() + pipeline.initialize() + + # Run a few frames + for i in range(5): + result = pipeline.execute([]) + assert result.success, f"Motion blur pipeline failed at frame {i}" + + # Check that history exists + history = ctx.get("framebuffer.default.history") + assert history is not None + assert len(history) > 0 diff --git a/tests/test_framebuffer_stage.py b/tests/test_framebuffer_stage.py index ef0ba17..be3c81d 100644 --- a/tests/test_framebuffer_stage.py +++ b/tests/test_framebuffer_stage.py @@ -30,9 +30,9 @@ class TestFrameBufferStage: assert stage.config.history_depth == 2 def test_capabilities(self): - """Stage provides framebuffer.history capability.""" + """Stage provides framebuffer.history.{name} capability.""" stage = FrameBufferStage() - assert "framebuffer.history" in stage.capabilities + assert "framebuffer.history.default" in stage.capabilities def test_dependencies(self): """Stage depends on render.output.""" @@ -46,15 +46,15 @@ class TestFrameBufferStage: assert DataType.TEXT_BUFFER in stage.outlet_types def test_init_context(self): - """init initializes context state.""" + """init initializes context state with prefixed keys.""" stage = FrameBufferStage() ctx = make_ctx() result = stage.init(ctx) assert result is True - assert ctx.get("frame_history") == [] - assert ctx.get("intensity_history") == [] + assert ctx.get("framebuffer.default.history") == [] + assert ctx.get("framebuffer.default.intensity_history") == [] def test_process_stores_buffer_in_history(self): """process stores buffer in history.""" @@ -66,7 +66,7 @@ class TestFrameBufferStage: result = stage.process(buffer, ctx) assert result == buffer # Pass-through - history = ctx.get("frame_history") + history = ctx.get("framebuffer.default.history") assert len(history) == 1 assert history[0] == buffer @@ -79,7 +79,7 @@ class TestFrameBufferStage: buffer = ["hello world", "test line", ""] stage.process(buffer, ctx) - intensity = ctx.get("current_intensity") + intensity = ctx.get("framebuffer.default.current_intensity") assert intensity is not None assert len(intensity) == 3 # Three rows # Non-empty lines should have intensity > 0 @@ -90,7 +90,7 @@ class TestFrameBufferStage: def test_process_keeps_multiple_frames(self): """process keeps configured depth of frames.""" - config = FrameBufferConfig(history_depth=3) + config = FrameBufferConfig(history_depth=3, name="test") stage = FrameBufferStage(config) ctx = make_ctx() stage.init(ctx) @@ -100,7 +100,7 @@ class TestFrameBufferStage: buffer = [f"frame {i}"] stage.process(buffer, ctx) - history = ctx.get("frame_history") + history = ctx.get("framebuffer.test.history") assert len(history) == 3 # Only last 3 kept # Should be in reverse chronological order (most recent first) assert history[0] == ["frame 4"] @@ -109,7 +109,7 @@ class TestFrameBufferStage: def test_process_keeps_intensity_sync(self): """process keeps intensity history in sync with frame history.""" - config = FrameBufferConfig(history_depth=3) + config = FrameBufferConfig(history_depth=3, name="sync") stage = FrameBufferStage(config) ctx = make_ctx() stage.init(ctx) @@ -122,8 +122,9 @@ class TestFrameBufferStage: for buf in buffers: stage.process(buf, ctx) - frame_hist = ctx.get("frame_history") - intensity_hist = ctx.get("intensity_history") + prefix = "framebuffer.sync" + frame_hist = ctx.get(f"{prefix}.history") + intensity_hist = ctx.get(f"{prefix}.intensity_history") assert len(frame_hist) == len(intensity_hist) == 3 # Each frame's intensity should match @@ -207,7 +208,7 @@ class TestFrameBufferStage: """process is thread-safe.""" from threading import Thread - stage = FrameBufferStage() + stage = FrameBufferStage(name="threadtest") ctx = make_ctx() stage.init(ctx) @@ -216,7 +217,7 @@ class TestFrameBufferStage: def worker(idx): buffer = [f"thread {idx}"] stage.process(buffer, ctx) - results.append(len(ctx.get("frame_history", []))) + results.append(len(ctx.get("framebuffer.threadtest.history", []))) threads = [Thread(target=worker, args=(i,)) for i in range(10)] for t in threads: @@ -225,7 +226,7 @@ class TestFrameBufferStage: t.join() # All threads should see consistent state - assert len(ctx.get("frame_history")) <= 2 # Depth limit + assert len(ctx.get("framebuffer.threadtest.history")) <= 2 # Depth limit # All worker threads should have completed without errors assert len(results) == 10 diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 22e86fa..ce90b42 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -129,7 +129,7 @@ class TestPipeline: pipeline.add_stage("source", mock_source) pipeline.add_stage("display", mock_display) - pipeline.build() + pipeline.build(auto_inject=False) assert pipeline._initialized is True assert "source" in pipeline.execution_order @@ -182,7 +182,7 @@ class TestPipeline: pipeline.add_stage("source", mock_source) pipeline.add_stage("effect", mock_effect) pipeline.add_stage("display", mock_display) - pipeline.build() + pipeline.build(auto_inject=False) result = pipeline.execute(None) @@ -218,7 +218,7 @@ class TestPipeline: pipeline.add_stage("source", mock_source) pipeline.add_stage("failing", mock_failing) - pipeline.build() + pipeline.build(auto_inject=False) result = pipeline.execute(None) @@ -254,7 +254,7 @@ class TestPipeline: pipeline.add_stage("source", mock_source) pipeline.add_stage("optional", mock_optional) - pipeline.build() + pipeline.build(auto_inject=False) result = pipeline.execute(None) @@ -302,7 +302,7 @@ class TestCapabilityBasedDependencies: pipeline = Pipeline() pipeline.add_stage("headlines", SourceStage()) pipeline.add_stage("render", RenderStage()) - pipeline.build() + pipeline.build(auto_inject=False) assert "headlines" in pipeline.execution_order assert "render" in pipeline.execution_order @@ -334,7 +334,7 @@ class TestCapabilityBasedDependencies: pipeline.add_stage("render", RenderStage()) try: - pipeline.build() + pipeline.build(auto_inject=False) raise AssertionError("Should have raised StageError") except StageError as e: assert "Missing capabilities" in e.message @@ -394,7 +394,7 @@ class TestCapabilityBasedDependencies: pipeline.add_stage("headlines", SourceA()) pipeline.add_stage("poetry", SourceB()) pipeline.add_stage("display", DisplayStage()) - pipeline.build() + pipeline.build(auto_inject=False) assert pipeline.execution_order[0] == "headlines" @@ -791,7 +791,7 @@ class TestFullPipeline: pipeline.add_stage("b", StageB()) try: - pipeline.build() + pipeline.build(auto_inject=False) raise AssertionError("Should detect circular dependency") except Exception: pass @@ -815,7 +815,7 @@ class TestPipelineMetrics: config = PipelineConfig(enable_metrics=True) pipeline = Pipeline(config=config) pipeline.add_stage("dummy", DummyStage()) - pipeline.build() + pipeline.build(auto_inject=False) pipeline.execute("test_data") @@ -838,7 +838,7 @@ class TestPipelineMetrics: config = PipelineConfig(enable_metrics=False) pipeline = Pipeline(config=config) pipeline.add_stage("dummy", DummyStage()) - pipeline.build() + pipeline.build(auto_inject=False) pipeline.execute("test_data") @@ -860,7 +860,7 @@ class TestPipelineMetrics: config = PipelineConfig(enable_metrics=True) pipeline = Pipeline(config=config) pipeline.add_stage("dummy", DummyStage()) - pipeline.build() + pipeline.build(auto_inject=False) pipeline.execute("test1") pipeline.execute("test2") @@ -964,7 +964,7 @@ class TestOverlayStages: pipeline.add_stage("overlay_a", OverlayStageA()) pipeline.add_stage("overlay_b", OverlayStageB()) pipeline.add_stage("regular", RegularStage()) - pipeline.build() + pipeline.build(auto_inject=False) overlays = pipeline.get_overlay_stages() assert len(overlays) == 2 @@ -1006,7 +1006,7 @@ class TestOverlayStages: pipeline = Pipeline() pipeline.add_stage("regular", RegularStage()) pipeline.add_stage("overlay", OverlayStage()) - pipeline.build() + pipeline.build(auto_inject=False) pipeline.execute("data") @@ -1070,7 +1070,7 @@ class TestOverlayStages: pipeline = Pipeline() pipeline.add_stage("test", TestStage()) - pipeline.build() + pipeline.build(auto_inject=False) assert pipeline.get_stage_type("test") == "overlay" @@ -1092,7 +1092,7 @@ class TestOverlayStages: pipeline = Pipeline() pipeline.add_stage("test", TestStage()) - pipeline.build() + pipeline.build(auto_inject=False) assert pipeline.get_render_order("test") == 42 @@ -1142,7 +1142,7 @@ class TestInletOutletTypeValidation: pipeline.add_stage("consumer", ConsumerStage()) with pytest.raises(StageError) as exc_info: - pipeline.build() + pipeline.build(auto_inject=False) assert "Type mismatch" in str(exc_info.value) assert "TEXT_BUFFER" in str(exc_info.value) @@ -1190,7 +1190,7 @@ class TestInletOutletTypeValidation: pipeline.add_stage("consumer", ConsumerStage()) # Should not raise - pipeline.build() + pipeline.build(auto_inject=False) def test_any_type_accepts_everything(self): """DataType.ANY accepts any upstream type.""" @@ -1234,7 +1234,7 @@ class TestInletOutletTypeValidation: pipeline.add_stage("consumer", ConsumerStage()) # Should not raise because consumer accepts ANY - pipeline.build() + pipeline.build(auto_inject=False) def test_multiple_compatible_types(self): """Stage can declare multiple inlet types.""" @@ -1278,7 +1278,7 @@ class TestInletOutletTypeValidation: pipeline.add_stage("consumer", ConsumerStage()) # Should not raise because consumer accepts SOURCE_ITEMS - pipeline.build() + pipeline.build(auto_inject=False) def test_display_must_accept_text_buffer(self): """Display stages must accept TEXT_BUFFER type.""" @@ -1302,7 +1302,7 @@ class TestInletOutletTypeValidation: pipeline.add_stage("display", BadDisplayStage()) with pytest.raises(StageError) as exc_info: - pipeline.build() + pipeline.build(auto_inject=False) assert "display" in str(exc_info.value).lower() @@ -1349,7 +1349,7 @@ class TestPipelineMutation: """add_stage() initializes stage when pipeline already initialized.""" pipeline = Pipeline() mock_stage = self._create_mock_stage("test") - pipeline.build() + pipeline.build(auto_inject=False) pipeline._initialized = True pipeline.add_stage("test", mock_stage, initialize=True) @@ -1478,7 +1478,7 @@ class TestPipelineMutation: pipeline.add_stage("a", stage_a, initialize=False) pipeline.add_stage("b", stage_b, initialize=False) pipeline.add_stage("c", stage_c, initialize=False) - pipeline.build() + pipeline.build(auto_inject=False) result = pipeline.move_stage("a", after="c") @@ -1497,7 +1497,7 @@ class TestPipelineMutation: pipeline.add_stage("a", stage_a, initialize=False) pipeline.add_stage("b", stage_b, initialize=False) pipeline.add_stage("c", stage_c, initialize=False) - pipeline.build() + pipeline.build(auto_inject=False) result = pipeline.move_stage("c", before="a") @@ -1512,7 +1512,7 @@ class TestPipelineMutation: stage = self._create_mock_stage("test") pipeline.add_stage("test", stage, initialize=False) - pipeline.build() + pipeline.build(auto_inject=False) result = pipeline.move_stage("nonexistent", after="test") @@ -1613,7 +1613,7 @@ class TestPipelineMutation: pipeline.add_stage("s1", stage1, initialize=False) pipeline.add_stage("s2", stage2, initialize=False) - pipeline.build() + pipeline.build(auto_inject=False) info = pipeline.get_pipeline_info() @@ -1640,7 +1640,7 @@ class TestPipelineMutation: pipeline.add_stage("source", source, initialize=False) pipeline.add_stage("effect", effect, initialize=False) pipeline.add_stage("display", display, initialize=False) - pipeline.build() + pipeline.build(auto_inject=False) assert pipeline.execution_order == ["source", "effect", "display"] @@ -1664,7 +1664,7 @@ class TestPipelineMutation: pipeline.add_stage("source", source, initialize=False) pipeline.add_stage("display", display, initialize=False) - pipeline.build() + pipeline.build(auto_inject=False) new_stage = self._create_mock_stage( "effect", "effect", capabilities={"effect"}, dependencies={"source"} @@ -1757,7 +1757,7 @@ class TestPipelineMutation: pipeline.add_stage("source", TestSource(), initialize=False) pipeline.add_stage("effect", TestEffect(), initialize=False) pipeline.add_stage("display", TestDisplay(), initialize=False) - pipeline.build() + pipeline.build(auto_inject=False) pipeline.initialize() result = pipeline.execute(None) diff --git a/tests/test_pipeline_e2e.py b/tests/test_pipeline_e2e.py index 6bbd897..39b2d30 100644 --- a/tests/test_pipeline_e2e.py +++ b/tests/test_pipeline_e2e.py @@ -21,6 +21,7 @@ from engine.pipeline.adapters import ( EffectPluginStage, FontStage, SourceItemsToBufferStage, + ViewportFilterStage, ) from engine.pipeline.core import PipelineContext from engine.pipeline.params import PipelineParams @@ -129,7 +130,28 @@ def _build_pipeline( # Render stage if use_font_stage: + # FontStage requires viewport_filter stage which requires camera state + from engine.camera import Camera + from engine.pipeline.adapters import CameraClockStage, CameraStage + + camera = Camera.scroll(speed=0.0) + camera.set_canvas_size(200, 200) + + # CameraClockStage updates camera state, must come before viewport_filter + pipeline.add_stage( + "camera_update", CameraClockStage(camera, name="camera-clock") + ) + + # ViewportFilterStage requires camera.state + pipeline.add_stage( + "viewport_filter", ViewportFilterStage(name="viewport-filter") + ) + + # FontStage converts items to buffer pipeline.add_stage("render", FontStage(name="font")) + + # CameraStage applies viewport transformation to rendered buffer + pipeline.add_stage("camera", CameraStage(camera, name="static")) else: pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) diff --git a/tests/test_pipeline_rebuild.py b/tests/test_pipeline_rebuild.py new file mode 100644 index 0000000..dd62590 --- /dev/null +++ b/tests/test_pipeline_rebuild.py @@ -0,0 +1,405 @@ +""" +Integration tests for pipeline hot-rebuild and state preservation. + +Tests: +1. Viewport size control via --viewport flag +2. NullDisplay recording and save/load functionality +3. Pipeline state preservation during hot-rebuild +""" + +import json +import sys +import tempfile +from pathlib import Path + +import pytest + +sys.path.insert(0, str(Path(__file__).parent.parent)) + +from engine.display import DisplayRegistry +from engine.display.backends.null import NullDisplay +from engine.display.backends.replay import ReplayDisplay +from engine.effects import get_registry +from engine.fetch import load_cache +from engine.pipeline import Pipeline, PipelineConfig, PipelineContext +from engine.pipeline.adapters import ( + EffectPluginStage, + FontStage, + ViewportFilterStage, + create_stage_from_display, + create_stage_from_effect, +) +from engine.pipeline.params import PipelineParams + + +@pytest.fixture +def viewport_dims(): + """Small viewport dimensions for testing.""" + return (40, 15) + + +@pytest.fixture +def items(): + """Load cached source items.""" + items = load_cache() + if not items: + pytest.skip("No fixture cache available") + return items + + +@pytest.fixture +def null_display(viewport_dims): + """Create a NullDisplay for testing.""" + display = DisplayRegistry.create("null") + display.init(viewport_dims[0], viewport_dims[1]) + return display + + +@pytest.fixture +def pipeline_with_null_display(items, null_display): + """Create a pipeline with NullDisplay for testing.""" + import engine.effects.plugins as effects_plugins + + effects_plugins.discover_plugins() + + width, height = null_display.width, null_display.height + params = PipelineParams() + params.viewport_width = width + params.viewport_height = height + + config = PipelineConfig( + source="fixture", + display="null", + camera="scroll", + effects=["noise", "fade"], + ) + + pipeline = Pipeline(config=config, context=PipelineContext()) + + from engine.camera import Camera + from engine.data_sources.sources import ListDataSource + from engine.pipeline.adapters import CameraClockStage, CameraStage, DataSourceStage + + list_source = ListDataSource(items, name="fixture") + pipeline.add_stage("source", DataSourceStage(list_source, name="fixture")) + + # Add camera stages (required by ViewportFilterStage) + camera = Camera.scroll(speed=0.3) + camera.set_canvas_size(200, 200) + pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock")) + pipeline.add_stage("camera", CameraStage(camera, name="scroll")) + + pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter")) + pipeline.add_stage("font", FontStage(name="font")) + + effect_registry = get_registry() + for effect_name in config.effects: + effect = effect_registry.get(effect_name) + if effect: + pipeline.add_stage( + f"effect_{effect_name}", + create_stage_from_effect(effect, effect_name), + ) + + pipeline.add_stage("display", create_stage_from_display(null_display, "null")) + pipeline.build() + + if not pipeline.initialize(): + pytest.fail("Failed to initialize pipeline") + + ctx = pipeline.context + ctx.params = params + ctx.set("display", null_display) + ctx.set("items", items) + ctx.set("pipeline", pipeline) + ctx.set("pipeline_order", pipeline.execution_order) + ctx.set("camera_y", 0) + + yield pipeline, params, null_display + + pipeline.cleanup() + null_display.cleanup() + + +class TestNullDisplayRecording: + """Tests for NullDisplay recording functionality.""" + + def test_null_display_initialization(self, viewport_dims): + """NullDisplay initializes with correct dimensions.""" + display = NullDisplay() + display.init(viewport_dims[0], viewport_dims[1]) + assert display.width == viewport_dims[0] + assert display.height == viewport_dims[1] + + def test_start_stop_recording(self, null_display): + """NullDisplay can start and stop recording.""" + assert not null_display._is_recording + + null_display.start_recording() + assert null_display._is_recording is True + + null_display.stop_recording() + assert null_display._is_recording is False + + def test_record_frames(self, null_display, pipeline_with_null_display): + """NullDisplay records frames when recording is enabled.""" + pipeline, params, display = pipeline_with_null_display + + display.start_recording() + assert len(display._recorded_frames) == 0 + + for frame in range(5): + params.frame_number = frame + pipeline.context.params = params + pipeline.execute([]) + + assert len(display._recorded_frames) == 5 + + def test_get_frames(self, null_display, pipeline_with_null_display): + """NullDisplay.get_frames() returns recorded buffers.""" + pipeline, params, display = pipeline_with_null_display + + display.start_recording() + + for frame in range(3): + params.frame_number = frame + pipeline.context.params = params + pipeline.execute([]) + + frames = display.get_frames() + assert len(frames) == 3 + assert all(isinstance(f, list) for f in frames) + + def test_clear_recording(self, null_display, pipeline_with_null_display): + """NullDisplay.clear_recording() clears recorded frames.""" + pipeline, params, display = pipeline_with_null_display + + display.start_recording() + for frame in range(3): + params.frame_number = frame + pipeline.context.params = params + pipeline.execute([]) + + assert len(display._recorded_frames) == 3 + + display.clear_recording() + assert len(display._recorded_frames) == 0 + + def test_save_load_recording(self, null_display, pipeline_with_null_display): + """NullDisplay can save and load recordings.""" + pipeline, params, display = pipeline_with_null_display + + display.start_recording() + for frame in range(3): + params.frame_number = frame + pipeline.context.params = params + pipeline.execute([]) + + with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f: + temp_path = f.name + + try: + display.save_recording(temp_path) + + with open(temp_path) as f: + data = json.load(f) + + assert data["version"] == 1 + assert data["display"] == "null" + assert data["frame_count"] == 3 + assert len(data["frames"]) == 3 + + display2 = NullDisplay() + display2.load_recording(temp_path) + assert len(display2._recorded_frames) == 3 + + finally: + Path(temp_path).unlink(missing_ok=True) + + +class TestReplayDisplay: + """Tests for ReplayDisplay functionality.""" + + def test_replay_display_initialization(self, viewport_dims): + """ReplayDisplay initializes correctly.""" + display = ReplayDisplay() + display.init(viewport_dims[0], viewport_dims[1]) + assert display.width == viewport_dims[0] + assert display.height == viewport_dims[1] + + def test_set_and_get_frames(self): + """ReplayDisplay can set and retrieve frames.""" + display = ReplayDisplay() + frames = [ + {"buffer": ["line1", "line2"], "width": 40, "height": 15}, + {"buffer": ["line3", "line4"], "width": 40, "height": 15}, + ] + display.set_frames(frames) + + frame = display.get_next_frame() + assert frame == ["line1", "line2"] + + frame = display.get_next_frame() + assert frame == ["line3", "line4"] + + frame = display.get_next_frame() + assert frame is None + + def test_replay_loop_mode(self): + """ReplayDisplay can loop playback.""" + display = ReplayDisplay() + display.set_loop(True) + frames = [ + {"buffer": ["frame1"], "width": 40, "height": 15}, + {"buffer": ["frame2"], "width": 40, "height": 15}, + ] + display.set_frames(frames) + + assert display.get_next_frame() == ["frame1"] + assert display.get_next_frame() == ["frame2"] + assert display.get_next_frame() == ["frame1"] + assert display.get_next_frame() == ["frame2"] + + def test_replay_seek_and_reset(self): + """ReplayDisplay supports seek and reset.""" + display = ReplayDisplay() + frames = [ + {"buffer": [f"frame{i}"], "width": 40, "height": 15} for i in range(5) + ] + display.set_frames(frames) + + display.seek(3) + assert display.get_next_frame() == ["frame3"] + + display.reset() + assert display.get_next_frame() == ["frame0"] + + +class TestPipelineHotRebuild: + """Tests for pipeline hot-rebuild and state preservation.""" + + def test_pipeline_runs_with_null_display(self, pipeline_with_null_display): + """Pipeline executes successfully with NullDisplay.""" + pipeline, params, display = pipeline_with_null_display + + for frame in range(5): + params.frame_number = frame + pipeline.context.params = params + result = pipeline.execute([]) + + assert result.success + assert display._last_buffer is not None + + def test_effect_toggle_during_execution(self, pipeline_with_null_display): + """Effects can be toggled during pipeline execution.""" + pipeline, params, display = pipeline_with_null_display + + params.frame_number = 0 + pipeline.context.params = params + pipeline.execute([]) + buffer1 = display._last_buffer + + fade_stage = pipeline.get_stage("effect_fade") + assert fade_stage is not None + assert isinstance(fade_stage, EffectPluginStage) + + fade_stage._enabled = False + fade_stage._effect.config.enabled = False + + params.frame_number = 1 + pipeline.context.params = params + pipeline.execute([]) + buffer2 = display._last_buffer + + assert buffer1 != buffer2 + + def test_state_preservation_across_rebuild(self, pipeline_with_null_display): + """Pipeline state is preserved across hot-rebuild events.""" + pipeline, params, display = pipeline_with_null_display + + for frame in range(5): + params.frame_number = frame + pipeline.context.params = params + pipeline.execute([]) + + camera_y_before = pipeline.context.get("camera_y") + + fade_stage = pipeline.get_stage("effect_fade") + if fade_stage and isinstance(fade_stage, EffectPluginStage): + fade_stage.set_enabled(not fade_stage.is_enabled()) + fade_stage._effect.config.enabled = fade_stage.is_enabled() + + params.frame_number = 5 + pipeline.context.params = params + pipeline.execute([]) + + pipeline.context.get("camera_y") + + assert camera_y_before is not None + + +class TestViewportControl: + """Tests for viewport size control.""" + + def test_viewport_dimensions_applied(self, items): + """Viewport dimensions are correctly applied to pipeline.""" + width, height = 40, 15 + + display = DisplayRegistry.create("null") + display.init(width, height) + + params = PipelineParams() + params.viewport_width = width + params.viewport_height = height + + config = PipelineConfig( + source="fixture", + display="null", + camera="scroll", + effects=[], + ) + + pipeline = Pipeline(config=config, context=PipelineContext()) + + from engine.camera import Camera + from engine.data_sources.sources import ListDataSource + from engine.pipeline.adapters import ( + CameraClockStage, + CameraStage, + DataSourceStage, + ) + + list_source = ListDataSource(items, name="fixture") + pipeline.add_stage("source", DataSourceStage(list_source, name="fixture")) + + # Add camera stages (required by ViewportFilterStage) + camera = Camera.scroll(speed=0.3) + camera.set_canvas_size(200, 200) + pipeline.add_stage( + "camera_update", CameraClockStage(camera, name="camera-clock") + ) + pipeline.add_stage("camera", CameraStage(camera, name="scroll")) + + pipeline.add_stage( + "viewport_filter", ViewportFilterStage(name="viewport-filter") + ) + pipeline.add_stage("font", FontStage(name="font")) + pipeline.add_stage("display", create_stage_from_display(display, "null")) + pipeline.build() + + assert pipeline.initialize() + + ctx = pipeline.context + ctx.params = params + ctx.set("display", display) + ctx.set("items", items) + ctx.set("pipeline", pipeline) + ctx.set("camera_y", 0) + + result = pipeline.execute(items) + + assert result.success + assert display._last_buffer is not None + + pipeline.cleanup() + display.cleanup() diff --git a/tests/test_tint_acceptance.py b/tests/test_tint_acceptance.py new file mode 100644 index 0000000..7dd70c8 --- /dev/null +++ b/tests/test_tint_acceptance.py @@ -0,0 +1,206 @@ +"""Integration test: TintEffect in the pipeline.""" + +import queue + +from engine.data_sources.sources import ListDataSource, SourceItem +from engine.effects.plugins.tint import TintEffect +from engine.effects.types import EffectConfig +from engine.pipeline import Pipeline, PipelineConfig +from engine.pipeline.adapters import ( + DataSourceStage, + DisplayStage, + EffectPluginStage, + SourceItemsToBufferStage, +) +from engine.pipeline.core import PipelineContext +from engine.pipeline.params import PipelineParams + + +class QueueDisplay: + """Stub display that captures every frame into a queue.""" + + def __init__(self): + self.frames: queue.Queue[list[str]] = queue.Queue() + self.width = 80 + self.height = 24 + self._init_called = False + + def init(self, width: int, height: int, reuse: bool = False) -> None: + self.width = width + self.height = height + self._init_called = True + + def show(self, buffer: list[str], border: bool = False) -> None: + self.frames.put(list(buffer)) + + def clear(self) -> None: + pass + + def cleanup(self) -> None: + pass + + def get_dimensions(self) -> tuple[int, int]: + return (self.width, self.height) + + +def _build_pipeline( + items: list[SourceItem], + tint_config: EffectConfig | None = None, + width: int = 80, + height: int = 24, +) -> tuple[Pipeline, QueueDisplay, PipelineContext]: + """Build pipeline: source -> render -> tint effect -> display.""" + display = QueueDisplay() + + ctx = PipelineContext() + params = PipelineParams() + params.viewport_width = width + params.viewport_height = height + params.frame_number = 0 + ctx.params = params + ctx.set("items", items) + + pipeline = Pipeline( + config=PipelineConfig(enable_metrics=True), + context=ctx, + ) + + # Source + source = ListDataSource(items, name="test-source") + pipeline.add_stage("source", DataSourceStage(source, name="test-source")) + + # Render (simple) + pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) + + # Tint effect + tint_effect = TintEffect() + if tint_config is not None: + tint_effect.configure(tint_config) + pipeline.add_stage("tint", EffectPluginStage(tint_effect, name="tint")) + + # Display + pipeline.add_stage("display", DisplayStage(display, name="queue")) + + pipeline.build() + pipeline.initialize() + + return pipeline, display, ctx + + +class TestTintAcceptance: + """Test TintEffect in a full pipeline.""" + + def test_tint_applies_default_color(self): + """Default tint should apply ANSI color codes to output.""" + items = [SourceItem(content="Hello World", source="test", timestamp="0")] + pipeline, display, ctx = _build_pipeline(items) + + result = pipeline.execute(items) + + assert result.success, f"Pipeline failed: {result.error}" + frame = display.frames.get(timeout=1) + + text = "\n".join(frame) + assert "\033[" in text, f"Expected ANSI codes in frame: {frame}" + assert "Hello World" in text + + def test_tint_applies_red_color(self): + """Configured red tint should produce red ANSI code (196-197).""" + items = [SourceItem(content="Red Text", source="test", timestamp="0")] + config = EffectConfig( + enabled=True, + intensity=1.0, + params={"r": 255, "g": 0, "b": 0, "a": 0.8}, + ) + pipeline, display, ctx = _build_pipeline(items, tint_config=config) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + line = frame[0] + + # Should contain red ANSI code (196 or 197 in 256 color) + assert "\033[38;5;196m" in line or "\033[38;5;197m" in line, ( + f"Missing red tint: {line}" + ) + assert "Red Text" in line + + def test_tint_disabled_does_nothing(self): + """Disabled tint stage should pass through buffer unchanged.""" + items = [SourceItem(content="Plain Text", source="test", timestamp="0")] + pipeline, display, ctx = _build_pipeline(items) + + # Disable the tint stage + stage = pipeline.get_stage("tint") + stage.set_enabled(False) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + text = "\n".join(frame) + + # Should contain Plain Text with NO ANSI color codes + assert "Plain Text" in text + assert "\033[" not in text, f"Unexpected ANSI codes in frame: {frame}" + + def test_tint_zero_transparency(self): + """Alpha=0 should pass through buffer unchanged (no tint).""" + items = [SourceItem(content="Transparent", source="test", timestamp="0")] + config = EffectConfig( + enabled=True, + intensity=1.0, + params={"r": 255, "g": 128, "b": 64, "a": 0.0}, + ) + pipeline, display, ctx = _build_pipeline(items, tint_config=config) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + text = "\n".join(frame) + + assert "Transparent" in text + assert "\033[" not in text, f"Expected no ANSI codes with alpha=0: {frame}" + + def test_tint_with_multiples_lines(self): + """Tint should apply to all non-empty lines.""" + items = [ + SourceItem(content="Line1\nLine2\n\nLine4", source="test", timestamp="0") + ] + config = EffectConfig( + enabled=True, + intensity=1.0, + params={"r": 0, "g": 255, "b": 0, "a": 0.7}, + ) + pipeline, display, ctx = _build_pipeline(items, tint_config=config) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + + # All non-empty lines should have green ANSI codes + green_codes = ["\033[38;5;", "m"] + for line in frame: + if line.strip(): + assert green_codes[0] in line and green_codes[1] in line, ( + f"Missing green tint: {line}" + ) + else: + assert line == "", f"Empty lines should be exactly empty: {line}" + + def test_tint_preserves_empty_lines(self): + """Empty lines should remain empty (no ANSI codes).""" + items = [SourceItem(content="A\n\nB", source="test", timestamp="0")] + pipeline, display, ctx = _build_pipeline(items) + + result = pipeline.execute(items) + + assert result.success + frame = display.frames.get(timeout=1) + + assert frame[0].strip() != "" + assert frame[1] == "" # Empty line unchanged + assert frame[2].strip() != ""