forked from genewildish/Mainline
- Fixes issue #45: Add state property to EffectContext for motionblur/afterimage effects - Fixes issue #44: Reset camera bounce direction state in reset() method - Fixes issue #43: Implement pipeline hot-rebuild with state preservation - Adds radial camera mode for polar coordinate scanning - Adds afterimage and motionblur effects - Adds acceptance tests for camera and pipeline rebuild Closes #43, #44, #45
462 lines
15 KiB
Python
462 lines
15 KiB
Python
"""
|
|
Camera system for viewport scrolling.
|
|
|
|
Provides abstraction for camera motion in different modes:
|
|
- Vertical: traditional upward scroll
|
|
- Horizontal: left/right movement
|
|
- Omni: combination of both
|
|
- Floating: sinusoidal/bobbing motion
|
|
|
|
The camera defines a visible viewport into a larger Canvas.
|
|
"""
|
|
|
|
import math
|
|
from collections.abc import Callable
|
|
from dataclasses import dataclass, field
|
|
from enum import Enum, auto
|
|
|
|
|
|
class CameraMode(Enum):
|
|
FEED = auto() # Single item view (static or rapid cycling)
|
|
SCROLL = auto() # Smooth vertical scrolling (movie credits style)
|
|
HORIZONTAL = auto()
|
|
OMNI = auto()
|
|
FLOATING = auto()
|
|
BOUNCE = auto()
|
|
RADIAL = auto() # Polar coordinates (r, theta) for radial scanning
|
|
|
|
|
|
@dataclass
|
|
class CameraViewport:
|
|
"""Represents the visible viewport."""
|
|
|
|
x: int
|
|
y: int
|
|
width: int
|
|
height: int
|
|
|
|
|
|
@dataclass
|
|
class Camera:
|
|
"""Camera for viewport scrolling.
|
|
|
|
The camera defines a visible viewport into a Canvas.
|
|
It can be smaller than the canvas to allow scrolling,
|
|
and supports zoom to scale the view.
|
|
|
|
Attributes:
|
|
x: Current horizontal offset (positive = scroll left)
|
|
y: Current vertical offset (positive = scroll up)
|
|
mode: Current camera mode
|
|
speed: Base scroll speed
|
|
zoom: Zoom factor (1.0 = 100%, 2.0 = 200% zoom out)
|
|
canvas_width: Width of the canvas being viewed
|
|
canvas_height: Height of the canvas being viewed
|
|
custom_update: Optional custom update function
|
|
"""
|
|
|
|
x: int = 0
|
|
y: int = 0
|
|
mode: CameraMode = CameraMode.FEED
|
|
speed: float = 1.0
|
|
zoom: float = 1.0
|
|
canvas_width: int = 200 # Larger than viewport for scrolling
|
|
canvas_height: int = 200
|
|
custom_update: Callable[["Camera", float], None] | None = None
|
|
_x_float: float = field(default=0.0, repr=False)
|
|
_y_float: float = field(default=0.0, repr=False)
|
|
_time: float = field(default=0.0, repr=False)
|
|
|
|
@property
|
|
def w(self) -> int:
|
|
"""Shorthand for viewport_width."""
|
|
return self.viewport_width
|
|
|
|
@property
|
|
def h(self) -> int:
|
|
"""Shorthand for viewport_height."""
|
|
return self.viewport_height
|
|
|
|
@property
|
|
def viewport_width(self) -> int:
|
|
"""Get the visible viewport width.
|
|
|
|
This is the canvas width divided by zoom.
|
|
"""
|
|
return max(1, int(self.canvas_width / self.zoom))
|
|
|
|
@property
|
|
def viewport_height(self) -> int:
|
|
"""Get the visible viewport height.
|
|
|
|
This is the canvas height divided by zoom.
|
|
"""
|
|
return max(1, int(self.canvas_height / self.zoom))
|
|
|
|
def get_viewport(self, viewport_height: int | None = None) -> CameraViewport:
|
|
"""Get the current viewport bounds.
|
|
|
|
Args:
|
|
viewport_height: Optional viewport height to use instead of camera's viewport_height
|
|
|
|
Returns:
|
|
CameraViewport with position and size (clamped to canvas bounds)
|
|
"""
|
|
vw = self.viewport_width
|
|
vh = viewport_height if viewport_height is not None else self.viewport_height
|
|
|
|
clamped_x = max(0, min(self.x, self.canvas_width - vw))
|
|
clamped_y = max(0, min(self.y, self.canvas_height - vh))
|
|
|
|
return CameraViewport(
|
|
x=clamped_x,
|
|
y=clamped_y,
|
|
width=vw,
|
|
height=vh,
|
|
)
|
|
|
|
return CameraViewport(
|
|
x=clamped_x,
|
|
y=clamped_y,
|
|
width=vw,
|
|
height=vh,
|
|
)
|
|
|
|
def set_zoom(self, zoom: float) -> None:
|
|
"""Set the zoom factor.
|
|
|
|
Args:
|
|
zoom: Zoom factor (1.0 = 100%, 2.0 = zoomed out 2x, 0.5 = zoomed in 2x)
|
|
"""
|
|
self.zoom = max(0.1, min(10.0, zoom))
|
|
|
|
def update(self, dt: float) -> None:
|
|
"""Update camera position based on mode.
|
|
|
|
Args:
|
|
dt: Delta time in seconds
|
|
"""
|
|
self._time += dt
|
|
|
|
if self.custom_update:
|
|
self.custom_update(self, dt)
|
|
return
|
|
|
|
if self.mode == CameraMode.FEED:
|
|
self._update_feed(dt)
|
|
elif self.mode == CameraMode.SCROLL:
|
|
self._update_scroll(dt)
|
|
elif self.mode == CameraMode.HORIZONTAL:
|
|
self._update_horizontal(dt)
|
|
elif self.mode == CameraMode.OMNI:
|
|
self._update_omni(dt)
|
|
elif self.mode == CameraMode.FLOATING:
|
|
self._update_floating(dt)
|
|
elif self.mode == CameraMode.BOUNCE:
|
|
self._update_bounce(dt)
|
|
elif self.mode == CameraMode.RADIAL:
|
|
self._update_radial(dt)
|
|
|
|
# Bounce mode handles its own bounds checking
|
|
if self.mode != CameraMode.BOUNCE:
|
|
self._clamp_to_bounds()
|
|
|
|
def _clamp_to_bounds(self) -> None:
|
|
"""Clamp camera position to stay within canvas bounds.
|
|
|
|
Only clamps if the viewport is smaller than the canvas.
|
|
If viewport equals canvas (no scrolling needed), allows any position
|
|
for backwards compatibility with original behavior.
|
|
"""
|
|
vw = self.viewport_width
|
|
vh = self.viewport_height
|
|
|
|
# Only clamp if there's room to scroll
|
|
if vw < self.canvas_width:
|
|
self.x = max(0, min(self.x, self.canvas_width - vw))
|
|
if vh < self.canvas_height:
|
|
self.y = max(0, min(self.y, self.canvas_height - vh))
|
|
|
|
def _update_feed(self, dt: float) -> None:
|
|
"""Feed mode: rapid scrolling (1 row per frame at speed=1.0)."""
|
|
self.y += int(self.speed * dt * 60)
|
|
|
|
def _update_scroll(self, dt: float) -> None:
|
|
"""Scroll mode: smooth vertical scrolling with float accumulation."""
|
|
self._y_float += self.speed * dt * 60
|
|
self.y = int(self._y_float)
|
|
|
|
def _update_horizontal(self, dt: float) -> None:
|
|
self.x += int(self.speed * dt * 60)
|
|
|
|
def _update_omni(self, dt: float) -> None:
|
|
speed = self.speed * dt * 60
|
|
self.y += int(speed)
|
|
self.x += int(speed * 0.5)
|
|
|
|
def _update_floating(self, dt: float) -> None:
|
|
base = self.speed * 30
|
|
self.y = int(math.sin(self._time * 2) * base)
|
|
self.x = int(math.cos(self._time * 1.5) * base * 0.5)
|
|
|
|
def _update_bounce(self, dt: float) -> None:
|
|
"""Bouncing DVD-style camera that bounces off canvas edges."""
|
|
vw = self.viewport_width
|
|
vh = self.viewport_height
|
|
|
|
# Initialize direction if not set
|
|
if not hasattr(self, "_bounce_dx"):
|
|
self._bounce_dx = 1
|
|
self._bounce_dy = 1
|
|
|
|
# Calculate max positions
|
|
max_x = max(0, self.canvas_width - vw)
|
|
max_y = max(0, self.canvas_height - vh)
|
|
|
|
# Move
|
|
move_speed = self.speed * dt * 60
|
|
|
|
# Bounce off edges - reverse direction when hitting bounds
|
|
self.x += int(move_speed * self._bounce_dx)
|
|
self.y += int(move_speed * self._bounce_dy)
|
|
|
|
# Bounce horizontally
|
|
if self.x <= 0:
|
|
self.x = 0
|
|
self._bounce_dx = 1
|
|
elif self.x >= max_x:
|
|
self.x = max_x
|
|
self._bounce_dx = -1
|
|
|
|
# Bounce vertically
|
|
if self.y <= 0:
|
|
self.y = 0
|
|
self._bounce_dy = 1
|
|
elif self.y >= max_y:
|
|
self.y = max_y
|
|
self._bounce_dy = -1
|
|
|
|
def _update_radial(self, dt: float) -> None:
|
|
"""Radial camera mode: polar coordinate scrolling (r, theta).
|
|
|
|
The camera rotates around the center of the canvas while optionally
|
|
moving outward/inward along rays. This enables:
|
|
- Radar sweep animations
|
|
- Pendulum view oscillation
|
|
- Spiral scanning motion
|
|
|
|
Uses polar coordinates internally:
|
|
- _r_float: radial distance from center (accumulates smoothly)
|
|
- _theta_float: angle in radians (accumulates smoothly)
|
|
- Updates x, y based on conversion from polar to Cartesian
|
|
"""
|
|
# Initialize radial state if needed
|
|
if not hasattr(self, "_r_float"):
|
|
self._r_float = 0.0
|
|
self._theta_float = 0.0
|
|
|
|
# Update angular position (rotation around center)
|
|
# Speed controls rotation rate
|
|
theta_speed = self.speed * dt * 1.0 # radians per second
|
|
self._theta_float += theta_speed
|
|
|
|
# Update radial position (inward/outward from center)
|
|
# Can be modulated by external sensor
|
|
if hasattr(self, "_radial_input"):
|
|
r_input = self._radial_input
|
|
else:
|
|
# Default: slow outward drift
|
|
r_input = 0.0
|
|
|
|
r_speed = self.speed * dt * 20.0 # pixels per second
|
|
self._r_float += r_input + r_speed * 0.01
|
|
|
|
# Clamp radial position to canvas bounds
|
|
max_r = min(self.canvas_width, self.canvas_height) / 2
|
|
self._r_float = max(0.0, min(self._r_float, max_r))
|
|
|
|
# Convert polar to Cartesian, centered at canvas center
|
|
center_x = self.canvas_width / 2
|
|
center_y = self.canvas_height / 2
|
|
|
|
self.x = int(center_x + self._r_float * math.cos(self._theta_float))
|
|
self.y = int(center_y + self._r_float * math.sin(self._theta_float))
|
|
|
|
# Clamp to canvas bounds
|
|
self._clamp_to_bounds()
|
|
|
|
def set_radial_input(self, value: float) -> None:
|
|
"""Set radial input for sensor-driven radius modulation.
|
|
|
|
Args:
|
|
value: Sensor value (0-1) that modulates radial distance
|
|
"""
|
|
self._radial_input = value * 10.0 # Scale to reasonable pixel range
|
|
|
|
def set_radial_angle(self, angle: float) -> None:
|
|
"""Set radial angle directly (for OSC integration).
|
|
|
|
Args:
|
|
angle: Angle in radians (0 to 2π)
|
|
"""
|
|
self._theta_float = angle
|
|
|
|
def reset(self) -> None:
|
|
"""Reset camera position and state."""
|
|
self.x = 0
|
|
self.y = 0
|
|
self._time = 0.0
|
|
self.zoom = 1.0
|
|
# Reset bounce direction state
|
|
if hasattr(self, "_bounce_dx"):
|
|
self._bounce_dx = 1
|
|
self._bounce_dy = 1
|
|
# Reset radial state
|
|
if hasattr(self, "_r_float"):
|
|
self._r_float = 0.0
|
|
self._theta_float = 0.0
|
|
|
|
def set_canvas_size(self, width: int, height: int) -> None:
|
|
"""Set the canvas size and clamp position if needed.
|
|
|
|
Args:
|
|
width: New canvas width
|
|
height: New canvas height
|
|
"""
|
|
self.canvas_width = width
|
|
self.canvas_height = height
|
|
self._clamp_to_bounds()
|
|
|
|
def apply(
|
|
self, buffer: list[str], viewport_width: int, viewport_height: int | None = None
|
|
) -> list[str]:
|
|
"""Apply camera viewport to a text buffer.
|
|
|
|
Slices the buffer based on camera position (x, y) and viewport dimensions.
|
|
Handles ANSI escape codes correctly for colored/styled text.
|
|
|
|
Args:
|
|
buffer: List of strings representing lines of text
|
|
viewport_width: Width of the visible viewport in characters
|
|
viewport_height: Height of the visible viewport (overrides camera's viewport_height if provided)
|
|
|
|
Returns:
|
|
Sliced buffer containing only the visible lines and columns
|
|
"""
|
|
from engine.effects.legacy import vis_offset, vis_trunc
|
|
|
|
if not buffer:
|
|
return buffer
|
|
|
|
# Get current viewport bounds (clamped to canvas size)
|
|
viewport = self.get_viewport(viewport_height)
|
|
|
|
# Use provided viewport_height if given, otherwise use camera's viewport
|
|
vh = viewport_height if viewport_height is not None else viewport.height
|
|
|
|
# Vertical slice: extract lines that fit in viewport height
|
|
start_y = viewport.y
|
|
end_y = min(viewport.y + vh, len(buffer))
|
|
|
|
if start_y >= len(buffer):
|
|
# Scrolled past end of buffer, return empty viewport
|
|
return [""] * vh
|
|
|
|
vertical_slice = buffer[start_y:end_y]
|
|
|
|
# Horizontal slice: apply horizontal offset and truncate to width
|
|
horizontal_slice = []
|
|
for line in vertical_slice:
|
|
# Apply horizontal offset (skip first x characters, handling ANSI)
|
|
offset_line = vis_offset(line, viewport.x)
|
|
# Truncate to viewport width (handling ANSI)
|
|
truncated_line = vis_trunc(offset_line, viewport_width)
|
|
|
|
# Pad line to full viewport width to prevent ghosting when panning
|
|
import re
|
|
|
|
visible_len = len(re.sub(r"\x1b\[[0-9;]*m", "", truncated_line))
|
|
if visible_len < viewport_width:
|
|
truncated_line += " " * (viewport_width - visible_len)
|
|
|
|
horizontal_slice.append(truncated_line)
|
|
|
|
# Pad with empty lines if needed to fill viewport height
|
|
while len(horizontal_slice) < vh:
|
|
horizontal_slice.append("")
|
|
|
|
return horizontal_slice
|
|
|
|
@classmethod
|
|
def feed(cls, speed: float = 1.0) -> "Camera":
|
|
"""Create a feed camera (rapid single-item scrolling, 1 row/frame at speed=1.0)."""
|
|
return cls(mode=CameraMode.FEED, speed=speed, canvas_height=200)
|
|
|
|
@classmethod
|
|
def scroll(cls, speed: float = 0.5) -> "Camera":
|
|
"""Create a smooth scrolling camera (movie credits style).
|
|
|
|
Uses float accumulation for sub-integer speeds.
|
|
Sets canvas_width=0 so it matches viewport_width for proper text wrapping.
|
|
"""
|
|
return cls(
|
|
mode=CameraMode.SCROLL, speed=speed, canvas_width=0, canvas_height=200
|
|
)
|
|
|
|
@classmethod
|
|
def vertical(cls, speed: float = 1.0) -> "Camera":
|
|
"""Deprecated: Use feed() or scroll() instead."""
|
|
return cls(mode=CameraMode.FEED, speed=speed, canvas_height=200)
|
|
|
|
@classmethod
|
|
def horizontal(cls, speed: float = 1.0) -> "Camera":
|
|
"""Create a horizontal scrolling camera."""
|
|
return cls(mode=CameraMode.HORIZONTAL, speed=speed, canvas_width=200)
|
|
|
|
@classmethod
|
|
def omni(cls, speed: float = 1.0) -> "Camera":
|
|
"""Create an omnidirectional scrolling camera."""
|
|
return cls(
|
|
mode=CameraMode.OMNI, speed=speed, canvas_width=200, canvas_height=200
|
|
)
|
|
|
|
@classmethod
|
|
def floating(cls, speed: float = 1.0) -> "Camera":
|
|
"""Create a floating/bobbing camera."""
|
|
return cls(
|
|
mode=CameraMode.FLOATING, speed=speed, canvas_width=200, canvas_height=200
|
|
)
|
|
|
|
@classmethod
|
|
def bounce(cls, speed: float = 1.0) -> "Camera":
|
|
"""Create a bouncing DVD-style camera that bounces off canvas edges."""
|
|
return cls(
|
|
mode=CameraMode.BOUNCE, speed=speed, canvas_width=200, canvas_height=200
|
|
)
|
|
|
|
@classmethod
|
|
def radial(cls, speed: float = 1.0) -> "Camera":
|
|
"""Create a radial camera (polar coordinate scanning).
|
|
|
|
The camera rotates around the center of the canvas with smooth angular motion.
|
|
Enables radar sweep, pendulum view, and spiral scanning animations.
|
|
|
|
Args:
|
|
speed: Rotation speed (higher = faster rotation)
|
|
|
|
Returns:
|
|
Camera configured for radial polar coordinate scanning
|
|
"""
|
|
cam = cls(
|
|
mode=CameraMode.RADIAL, speed=speed, canvas_width=200, canvas_height=200
|
|
)
|
|
# Initialize radial state
|
|
cam._r_float = 0.0
|
|
cam._theta_float = 0.0
|
|
return cam
|
|
|
|
@classmethod
|
|
def custom(cls, update_fn: Callable[["Camera", float], None]) -> "Camera":
|
|
"""Create a camera with custom update function."""
|
|
return cls(custom_update=update_fn)
|