feat: Complete pipeline hot-rebuild implementation with acceptance tests

- Implements pipeline hot-rebuild with state preservation (issue #43)
- Adds auto-injection of MVP stages for missing capabilities
- Adds radial camera mode for polar coordinate scanning
- Adds afterimage and motionblur effects using framebuffer history
- Adds comprehensive acceptance tests for camera modes and pipeline rebuild
- Updates presets.toml with new effect configurations

Related to: #35 (Pipeline Mutation API epic)
Closes: #43, #44, #45
This commit is contained in:
2026-03-19 03:34:06 -07:00
parent 0eb5f1d5ff
commit 238bac1bb2
30 changed files with 3438 additions and 378 deletions

View File

@@ -27,13 +27,13 @@ class TestMain:
"""main() uses PRESET from config if set."""
with (
patch("engine.config.PIPELINE_DIAGRAM", False),
patch("engine.config.PRESET", "gallery-sources"),
patch("engine.config.PRESET", "demo"),
patch("engine.config.PIPELINE_MODE", False),
patch("engine.app.main.run_pipeline_mode") as mock_run,
):
sys.argv = ["mainline.py"]
main()
mock_run.assert_called_once_with("gallery-sources")
mock_run.assert_called_once_with("demo")
def test_main_exits_on_unknown_preset(self):
"""main() exits with error for unknown preset."""
@@ -122,7 +122,7 @@ class TestRunPipelineMode:
mock_create.return_value = mock_display
try:
run_pipeline_mode("gallery-display-terminal")
run_pipeline_mode("demo-base")
except (KeyboardInterrupt, SystemExit):
pass

View File

@@ -0,0 +1,826 @@
"""
Camera acceptance tests using NullDisplay frame recording and ReplayDisplay.
Tests all camera modes by:
1. Creating deterministic source data (numbered lines)
2. Running pipeline with small viewport (40x15)
3. Recording frames with NullDisplay
4. Asserting expected viewport content for each mode
Usage:
pytest tests/test_camera_acceptance.py -v
pytest tests/test_camera_acceptance.py --show-frames -v
The --show-frames flag displays recorded frames for visual verification.
"""
import math
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.camera import Camera, CameraMode
from engine.display import DisplayRegistry
from engine.effects import get_registry
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
from engine.pipeline.adapters import (
CameraClockStage,
CameraStage,
FontStage,
ViewportFilterStage,
create_stage_from_display,
create_stage_from_effect,
)
from engine.pipeline.params import PipelineParams
def get_camera_position(pipeline, camera):
"""Helper to get camera position directly from the camera object.
The pipeline context's camera_y/camera_x values may be transformed by
ViewportFilterStage (filtered relative position). This helper gets the
true camera position from the camera object itself.
Args:
pipeline: The pipeline instance
camera: The camera object
Returns:
tuple (x, y) of the camera's absolute position
"""
return (camera.x, camera.y)
# Register custom CLI option for showing frames
def pytest_addoption(parser):
parser.addoption(
"--show-frames",
action="store_true",
default=False,
help="Display recorded frames for visual verification",
)
@pytest.fixture
def show_frames(request):
"""Get the --show-frames flag value."""
try:
return request.config.getoption("--show-frames")
except ValueError:
# Option not registered, default to False
return False
@pytest.fixture
def viewport_dims():
"""Small viewport dimensions for testing."""
return (40, 15)
@pytest.fixture
def items():
"""Create deterministic test data - numbered lines for easy verification."""
# Create 100 numbered lines: LINE 000, LINE 001, etc.
return [{"text": f"LINE {i:03d} - This is line number {i}"} for i in range(100)]
@pytest.fixture
def null_display(viewport_dims):
"""Create a NullDisplay for testing."""
display = DisplayRegistry.create("null")
display.init(viewport_dims[0], viewport_dims[1])
return display
def create_pipeline_with_camera(
camera, items, null_display, viewport_dims, effects=None
):
"""Helper to create a pipeline with a specific camera."""
effects = effects or []
width, height = viewport_dims
params = PipelineParams()
params.viewport_width = width
params.viewport_height = height
config = PipelineConfig(
source="fixture",
display="null",
camera="scroll",
effects=effects,
)
pipeline = Pipeline(config=config, context=PipelineContext())
from engine.data_sources.sources import ListDataSource
from engine.pipeline.adapters import DataSourceStage
list_source = ListDataSource(items, name="fixture")
pipeline.add_stage("source", DataSourceStage(list_source, name="fixture"))
# Add camera update stage to ensure camera_y is available for viewport filter
pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock"))
# Note: camera should come after font/viewport_filter, before effects
pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter"))
pipeline.add_stage("font", FontStage(name="font"))
pipeline.add_stage(
"camera",
CameraStage(
camera, name="radial" if camera.mode == CameraMode.RADIAL else "vertical"
),
)
if effects:
effect_registry = get_registry()
for effect_name in effects:
effect = effect_registry.get(effect_name)
if effect:
pipeline.add_stage(
f"effect_{effect_name}",
create_stage_from_effect(effect, effect_name),
)
pipeline.add_stage("display", create_stage_from_display(null_display, "null"))
pipeline.build()
if not pipeline.initialize():
return None
ctx = pipeline.context
ctx.params = params
ctx.set("display", null_display)
ctx.set("items", items)
ctx.set("pipeline", pipeline)
ctx.set("pipeline_order", pipeline.execution_order)
return pipeline
class DisplayHelper:
"""Helper to display frames for visual verification."""
@staticmethod
def show_frame(buffer, title, viewport_dims, marker_line=None):
"""Display a single frame with visual markers."""
width, height = viewport_dims
print(f"\n{'=' * (width + 20)}")
print(f" {title}")
print(f"{'=' * (width + 20)}")
for i, line in enumerate(buffer[:height]):
# Add marker if this line should be highlighted
marker = ">>>" if marker_line == i else " "
print(f"{marker} [{i:2}] {line[:width]}")
print(f"{'=' * (width + 20)}\n")
class TestFeedCamera:
"""Test FEED mode: rapid single-item scrolling (1 row/frame at speed=1.0)."""
def test_feed_camera_scrolls_down(
self, items, null_display, viewport_dims, show_frames
):
"""FEED camera should move content down (y increases) at 1 row/frame."""
camera = Camera.feed(speed=1.0)
camera.set_canvas_size(200, 100)
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
# Run for 10 frames with small delay between frames
# to ensure camera has time to move (dt calculation relies on time.perf_counter())
import time
for frame in range(10):
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
if frame < 9: # No need to sleep after last frame
time.sleep(0.02) # Wait 20ms so dt~0.02, camera moves ~1.2 rows
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(frames[0], "FEED Camera - Frame 0", viewport_dims)
DisplayHelper.show_frame(frames[5], "FEED Camera - Frame 5", viewport_dims)
DisplayHelper.show_frame(frames[9], "FEED Camera - Frame 9", viewport_dims)
# FEED mode: each frame y increases by speed*dt*60
# At dt=1.0, speed=1.0: y increases by 60 per frame
# But clamp to canvas bounds (200)
# Frame 0: y=0, should show LINE 000
# Frame 1: y=60, should show LINE 060
# Verify frame 0 contains ASCII art content (rendered from LINE 000)
# The text is converted to block characters, so check for non-empty frames
assert len(frames[0]) > 0, "Frame 0 should not be empty"
assert frames[0][0].strip() != "", "Frame 0 should have visible content"
# Verify camera position changed between frames
# Feed mode moves 1 row per frame at speed=1.0 with dt~0.02
# After 5 frames, camera should have moved down
assert camera.y > 0, f"Camera should have moved down, y={camera.y}"
# Verify different frames show different content (camera is scrolling)
# Check that frame 0 and frame 5 are different
frame_0_str = "\n".join(frames[0])
frame_5_str = "\n".join(frames[5])
assert frame_0_str != frame_5_str, (
"Frame 0 and Frame 5 should show different content"
)
class TestScrollCamera:
"""Test SCROLL mode: smooth vertical scrolling with float accumulation."""
def test_scroll_camera_smooth_movement(
self, items, null_display, viewport_dims, show_frames
):
"""SCROLL camera should move content smoothly with sub-integer precision."""
camera = Camera.scroll(speed=0.5)
camera.set_canvas_size(0, 200) # Match viewport width for text wrapping
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
# Run for 20 frames
for frame in range(20):
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(
frames[0], "SCROLL Camera - Frame 0", viewport_dims
)
DisplayHelper.show_frame(
frames[10], "SCROLL Camera - Frame 10", viewport_dims
)
# SCROLL mode uses float accumulation for smooth scrolling
# At speed=0.5, dt=1.0: y increases by 0.5 * 60 = 30 pixels per frame
# Verify camera_y is increasing (which causes the scroll)
camera_y_values = []
for frame in range(5):
# Get camera.y directly (not filtered context value)
pipeline.context.set("frame_number", frame)
pipeline.execute(items)
camera_y_values.append(camera.y)
print(f"\nSCROLL test - camera_y positions: {camera_y_values}")
# Verify camera_y is non-zero (camera is moving)
assert camera_y_values[-1] > 0, (
"Camera should have scrolled down (camera_y > 0)"
)
# Verify camera_y is increasing
for i in range(len(camera_y_values) - 1):
assert camera_y_values[i + 1] >= camera_y_values[i], (
f"Camera_y should be non-decreasing: {camera_y_values}"
)
class TestHorizontalCamera:
"""Test HORIZONTAL mode: left/right scrolling."""
def test_horizontal_camera_scrolls_right(
self, items, null_display, viewport_dims, show_frames
):
"""HORIZONTAL camera should move content right (x increases)."""
camera = Camera.horizontal(speed=1.0)
camera.set_canvas_size(200, 200)
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
for frame in range(10):
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(
frames[0], "HORIZONTAL Camera - Frame 0", viewport_dims
)
DisplayHelper.show_frame(
frames[5], "HORIZONTAL Camera - Frame 5", viewport_dims
)
# HORIZONTAL mode: x increases by speed*dt*60
# At dt=1.0, speed=1.0: x increases by 60 per frame
# Frame 0: x=0
# Frame 5: x=300 (clamped to canvas_width-viewport_width)
# Verify frame 0 contains content (ASCII art of LINE 000)
assert len(frames[0]) > 0, "Frame 0 should not be empty"
assert frames[0][0].strip() != "", "Frame 0 should have visible content"
# Verify camera x is increasing
print("\nHORIZONTAL test - camera positions:")
for i in range(10):
print(f" Frame {i}: x={camera.x}, y={camera.y}")
camera.update(1.0)
# Verify camera moved
assert camera.x > 0, f"Camera should have moved right, x={camera.x}"
class TestOmniCamera:
"""Test OMNI mode: diagonal scrolling (x and y increase together)."""
def test_omni_camera_diagonal_movement(
self, items, null_display, viewport_dims, show_frames
):
"""OMNI camera should move content diagonally (both x and y increase)."""
camera = Camera.omni(speed=1.0)
camera.set_canvas_size(200, 200)
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
for frame in range(10):
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(frames[0], "OMNI Camera - Frame 0", viewport_dims)
DisplayHelper.show_frame(frames[5], "OMNI Camera - Frame 5", viewport_dims)
# OMNI mode: y increases by speed*dt*60, x increases by speed*dt*60*0.5
# At dt=1.0, speed=1.0: y += 60, x += 30
# Verify frame 0 contains content (ASCII art)
assert len(frames[0]) > 0, "Frame 0 should not be empty"
assert frames[0][0].strip() != "", "Frame 0 should have visible content"
print("\nOMNI test - camera positions:")
camera.reset()
for frame in range(5):
print(f" Frame {frame}: x={camera.x}, y={camera.y}")
camera.update(1.0)
# Verify camera moved
assert camera.y > 0, f"Camera should have moved down, y={camera.y}"
class TestFloatingCamera:
"""Test FLOATING mode: sinusoidal bobbing motion."""
def test_floating_camera_bobbing(
self, items, null_display, viewport_dims, show_frames
):
"""FLOATING camera should move content in a sinusoidal pattern."""
camera = Camera.floating(speed=1.0)
camera.set_canvas_size(200, 200)
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
for frame in range(32):
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(
frames[0], "FLOATING Camera - Frame 0", viewport_dims
)
DisplayHelper.show_frame(
frames[8], "FLOATING Camera - Frame 8 (quarter cycle)", viewport_dims
)
DisplayHelper.show_frame(
frames[16], "FLOATING Camera - Frame 16 (half cycle)", viewport_dims
)
# FLOATING mode: y = sin(time*2) * speed * 30
# Period: 2π / 2 = π ≈ 3.14 seconds (or ~3.14 frames at dt=1.0)
# Full cycle ~32 frames
print("\nFLOATING test - sinusoidal motion:")
camera.reset()
for frame in range(16):
print(f" Frame {frame}: y={camera.y}, x={camera.x}")
camera.update(1.0)
# Verify y oscillates around 0
camera.reset()
camera.update(1.0) # Frame 1
y1 = camera.y
camera.update(1.0) # Frame 2
y2 = camera.y
camera.update(1.0) # Frame 3
y3 = camera.y
# After a few frames, y should oscillate (not monotonic)
assert y1 != y2 or y2 != y3, "FLOATING camera should oscillate"
class TestBounceCamera:
"""Test BOUNCE mode: bouncing DVD-style motion."""
def test_bounce_camera_reverses_at_edges(
self, items, null_display, viewport_dims, show_frames
):
"""BOUNCE camera should reverse direction when hitting canvas edges."""
camera = Camera.bounce(speed=5.0) # Faster for quicker test
# Set zoom > 1.0 so viewport is smaller than canvas, allowing movement
camera.set_zoom(2.0) # Zoom out 2x, viewport is half the canvas size
camera.set_canvas_size(400, 400)
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
for frame in range(50):
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(
frames[0], "BOUNCE Camera - Frame 0", viewport_dims
)
DisplayHelper.show_frame(
frames[25], "BOUNCE Camera - Frame 25", viewport_dims
)
# BOUNCE mode: moves until it hits edge, then reverses
# Verify the camera moves and changes direction
print("\nBOUNCE test - bouncing motion:")
camera.reset()
camera.set_zoom(2.0) # Reset also resets zoom, so set it again
for frame in range(20):
print(f" Frame {frame}: x={camera.x}, y={camera.y}")
camera.update(1.0)
# Check that camera hits bounds and reverses
camera.reset()
camera.set_zoom(2.0) # Reset also resets zoom, so set it again
for _ in range(51): # Odd number ensures ending at opposite corner
camera.update(1.0)
# Camera should have hit an edge and reversed direction
# With 400x400 canvas, viewport 200x200 (zoom=2), max_x = 200, max_y = 200
# Starting at (0,0), after 51 updates it should be at (200, 200)
max_x = max(0, camera.canvas_width - camera.viewport_width)
print(f"BOUNCE camera final position: x={camera.x}, y={camera.y}")
assert camera.x == max_x, (
f"Camera should be at max_x ({max_x}), got x={camera.x}"
)
# Check bounds are respected
vw = camera.viewport_width
vh = camera.viewport_height
assert camera.x >= 0 and camera.x <= camera.canvas_width - vw
assert camera.y >= 0 and camera.y <= camera.canvas_height - vh
class TestRadialCamera:
"""Test RADIAL mode: polar coordinate scanning (rotation around center)."""
def test_radial_camera_rotates_around_center(
self, items, null_display, viewport_dims, show_frames
):
"""RADIAL camera should rotate around the center of the canvas."""
camera = Camera.radial(speed=0.5)
camera.set_canvas_size(200, 200)
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
for frame in range(32): # 32 frames = 2π at ~0.2 rad/frame
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(
frames[0], "RADIAL Camera - Frame 0", viewport_dims
)
DisplayHelper.show_frame(
frames[8], "RADIAL Camera - Frame 8 (quarter turn)", viewport_dims
)
DisplayHelper.show_frame(
frames[16], "RADIAL Camera - Frame 16 (half turn)", viewport_dims
)
DisplayHelper.show_frame(
frames[24], "RADIAL Camera - Frame 24 (3/4 turn)", viewport_dims
)
# RADIAL mode: rotates around center with smooth angular motion
# At speed=0.5: theta increases by ~0.2 rad/frame (0.5 * dt * 1.0)
print("\nRADIAL test - rotational motion:")
camera.reset()
for frame in range(32):
theta_deg = (camera._theta_float * 180 / math.pi) % 360
print(
f" Frame {frame}: theta={theta_deg:.1f}°, x={camera.x}, y={camera.y}"
)
camera.update(1.0)
# Verify rotation occurs (angle should change)
camera.reset()
theta_start = camera._theta_float
camera.update(1.0) # Frame 1
theta_mid = camera._theta_float
camera.update(1.0) # Frame 2
theta_end = camera._theta_float
assert theta_mid > theta_start, "Theta should increase (rotation)"
assert theta_end > theta_mid, "Theta should continue increasing"
def test_radial_camera_with_sensor_integration(
self, items, null_display, viewport_dims, show_frames
):
"""RADIAL camera can be driven by external sensor (OSC integration test)."""
from engine.sensors.oscillator import (
OscillatorSensor,
register_oscillator_sensor,
)
# Create an oscillator sensor for testing
register_oscillator_sensor(name="test_osc", waveform="sine", frequency=0.5)
osc = OscillatorSensor(name="test_osc", waveform="sine", frequency=0.5)
camera = Camera.radial(speed=0.3)
camera.set_canvas_size(200, 200)
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
# Run frames while modulating camera with oscillator
for frame in range(32):
# Read oscillator value and set as radial input
osc_value = osc.read()
if osc_value:
camera.set_radial_input(osc_value.value)
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(
frames[0], "RADIAL+OSC Camera - Frame 0", viewport_dims
)
DisplayHelper.show_frame(
frames[8], "RADIAL+OSC Camera - Frame 8", viewport_dims
)
DisplayHelper.show_frame(
frames[16], "RADIAL+OSC Camera - Frame 16", viewport_dims
)
print("\nRADIAL+OSC test - sensor-driven rotation:")
osc.start()
camera.reset()
for frame in range(16):
osc_value = osc.read()
if osc_value:
camera.set_radial_input(osc_value.value)
camera.update(1.0)
theta_deg = (camera._theta_float * 180 / math.pi) % 360
print(
f" Frame {frame}: osc={osc_value.value if osc_value else 0:.3f}, theta={theta_deg:.1f}°"
)
# Verify camera position changes when driven by sensor
camera.reset()
x_start = camera.x
camera.update(1.0)
x_mid = camera.x
assert x_start != x_mid, "Camera should move when driven by oscillator"
osc.stop()
def test_radial_camera_with_direct_angle_setting(
self, items, null_display, viewport_dims, show_frames
):
"""RADIAL camera can have angle set directly for OSC integration."""
camera = Camera.radial(speed=0.0) # No auto-rotation
camera.set_canvas_size(200, 200)
camera._r_float = 80.0 # Set initial radius to see movement
pipeline = create_pipeline_with_camera(
camera, items, null_display, viewport_dims
)
assert pipeline is not None, "Pipeline creation failed"
null_display.start_recording()
# Set angle directly to sweep through full rotation
for frame in range(32):
angle = (frame / 32) * 2 * math.pi # 0 to 2π over 32 frames
camera.set_radial_angle(angle)
camera.update(1.0) # Must update to convert polar to Cartesian
pipeline.context.set("frame_number", frame)
result = pipeline.execute(items)
assert result.success, f"Frame {frame} execution failed"
null_display.stop_recording()
frames = null_display.get_frames()
if show_frames:
DisplayHelper.show_frame(
frames[0], "RADIAL Direct Angle - Frame 0", viewport_dims
)
DisplayHelper.show_frame(
frames[8], "RADIAL Direct Angle - Frame 8", viewport_dims
)
DisplayHelper.show_frame(
frames[16], "RADIAL Direct Angle - Frame 16", viewport_dims
)
print("\nRADIAL Direct Angle test - sweeping rotation:")
for frame in range(32):
angle = (frame / 32) * 2 * math.pi
camera.set_radial_angle(angle)
camera.update(1.0) # Update converts angle to x,y position
theta_deg = angle * 180 / math.pi
print(
f" Frame {frame}: set_angle={theta_deg:.1f}°, actual_x={camera.x}, actual_y={camera.y}"
)
# Verify camera position changes as angle sweeps
camera.reset()
camera._r_float = 80.0 # Set radius for testing
camera.set_radial_angle(0)
camera.update(1.0)
x0 = camera.x
camera.set_radial_angle(math.pi / 2)
camera.update(1.0)
x90 = camera.x
assert x0 != x90, (
f"Camera position should change with angle (x0={x0}, x90={x90})"
)
class TestCameraModeEnum:
"""Test CameraMode enum integrity."""
def test_all_modes_exist(self):
"""Verify all camera modes are defined."""
modes = [m.name for m in CameraMode]
expected = [
"FEED",
"SCROLL",
"HORIZONTAL",
"OMNI",
"FLOATING",
"BOUNCE",
"RADIAL",
]
for mode in expected:
assert mode in modes, f"CameraMode.{mode} should exist"
def test_radial_mode_exists(self):
"""Verify RADIAL mode is properly defined."""
assert CameraMode.RADIAL is not None
assert isinstance(CameraMode.RADIAL, CameraMode)
assert CameraMode.RADIAL.name == "RADIAL"
class TestCameraFactoryMethods:
"""Test camera factory methods create proper camera instances."""
def test_radial_factory(self):
"""RADIAL factory should create a camera with correct mode."""
camera = Camera.radial(speed=2.0)
assert camera.mode == CameraMode.RADIAL
assert camera.speed == 2.0
assert hasattr(camera, "_r_float")
assert hasattr(camera, "_theta_float")
def test_radial_factory_initializes_state(self):
"""RADIAL factory should initialize radial state."""
camera = Camera.radial()
assert camera._r_float == 0.0
assert camera._theta_float == 0.0
class TestCameraStateSaveRestore:
"""Test camera state can be saved and restored (for hot-rebuild)."""
def test_radial_camera_state_save(self):
"""RADIAL camera should save polar coordinate state."""
camera = Camera.radial()
camera._theta_float = math.pi / 4
camera._r_float = 50.0
# Save state via CameraStage adapter
from engine.pipeline.adapters.camera import CameraStage
stage = CameraStage(camera)
state = stage.save_state()
assert "_theta_float" in state
assert "_r_float" in state
assert state["_theta_float"] == math.pi / 4
assert state["_r_float"] == 50.0
def test_radial_camera_state_restore(self):
"""RADIAL camera should restore polar coordinate state."""
camera1 = Camera.radial()
camera1._theta_float = math.pi / 3
camera1._r_float = 75.0
from engine.pipeline.adapters.camera import CameraStage
stage1 = CameraStage(camera1)
state = stage1.save_state()
# Create new camera and restore
camera2 = Camera.radial()
stage2 = CameraStage(camera2)
stage2.restore_state(state)
assert abs(camera2._theta_float - math.pi / 3) < 0.001
assert abs(camera2._r_float - 75.0) < 0.001
class TestCameraViewportApplication:
"""Test camera.apply() properly slices buffers."""
def test_radial_camera_viewport_slicing(self):
"""RADIAL camera should properly slice buffer based on position."""
camera = Camera.radial(speed=0.5)
camera.set_canvas_size(200, 200)
# Update to move camera
camera.update(1.0)
# Create test buffer with 200 lines
buffer = [f"LINE {i:03d}" for i in range(200)]
# Apply camera viewport (15 lines high)
result = camera.apply(buffer, viewport_width=40, viewport_height=15)
# Result should be exactly 15 lines
assert len(result) == 15
# Each line should be 40 characters (padded or truncated)
for line in result:
assert len(line) <= 40

View File

@@ -120,12 +120,16 @@ class TestTerminalDisplay:
def test_get_dimensions_returns_cached_value(self):
"""get_dimensions returns cached dimensions for stability."""
display = TerminalDisplay()
display.init(80, 24)
import os
from unittest.mock import patch
# First call should set cache
d1 = display.get_dimensions()
assert d1 == (80, 24)
# Mock terminal size to ensure deterministic dimensions
term_size = os.terminal_size((80, 24))
with patch("os.get_terminal_size", return_value=term_size):
display = TerminalDisplay()
display.init(80, 24)
d1 = display.get_dimensions()
assert d1 == (80, 24)
def test_show_clears_screen_before_each_frame(self):
"""show clears previous frame to prevent visual wobble.

View File

@@ -0,0 +1,195 @@
"""Integration test: FrameBufferStage in the pipeline."""
import queue
from engine.data_sources.sources import ListDataSource, SourceItem
from engine.effects.types import EffectConfig
from engine.pipeline import Pipeline, PipelineConfig
from engine.pipeline.adapters import (
DataSourceStage,
DisplayStage,
SourceItemsToBufferStage,
)
from engine.pipeline.core import PipelineContext
from engine.pipeline.stages.framebuffer import FrameBufferStage
class QueueDisplay:
"""Stub display that captures every frame into a queue."""
def __init__(self):
self.frames: queue.Queue[list[str]] = queue.Queue()
self.width = 80
self.height = 24
self._init_called = False
def init(self, width: int, height: int, reuse: bool = False) -> None:
self.width = width
self.height = height
self._init_called = True
def show(self, buffer: list[str], border: bool = False) -> None:
self.frames.put(list(buffer))
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass
def get_dimensions(self) -> tuple[int, int]:
return (self.width, self.height)
def _build_pipeline(
items: list[SourceItem],
history_depth: int = 5,
width: int = 80,
height: int = 24,
) -> tuple[Pipeline, QueueDisplay, PipelineContext]:
"""Build pipeline: source -> render -> framebuffer -> display."""
display = QueueDisplay()
ctx = PipelineContext()
ctx.set("items", items)
pipeline = Pipeline(
config=PipelineConfig(enable_metrics=True),
context=ctx,
)
# Source
source = ListDataSource(items, name="test-source")
pipeline.add_stage("source", DataSourceStage(source, name="test-source"))
# Render
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
# Framebuffer
framebuffer = FrameBufferStage(name="default", history_depth=history_depth)
pipeline.add_stage("framebuffer", framebuffer)
# Display
pipeline.add_stage("display", DisplayStage(display, name="queue"))
pipeline.build()
pipeline.initialize()
return pipeline, display, ctx
class TestFrameBufferAcceptance:
"""Test FrameBufferStage in a full pipeline."""
def test_framebuffer_populates_history(self):
"""After several frames, framebuffer should have history stored."""
items = [
SourceItem(content="Frame\nBuffer\nTest", source="test", timestamp="0")
]
pipeline, display, ctx = _build_pipeline(items, history_depth=5)
# Run 3 frames
for i in range(3):
result = pipeline.execute([])
assert result.success, f"Pipeline failed at frame {i}: {result.error}"
# Check framebuffer history in context
history = ctx.get("framebuffer.default.history")
assert history is not None, "Framebuffer history not found in context"
assert len(history) == 3, f"Expected 3 history frames, got {len(history)}"
def test_framebuffer_respects_depth(self):
"""Framebuffer should not exceed configured history depth."""
items = [SourceItem(content="Depth\nTest", source="test", timestamp="0")]
pipeline, display, ctx = _build_pipeline(items, history_depth=3)
# Run 5 frames
for i in range(5):
result = pipeline.execute([])
assert result.success
history = ctx.get("framebuffer.default.history")
assert history is not None
assert len(history) == 3, f"Expected depth 3, got {len(history)}"
def test_framebuffer_current_intensity(self):
"""Framebuffer should compute current intensity map."""
items = [SourceItem(content="Intensity\nMap", source="test", timestamp="0")]
pipeline, display, ctx = _build_pipeline(items, history_depth=5)
# Run at least one frame
result = pipeline.execute([])
assert result.success
intensity = ctx.get("framebuffer.default.current_intensity")
assert intensity is not None, "No intensity map in context"
# Intensity should be a list of one value per line? Actually it's a 2D array or list?
# Let's just check it's non-empty
assert len(intensity) > 0, "Intensity map is empty"
def test_framebuffer_get_frame(self):
"""Should be able to retrieve specific frames from history."""
items = [SourceItem(content="Retrieve\nFrame", source="test", timestamp="0")]
pipeline, display, ctx = _build_pipeline(items, history_depth=5)
# Run 2 frames
for i in range(2):
result = pipeline.execute([])
assert result.success
# Retrieve frame 0 (most recent)
recent = pipeline.get_stage("framebuffer").get_frame(0, ctx)
assert recent is not None, "Cannot retrieve recent frame"
assert len(recent) > 0, "Recent frame is empty"
# Retrieve frame 1 (previous)
previous = pipeline.get_stage("framebuffer").get_frame(1, ctx)
assert previous is not None, "Cannot retrieve previous frame"
def test_framebuffer_with_motionblur_effect(self):
"""MotionBlurEffect should work when depending on framebuffer."""
from engine.effects.plugins.motionblur import MotionBlurEffect
from engine.pipeline.adapters import EffectPluginStage
items = [SourceItem(content="Motion\nBlur", source="test", timestamp="0")]
display = QueueDisplay()
ctx = PipelineContext()
ctx.set("items", items)
pipeline = Pipeline(
config=PipelineConfig(enable_metrics=True),
context=ctx,
)
source = ListDataSource(items, name="test")
pipeline.add_stage("source", DataSourceStage(source, name="test"))
pipeline.add_stage("render", SourceItemsToBufferStage(name="render"))
framebuffer = FrameBufferStage(name="default", history_depth=3)
pipeline.add_stage("framebuffer", framebuffer)
motionblur = MotionBlurEffect()
motionblur.configure(EffectConfig(enabled=True, intensity=0.5))
pipeline.add_stage(
"motionblur",
EffectPluginStage(
motionblur,
name="motionblur",
dependencies={"framebuffer.history.default"},
),
)
pipeline.add_stage("display", DisplayStage(display, name="queue"))
pipeline.build()
pipeline.initialize()
# Run a few frames
for i in range(5):
result = pipeline.execute([])
assert result.success, f"Motion blur pipeline failed at frame {i}"
# Check that history exists
history = ctx.get("framebuffer.default.history")
assert history is not None
assert len(history) > 0

View File

@@ -30,9 +30,9 @@ class TestFrameBufferStage:
assert stage.config.history_depth == 2
def test_capabilities(self):
"""Stage provides framebuffer.history capability."""
"""Stage provides framebuffer.history.{name} capability."""
stage = FrameBufferStage()
assert "framebuffer.history" in stage.capabilities
assert "framebuffer.history.default" in stage.capabilities
def test_dependencies(self):
"""Stage depends on render.output."""
@@ -46,15 +46,15 @@ class TestFrameBufferStage:
assert DataType.TEXT_BUFFER in stage.outlet_types
def test_init_context(self):
"""init initializes context state."""
"""init initializes context state with prefixed keys."""
stage = FrameBufferStage()
ctx = make_ctx()
result = stage.init(ctx)
assert result is True
assert ctx.get("frame_history") == []
assert ctx.get("intensity_history") == []
assert ctx.get("framebuffer.default.history") == []
assert ctx.get("framebuffer.default.intensity_history") == []
def test_process_stores_buffer_in_history(self):
"""process stores buffer in history."""
@@ -66,7 +66,7 @@ class TestFrameBufferStage:
result = stage.process(buffer, ctx)
assert result == buffer # Pass-through
history = ctx.get("frame_history")
history = ctx.get("framebuffer.default.history")
assert len(history) == 1
assert history[0] == buffer
@@ -79,7 +79,7 @@ class TestFrameBufferStage:
buffer = ["hello world", "test line", ""]
stage.process(buffer, ctx)
intensity = ctx.get("current_intensity")
intensity = ctx.get("framebuffer.default.current_intensity")
assert intensity is not None
assert len(intensity) == 3 # Three rows
# Non-empty lines should have intensity > 0
@@ -90,7 +90,7 @@ class TestFrameBufferStage:
def test_process_keeps_multiple_frames(self):
"""process keeps configured depth of frames."""
config = FrameBufferConfig(history_depth=3)
config = FrameBufferConfig(history_depth=3, name="test")
stage = FrameBufferStage(config)
ctx = make_ctx()
stage.init(ctx)
@@ -100,7 +100,7 @@ class TestFrameBufferStage:
buffer = [f"frame {i}"]
stage.process(buffer, ctx)
history = ctx.get("frame_history")
history = ctx.get("framebuffer.test.history")
assert len(history) == 3 # Only last 3 kept
# Should be in reverse chronological order (most recent first)
assert history[0] == ["frame 4"]
@@ -109,7 +109,7 @@ class TestFrameBufferStage:
def test_process_keeps_intensity_sync(self):
"""process keeps intensity history in sync with frame history."""
config = FrameBufferConfig(history_depth=3)
config = FrameBufferConfig(history_depth=3, name="sync")
stage = FrameBufferStage(config)
ctx = make_ctx()
stage.init(ctx)
@@ -122,8 +122,9 @@ class TestFrameBufferStage:
for buf in buffers:
stage.process(buf, ctx)
frame_hist = ctx.get("frame_history")
intensity_hist = ctx.get("intensity_history")
prefix = "framebuffer.sync"
frame_hist = ctx.get(f"{prefix}.history")
intensity_hist = ctx.get(f"{prefix}.intensity_history")
assert len(frame_hist) == len(intensity_hist) == 3
# Each frame's intensity should match
@@ -207,7 +208,7 @@ class TestFrameBufferStage:
"""process is thread-safe."""
from threading import Thread
stage = FrameBufferStage()
stage = FrameBufferStage(name="threadtest")
ctx = make_ctx()
stage.init(ctx)
@@ -216,7 +217,7 @@ class TestFrameBufferStage:
def worker(idx):
buffer = [f"thread {idx}"]
stage.process(buffer, ctx)
results.append(len(ctx.get("frame_history", [])))
results.append(len(ctx.get("framebuffer.threadtest.history", [])))
threads = [Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads:
@@ -225,7 +226,7 @@ class TestFrameBufferStage:
t.join()
# All threads should see consistent state
assert len(ctx.get("frame_history")) <= 2 # Depth limit
assert len(ctx.get("framebuffer.threadtest.history")) <= 2 # Depth limit
# All worker threads should have completed without errors
assert len(results) == 10

View File

@@ -129,7 +129,7 @@ class TestPipeline:
pipeline.add_stage("source", mock_source)
pipeline.add_stage("display", mock_display)
pipeline.build()
pipeline.build(auto_inject=False)
assert pipeline._initialized is True
assert "source" in pipeline.execution_order
@@ -182,7 +182,7 @@ class TestPipeline:
pipeline.add_stage("source", mock_source)
pipeline.add_stage("effect", mock_effect)
pipeline.add_stage("display", mock_display)
pipeline.build()
pipeline.build(auto_inject=False)
result = pipeline.execute(None)
@@ -218,7 +218,7 @@ class TestPipeline:
pipeline.add_stage("source", mock_source)
pipeline.add_stage("failing", mock_failing)
pipeline.build()
pipeline.build(auto_inject=False)
result = pipeline.execute(None)
@@ -254,7 +254,7 @@ class TestPipeline:
pipeline.add_stage("source", mock_source)
pipeline.add_stage("optional", mock_optional)
pipeline.build()
pipeline.build(auto_inject=False)
result = pipeline.execute(None)
@@ -302,7 +302,7 @@ class TestCapabilityBasedDependencies:
pipeline = Pipeline()
pipeline.add_stage("headlines", SourceStage())
pipeline.add_stage("render", RenderStage())
pipeline.build()
pipeline.build(auto_inject=False)
assert "headlines" in pipeline.execution_order
assert "render" in pipeline.execution_order
@@ -334,7 +334,7 @@ class TestCapabilityBasedDependencies:
pipeline.add_stage("render", RenderStage())
try:
pipeline.build()
pipeline.build(auto_inject=False)
raise AssertionError("Should have raised StageError")
except StageError as e:
assert "Missing capabilities" in e.message
@@ -394,7 +394,7 @@ class TestCapabilityBasedDependencies:
pipeline.add_stage("headlines", SourceA())
pipeline.add_stage("poetry", SourceB())
pipeline.add_stage("display", DisplayStage())
pipeline.build()
pipeline.build(auto_inject=False)
assert pipeline.execution_order[0] == "headlines"
@@ -791,7 +791,7 @@ class TestFullPipeline:
pipeline.add_stage("b", StageB())
try:
pipeline.build()
pipeline.build(auto_inject=False)
raise AssertionError("Should detect circular dependency")
except Exception:
pass
@@ -815,7 +815,7 @@ class TestPipelineMetrics:
config = PipelineConfig(enable_metrics=True)
pipeline = Pipeline(config=config)
pipeline.add_stage("dummy", DummyStage())
pipeline.build()
pipeline.build(auto_inject=False)
pipeline.execute("test_data")
@@ -838,7 +838,7 @@ class TestPipelineMetrics:
config = PipelineConfig(enable_metrics=False)
pipeline = Pipeline(config=config)
pipeline.add_stage("dummy", DummyStage())
pipeline.build()
pipeline.build(auto_inject=False)
pipeline.execute("test_data")
@@ -860,7 +860,7 @@ class TestPipelineMetrics:
config = PipelineConfig(enable_metrics=True)
pipeline = Pipeline(config=config)
pipeline.add_stage("dummy", DummyStage())
pipeline.build()
pipeline.build(auto_inject=False)
pipeline.execute("test1")
pipeline.execute("test2")
@@ -964,7 +964,7 @@ class TestOverlayStages:
pipeline.add_stage("overlay_a", OverlayStageA())
pipeline.add_stage("overlay_b", OverlayStageB())
pipeline.add_stage("regular", RegularStage())
pipeline.build()
pipeline.build(auto_inject=False)
overlays = pipeline.get_overlay_stages()
assert len(overlays) == 2
@@ -1006,7 +1006,7 @@ class TestOverlayStages:
pipeline = Pipeline()
pipeline.add_stage("regular", RegularStage())
pipeline.add_stage("overlay", OverlayStage())
pipeline.build()
pipeline.build(auto_inject=False)
pipeline.execute("data")
@@ -1070,7 +1070,7 @@ class TestOverlayStages:
pipeline = Pipeline()
pipeline.add_stage("test", TestStage())
pipeline.build()
pipeline.build(auto_inject=False)
assert pipeline.get_stage_type("test") == "overlay"
@@ -1092,7 +1092,7 @@ class TestOverlayStages:
pipeline = Pipeline()
pipeline.add_stage("test", TestStage())
pipeline.build()
pipeline.build(auto_inject=False)
assert pipeline.get_render_order("test") == 42
@@ -1142,7 +1142,7 @@ class TestInletOutletTypeValidation:
pipeline.add_stage("consumer", ConsumerStage())
with pytest.raises(StageError) as exc_info:
pipeline.build()
pipeline.build(auto_inject=False)
assert "Type mismatch" in str(exc_info.value)
assert "TEXT_BUFFER" in str(exc_info.value)
@@ -1190,7 +1190,7 @@ class TestInletOutletTypeValidation:
pipeline.add_stage("consumer", ConsumerStage())
# Should not raise
pipeline.build()
pipeline.build(auto_inject=False)
def test_any_type_accepts_everything(self):
"""DataType.ANY accepts any upstream type."""
@@ -1234,7 +1234,7 @@ class TestInletOutletTypeValidation:
pipeline.add_stage("consumer", ConsumerStage())
# Should not raise because consumer accepts ANY
pipeline.build()
pipeline.build(auto_inject=False)
def test_multiple_compatible_types(self):
"""Stage can declare multiple inlet types."""
@@ -1278,7 +1278,7 @@ class TestInletOutletTypeValidation:
pipeline.add_stage("consumer", ConsumerStage())
# Should not raise because consumer accepts SOURCE_ITEMS
pipeline.build()
pipeline.build(auto_inject=False)
def test_display_must_accept_text_buffer(self):
"""Display stages must accept TEXT_BUFFER type."""
@@ -1302,7 +1302,7 @@ class TestInletOutletTypeValidation:
pipeline.add_stage("display", BadDisplayStage())
with pytest.raises(StageError) as exc_info:
pipeline.build()
pipeline.build(auto_inject=False)
assert "display" in str(exc_info.value).lower()
@@ -1349,7 +1349,7 @@ class TestPipelineMutation:
"""add_stage() initializes stage when pipeline already initialized."""
pipeline = Pipeline()
mock_stage = self._create_mock_stage("test")
pipeline.build()
pipeline.build(auto_inject=False)
pipeline._initialized = True
pipeline.add_stage("test", mock_stage, initialize=True)
@@ -1478,7 +1478,7 @@ class TestPipelineMutation:
pipeline.add_stage("a", stage_a, initialize=False)
pipeline.add_stage("b", stage_b, initialize=False)
pipeline.add_stage("c", stage_c, initialize=False)
pipeline.build()
pipeline.build(auto_inject=False)
result = pipeline.move_stage("a", after="c")
@@ -1497,7 +1497,7 @@ class TestPipelineMutation:
pipeline.add_stage("a", stage_a, initialize=False)
pipeline.add_stage("b", stage_b, initialize=False)
pipeline.add_stage("c", stage_c, initialize=False)
pipeline.build()
pipeline.build(auto_inject=False)
result = pipeline.move_stage("c", before="a")
@@ -1512,7 +1512,7 @@ class TestPipelineMutation:
stage = self._create_mock_stage("test")
pipeline.add_stage("test", stage, initialize=False)
pipeline.build()
pipeline.build(auto_inject=False)
result = pipeline.move_stage("nonexistent", after="test")
@@ -1613,7 +1613,7 @@ class TestPipelineMutation:
pipeline.add_stage("s1", stage1, initialize=False)
pipeline.add_stage("s2", stage2, initialize=False)
pipeline.build()
pipeline.build(auto_inject=False)
info = pipeline.get_pipeline_info()
@@ -1640,7 +1640,7 @@ class TestPipelineMutation:
pipeline.add_stage("source", source, initialize=False)
pipeline.add_stage("effect", effect, initialize=False)
pipeline.add_stage("display", display, initialize=False)
pipeline.build()
pipeline.build(auto_inject=False)
assert pipeline.execution_order == ["source", "effect", "display"]
@@ -1664,7 +1664,7 @@ class TestPipelineMutation:
pipeline.add_stage("source", source, initialize=False)
pipeline.add_stage("display", display, initialize=False)
pipeline.build()
pipeline.build(auto_inject=False)
new_stage = self._create_mock_stage(
"effect", "effect", capabilities={"effect"}, dependencies={"source"}
@@ -1757,7 +1757,7 @@ class TestPipelineMutation:
pipeline.add_stage("source", TestSource(), initialize=False)
pipeline.add_stage("effect", TestEffect(), initialize=False)
pipeline.add_stage("display", TestDisplay(), initialize=False)
pipeline.build()
pipeline.build(auto_inject=False)
pipeline.initialize()
result = pipeline.execute(None)

View File

@@ -21,6 +21,7 @@ from engine.pipeline.adapters import (
EffectPluginStage,
FontStage,
SourceItemsToBufferStage,
ViewportFilterStage,
)
from engine.pipeline.core import PipelineContext
from engine.pipeline.params import PipelineParams
@@ -129,7 +130,28 @@ def _build_pipeline(
# Render stage
if use_font_stage:
# FontStage requires viewport_filter stage which requires camera state
from engine.camera import Camera
from engine.pipeline.adapters import CameraClockStage, CameraStage
camera = Camera.scroll(speed=0.0)
camera.set_canvas_size(200, 200)
# CameraClockStage updates camera state, must come before viewport_filter
pipeline.add_stage(
"camera_update", CameraClockStage(camera, name="camera-clock")
)
# ViewportFilterStage requires camera.state
pipeline.add_stage(
"viewport_filter", ViewportFilterStage(name="viewport-filter")
)
# FontStage converts items to buffer
pipeline.add_stage("render", FontStage(name="font"))
# CameraStage applies viewport transformation to rendered buffer
pipeline.add_stage("camera", CameraStage(camera, name="static"))
else:
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))

View File

@@ -0,0 +1,405 @@
"""
Integration tests for pipeline hot-rebuild and state preservation.
Tests:
1. Viewport size control via --viewport flag
2. NullDisplay recording and save/load functionality
3. Pipeline state preservation during hot-rebuild
"""
import json
import sys
import tempfile
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent))
from engine.display import DisplayRegistry
from engine.display.backends.null import NullDisplay
from engine.display.backends.replay import ReplayDisplay
from engine.effects import get_registry
from engine.fetch import load_cache
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
from engine.pipeline.adapters import (
EffectPluginStage,
FontStage,
ViewportFilterStage,
create_stage_from_display,
create_stage_from_effect,
)
from engine.pipeline.params import PipelineParams
@pytest.fixture
def viewport_dims():
"""Small viewport dimensions for testing."""
return (40, 15)
@pytest.fixture
def items():
"""Load cached source items."""
items = load_cache()
if not items:
pytest.skip("No fixture cache available")
return items
@pytest.fixture
def null_display(viewport_dims):
"""Create a NullDisplay for testing."""
display = DisplayRegistry.create("null")
display.init(viewport_dims[0], viewport_dims[1])
return display
@pytest.fixture
def pipeline_with_null_display(items, null_display):
"""Create a pipeline with NullDisplay for testing."""
import engine.effects.plugins as effects_plugins
effects_plugins.discover_plugins()
width, height = null_display.width, null_display.height
params = PipelineParams()
params.viewport_width = width
params.viewport_height = height
config = PipelineConfig(
source="fixture",
display="null",
camera="scroll",
effects=["noise", "fade"],
)
pipeline = Pipeline(config=config, context=PipelineContext())
from engine.camera import Camera
from engine.data_sources.sources import ListDataSource
from engine.pipeline.adapters import CameraClockStage, CameraStage, DataSourceStage
list_source = ListDataSource(items, name="fixture")
pipeline.add_stage("source", DataSourceStage(list_source, name="fixture"))
# Add camera stages (required by ViewportFilterStage)
camera = Camera.scroll(speed=0.3)
camera.set_canvas_size(200, 200)
pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock"))
pipeline.add_stage("camera", CameraStage(camera, name="scroll"))
pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter"))
pipeline.add_stage("font", FontStage(name="font"))
effect_registry = get_registry()
for effect_name in config.effects:
effect = effect_registry.get(effect_name)
if effect:
pipeline.add_stage(
f"effect_{effect_name}",
create_stage_from_effect(effect, effect_name),
)
pipeline.add_stage("display", create_stage_from_display(null_display, "null"))
pipeline.build()
if not pipeline.initialize():
pytest.fail("Failed to initialize pipeline")
ctx = pipeline.context
ctx.params = params
ctx.set("display", null_display)
ctx.set("items", items)
ctx.set("pipeline", pipeline)
ctx.set("pipeline_order", pipeline.execution_order)
ctx.set("camera_y", 0)
yield pipeline, params, null_display
pipeline.cleanup()
null_display.cleanup()
class TestNullDisplayRecording:
"""Tests for NullDisplay recording functionality."""
def test_null_display_initialization(self, viewport_dims):
"""NullDisplay initializes with correct dimensions."""
display = NullDisplay()
display.init(viewport_dims[0], viewport_dims[1])
assert display.width == viewport_dims[0]
assert display.height == viewport_dims[1]
def test_start_stop_recording(self, null_display):
"""NullDisplay can start and stop recording."""
assert not null_display._is_recording
null_display.start_recording()
assert null_display._is_recording is True
null_display.stop_recording()
assert null_display._is_recording is False
def test_record_frames(self, null_display, pipeline_with_null_display):
"""NullDisplay records frames when recording is enabled."""
pipeline, params, display = pipeline_with_null_display
display.start_recording()
assert len(display._recorded_frames) == 0
for frame in range(5):
params.frame_number = frame
pipeline.context.params = params
pipeline.execute([])
assert len(display._recorded_frames) == 5
def test_get_frames(self, null_display, pipeline_with_null_display):
"""NullDisplay.get_frames() returns recorded buffers."""
pipeline, params, display = pipeline_with_null_display
display.start_recording()
for frame in range(3):
params.frame_number = frame
pipeline.context.params = params
pipeline.execute([])
frames = display.get_frames()
assert len(frames) == 3
assert all(isinstance(f, list) for f in frames)
def test_clear_recording(self, null_display, pipeline_with_null_display):
"""NullDisplay.clear_recording() clears recorded frames."""
pipeline, params, display = pipeline_with_null_display
display.start_recording()
for frame in range(3):
params.frame_number = frame
pipeline.context.params = params
pipeline.execute([])
assert len(display._recorded_frames) == 3
display.clear_recording()
assert len(display._recorded_frames) == 0
def test_save_load_recording(self, null_display, pipeline_with_null_display):
"""NullDisplay can save and load recordings."""
pipeline, params, display = pipeline_with_null_display
display.start_recording()
for frame in range(3):
params.frame_number = frame
pipeline.context.params = params
pipeline.execute([])
with tempfile.NamedTemporaryFile(mode="w", suffix=".json", delete=False) as f:
temp_path = f.name
try:
display.save_recording(temp_path)
with open(temp_path) as f:
data = json.load(f)
assert data["version"] == 1
assert data["display"] == "null"
assert data["frame_count"] == 3
assert len(data["frames"]) == 3
display2 = NullDisplay()
display2.load_recording(temp_path)
assert len(display2._recorded_frames) == 3
finally:
Path(temp_path).unlink(missing_ok=True)
class TestReplayDisplay:
"""Tests for ReplayDisplay functionality."""
def test_replay_display_initialization(self, viewport_dims):
"""ReplayDisplay initializes correctly."""
display = ReplayDisplay()
display.init(viewport_dims[0], viewport_dims[1])
assert display.width == viewport_dims[0]
assert display.height == viewport_dims[1]
def test_set_and_get_frames(self):
"""ReplayDisplay can set and retrieve frames."""
display = ReplayDisplay()
frames = [
{"buffer": ["line1", "line2"], "width": 40, "height": 15},
{"buffer": ["line3", "line4"], "width": 40, "height": 15},
]
display.set_frames(frames)
frame = display.get_next_frame()
assert frame == ["line1", "line2"]
frame = display.get_next_frame()
assert frame == ["line3", "line4"]
frame = display.get_next_frame()
assert frame is None
def test_replay_loop_mode(self):
"""ReplayDisplay can loop playback."""
display = ReplayDisplay()
display.set_loop(True)
frames = [
{"buffer": ["frame1"], "width": 40, "height": 15},
{"buffer": ["frame2"], "width": 40, "height": 15},
]
display.set_frames(frames)
assert display.get_next_frame() == ["frame1"]
assert display.get_next_frame() == ["frame2"]
assert display.get_next_frame() == ["frame1"]
assert display.get_next_frame() == ["frame2"]
def test_replay_seek_and_reset(self):
"""ReplayDisplay supports seek and reset."""
display = ReplayDisplay()
frames = [
{"buffer": [f"frame{i}"], "width": 40, "height": 15} for i in range(5)
]
display.set_frames(frames)
display.seek(3)
assert display.get_next_frame() == ["frame3"]
display.reset()
assert display.get_next_frame() == ["frame0"]
class TestPipelineHotRebuild:
"""Tests for pipeline hot-rebuild and state preservation."""
def test_pipeline_runs_with_null_display(self, pipeline_with_null_display):
"""Pipeline executes successfully with NullDisplay."""
pipeline, params, display = pipeline_with_null_display
for frame in range(5):
params.frame_number = frame
pipeline.context.params = params
result = pipeline.execute([])
assert result.success
assert display._last_buffer is not None
def test_effect_toggle_during_execution(self, pipeline_with_null_display):
"""Effects can be toggled during pipeline execution."""
pipeline, params, display = pipeline_with_null_display
params.frame_number = 0
pipeline.context.params = params
pipeline.execute([])
buffer1 = display._last_buffer
fade_stage = pipeline.get_stage("effect_fade")
assert fade_stage is not None
assert isinstance(fade_stage, EffectPluginStage)
fade_stage._enabled = False
fade_stage._effect.config.enabled = False
params.frame_number = 1
pipeline.context.params = params
pipeline.execute([])
buffer2 = display._last_buffer
assert buffer1 != buffer2
def test_state_preservation_across_rebuild(self, pipeline_with_null_display):
"""Pipeline state is preserved across hot-rebuild events."""
pipeline, params, display = pipeline_with_null_display
for frame in range(5):
params.frame_number = frame
pipeline.context.params = params
pipeline.execute([])
camera_y_before = pipeline.context.get("camera_y")
fade_stage = pipeline.get_stage("effect_fade")
if fade_stage and isinstance(fade_stage, EffectPluginStage):
fade_stage.set_enabled(not fade_stage.is_enabled())
fade_stage._effect.config.enabled = fade_stage.is_enabled()
params.frame_number = 5
pipeline.context.params = params
pipeline.execute([])
pipeline.context.get("camera_y")
assert camera_y_before is not None
class TestViewportControl:
"""Tests for viewport size control."""
def test_viewport_dimensions_applied(self, items):
"""Viewport dimensions are correctly applied to pipeline."""
width, height = 40, 15
display = DisplayRegistry.create("null")
display.init(width, height)
params = PipelineParams()
params.viewport_width = width
params.viewport_height = height
config = PipelineConfig(
source="fixture",
display="null",
camera="scroll",
effects=[],
)
pipeline = Pipeline(config=config, context=PipelineContext())
from engine.camera import Camera
from engine.data_sources.sources import ListDataSource
from engine.pipeline.adapters import (
CameraClockStage,
CameraStage,
DataSourceStage,
)
list_source = ListDataSource(items, name="fixture")
pipeline.add_stage("source", DataSourceStage(list_source, name="fixture"))
# Add camera stages (required by ViewportFilterStage)
camera = Camera.scroll(speed=0.3)
camera.set_canvas_size(200, 200)
pipeline.add_stage(
"camera_update", CameraClockStage(camera, name="camera-clock")
)
pipeline.add_stage("camera", CameraStage(camera, name="scroll"))
pipeline.add_stage(
"viewport_filter", ViewportFilterStage(name="viewport-filter")
)
pipeline.add_stage("font", FontStage(name="font"))
pipeline.add_stage("display", create_stage_from_display(display, "null"))
pipeline.build()
assert pipeline.initialize()
ctx = pipeline.context
ctx.params = params
ctx.set("display", display)
ctx.set("items", items)
ctx.set("pipeline", pipeline)
ctx.set("camera_y", 0)
result = pipeline.execute(items)
assert result.success
assert display._last_buffer is not None
pipeline.cleanup()
display.cleanup()

View File

@@ -0,0 +1,206 @@
"""Integration test: TintEffect in the pipeline."""
import queue
from engine.data_sources.sources import ListDataSource, SourceItem
from engine.effects.plugins.tint import TintEffect
from engine.effects.types import EffectConfig
from engine.pipeline import Pipeline, PipelineConfig
from engine.pipeline.adapters import (
DataSourceStage,
DisplayStage,
EffectPluginStage,
SourceItemsToBufferStage,
)
from engine.pipeline.core import PipelineContext
from engine.pipeline.params import PipelineParams
class QueueDisplay:
"""Stub display that captures every frame into a queue."""
def __init__(self):
self.frames: queue.Queue[list[str]] = queue.Queue()
self.width = 80
self.height = 24
self._init_called = False
def init(self, width: int, height: int, reuse: bool = False) -> None:
self.width = width
self.height = height
self._init_called = True
def show(self, buffer: list[str], border: bool = False) -> None:
self.frames.put(list(buffer))
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass
def get_dimensions(self) -> tuple[int, int]:
return (self.width, self.height)
def _build_pipeline(
items: list[SourceItem],
tint_config: EffectConfig | None = None,
width: int = 80,
height: int = 24,
) -> tuple[Pipeline, QueueDisplay, PipelineContext]:
"""Build pipeline: source -> render -> tint effect -> display."""
display = QueueDisplay()
ctx = PipelineContext()
params = PipelineParams()
params.viewport_width = width
params.viewport_height = height
params.frame_number = 0
ctx.params = params
ctx.set("items", items)
pipeline = Pipeline(
config=PipelineConfig(enable_metrics=True),
context=ctx,
)
# Source
source = ListDataSource(items, name="test-source")
pipeline.add_stage("source", DataSourceStage(source, name="test-source"))
# Render (simple)
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
# Tint effect
tint_effect = TintEffect()
if tint_config is not None:
tint_effect.configure(tint_config)
pipeline.add_stage("tint", EffectPluginStage(tint_effect, name="tint"))
# Display
pipeline.add_stage("display", DisplayStage(display, name="queue"))
pipeline.build()
pipeline.initialize()
return pipeline, display, ctx
class TestTintAcceptance:
"""Test TintEffect in a full pipeline."""
def test_tint_applies_default_color(self):
"""Default tint should apply ANSI color codes to output."""
items = [SourceItem(content="Hello World", source="test", timestamp="0")]
pipeline, display, ctx = _build_pipeline(items)
result = pipeline.execute(items)
assert result.success, f"Pipeline failed: {result.error}"
frame = display.frames.get(timeout=1)
text = "\n".join(frame)
assert "\033[" in text, f"Expected ANSI codes in frame: {frame}"
assert "Hello World" in text
def test_tint_applies_red_color(self):
"""Configured red tint should produce red ANSI code (196-197)."""
items = [SourceItem(content="Red Text", source="test", timestamp="0")]
config = EffectConfig(
enabled=True,
intensity=1.0,
params={"r": 255, "g": 0, "b": 0, "a": 0.8},
)
pipeline, display, ctx = _build_pipeline(items, tint_config=config)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
line = frame[0]
# Should contain red ANSI code (196 or 197 in 256 color)
assert "\033[38;5;196m" in line or "\033[38;5;197m" in line, (
f"Missing red tint: {line}"
)
assert "Red Text" in line
def test_tint_disabled_does_nothing(self):
"""Disabled tint stage should pass through buffer unchanged."""
items = [SourceItem(content="Plain Text", source="test", timestamp="0")]
pipeline, display, ctx = _build_pipeline(items)
# Disable the tint stage
stage = pipeline.get_stage("tint")
stage.set_enabled(False)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
text = "\n".join(frame)
# Should contain Plain Text with NO ANSI color codes
assert "Plain Text" in text
assert "\033[" not in text, f"Unexpected ANSI codes in frame: {frame}"
def test_tint_zero_transparency(self):
"""Alpha=0 should pass through buffer unchanged (no tint)."""
items = [SourceItem(content="Transparent", source="test", timestamp="0")]
config = EffectConfig(
enabled=True,
intensity=1.0,
params={"r": 255, "g": 128, "b": 64, "a": 0.0},
)
pipeline, display, ctx = _build_pipeline(items, tint_config=config)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
text = "\n".join(frame)
assert "Transparent" in text
assert "\033[" not in text, f"Expected no ANSI codes with alpha=0: {frame}"
def test_tint_with_multiples_lines(self):
"""Tint should apply to all non-empty lines."""
items = [
SourceItem(content="Line1\nLine2\n\nLine4", source="test", timestamp="0")
]
config = EffectConfig(
enabled=True,
intensity=1.0,
params={"r": 0, "g": 255, "b": 0, "a": 0.7},
)
pipeline, display, ctx = _build_pipeline(items, tint_config=config)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
# All non-empty lines should have green ANSI codes
green_codes = ["\033[38;5;", "m"]
for line in frame:
if line.strip():
assert green_codes[0] in line and green_codes[1] in line, (
f"Missing green tint: {line}"
)
else:
assert line == "", f"Empty lines should be exactly empty: {line}"
def test_tint_preserves_empty_lines(self):
"""Empty lines should remain empty (no ANSI codes)."""
items = [SourceItem(content="A\n\nB", source="test", timestamp="0")]
pipeline, display, ctx = _build_pipeline(items)
result = pipeline.execute(items)
assert result.success
frame = display.frames.get(timeout=1)
assert frame[0].strip() != ""
assert frame[1] == "" # Empty line unchanged
assert frame[2].strip() != ""