diff --git a/engine/app.py b/engine/app.py index 8bc6089..806e2a7 100644 --- a/engine/app.py +++ b/engine/app.py @@ -352,10 +352,11 @@ def pick_effects_config(): def run_demo_mode(): - """Run demo mode - showcases effects with real content and pygame display.""" + """Run demo mode - showcases effects and camera modes with real content.""" import random from engine import config + from engine.camera import Camera, CameraMode from engine.display import DisplayRegistry from engine.effects import ( EffectContext, @@ -409,7 +410,6 @@ def run_demo_mode(): pool = list(items) seen = set() active = [] - scroll_cam = 0 ticker_next_y = 0 noise_cache = {} scroll_motion_accum = 0.0 @@ -418,6 +418,8 @@ def run_demo_mode(): GAP = 3 scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, h) + camera = Camera.vertical(speed=1.0) + effects_to_demo = ["noise", "fade", "glitch", "firehose"] effect_idx = 0 effect_name = effects_to_demo[effect_idx] @@ -425,12 +427,22 @@ def run_demo_mode(): current_intensity = 0.0 ramping_up = True - print(" \033[38;5;82mStarting effect demo...\033[0m") + camera_modes = [ + (CameraMode.VERTICAL, "vertical"), + (CameraMode.HORIZONTAL, "horizontal"), + (CameraMode.OMNI, "omni"), + (CameraMode.FLOATING, "floating"), + ] + camera_mode_idx = 0 + camera_start_time = time.time() + + print(" \033[38;5;82mStarting effect & camera demo...\033[0m") print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") try: while True: elapsed = time.time() - effect_start_time + camera_elapsed = time.time() - camera_start_time duration = config.DEMO_EFFECT_DURATION if elapsed >= duration: @@ -441,6 +453,13 @@ def run_demo_mode(): current_intensity = 0.0 ramping_up = True + if camera_elapsed >= duration * 2: + camera_mode_idx = (camera_mode_idx + 1) % len(camera_modes) + mode, mode_name = camera_modes[camera_mode_idx] + camera = Camera(mode=mode, speed=1.0) + camera_start_time = time.time() + camera_elapsed = 0 + progress = elapsed / duration if ramping_up: current_intensity = progress @@ -458,18 +477,21 @@ def run_demo_mode(): hud_effect = registry.get("hud") if hud_effect: - hud_effect.config.params["display_effect"] = effect_name + mode_name = camera_modes[camera_mode_idx][1] + hud_effect.config.params["display_effect"] = ( + f"{effect_name} / {mode_name}" + ) hud_effect.config.params["display_intensity"] = current_intensity scroll_motion_accum += config.FRAME_DT while scroll_motion_accum >= scroll_step_interval: scroll_motion_accum -= scroll_step_interval - scroll_cam += 1 + camera.update(config.FRAME_DT) - from engine.effects import next_headline - from engine.render import make_block + while ticker_next_y < camera.y + h + 10 and len(active) < 50: + from engine.effects import next_headline + from engine.render import make_block - while ticker_next_y < scroll_cam + h + 10: t, src, ts = next_headline(pool, items, seen) ticker_content, hc, midx = make_block(t, src, ts, w) active.append((ticker_content, hc, ticker_next_y, midx)) @@ -478,10 +500,10 @@ def run_demo_mode(): active = [ (c, hc, by, mi) for c, hc, by, mi in active - if by + len(c) > scroll_cam + if by + len(c) > camera.y ] for k in list(noise_cache): - if k < scroll_cam: + if k < camera.y: del noise_cache[k] grad_offset = (time.time() * config.GRAD_SPEED) % 1.0 @@ -489,7 +511,13 @@ def run_demo_mode(): from engine.layers import render_ticker_zone buf, noise_cache = render_ticker_zone( - active, scroll_cam, h, w, noise_cache, grad_offset + active, + scroll_cam=camera.y, + camera_x=camera.x, + ticker_h=h, + w=w, + noise_cache=noise_cache, + grad_offset=grad_offset, ) from engine.layers import render_firehose @@ -500,8 +528,9 @@ def run_demo_mode(): ctx = EffectContext( terminal_width=w, terminal_height=h, - scroll_cam=scroll_cam, + scroll_cam=camera.y, ticker_height=h, + camera_x=camera.x, mic_excess=0.0, grad_offset=grad_offset, frame_number=frame_number, diff --git a/engine/camera.py b/engine/camera.py new file mode 100644 index 0000000..12d95f2 --- /dev/null +++ b/engine/camera.py @@ -0,0 +1,109 @@ +""" +Camera system for viewport scrolling. + +Provides abstraction for camera motion in different modes: +- Vertical: traditional upward scroll +- Horizontal: left/right movement +- Omni: combination of both +- Floating: sinusoidal/bobbing motion +""" + +import math +from collections.abc import Callable +from dataclasses import dataclass, field +from enum import Enum, auto + + +class CameraMode(Enum): + VERTICAL = auto() + HORIZONTAL = auto() + OMNI = auto() + FLOATING = auto() + + +@dataclass +class Camera: + """Camera for viewport scrolling. + + Attributes: + x: Current horizontal offset (positive = scroll left) + y: Current vertical offset (positive = scroll up) + mode: Current camera mode + speed: Base scroll speed + custom_update: Optional custom update function + """ + + x: int = 0 + y: int = 0 + mode: CameraMode = CameraMode.VERTICAL + speed: float = 1.0 + custom_update: Callable[["Camera", float], None] | None = None + _time: float = field(default=0.0, repr=False) + + def update(self, dt: float) -> None: + """Update camera position based on mode. + + Args: + dt: Delta time in seconds + """ + self._time += dt + + if self.custom_update: + self.custom_update(self, dt) + return + + if self.mode == CameraMode.VERTICAL: + self._update_vertical(dt) + elif self.mode == CameraMode.HORIZONTAL: + self._update_horizontal(dt) + elif self.mode == CameraMode.OMNI: + self._update_omni(dt) + elif self.mode == CameraMode.FLOATING: + self._update_floating(dt) + + def _update_vertical(self, dt: float) -> None: + self.y += int(self.speed * dt * 60) + + def _update_horizontal(self, dt: float) -> None: + self.x += int(self.speed * dt * 60) + + def _update_omni(self, dt: float) -> None: + speed = self.speed * dt * 60 + self.y += int(speed) + self.x += int(speed * 0.5) + + def _update_floating(self, dt: float) -> None: + base = self.speed * 30 + self.y = int(math.sin(self._time * 2) * base) + self.x = int(math.cos(self._time * 1.5) * base * 0.5) + + def reset(self) -> None: + """Reset camera position.""" + self.x = 0 + self.y = 0 + self._time = 0.0 + + @classmethod + def vertical(cls, speed: float = 1.0) -> "Camera": + """Create a vertical scrolling camera.""" + return cls(mode=CameraMode.VERTICAL, speed=speed) + + @classmethod + def horizontal(cls, speed: float = 1.0) -> "Camera": + """Create a horizontal scrolling camera.""" + return cls(mode=CameraMode.HORIZONTAL, speed=speed) + + @classmethod + def omni(cls, speed: float = 1.0) -> "Camera": + """Create an omnidirectional scrolling camera.""" + return cls(mode=CameraMode.OMNI, speed=speed) + + @classmethod + def floating(cls, speed: float = 1.0) -> "Camera": + """Create a floating/bobbing camera.""" + return cls(mode=CameraMode.FLOATING, speed=speed) + + @classmethod + def custom(cls, update_fn: Callable[["Camera", float], None]) -> "Camera": + """Create a camera with custom update function.""" + return cls(custom_update=update_fn) diff --git a/engine/effects/__init__.py b/engine/effects/__init__.py index 7f89c3b..55f8370 100644 --- a/engine/effects/__init__.py +++ b/engine/effects/__init__.py @@ -6,6 +6,7 @@ from engine.effects.legacy import ( glitch_bar, next_headline, noise, + vis_offset, vis_trunc, ) from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor @@ -45,4 +46,5 @@ __all__ = [ "noise", "next_headline", "vis_trunc", + "vis_offset", ] diff --git a/engine/effects/legacy.py b/engine/effects/legacy.py index 2887452..ac82096 100644 --- a/engine/effects/legacy.py +++ b/engine/effects/legacy.py @@ -82,6 +82,37 @@ def vis_trunc(s, w): return "".join(result) +def vis_offset(s, offset): + """Offset string by skipping first offset visual characters, skipping ANSI escape codes.""" + if offset <= 0: + return s + result = [] + vw = 0 + i = 0 + skipping = True + while i < len(s): + if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[": + j = i + 2 + while j < len(s) and not s[j].isalpha(): + j += 1 + if skipping: + i = j + 1 + continue + result.append(s[i : j + 1]) + i = j + 1 + else: + if skipping: + if vw >= offset: + skipping = False + result.append(s[i]) + vw += 1 + i += 1 + else: + result.append(s[i]) + i += 1 + return "".join(result) + + def next_headline(pool, items, seen): """Pull the next unique headline from pool, refilling as needed.""" while True: diff --git a/engine/effects/types.py b/engine/effects/types.py index d544dec..2d35dcb 100644 --- a/engine/effects/types.py +++ b/engine/effects/types.py @@ -29,10 +29,11 @@ class EffectContext: terminal_height: int scroll_cam: int ticker_height: int - mic_excess: float - grad_offset: float - frame_number: int - has_message: bool + camera_x: int = 0 + mic_excess: float = 0.0 + grad_offset: float = 0.0 + frame_number: int = 0 + has_message: bool = False items: list = field(default_factory=list) diff --git a/engine/layers.py b/engine/layers.py index b5ac428..0d8fe95 100644 --- a/engine/layers.py +++ b/engine/layers.py @@ -16,6 +16,7 @@ from engine.effects import ( firehose_line, glitch_bar, noise, + vis_offset, vis_trunc, ) from engine.render import big_wrap, lr_gradient, lr_gradient_opposite @@ -94,16 +95,18 @@ def render_message_overlay( def render_ticker_zone( active: list, scroll_cam: int, - ticker_h: int, - w: int, - noise_cache: dict, - grad_offset: float, + camera_x: int = 0, + ticker_h: int = 0, + w: int = 80, + noise_cache: dict | None = None, + grad_offset: float = 0.0, ) -> tuple[list[str], dict]: """Render the ticker scroll zone. Args: active: list of (content_rows, color, canvas_y, meta_idx) scroll_cam: camera position (viewport top) + camera_x: horizontal camera offset ticker_h: height of ticker zone w: terminal width noise_cache: dict of cy -> noise string @@ -112,6 +115,8 @@ def render_ticker_zone( Returns: (list of ANSI strings, updated noise_cache) """ + if noise_cache is None: + noise_cache = {} buf = [] top_zone = max(1, int(ticker_h * 0.25)) bot_zone = max(1, int(ticker_h * 0.10)) @@ -137,7 +142,7 @@ def render_ticker_zone( colored = lr_gradient([raw], grad_offset)[0] else: colored = raw - ln = vis_trunc(colored, w) + ln = vis_trunc(vis_offset(colored, camera_x), w) if row_fade < 1.0: ln = fade_line(ln, row_fade) @@ -228,11 +233,12 @@ def process_effects( h: int, scroll_cam: int, ticker_h: int, - mic_excess: float, - grad_offset: float, - frame_number: int, - has_message: bool, - items: list, + camera_x: int = 0, + mic_excess: float = 0.0, + grad_offset: float = 0.0, + frame_number: int = 0, + has_message: bool = False, + items: list | None = None, ) -> list[str]: """Process buffer through effect chain.""" if _effect_chain is None: @@ -242,12 +248,13 @@ def process_effects( terminal_width=w, terminal_height=h, scroll_cam=scroll_cam, + camera_x=camera_x, ticker_height=ticker_h, mic_excess=mic_excess, grad_offset=grad_offset, frame_number=frame_number, has_message=has_message, - items=items, + items=items or [], ) return _effect_chain.process(buf, ctx) diff --git a/engine/scroll.py b/engine/scroll.py index d13408b..6911a6b 100644 --- a/engine/scroll.py +++ b/engine/scroll.py @@ -7,6 +7,7 @@ import random import time from engine import config +from engine.camera import Camera from engine.display import ( Display, TerminalDisplay, @@ -27,10 +28,19 @@ from engine.viewport import th, tw USE_EFFECT_CHAIN = True -def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): +def stream( + items, + ntfy_poller, + mic_monitor, + display: Display | None = None, + camera: Camera | None = None, +): """Main render loop with four layers: message, ticker, scroll motion, firehose.""" if display is None: display = TerminalDisplay() + if camera is None: + camera = Camera.vertical() + random.shuffle(items) pool = list(items) seen = set() @@ -46,7 +56,6 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) active = [] - scroll_cam = 0 ticker_next_y = ticker_view_h noise_cache = {} scroll_motion_accum = 0.0 @@ -72,10 +81,10 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): scroll_motion_accum += config.FRAME_DT while scroll_motion_accum >= scroll_step_interval: scroll_motion_accum -= scroll_step_interval - scroll_cam += 1 + camera.update(config.FRAME_DT) while ( - ticker_next_y < scroll_cam + ticker_view_h + 10 + ticker_next_y < camera.y + ticker_view_h + 10 and queued < config.HEADLINE_LIMIT ): from engine.effects import next_headline @@ -88,17 +97,17 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): queued += 1 active = [ - (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam + (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > camera.y ] for k in list(noise_cache): - if k < scroll_cam: + if k < camera.y: del noise_cache[k] grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0 ticker_buf_start = len(buf) ticker_buf, noise_cache = render_ticker_zone( - active, scroll_cam, ticker_h, w, noise_cache, grad_offset + active, camera.y, camera.x, ticker_h, w, noise_cache, grad_offset ) buf.extend(ticker_buf) @@ -110,8 +119,9 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): buf, w, h, - scroll_cam, + camera.y, ticker_h, + camera.x, mic_excess, grad_offset, frame_number, diff --git a/tests/test_camera.py b/tests/test_camera.py new file mode 100644 index 0000000..b55a968 --- /dev/null +++ b/tests/test_camera.py @@ -0,0 +1,69 @@ + +from engine.camera import Camera, CameraMode + + +def test_camera_vertical_default(): + """Test default vertical camera.""" + cam = Camera() + assert cam.mode == CameraMode.VERTICAL + assert cam.x == 0 + assert cam.y == 0 + + +def test_camera_vertical_factory(): + """Test vertical factory method.""" + cam = Camera.vertical(speed=2.0) + assert cam.mode == CameraMode.VERTICAL + assert cam.speed == 2.0 + + +def test_camera_horizontal(): + """Test horizontal camera.""" + cam = Camera.horizontal(speed=1.5) + assert cam.mode == CameraMode.HORIZONTAL + cam.update(1.0) + assert cam.x > 0 + + +def test_camera_omni(): + """Test omnidirectional camera.""" + cam = Camera.omni(speed=1.0) + assert cam.mode == CameraMode.OMNI + cam.update(1.0) + assert cam.x > 0 + assert cam.y > 0 + + +def test_camera_floating(): + """Test floating camera with sinusoidal motion.""" + cam = Camera.floating(speed=1.0) + assert cam.mode == CameraMode.FLOATING + y_before = cam.y + cam.update(0.5) + y_after = cam.y + assert y_before != y_after + + +def test_camera_reset(): + """Test camera reset.""" + cam = Camera.vertical() + cam.update(1.0) + assert cam.y > 0 + cam.reset() + assert cam.x == 0 + assert cam.y == 0 + + +def test_camera_custom_update(): + """Test custom update function.""" + call_count = 0 + + def custom_update(camera, dt): + nonlocal call_count + call_count += 1 + camera.x += int(10 * dt) + + cam = Camera.custom(custom_update) + cam.update(1.0) + assert call_count == 1 + assert cam.x == 10 diff --git a/tests/test_layers.py b/tests/test_layers.py index afe9c07..a2205a6 100644 --- a/tests/test_layers.py +++ b/tests/test_layers.py @@ -87,10 +87,26 @@ class TestRenderTickerZone: def test_returns_list(self): """Returns a list of strings.""" - result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) + result, cache = layers.render_ticker_zone( + [], + scroll_cam=0, + camera_x=0, + ticker_h=10, + w=80, + noise_cache={}, + grad_offset=0.0, + ) assert isinstance(result, list) def test_returns_dict_for_cache(self): """Returns a dict for the noise cache.""" - result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0) + result, cache = layers.render_ticker_zone( + [], + scroll_cam=0, + camera_x=0, + ticker_h=10, + w=80, + noise_cache={}, + grad_offset=0.0, + ) assert isinstance(cache, dict) diff --git a/tests/test_vis_offset.py b/tests/test_vis_offset.py new file mode 100644 index 0000000..e4ba282 --- /dev/null +++ b/tests/test_vis_offset.py @@ -0,0 +1,32 @@ + +from engine.effects.legacy import vis_offset, vis_trunc + + +def test_vis_offset_no_change(): + """vis_offset with offset 0 returns original.""" + result = vis_offset("hello", 0) + assert result == "hello" + + +def test_vis_offset_trims_start(): + """vis_offset skips first N characters.""" + result = vis_offset("hello world", 6) + assert result == "world" + + +def test_vis_offset_handles_ansi(): + """vis_offset handles ANSI codes correctly.""" + result = vis_offset("\033[31mhello\033[0m", 3) + assert result == "lo\x1b[0m" or "lo" in result + + +def test_vis_offset_greater_than_length(): + """vis_offset with offset > length returns empty-ish.""" + result = vis_offset("hi", 10) + assert result == "" + + +def test_vis_trunc_still_works(): + """Ensure vis_trunc still works after changes.""" + result = vis_trunc("hello world", 5) + assert result == "hello"