""" Camera acceptance tests using NullDisplay frame recording and ReplayDisplay. Tests all camera modes by: 1. Creating deterministic source data (numbered lines) 2. Running pipeline with small viewport (40x15) 3. Recording frames with NullDisplay 4. Asserting expected viewport content for each mode Usage: pytest tests/test_camera_acceptance.py -v pytest tests/test_camera_acceptance.py --show-frames -v The --show-frames flag displays recorded frames for visual verification. """ import math import sys from pathlib import Path import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) from engine.camera import Camera, CameraMode from engine.display import DisplayRegistry from engine.effects import get_registry from engine.pipeline import Pipeline, PipelineConfig, PipelineContext from engine.pipeline.adapters import ( CameraClockStage, CameraStage, FontStage, ViewportFilterStage, create_stage_from_display, create_stage_from_effect, ) from engine.pipeline.params import PipelineParams def get_camera_position(pipeline, camera): """Helper to get camera position directly from the camera object. The pipeline context's camera_y/camera_x values may be transformed by ViewportFilterStage (filtered relative position). This helper gets the true camera position from the camera object itself. Args: pipeline: The pipeline instance camera: The camera object Returns: tuple (x, y) of the camera's absolute position """ return (camera.x, camera.y) # Register custom CLI option for showing frames def pytest_addoption(parser): parser.addoption( "--show-frames", action="store_true", default=False, help="Display recorded frames for visual verification", ) @pytest.fixture def show_frames(request): """Get the --show-frames flag value.""" try: return request.config.getoption("--show-frames") except ValueError: # Option not registered, default to False return False @pytest.fixture def viewport_dims(): """Small viewport dimensions for testing.""" return (40, 15) @pytest.fixture def items(): """Create deterministic test data - numbered lines for easy verification.""" # Create 100 numbered lines: LINE 000, LINE 001, etc. return [{"text": f"LINE {i:03d} - This is line number {i}"} for i in range(100)] @pytest.fixture def null_display(viewport_dims): """Create a NullDisplay for testing.""" display = DisplayRegistry.create("null") display.init(viewport_dims[0], viewport_dims[1]) return display def create_pipeline_with_camera( camera, items, null_display, viewport_dims, effects=None ): """Helper to create a pipeline with a specific camera.""" effects = effects or [] width, height = viewport_dims params = PipelineParams() params.viewport_width = width params.viewport_height = height config = PipelineConfig( source="fixture", display="null", camera="scroll", effects=effects, ) pipeline = Pipeline(config=config, context=PipelineContext()) from engine.data_sources.sources import ListDataSource from engine.pipeline.adapters import DataSourceStage list_source = ListDataSource(items, name="fixture") pipeline.add_stage("source", DataSourceStage(list_source, name="fixture")) # Add camera update stage to ensure camera_y is available for viewport filter pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock")) # Note: camera should come after font/viewport_filter, before effects pipeline.add_stage("viewport_filter", ViewportFilterStage(name="viewport-filter")) pipeline.add_stage("font", FontStage(name="font")) pipeline.add_stage( "camera", CameraStage( camera, name="radial" if camera.mode == CameraMode.RADIAL else "vertical" ), ) if effects: effect_registry = get_registry() for effect_name in effects: effect = effect_registry.get(effect_name) if effect: pipeline.add_stage( f"effect_{effect_name}", create_stage_from_effect(effect, effect_name), ) pipeline.add_stage("display", create_stage_from_display(null_display, "null")) pipeline.build() if not pipeline.initialize(): return None ctx = pipeline.context ctx.params = params ctx.set("display", null_display) ctx.set("items", items) ctx.set("pipeline", pipeline) ctx.set("pipeline_order", pipeline.execution_order) return pipeline class DisplayHelper: """Helper to display frames for visual verification.""" @staticmethod def show_frame(buffer, title, viewport_dims, marker_line=None): """Display a single frame with visual markers.""" width, height = viewport_dims print(f"\n{'=' * (width + 20)}") print(f" {title}") print(f"{'=' * (width + 20)}") for i, line in enumerate(buffer[:height]): # Add marker if this line should be highlighted marker = ">>>" if marker_line == i else " " print(f"{marker} [{i:2}] {line[:width]}") print(f"{'=' * (width + 20)}\n") class TestFeedCamera: """Test FEED mode: rapid single-item scrolling (1 row/frame at speed=1.0).""" def test_feed_camera_scrolls_down( self, items, null_display, viewport_dims, show_frames ): """FEED camera should move content down (y increases) at 1 row/frame.""" camera = Camera.feed(speed=1.0) camera.set_canvas_size(200, 100) pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() # Run for 10 frames with small delay between frames # to ensure camera has time to move (dt calculation relies on time.perf_counter()) import time for frame in range(10): pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" if frame < 9: # No need to sleep after last frame time.sleep(0.02) # Wait 20ms so dt~0.02, camera moves ~1.2 rows null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame(frames[0], "FEED Camera - Frame 0", viewport_dims) DisplayHelper.show_frame(frames[5], "FEED Camera - Frame 5", viewport_dims) DisplayHelper.show_frame(frames[9], "FEED Camera - Frame 9", viewport_dims) # FEED mode: each frame y increases by speed*dt*60 # At dt=1.0, speed=1.0: y increases by 60 per frame # But clamp to canvas bounds (200) # Frame 0: y=0, should show LINE 000 # Frame 1: y=60, should show LINE 060 # Verify frame 0 contains ASCII art content (rendered from LINE 000) # The text is converted to block characters, so check for non-empty frames assert len(frames[0]) > 0, "Frame 0 should not be empty" assert frames[0][0].strip() != "", "Frame 0 should have visible content" # Verify camera position changed between frames # Feed mode moves 1 row per frame at speed=1.0 with dt~0.02 # After 5 frames, camera should have moved down assert camera.y > 0, f"Camera should have moved down, y={camera.y}" # Verify different frames show different content (camera is scrolling) # Check that frame 0 and frame 5 are different frame_0_str = "\n".join(frames[0]) frame_5_str = "\n".join(frames[5]) assert frame_0_str != frame_5_str, ( "Frame 0 and Frame 5 should show different content" ) class TestScrollCamera: """Test SCROLL mode: smooth vertical scrolling with float accumulation.""" def test_scroll_camera_smooth_movement( self, items, null_display, viewport_dims, show_frames ): """SCROLL camera should move content smoothly with sub-integer precision.""" camera = Camera.scroll(speed=0.5) camera.set_canvas_size(0, 200) # Match viewport width for text wrapping pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() # Run for 20 frames for frame in range(20): pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame( frames[0], "SCROLL Camera - Frame 0", viewport_dims ) DisplayHelper.show_frame( frames[10], "SCROLL Camera - Frame 10", viewport_dims ) # SCROLL mode uses float accumulation for smooth scrolling # At speed=0.5, dt=1.0: y increases by 0.5 * 60 = 30 pixels per frame # Verify camera_y is increasing (which causes the scroll) camera_y_values = [] for frame in range(5): # Get camera.y directly (not filtered context value) pipeline.context.set("frame_number", frame) pipeline.execute(items) camera_y_values.append(camera.y) print(f"\nSCROLL test - camera_y positions: {camera_y_values}") # Verify camera_y is non-zero (camera is moving) assert camera_y_values[-1] > 0, ( "Camera should have scrolled down (camera_y > 0)" ) # Verify camera_y is increasing for i in range(len(camera_y_values) - 1): assert camera_y_values[i + 1] >= camera_y_values[i], ( f"Camera_y should be non-decreasing: {camera_y_values}" ) class TestHorizontalCamera: """Test HORIZONTAL mode: left/right scrolling.""" def test_horizontal_camera_scrolls_right( self, items, null_display, viewport_dims, show_frames ): """HORIZONTAL camera should move content right (x increases).""" camera = Camera.horizontal(speed=1.0) camera.set_canvas_size(200, 200) pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() for frame in range(10): pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame( frames[0], "HORIZONTAL Camera - Frame 0", viewport_dims ) DisplayHelper.show_frame( frames[5], "HORIZONTAL Camera - Frame 5", viewport_dims ) # HORIZONTAL mode: x increases by speed*dt*60 # At dt=1.0, speed=1.0: x increases by 60 per frame # Frame 0: x=0 # Frame 5: x=300 (clamped to canvas_width-viewport_width) # Verify frame 0 contains content (ASCII art of LINE 000) assert len(frames[0]) > 0, "Frame 0 should not be empty" assert frames[0][0].strip() != "", "Frame 0 should have visible content" # Verify camera x is increasing print("\nHORIZONTAL test - camera positions:") for i in range(10): print(f" Frame {i}: x={camera.x}, y={camera.y}") camera.update(1.0) # Verify camera moved assert camera.x > 0, f"Camera should have moved right, x={camera.x}" class TestOmniCamera: """Test OMNI mode: diagonal scrolling (x and y increase together).""" def test_omni_camera_diagonal_movement( self, items, null_display, viewport_dims, show_frames ): """OMNI camera should move content diagonally (both x and y increase).""" camera = Camera.omni(speed=1.0) camera.set_canvas_size(200, 200) pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() for frame in range(10): pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame(frames[0], "OMNI Camera - Frame 0", viewport_dims) DisplayHelper.show_frame(frames[5], "OMNI Camera - Frame 5", viewport_dims) # OMNI mode: y increases by speed*dt*60, x increases by speed*dt*60*0.5 # At dt=1.0, speed=1.0: y += 60, x += 30 # Verify frame 0 contains content (ASCII art) assert len(frames[0]) > 0, "Frame 0 should not be empty" assert frames[0][0].strip() != "", "Frame 0 should have visible content" print("\nOMNI test - camera positions:") camera.reset() for frame in range(5): print(f" Frame {frame}: x={camera.x}, y={camera.y}") camera.update(1.0) # Verify camera moved assert camera.y > 0, f"Camera should have moved down, y={camera.y}" class TestFloatingCamera: """Test FLOATING mode: sinusoidal bobbing motion.""" def test_floating_camera_bobbing( self, items, null_display, viewport_dims, show_frames ): """FLOATING camera should move content in a sinusoidal pattern.""" camera = Camera.floating(speed=1.0) camera.set_canvas_size(200, 200) pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() for frame in range(32): pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame( frames[0], "FLOATING Camera - Frame 0", viewport_dims ) DisplayHelper.show_frame( frames[8], "FLOATING Camera - Frame 8 (quarter cycle)", viewport_dims ) DisplayHelper.show_frame( frames[16], "FLOATING Camera - Frame 16 (half cycle)", viewport_dims ) # FLOATING mode: y = sin(time*2) * speed * 30 # Period: 2π / 2 = π ≈ 3.14 seconds (or ~3.14 frames at dt=1.0) # Full cycle ~32 frames print("\nFLOATING test - sinusoidal motion:") camera.reset() for frame in range(16): print(f" Frame {frame}: y={camera.y}, x={camera.x}") camera.update(1.0) # Verify y oscillates around 0 camera.reset() camera.update(1.0) # Frame 1 y1 = camera.y camera.update(1.0) # Frame 2 y2 = camera.y camera.update(1.0) # Frame 3 y3 = camera.y # After a few frames, y should oscillate (not monotonic) assert y1 != y2 or y2 != y3, "FLOATING camera should oscillate" class TestBounceCamera: """Test BOUNCE mode: bouncing DVD-style motion.""" def test_bounce_camera_reverses_at_edges( self, items, null_display, viewport_dims, show_frames ): """BOUNCE camera should reverse direction when hitting canvas edges.""" camera = Camera.bounce(speed=5.0) # Faster for quicker test # Set zoom > 1.0 so viewport is smaller than canvas, allowing movement camera.set_zoom(2.0) # Zoom out 2x, viewport is half the canvas size camera.set_canvas_size(400, 400) pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() for frame in range(50): pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame( frames[0], "BOUNCE Camera - Frame 0", viewport_dims ) DisplayHelper.show_frame( frames[25], "BOUNCE Camera - Frame 25", viewport_dims ) # BOUNCE mode: moves until it hits edge, then reverses # Verify the camera moves and changes direction print("\nBOUNCE test - bouncing motion:") camera.reset() camera.set_zoom(2.0) # Reset also resets zoom, so set it again for frame in range(20): print(f" Frame {frame}: x={camera.x}, y={camera.y}") camera.update(1.0) # Check that camera hits bounds and reverses camera.reset() camera.set_zoom(2.0) # Reset also resets zoom, so set it again for _ in range(51): # Odd number ensures ending at opposite corner camera.update(1.0) # Camera should have hit an edge and reversed direction # With 400x400 canvas, viewport 200x200 (zoom=2), max_x = 200, max_y = 200 # Starting at (0,0), after 51 updates it should be at (200, 200) max_x = max(0, camera.canvas_width - camera.viewport_width) print(f"BOUNCE camera final position: x={camera.x}, y={camera.y}") assert camera.x == max_x, ( f"Camera should be at max_x ({max_x}), got x={camera.x}" ) # Check bounds are respected vw = camera.viewport_width vh = camera.viewport_height assert camera.x >= 0 and camera.x <= camera.canvas_width - vw assert camera.y >= 0 and camera.y <= camera.canvas_height - vh class TestRadialCamera: """Test RADIAL mode: polar coordinate scanning (rotation around center).""" def test_radial_camera_rotates_around_center( self, items, null_display, viewport_dims, show_frames ): """RADIAL camera should rotate around the center of the canvas.""" camera = Camera.radial(speed=0.5) camera.set_canvas_size(200, 200) pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() for frame in range(32): # 32 frames = 2π at ~0.2 rad/frame pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame( frames[0], "RADIAL Camera - Frame 0", viewport_dims ) DisplayHelper.show_frame( frames[8], "RADIAL Camera - Frame 8 (quarter turn)", viewport_dims ) DisplayHelper.show_frame( frames[16], "RADIAL Camera - Frame 16 (half turn)", viewport_dims ) DisplayHelper.show_frame( frames[24], "RADIAL Camera - Frame 24 (3/4 turn)", viewport_dims ) # RADIAL mode: rotates around center with smooth angular motion # At speed=0.5: theta increases by ~0.2 rad/frame (0.5 * dt * 1.0) print("\nRADIAL test - rotational motion:") camera.reset() for frame in range(32): theta_deg = (camera._theta_float * 180 / math.pi) % 360 print( f" Frame {frame}: theta={theta_deg:.1f}°, x={camera.x}, y={camera.y}" ) camera.update(1.0) # Verify rotation occurs (angle should change) camera.reset() theta_start = camera._theta_float camera.update(1.0) # Frame 1 theta_mid = camera._theta_float camera.update(1.0) # Frame 2 theta_end = camera._theta_float assert theta_mid > theta_start, "Theta should increase (rotation)" assert theta_end > theta_mid, "Theta should continue increasing" def test_radial_camera_with_sensor_integration( self, items, null_display, viewport_dims, show_frames ): """RADIAL camera can be driven by external sensor (OSC integration test).""" from engine.sensors.oscillator import ( OscillatorSensor, register_oscillator_sensor, ) # Create an oscillator sensor for testing register_oscillator_sensor(name="test_osc", waveform="sine", frequency=0.5) osc = OscillatorSensor(name="test_osc", waveform="sine", frequency=0.5) camera = Camera.radial(speed=0.3) camera.set_canvas_size(200, 200) pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() # Run frames while modulating camera with oscillator for frame in range(32): # Read oscillator value and set as radial input osc_value = osc.read() if osc_value: camera.set_radial_input(osc_value.value) pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame( frames[0], "RADIAL+OSC Camera - Frame 0", viewport_dims ) DisplayHelper.show_frame( frames[8], "RADIAL+OSC Camera - Frame 8", viewport_dims ) DisplayHelper.show_frame( frames[16], "RADIAL+OSC Camera - Frame 16", viewport_dims ) print("\nRADIAL+OSC test - sensor-driven rotation:") osc.start() camera.reset() for frame in range(16): osc_value = osc.read() if osc_value: camera.set_radial_input(osc_value.value) camera.update(1.0) theta_deg = (camera._theta_float * 180 / math.pi) % 360 print( f" Frame {frame}: osc={osc_value.value if osc_value else 0:.3f}, theta={theta_deg:.1f}°" ) # Verify camera position changes when driven by sensor camera.reset() x_start = camera.x camera.update(1.0) x_mid = camera.x assert x_start != x_mid, "Camera should move when driven by oscillator" osc.stop() def test_radial_camera_with_direct_angle_setting( self, items, null_display, viewport_dims, show_frames ): """RADIAL camera can have angle set directly for OSC integration.""" camera = Camera.radial(speed=0.0) # No auto-rotation camera.set_canvas_size(200, 200) camera._r_float = 80.0 # Set initial radius to see movement pipeline = create_pipeline_with_camera( camera, items, null_display, viewport_dims ) assert pipeline is not None, "Pipeline creation failed" null_display.start_recording() # Set angle directly to sweep through full rotation for frame in range(32): angle = (frame / 32) * 2 * math.pi # 0 to 2π over 32 frames camera.set_radial_angle(angle) camera.update(1.0) # Must update to convert polar to Cartesian pipeline.context.set("frame_number", frame) result = pipeline.execute(items) assert result.success, f"Frame {frame} execution failed" null_display.stop_recording() frames = null_display.get_frames() if show_frames: DisplayHelper.show_frame( frames[0], "RADIAL Direct Angle - Frame 0", viewport_dims ) DisplayHelper.show_frame( frames[8], "RADIAL Direct Angle - Frame 8", viewport_dims ) DisplayHelper.show_frame( frames[16], "RADIAL Direct Angle - Frame 16", viewport_dims ) print("\nRADIAL Direct Angle test - sweeping rotation:") for frame in range(32): angle = (frame / 32) * 2 * math.pi camera.set_radial_angle(angle) camera.update(1.0) # Update converts angle to x,y position theta_deg = angle * 180 / math.pi print( f" Frame {frame}: set_angle={theta_deg:.1f}°, actual_x={camera.x}, actual_y={camera.y}" ) # Verify camera position changes as angle sweeps camera.reset() camera._r_float = 80.0 # Set radius for testing camera.set_radial_angle(0) camera.update(1.0) x0 = camera.x camera.set_radial_angle(math.pi / 2) camera.update(1.0) x90 = camera.x assert x0 != x90, ( f"Camera position should change with angle (x0={x0}, x90={x90})" ) class TestCameraModeEnum: """Test CameraMode enum integrity.""" def test_all_modes_exist(self): """Verify all camera modes are defined.""" modes = [m.name for m in CameraMode] expected = [ "FEED", "SCROLL", "HORIZONTAL", "OMNI", "FLOATING", "BOUNCE", "RADIAL", ] for mode in expected: assert mode in modes, f"CameraMode.{mode} should exist" def test_radial_mode_exists(self): """Verify RADIAL mode is properly defined.""" assert CameraMode.RADIAL is not None assert isinstance(CameraMode.RADIAL, CameraMode) assert CameraMode.RADIAL.name == "RADIAL" class TestCameraFactoryMethods: """Test camera factory methods create proper camera instances.""" def test_radial_factory(self): """RADIAL factory should create a camera with correct mode.""" camera = Camera.radial(speed=2.0) assert camera.mode == CameraMode.RADIAL assert camera.speed == 2.0 assert hasattr(camera, "_r_float") assert hasattr(camera, "_theta_float") def test_radial_factory_initializes_state(self): """RADIAL factory should initialize radial state.""" camera = Camera.radial() assert camera._r_float == 0.0 assert camera._theta_float == 0.0 class TestCameraStateSaveRestore: """Test camera state can be saved and restored (for hot-rebuild).""" def test_radial_camera_state_save(self): """RADIAL camera should save polar coordinate state.""" camera = Camera.radial() camera._theta_float = math.pi / 4 camera._r_float = 50.0 # Save state via CameraStage adapter from engine.pipeline.adapters.camera import CameraStage stage = CameraStage(camera) state = stage.save_state() assert "_theta_float" in state assert "_r_float" in state assert state["_theta_float"] == math.pi / 4 assert state["_r_float"] == 50.0 def test_radial_camera_state_restore(self): """RADIAL camera should restore polar coordinate state.""" camera1 = Camera.radial() camera1._theta_float = math.pi / 3 camera1._r_float = 75.0 from engine.pipeline.adapters.camera import CameraStage stage1 = CameraStage(camera1) state = stage1.save_state() # Create new camera and restore camera2 = Camera.radial() stage2 = CameraStage(camera2) stage2.restore_state(state) assert abs(camera2._theta_float - math.pi / 3) < 0.001 assert abs(camera2._r_float - 75.0) < 0.001 class TestCameraViewportApplication: """Test camera.apply() properly slices buffers.""" def test_radial_camera_viewport_slicing(self): """RADIAL camera should properly slice buffer based on position.""" camera = Camera.radial(speed=0.5) camera.set_canvas_size(200, 200) # Update to move camera camera.update(1.0) # Create test buffer with 200 lines buffer = [f"LINE {i:03d}" for i in range(200)] # Apply camera viewport (15 lines high) result = camera.apply(buffer, viewport_width=40, viewport_height=15) # Result should be exactly 15 lines assert len(result) == 15 # Each line should be 40 characters (padded or truncated) for line in result: assert len(line) <= 40