feat(core): add Camera abstraction for viewport scrolling

- Add Camera class with modes: vertical, horizontal, omni, floating
- Refactor scroll.py and demo to use Camera abstraction
- Add vis_offset for horizontal scrolling support
- Add camera_x to EffectContext for effects
- Add pygame window resize handling
- Add HUD effect plugin for demo mode
- Add --demo flag to run demo mode
- Add tests for Camera and vis_offset
This commit is contained in:
2026-03-16 01:46:21 -07:00
parent e1408dcf16
commit 9b139a40f7
10 changed files with 343 additions and 37 deletions

View File

@@ -352,10 +352,11 @@ def pick_effects_config():
def run_demo_mode(): def run_demo_mode():
"""Run demo mode - showcases effects with real content and pygame display.""" """Run demo mode - showcases effects and camera modes with real content."""
import random import random
from engine import config from engine import config
from engine.camera import Camera, CameraMode
from engine.display import DisplayRegistry from engine.display import DisplayRegistry
from engine.effects import ( from engine.effects import (
EffectContext, EffectContext,
@@ -409,7 +410,6 @@ def run_demo_mode():
pool = list(items) pool = list(items)
seen = set() seen = set()
active = [] active = []
scroll_cam = 0
ticker_next_y = 0 ticker_next_y = 0
noise_cache = {} noise_cache = {}
scroll_motion_accum = 0.0 scroll_motion_accum = 0.0
@@ -418,6 +418,8 @@ def run_demo_mode():
GAP = 3 GAP = 3
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, h) scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, h)
camera = Camera.vertical(speed=1.0)
effects_to_demo = ["noise", "fade", "glitch", "firehose"] effects_to_demo = ["noise", "fade", "glitch", "firehose"]
effect_idx = 0 effect_idx = 0
effect_name = effects_to_demo[effect_idx] effect_name = effects_to_demo[effect_idx]
@@ -425,12 +427,22 @@ def run_demo_mode():
current_intensity = 0.0 current_intensity = 0.0
ramping_up = True ramping_up = True
print(" \033[38;5;82mStarting effect demo...\033[0m") camera_modes = [
(CameraMode.VERTICAL, "vertical"),
(CameraMode.HORIZONTAL, "horizontal"),
(CameraMode.OMNI, "omni"),
(CameraMode.FLOATING, "floating"),
]
camera_mode_idx = 0
camera_start_time = time.time()
print(" \033[38;5;82mStarting effect & camera demo...\033[0m")
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
try: try:
while True: while True:
elapsed = time.time() - effect_start_time elapsed = time.time() - effect_start_time
camera_elapsed = time.time() - camera_start_time
duration = config.DEMO_EFFECT_DURATION duration = config.DEMO_EFFECT_DURATION
if elapsed >= duration: if elapsed >= duration:
@@ -441,6 +453,13 @@ def run_demo_mode():
current_intensity = 0.0 current_intensity = 0.0
ramping_up = True ramping_up = True
if camera_elapsed >= duration * 2:
camera_mode_idx = (camera_mode_idx + 1) % len(camera_modes)
mode, mode_name = camera_modes[camera_mode_idx]
camera = Camera(mode=mode, speed=1.0)
camera_start_time = time.time()
camera_elapsed = 0
progress = elapsed / duration progress = elapsed / duration
if ramping_up: if ramping_up:
current_intensity = progress current_intensity = progress
@@ -458,18 +477,21 @@ def run_demo_mode():
hud_effect = registry.get("hud") hud_effect = registry.get("hud")
if hud_effect: if hud_effect:
hud_effect.config.params["display_effect"] = effect_name mode_name = camera_modes[camera_mode_idx][1]
hud_effect.config.params["display_effect"] = (
f"{effect_name} / {mode_name}"
)
hud_effect.config.params["display_intensity"] = current_intensity hud_effect.config.params["display_intensity"] = current_intensity
scroll_motion_accum += config.FRAME_DT scroll_motion_accum += config.FRAME_DT
while scroll_motion_accum >= scroll_step_interval: while scroll_motion_accum >= scroll_step_interval:
scroll_motion_accum -= scroll_step_interval scroll_motion_accum -= scroll_step_interval
scroll_cam += 1 camera.update(config.FRAME_DT)
while ticker_next_y < camera.y + h + 10 and len(active) < 50:
from engine.effects import next_headline from engine.effects import next_headline
from engine.render import make_block from engine.render import make_block
while ticker_next_y < scroll_cam + h + 10:
t, src, ts = next_headline(pool, items, seen) t, src, ts = next_headline(pool, items, seen)
ticker_content, hc, midx = make_block(t, src, ts, w) ticker_content, hc, midx = make_block(t, src, ts, w)
active.append((ticker_content, hc, ticker_next_y, midx)) active.append((ticker_content, hc, ticker_next_y, midx))
@@ -478,10 +500,10 @@ def run_demo_mode():
active = [ active = [
(c, hc, by, mi) (c, hc, by, mi)
for c, hc, by, mi in active for c, hc, by, mi in active
if by + len(c) > scroll_cam if by + len(c) > camera.y
] ]
for k in list(noise_cache): for k in list(noise_cache):
if k < scroll_cam: if k < camera.y:
del noise_cache[k] del noise_cache[k]
grad_offset = (time.time() * config.GRAD_SPEED) % 1.0 grad_offset = (time.time() * config.GRAD_SPEED) % 1.0
@@ -489,7 +511,13 @@ def run_demo_mode():
from engine.layers import render_ticker_zone from engine.layers import render_ticker_zone
buf, noise_cache = render_ticker_zone( buf, noise_cache = render_ticker_zone(
active, scroll_cam, h, w, noise_cache, grad_offset active,
scroll_cam=camera.y,
camera_x=camera.x,
ticker_h=h,
w=w,
noise_cache=noise_cache,
grad_offset=grad_offset,
) )
from engine.layers import render_firehose from engine.layers import render_firehose
@@ -500,8 +528,9 @@ def run_demo_mode():
ctx = EffectContext( ctx = EffectContext(
terminal_width=w, terminal_width=w,
terminal_height=h, terminal_height=h,
scroll_cam=scroll_cam, scroll_cam=camera.y,
ticker_height=h, ticker_height=h,
camera_x=camera.x,
mic_excess=0.0, mic_excess=0.0,
grad_offset=grad_offset, grad_offset=grad_offset,
frame_number=frame_number, frame_number=frame_number,

109
engine/camera.py Normal file
View File

@@ -0,0 +1,109 @@
"""
Camera system for viewport scrolling.
Provides abstraction for camera motion in different modes:
- Vertical: traditional upward scroll
- Horizontal: left/right movement
- Omni: combination of both
- Floating: sinusoidal/bobbing motion
"""
import math
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import Enum, auto
class CameraMode(Enum):
VERTICAL = auto()
HORIZONTAL = auto()
OMNI = auto()
FLOATING = auto()
@dataclass
class Camera:
"""Camera for viewport scrolling.
Attributes:
x: Current horizontal offset (positive = scroll left)
y: Current vertical offset (positive = scroll up)
mode: Current camera mode
speed: Base scroll speed
custom_update: Optional custom update function
"""
x: int = 0
y: int = 0
mode: CameraMode = CameraMode.VERTICAL
speed: float = 1.0
custom_update: Callable[["Camera", float], None] | None = None
_time: float = field(default=0.0, repr=False)
def update(self, dt: float) -> None:
"""Update camera position based on mode.
Args:
dt: Delta time in seconds
"""
self._time += dt
if self.custom_update:
self.custom_update(self, dt)
return
if self.mode == CameraMode.VERTICAL:
self._update_vertical(dt)
elif self.mode == CameraMode.HORIZONTAL:
self._update_horizontal(dt)
elif self.mode == CameraMode.OMNI:
self._update_omni(dt)
elif self.mode == CameraMode.FLOATING:
self._update_floating(dt)
def _update_vertical(self, dt: float) -> None:
self.y += int(self.speed * dt * 60)
def _update_horizontal(self, dt: float) -> None:
self.x += int(self.speed * dt * 60)
def _update_omni(self, dt: float) -> None:
speed = self.speed * dt * 60
self.y += int(speed)
self.x += int(speed * 0.5)
def _update_floating(self, dt: float) -> None:
base = self.speed * 30
self.y = int(math.sin(self._time * 2) * base)
self.x = int(math.cos(self._time * 1.5) * base * 0.5)
def reset(self) -> None:
"""Reset camera position."""
self.x = 0
self.y = 0
self._time = 0.0
@classmethod
def vertical(cls, speed: float = 1.0) -> "Camera":
"""Create a vertical scrolling camera."""
return cls(mode=CameraMode.VERTICAL, speed=speed)
@classmethod
def horizontal(cls, speed: float = 1.0) -> "Camera":
"""Create a horizontal scrolling camera."""
return cls(mode=CameraMode.HORIZONTAL, speed=speed)
@classmethod
def omni(cls, speed: float = 1.0) -> "Camera":
"""Create an omnidirectional scrolling camera."""
return cls(mode=CameraMode.OMNI, speed=speed)
@classmethod
def floating(cls, speed: float = 1.0) -> "Camera":
"""Create a floating/bobbing camera."""
return cls(mode=CameraMode.FLOATING, speed=speed)
@classmethod
def custom(cls, update_fn: Callable[["Camera", float], None]) -> "Camera":
"""Create a camera with custom update function."""
return cls(custom_update=update_fn)

View File

@@ -6,6 +6,7 @@ from engine.effects.legacy import (
glitch_bar, glitch_bar,
next_headline, next_headline,
noise, noise,
vis_offset,
vis_trunc, vis_trunc,
) )
from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor
@@ -45,4 +46,5 @@ __all__ = [
"noise", "noise",
"next_headline", "next_headline",
"vis_trunc", "vis_trunc",
"vis_offset",
] ]

