- Implements pipeline hot-rebuild with state preservation (issue #43) - Adds auto-injection of MVP stages for missing capabilities - Adds radial camera mode for polar coordinate scanning - Adds afterimage and motionblur effects using framebuffer history - Adds comprehensive acceptance tests for camera modes and pipeline rebuild - Updates presets.toml with new effect configurations Related to: #35 (Pipeline Mutation API epic) Closes: #43, #44, #45
827 lines
29 KiB
Python
827 lines
29 KiB
Python
"""
|
|
Camera acceptance tests using NullDisplay frame recording and ReplayDisplay.
|
|
|
|
Tests all camera modes by:
|
|
1. Creating deterministic source data (numbered lines)
|
|
2. Running pipeline with small viewport (40x15)
|
|
3. Recording frames with NullDisplay
|
|
4. Asserting expected viewport content for each mode
|
|
|
|
Usage:
|
|
pytest tests/test_camera_acceptance.py -v
|
|
pytest tests/test_camera_acceptance.py --show-frames -v
|
|
|
|
The --show-frames flag displays recorded frames for visual verification.
|
|
"""
|
|
|
|
import math
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
import pytest
|
|
|
|
sys.path.insert(0, str(Path(__file__).parent.parent))
|
|
|
|
from engine.camera import Camera, CameraMode
|
|
from engine.display import DisplayRegistry
|
|
from engine.effects import get_registry
|
|
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
|
|
from engine.pipeline.adapters import (
|
|
CameraClockStage,
|
|
CameraStage,
|
|
FontStage,
|
|
ViewportFilterStage,
|
|
create_stage_from_display,
|
|
create_stage_from_effect,
|
|
)
|
|
from engine.pipeline.params import PipelineParams
|
|
|
|
|
|
def get_camera_position(pipeline, camera):
|
|
"""Helper to get camera position directly from the camera object.
|
|
|
|
The pipeline context's camera_y/camera_x values may be transformed by
|
|
ViewportFilterStage (filtered relative position). This helper gets the
|
|
true camera position from the camera object itself.
|
|
|
|
Args:
|
|
pipeline: The pipeline instance
|
|
camera: The camera object
|
|
|
|
Returns:
|
|
tuple (x, y) of the camera's absolute position
|
|
"""
|
|
return (camera.x, camera.y)
|
|
|
|
|
|
# Register custom CLI option for showing frames
|
|
def pytest_addoption(parser):
|
|
parser.addoption(
|
|
"--show-frames",
|
|
action="store_true",
|
|
default=False,
|
|
help="Display recorded frames for visual verification",
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def show_frames(request):
|
|
"""Get the --show-frames flag value."""
|
|
try:
|
|
return request.config.getoption("--show-frames")
|
|
except ValueError:
|
|
# Option not registered, default to False
|
|
return False
|
|
|
|
|
|
@pytest.fixture
|
|
def viewport_dims():
|
|
"""Small viewport dimensions for testing."""
|
|
return (40, 15)
|
|
|
|
|
|
@pytest.fixture
|
|
def items():
|
|
"""Create deterministic test data - numbered lines for easy verification."""
|
|
# Create 100 numbered lines: LINE 000, LINE 001, etc.
|
|
return [{"text": f"LINE {i:03d} - This is line number {i}"} for i in range(100)]
|
|
|
|
|
|
@pytest.fixture
|
|
def null_display(viewport_dims):
|
|
"""Create a NullDisplay for testing."""
|
|
display = DisplayRegistry.create("null")
|
|
display.init(viewport_dims[0], viewport_dims[1])
|
|
return display
|
|
|
|
|
|
def create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims, effects=None
|
|
):
|
|
"""Helper to create a pipeline with a specific camera."""
|
|
effects = effects or []
|
|
width, height = viewport_dims
|
|
|
|
params = PipelineParams()
|
|
params.viewport_width = width
|
|
params.viewport_height = height
|
|
|
|
config = PipelineConfig(
|
|
source="fixture",
|
|
display="null",
|
|
camera="scroll",
|
|
effects=effects,
|
|
)
|
|
|
|
pipeline = Pipeline(config=config, context=PipelineContext())
|
|
|
|
from engine.data_sources.sources import ListDataSource
|
|
from engine.pipeline.adapters import DataSourceStage
|
|
|
|
list_source = ListDataSource(items, name="fixture")
|
|
pipeline.add_stage("source", DataSourceStage(list_source, name="fixture"))
|
|
|
|
# Add camera update stage to ensure camera_y is available for viewport filter
|
|
pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock"))
|
|
|
|
# Note: camera should come after font/viewport_filter, before effects
|
|
pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter"))
|
|
pipeline.add_stage("font", FontStage(name="font"))
|
|
pipeline.add_stage(
|
|
"camera",
|
|
CameraStage(
|
|
camera, name="radial" if camera.mode == CameraMode.RADIAL else "vertical"
|
|
),
|
|
)
|
|
|
|
if effects:
|
|
effect_registry = get_registry()
|
|
for effect_name in effects:
|
|
effect = effect_registry.get(effect_name)
|
|
if effect:
|
|
pipeline.add_stage(
|
|
f"effect_{effect_name}",
|
|
create_stage_from_effect(effect, effect_name),
|
|
)
|
|
|
|
pipeline.add_stage("display", create_stage_from_display(null_display, "null"))
|
|
pipeline.build()
|
|
|
|
if not pipeline.initialize():
|
|
return None
|
|
|
|
ctx = pipeline.context
|
|
ctx.params = params
|
|
ctx.set("display", null_display)
|
|
ctx.set("items", items)
|
|
ctx.set("pipeline", pipeline)
|
|
ctx.set("pipeline_order", pipeline.execution_order)
|
|
|
|
return pipeline
|
|
|
|
|
|
class DisplayHelper:
|
|
"""Helper to display frames for visual verification."""
|
|
|
|
@staticmethod
|
|
def show_frame(buffer, title, viewport_dims, marker_line=None):
|
|
"""Display a single frame with visual markers."""
|
|
width, height = viewport_dims
|
|
print(f"\n{'=' * (width + 20)}")
|
|
print(f" {title}")
|
|
print(f"{'=' * (width + 20)}")
|
|
|
|
for i, line in enumerate(buffer[:height]):
|
|
# Add marker if this line should be highlighted
|
|
marker = ">>>" if marker_line == i else " "
|
|
print(f"{marker} [{i:2}] {line[:width]}")
|
|
|
|
print(f"{'=' * (width + 20)}\n")
|
|
|
|
|
|
class TestFeedCamera:
|
|
"""Test FEED mode: rapid single-item scrolling (1 row/frame at speed=1.0)."""
|
|
|
|
def test_feed_camera_scrolls_down(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""FEED camera should move content down (y increases) at 1 row/frame."""
|
|
camera = Camera.feed(speed=1.0)
|
|
camera.set_canvas_size(200, 100)
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
# Run for 10 frames with small delay between frames
|
|
# to ensure camera has time to move (dt calculation relies on time.perf_counter())
|
|
import time
|
|
|
|
for frame in range(10):
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
if frame < 9: # No need to sleep after last frame
|
|
time.sleep(0.02) # Wait 20ms so dt~0.02, camera moves ~1.2 rows
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(frames[0], "FEED Camera - Frame 0", viewport_dims)
|
|
DisplayHelper.show_frame(frames[5], "FEED Camera - Frame 5", viewport_dims)
|
|
DisplayHelper.show_frame(frames[9], "FEED Camera - Frame 9", viewport_dims)
|
|
|
|
# FEED mode: each frame y increases by speed*dt*60
|
|
# At dt=1.0, speed=1.0: y increases by 60 per frame
|
|
# But clamp to canvas bounds (200)
|
|
# Frame 0: y=0, should show LINE 000
|
|
# Frame 1: y=60, should show LINE 060
|
|
|
|
# Verify frame 0 contains ASCII art content (rendered from LINE 000)
|
|
# The text is converted to block characters, so check for non-empty frames
|
|
assert len(frames[0]) > 0, "Frame 0 should not be empty"
|
|
assert frames[0][0].strip() != "", "Frame 0 should have visible content"
|
|
|
|
# Verify camera position changed between frames
|
|
# Feed mode moves 1 row per frame at speed=1.0 with dt~0.02
|
|
# After 5 frames, camera should have moved down
|
|
assert camera.y > 0, f"Camera should have moved down, y={camera.y}"
|
|
|
|
# Verify different frames show different content (camera is scrolling)
|
|
# Check that frame 0 and frame 5 are different
|
|
frame_0_str = "\n".join(frames[0])
|
|
frame_5_str = "\n".join(frames[5])
|
|
assert frame_0_str != frame_5_str, (
|
|
"Frame 0 and Frame 5 should show different content"
|
|
)
|
|
|
|
|
|
class TestScrollCamera:
|
|
"""Test SCROLL mode: smooth vertical scrolling with float accumulation."""
|
|
|
|
def test_scroll_camera_smooth_movement(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""SCROLL camera should move content smoothly with sub-integer precision."""
|
|
camera = Camera.scroll(speed=0.5)
|
|
camera.set_canvas_size(0, 200) # Match viewport width for text wrapping
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
# Run for 20 frames
|
|
for frame in range(20):
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(
|
|
frames[0], "SCROLL Camera - Frame 0", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[10], "SCROLL Camera - Frame 10", viewport_dims
|
|
)
|
|
|
|
# SCROLL mode uses float accumulation for smooth scrolling
|
|
# At speed=0.5, dt=1.0: y increases by 0.5 * 60 = 30 pixels per frame
|
|
# Verify camera_y is increasing (which causes the scroll)
|
|
camera_y_values = []
|
|
for frame in range(5):
|
|
# Get camera.y directly (not filtered context value)
|
|
pipeline.context.set("frame_number", frame)
|
|
pipeline.execute(items)
|
|
camera_y_values.append(camera.y)
|
|
|
|
print(f"\nSCROLL test - camera_y positions: {camera_y_values}")
|
|
|
|
# Verify camera_y is non-zero (camera is moving)
|
|
assert camera_y_values[-1] > 0, (
|
|
"Camera should have scrolled down (camera_y > 0)"
|
|
)
|
|
|
|
# Verify camera_y is increasing
|
|
for i in range(len(camera_y_values) - 1):
|
|
assert camera_y_values[i + 1] >= camera_y_values[i], (
|
|
f"Camera_y should be non-decreasing: {camera_y_values}"
|
|
)
|
|
|
|
|
|
class TestHorizontalCamera:
|
|
"""Test HORIZONTAL mode: left/right scrolling."""
|
|
|
|
def test_horizontal_camera_scrolls_right(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""HORIZONTAL camera should move content right (x increases)."""
|
|
camera = Camera.horizontal(speed=1.0)
|
|
camera.set_canvas_size(200, 200)
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
for frame in range(10):
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(
|
|
frames[0], "HORIZONTAL Camera - Frame 0", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[5], "HORIZONTAL Camera - Frame 5", viewport_dims
|
|
)
|
|
|
|
# HORIZONTAL mode: x increases by speed*dt*60
|
|
# At dt=1.0, speed=1.0: x increases by 60 per frame
|
|
# Frame 0: x=0
|
|
# Frame 5: x=300 (clamped to canvas_width-viewport_width)
|
|
|
|
# Verify frame 0 contains content (ASCII art of LINE 000)
|
|
assert len(frames[0]) > 0, "Frame 0 should not be empty"
|
|
assert frames[0][0].strip() != "", "Frame 0 should have visible content"
|
|
|
|
# Verify camera x is increasing
|
|
print("\nHORIZONTAL test - camera positions:")
|
|
for i in range(10):
|
|
print(f" Frame {i}: x={camera.x}, y={camera.y}")
|
|
camera.update(1.0)
|
|
|
|
# Verify camera moved
|
|
assert camera.x > 0, f"Camera should have moved right, x={camera.x}"
|
|
|
|
|
|
class TestOmniCamera:
|
|
"""Test OMNI mode: diagonal scrolling (x and y increase together)."""
|
|
|
|
def test_omni_camera_diagonal_movement(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""OMNI camera should move content diagonally (both x and y increase)."""
|
|
camera = Camera.omni(speed=1.0)
|
|
camera.set_canvas_size(200, 200)
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
for frame in range(10):
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(frames[0], "OMNI Camera - Frame 0", viewport_dims)
|
|
DisplayHelper.show_frame(frames[5], "OMNI Camera - Frame 5", viewport_dims)
|
|
|
|
# OMNI mode: y increases by speed*dt*60, x increases by speed*dt*60*0.5
|
|
# At dt=1.0, speed=1.0: y += 60, x += 30
|
|
|
|
# Verify frame 0 contains content (ASCII art)
|
|
assert len(frames[0]) > 0, "Frame 0 should not be empty"
|
|
assert frames[0][0].strip() != "", "Frame 0 should have visible content"
|
|
|
|
print("\nOMNI test - camera positions:")
|
|
camera.reset()
|
|
for frame in range(5):
|
|
print(f" Frame {frame}: x={camera.x}, y={camera.y}")
|
|
camera.update(1.0)
|
|
|
|
# Verify camera moved
|
|
assert camera.y > 0, f"Camera should have moved down, y={camera.y}"
|
|
|
|
|
|
class TestFloatingCamera:
|
|
"""Test FLOATING mode: sinusoidal bobbing motion."""
|
|
|
|
def test_floating_camera_bobbing(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""FLOATING camera should move content in a sinusoidal pattern."""
|
|
camera = Camera.floating(speed=1.0)
|
|
camera.set_canvas_size(200, 200)
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
for frame in range(32):
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(
|
|
frames[0], "FLOATING Camera - Frame 0", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[8], "FLOATING Camera - Frame 8 (quarter cycle)", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[16], "FLOATING Camera - Frame 16 (half cycle)", viewport_dims
|
|
)
|
|
|
|
# FLOATING mode: y = sin(time*2) * speed * 30
|
|
# Period: 2π / 2 = π ≈ 3.14 seconds (or ~3.14 frames at dt=1.0)
|
|
# Full cycle ~32 frames
|
|
|
|
print("\nFLOATING test - sinusoidal motion:")
|
|
camera.reset()
|
|
for frame in range(16):
|
|
print(f" Frame {frame}: y={camera.y}, x={camera.x}")
|
|
camera.update(1.0)
|
|
|
|
# Verify y oscillates around 0
|
|
camera.reset()
|
|
camera.update(1.0) # Frame 1
|
|
y1 = camera.y
|
|
camera.update(1.0) # Frame 2
|
|
y2 = camera.y
|
|
camera.update(1.0) # Frame 3
|
|
y3 = camera.y
|
|
|
|
# After a few frames, y should oscillate (not monotonic)
|
|
assert y1 != y2 or y2 != y3, "FLOATING camera should oscillate"
|
|
|
|
|
|
class TestBounceCamera:
|
|
"""Test BOUNCE mode: bouncing DVD-style motion."""
|
|
|
|
def test_bounce_camera_reverses_at_edges(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""BOUNCE camera should reverse direction when hitting canvas edges."""
|
|
camera = Camera.bounce(speed=5.0) # Faster for quicker test
|
|
# Set zoom > 1.0 so viewport is smaller than canvas, allowing movement
|
|
camera.set_zoom(2.0) # Zoom out 2x, viewport is half the canvas size
|
|
camera.set_canvas_size(400, 400)
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
for frame in range(50):
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(
|
|
frames[0], "BOUNCE Camera - Frame 0", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[25], "BOUNCE Camera - Frame 25", viewport_dims
|
|
)
|
|
|
|
# BOUNCE mode: moves until it hits edge, then reverses
|
|
# Verify the camera moves and changes direction
|
|
|
|
print("\nBOUNCE test - bouncing motion:")
|
|
camera.reset()
|
|
camera.set_zoom(2.0) # Reset also resets zoom, so set it again
|
|
for frame in range(20):
|
|
print(f" Frame {frame}: x={camera.x}, y={camera.y}")
|
|
camera.update(1.0)
|
|
|
|
# Check that camera hits bounds and reverses
|
|
camera.reset()
|
|
camera.set_zoom(2.0) # Reset also resets zoom, so set it again
|
|
for _ in range(51): # Odd number ensures ending at opposite corner
|
|
camera.update(1.0)
|
|
|
|
# Camera should have hit an edge and reversed direction
|
|
# With 400x400 canvas, viewport 200x200 (zoom=2), max_x = 200, max_y = 200
|
|
# Starting at (0,0), after 51 updates it should be at (200, 200)
|
|
max_x = max(0, camera.canvas_width - camera.viewport_width)
|
|
print(f"BOUNCE camera final position: x={camera.x}, y={camera.y}")
|
|
assert camera.x == max_x, (
|
|
f"Camera should be at max_x ({max_x}), got x={camera.x}"
|
|
)
|
|
|
|
# Check bounds are respected
|
|
vw = camera.viewport_width
|
|
vh = camera.viewport_height
|
|
assert camera.x >= 0 and camera.x <= camera.canvas_width - vw
|
|
assert camera.y >= 0 and camera.y <= camera.canvas_height - vh
|
|
|
|
|
|
class TestRadialCamera:
|
|
"""Test RADIAL mode: polar coordinate scanning (rotation around center)."""
|
|
|
|
def test_radial_camera_rotates_around_center(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""RADIAL camera should rotate around the center of the canvas."""
|
|
camera = Camera.radial(speed=0.5)
|
|
camera.set_canvas_size(200, 200)
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
for frame in range(32): # 32 frames = 2π at ~0.2 rad/frame
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(
|
|
frames[0], "RADIAL Camera - Frame 0", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[8], "RADIAL Camera - Frame 8 (quarter turn)", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[16], "RADIAL Camera - Frame 16 (half turn)", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[24], "RADIAL Camera - Frame 24 (3/4 turn)", viewport_dims
|
|
)
|
|
|
|
# RADIAL mode: rotates around center with smooth angular motion
|
|
# At speed=0.5: theta increases by ~0.2 rad/frame (0.5 * dt * 1.0)
|
|
|
|
print("\nRADIAL test - rotational motion:")
|
|
camera.reset()
|
|
for frame in range(32):
|
|
theta_deg = (camera._theta_float * 180 / math.pi) % 360
|
|
print(
|
|
f" Frame {frame}: theta={theta_deg:.1f}°, x={camera.x}, y={camera.y}"
|
|
)
|
|
camera.update(1.0)
|
|
|
|
# Verify rotation occurs (angle should change)
|
|
camera.reset()
|
|
theta_start = camera._theta_float
|
|
camera.update(1.0) # Frame 1
|
|
theta_mid = camera._theta_float
|
|
camera.update(1.0) # Frame 2
|
|
theta_end = camera._theta_float
|
|
|
|
assert theta_mid > theta_start, "Theta should increase (rotation)"
|
|
assert theta_end > theta_mid, "Theta should continue increasing"
|
|
|
|
def test_radial_camera_with_sensor_integration(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""RADIAL camera can be driven by external sensor (OSC integration test)."""
|
|
from engine.sensors.oscillator import (
|
|
OscillatorSensor,
|
|
register_oscillator_sensor,
|
|
)
|
|
|
|
# Create an oscillator sensor for testing
|
|
register_oscillator_sensor(name="test_osc", waveform="sine", frequency=0.5)
|
|
osc = OscillatorSensor(name="test_osc", waveform="sine", frequency=0.5)
|
|
|
|
camera = Camera.radial(speed=0.3)
|
|
camera.set_canvas_size(200, 200)
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
# Run frames while modulating camera with oscillator
|
|
for frame in range(32):
|
|
# Read oscillator value and set as radial input
|
|
osc_value = osc.read()
|
|
if osc_value:
|
|
camera.set_radial_input(osc_value.value)
|
|
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(
|
|
frames[0], "RADIAL+OSC Camera - Frame 0", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[8], "RADIAL+OSC Camera - Frame 8", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[16], "RADIAL+OSC Camera - Frame 16", viewport_dims
|
|
)
|
|
|
|
print("\nRADIAL+OSC test - sensor-driven rotation:")
|
|
osc.start()
|
|
camera.reset()
|
|
for frame in range(16):
|
|
osc_value = osc.read()
|
|
if osc_value:
|
|
camera.set_radial_input(osc_value.value)
|
|
camera.update(1.0)
|
|
theta_deg = (camera._theta_float * 180 / math.pi) % 360
|
|
print(
|
|
f" Frame {frame}: osc={osc_value.value if osc_value else 0:.3f}, theta={theta_deg:.1f}°"
|
|
)
|
|
|
|
# Verify camera position changes when driven by sensor
|
|
camera.reset()
|
|
x_start = camera.x
|
|
camera.update(1.0)
|
|
x_mid = camera.x
|
|
assert x_start != x_mid, "Camera should move when driven by oscillator"
|
|
|
|
osc.stop()
|
|
|
|
def test_radial_camera_with_direct_angle_setting(
|
|
self, items, null_display, viewport_dims, show_frames
|
|
):
|
|
"""RADIAL camera can have angle set directly for OSC integration."""
|
|
camera = Camera.radial(speed=0.0) # No auto-rotation
|
|
camera.set_canvas_size(200, 200)
|
|
camera._r_float = 80.0 # Set initial radius to see movement
|
|
|
|
pipeline = create_pipeline_with_camera(
|
|
camera, items, null_display, viewport_dims
|
|
)
|
|
assert pipeline is not None, "Pipeline creation failed"
|
|
|
|
null_display.start_recording()
|
|
|
|
# Set angle directly to sweep through full rotation
|
|
for frame in range(32):
|
|
angle = (frame / 32) * 2 * math.pi # 0 to 2π over 32 frames
|
|
camera.set_radial_angle(angle)
|
|
camera.update(1.0) # Must update to convert polar to Cartesian
|
|
|
|
pipeline.context.set("frame_number", frame)
|
|
result = pipeline.execute(items)
|
|
assert result.success, f"Frame {frame} execution failed"
|
|
|
|
null_display.stop_recording()
|
|
frames = null_display.get_frames()
|
|
|
|
if show_frames:
|
|
DisplayHelper.show_frame(
|
|
frames[0], "RADIAL Direct Angle - Frame 0", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[8], "RADIAL Direct Angle - Frame 8", viewport_dims
|
|
)
|
|
DisplayHelper.show_frame(
|
|
frames[16], "RADIAL Direct Angle - Frame 16", viewport_dims
|
|
)
|
|
|
|
print("\nRADIAL Direct Angle test - sweeping rotation:")
|
|
for frame in range(32):
|
|
angle = (frame / 32) * 2 * math.pi
|
|
camera.set_radial_angle(angle)
|
|
camera.update(1.0) # Update converts angle to x,y position
|
|
theta_deg = angle * 180 / math.pi
|
|
print(
|
|
f" Frame {frame}: set_angle={theta_deg:.1f}°, actual_x={camera.x}, actual_y={camera.y}"
|
|
)
|
|
|
|
# Verify camera position changes as angle sweeps
|
|
camera.reset()
|
|
camera._r_float = 80.0 # Set radius for testing
|
|
camera.set_radial_angle(0)
|
|
camera.update(1.0)
|
|
x0 = camera.x
|
|
camera.set_radial_angle(math.pi / 2)
|
|
camera.update(1.0)
|
|
x90 = camera.x
|
|
assert x0 != x90, (
|
|
f"Camera position should change with angle (x0={x0}, x90={x90})"
|
|
)
|
|
|
|
|
|
class TestCameraModeEnum:
|
|
"""Test CameraMode enum integrity."""
|
|
|
|
def test_all_modes_exist(self):
|
|
"""Verify all camera modes are defined."""
|
|
modes = [m.name for m in CameraMode]
|
|
expected = [
|
|
"FEED",
|
|
"SCROLL",
|
|
"HORIZONTAL",
|
|
"OMNI",
|
|
"FLOATING",
|
|
"BOUNCE",
|
|
"RADIAL",
|
|
]
|
|
|
|
for mode in expected:
|
|
assert mode in modes, f"CameraMode.{mode} should exist"
|
|
|
|
def test_radial_mode_exists(self):
|
|
"""Verify RADIAL mode is properly defined."""
|
|
assert CameraMode.RADIAL is not None
|
|
assert isinstance(CameraMode.RADIAL, CameraMode)
|
|
assert CameraMode.RADIAL.name == "RADIAL"
|
|
|
|
|
|
class TestCameraFactoryMethods:
|
|
"""Test camera factory methods create proper camera instances."""
|
|
|
|
def test_radial_factory(self):
|
|
"""RADIAL factory should create a camera with correct mode."""
|
|
camera = Camera.radial(speed=2.0)
|
|
assert camera.mode == CameraMode.RADIAL
|
|
assert camera.speed == 2.0
|
|
assert hasattr(camera, "_r_float")
|
|
assert hasattr(camera, "_theta_float")
|
|
|
|
def test_radial_factory_initializes_state(self):
|
|
"""RADIAL factory should initialize radial state."""
|
|
camera = Camera.radial()
|
|
assert camera._r_float == 0.0
|
|
assert camera._theta_float == 0.0
|
|
|
|
|
|
class TestCameraStateSaveRestore:
|
|
"""Test camera state can be saved and restored (for hot-rebuild)."""
|
|
|
|
def test_radial_camera_state_save(self):
|
|
"""RADIAL camera should save polar coordinate state."""
|
|
camera = Camera.radial()
|
|
camera._theta_float = math.pi / 4
|
|
camera._r_float = 50.0
|
|
|
|
# Save state via CameraStage adapter
|
|
from engine.pipeline.adapters.camera import CameraStage
|
|
|
|
stage = CameraStage(camera)
|
|
|
|
state = stage.save_state()
|
|
assert "_theta_float" in state
|
|
assert "_r_float" in state
|
|
assert state["_theta_float"] == math.pi / 4
|
|
assert state["_r_float"] == 50.0
|
|
|
|
def test_radial_camera_state_restore(self):
|
|
"""RADIAL camera should restore polar coordinate state."""
|
|
camera1 = Camera.radial()
|
|
camera1._theta_float = math.pi / 3
|
|
camera1._r_float = 75.0
|
|
|
|
from engine.pipeline.adapters.camera import CameraStage
|
|
|
|
stage1 = CameraStage(camera1)
|
|
state = stage1.save_state()
|
|
|
|
# Create new camera and restore
|
|
camera2 = Camera.radial()
|
|
stage2 = CameraStage(camera2)
|
|
stage2.restore_state(state)
|
|
|
|
assert abs(camera2._theta_float - math.pi / 3) < 0.001
|
|
assert abs(camera2._r_float - 75.0) < 0.001
|
|
|
|
|
|
class TestCameraViewportApplication:
|
|
"""Test camera.apply() properly slices buffers."""
|
|
|
|
def test_radial_camera_viewport_slicing(self):
|
|
"""RADIAL camera should properly slice buffer based on position."""
|
|
camera = Camera.radial(speed=0.5)
|
|
camera.set_canvas_size(200, 200)
|
|
|
|
# Update to move camera
|
|
camera.update(1.0)
|
|
|
|
# Create test buffer with 200 lines
|
|
buffer = [f"LINE {i:03d}" for i in range(200)]
|
|
|
|
# Apply camera viewport (15 lines high)
|
|
result = camera.apply(buffer, viewport_width=40, viewport_height=15)
|
|
|
|
# Result should be exactly 15 lines
|
|
assert len(result) == 15
|
|
|
|
# Each line should be 40 characters (padded or truncated)
|
|
for line in result:
|
|
assert len(line) <= 40
|