View File

@@ -82,6 +82,37 @@ def vis_trunc(s, w):
return "".join(result) return "".join(result)
def vis_offset(s, offset):
"""Offset string by skipping first offset visual characters, skipping ANSI escape codes."""
if offset <= 0:
return s
result = []
vw = 0
i = 0
skipping = True
while i < len(s):
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
if skipping:
i = j + 1
continue
result.append(s[i : j + 1])
i = j + 1
else:
if skipping:
if vw >= offset:
skipping = False
result.append(s[i])
vw += 1
i += 1
else:
result.append(s[i])
i += 1
return "".join(result)
def next_headline(pool, items, seen): def next_headline(pool, items, seen):
"""Pull the next unique headline from pool, refilling as needed.""" """Pull the next unique headline from pool, refilling as needed."""
while True: while True:

View File

@@ -29,10 +29,11 @@ class EffectContext:
terminal_height: int terminal_height: int
scroll_cam: int scroll_cam: int
ticker_height: int ticker_height: int
mic_excess: float camera_x: int = 0
grad_offset: float mic_excess: float = 0.0
frame_number: int grad_offset: float = 0.0
has_message: bool frame_number: int = 0
has_message: bool = False
items: list = field(default_factory=list) items: list = field(default_factory=list)

View File

@@ -16,6 +16,7 @@ from engine.effects import (
firehose_line, firehose_line,
glitch_bar, glitch_bar,
noise, noise,
vis_offset,
vis_trunc, vis_trunc,
) )
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite from engine.render import big_wrap, lr_gradient, lr_gradient_opposite
@@ -94,16 +95,18 @@ def render_message_overlay(
def render_ticker_zone( def render_ticker_zone(
active: list, active: list,
scroll_cam: int, scroll_cam: int,
ticker_h: int, camera_x: int = 0,
w: int, ticker_h: int = 0,
noise_cache: dict, w: int = 80,
grad_offset: float, noise_cache: dict | None = None,
grad_offset: float = 0.0,
) -> tuple[list[str], dict]: ) -> tuple[list[str], dict]:
"""Render the ticker scroll zone. """Render the ticker scroll zone.
Args: Args:
active: list of (content_rows, color, canvas_y, meta_idx) active: list of (content_rows, color, canvas_y, meta_idx)
scroll_cam: camera position (viewport top) scroll_cam: camera position (viewport top)
camera_x: horizontal camera offset
ticker_h: height of ticker zone ticker_h: height of ticker zone
w: terminal width w: terminal width
noise_cache: dict of cy -> noise string noise_cache: dict of cy -> noise string
@@ -112,6 +115,8 @@ def render_ticker_zone(
Returns: Returns:
(list of ANSI strings, updated noise_cache) (list of ANSI strings, updated noise_cache)
""" """
if noise_cache is None:
noise_cache = {}
buf = [] buf = []
top_zone = max(1, int(ticker_h * 0.25)) top_zone = max(1, int(ticker_h * 0.25))
bot_zone = max(1, int(ticker_h * 0.10)) bot_zone = max(1, int(ticker_h * 0.10))
@@ -137,7 +142,7 @@ def render_ticker_zone(
colored = lr_gradient([raw], grad_offset)[0] colored = lr_gradient([raw], grad_offset)[0]
else: else:
colored = raw colored = raw
ln = vis_trunc(colored, w) ln = vis_trunc(vis_offset(colored, camera_x), w)
if row_fade < 1.0: if row_fade < 1.0:
ln = fade_line(ln, row_fade) ln = fade_line(ln, row_fade)
@@ -228,11 +233,12 @@ def process_effects(
h: int, h: int,
scroll_cam: int, scroll_cam: int,
ticker_h: int, ticker_h: int,
mic_excess: float, camera_x: int = 0,
grad_offset: float, mic_excess: float = 0.0,
frame_number: int, grad_offset: float = 0.0,
has_message: bool, frame_number: int = 0,
items: list, has_message: bool = False,
items: list | None = None,
) -> list[str]: ) -> list[str]:
"""Process buffer through effect chain.""" """Process buffer through effect chain."""
if _effect_chain is None: if _effect_chain is None:
@@ -242,12 +248,13 @@ def process_effects(
terminal_width=w, terminal_width=w,
terminal_height=h, terminal_height=h,
scroll_cam=scroll_cam, scroll_cam=scroll_cam,
camera_x=camera_x,
ticker_height=ticker_h, ticker_height=ticker_h,
mic_excess=mic_excess, mic_excess=mic_excess,
grad_offset=grad_offset, grad_offset=grad_offset,
frame_number=frame_number, frame_number=frame_number,
has_message=has_message, has_message=has_message,
items=items, items=items or [],
) )
return _effect_chain.process(buf, ctx) return _effect_chain.process(buf, ctx)

View File

@@ -7,6 +7,7 @@ import random
import time import time
from engine import config from engine import config
from engine.camera import Camera
from engine.display import ( from engine.display import (
Display, Display,
TerminalDisplay, TerminalDisplay,
@@ -27,10 +28,19 @@ from engine.viewport import th, tw
USE_EFFECT_CHAIN = True USE_EFFECT_CHAIN = True
def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): def stream(
items,
ntfy_poller,
mic_monitor,
display: Display | None = None,
camera: Camera | None = None,
):
"""Main render loop with four layers: message, ticker, scroll motion, firehose.""" """Main render loop with four layers: message, ticker, scroll motion, firehose."""
if display is None: if display is None:
display = TerminalDisplay() display = TerminalDisplay()
if camera is None:
camera = Camera.vertical()
random.shuffle(items) random.shuffle(items)
pool = list(items) pool = list(items)
seen = set() seen = set()
@@ -46,7 +56,6 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
active = [] active = []
scroll_cam = 0
ticker_next_y = ticker_view_h ticker_next_y = ticker_view_h
noise_cache = {} noise_cache = {}
scroll_motion_accum = 0.0 scroll_motion_accum = 0.0
@@ -72,10 +81,10 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
scroll_motion_accum += config.FRAME_DT scroll_motion_accum += config.FRAME_DT
while scroll_motion_accum >= scroll_step_interval: while scroll_motion_accum >= scroll_step_interval:
scroll_motion_accum -= scroll_step_interval scroll_motion_accum -= scroll_step_interval
scroll_cam += 1 camera.update(config.FRAME_DT)
while ( while (
ticker_next_y < scroll_cam + ticker_view_h + 10 ticker_next_y < camera.y + ticker_view_h + 10
and queued < config.HEADLINE_LIMIT and queued < config.HEADLINE_LIMIT
): ):
from engine.effects import next_headline from engine.effects import next_headline
@@ -88,17 +97,17 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
queued += 1 queued += 1
active = [ active = [
(c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > camera.y
] ]
for k in list(noise_cache): for k in list(noise_cache):
if k < scroll_cam: if k < camera.y:
del noise_cache[k] del noise_cache[k]
grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0 grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0
ticker_buf_start = len(buf) ticker_buf_start = len(buf)
ticker_buf, noise_cache = render_ticker_zone( ticker_buf, noise_cache = render_ticker_zone(
active, scroll_cam, ticker_h, w, noise_cache, grad_offset active, camera.y, camera.x, ticker_h, w, noise_cache, grad_offset
) )
buf.extend(ticker_buf) buf.extend(ticker_buf)
@@ -110,8 +119,9 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
buf, buf,
w, w,
h, h,
scroll_cam, camera.y,
ticker_h, ticker_h,
camera.x,
mic_excess, mic_excess,
grad_offset, grad_offset,
frame_number, frame_number,

69
tests/test_camera.py Normal file
View File

@@ -0,0 +1,69 @@
from engine.camera import Camera, CameraMode
def test_camera_vertical_default():
"""Test default vertical camera."""
cam = Camera()
assert cam.mode == CameraMode.VERTICAL
assert cam.x == 0
assert cam.y == 0
def test_camera_vertical_factory():
"""Test vertical factory method."""
cam = Camera.vertical(speed=2.0)
assert cam.mode == CameraMode.VERTICAL
assert cam.speed == 2.0
def test_camera_horizontal():
"""Test horizontal camera."""
cam = Camera.horizontal(speed=1.5)
assert cam.mode == CameraMode.HORIZONTAL
cam.update(1.0)
assert cam.x > 0
def test_camera_omni():
"""Test omnidirectional camera."""
cam = Camera.omni(speed=1.0)
assert cam.mode == CameraMode.OMNI
cam.update(1.0)
assert cam.x > 0
assert cam.y > 0
def test_camera_floating():
"""Test floating camera with sinusoidal motion."""
cam = Camera.floating(speed=1.0)
assert cam.mode == CameraMode.FLOATING
y_before = cam.y
cam.update(0.5)
y_after = cam.y
assert y_before != y_after
def test_camera_reset():
"""Test camera reset."""
cam = Camera.vertical()
cam.update(1.0)
assert cam.y > 0
cam.reset()
assert cam.x == 0
assert cam.y == 0
def test_camera_custom_update():
"""Test custom update function."""
call_count = 0
def custom_update(camera, dt):
nonlocal call_count
call_count += 1
camera.x += int(10 * dt)
cam = Camera.custom(custom_update)
cam.update(1.0)
assert call_count == 1
assert cam.x == 10

View File

@@ -87,10 +87,26 @@ class TestRenderTickerZone:
def test_returns_list(self): def test_returns_list(self):
"""Returns a list of strings.""" """Returns a list of strings."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) result, cache = layers.render_ticker_zone(
[],
scroll_cam=0,
camera_x=0,
ticker_h=10,
w=80,
noise_cache={},
grad_offset=0.0,
)
assert isinstance(result, list) assert isinstance(result, list)
def test_returns_dict_for_cache(self): def test_returns_dict_for_cache(self):
"""Returns a dict for the noise cache.""" """Returns a dict for the noise cache."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) result, cache = layers.render_ticker_zone(
[],
scroll_cam=0,
camera_x=0,
ticker_h=10,
w=80,
noise_cache={},
grad_offset=0.0,
)
assert isinstance(cache, dict) assert isinstance(cache, dict)

32
tests/test_vis_offset.py Normal file
View File

@@ -0,0 +1,32 @@
from engine.effects.legacy import vis_offset, vis_trunc
def test_vis_offset_no_change():
"""vis_offset with offset 0 returns original."""
result = vis_offset("hello", 0)
assert result == "hello"
def test_vis_offset_trims_start():
"""vis_offset skips first N characters."""
result = vis_offset("hello world", 6)
assert result == "world"
def test_vis_offset_handles_ansi():
"""vis_offset handles ANSI codes correctly."""
result = vis_offset("\033[31mhello\033[0m", 3)
assert result == "lo\x1b[0m" or "lo" in result
def test_vis_offset_greater_than_length():
"""vis_offset with offset > length returns empty-ish."""
result = vis_offset("hi", 10)
assert result == ""
def test_vis_trunc_still_works():
"""Ensure vis_trunc still works after changes."""
result = vis_trunc("hello world", 5)
assert result == "hello"