forked from genewildish/Mainline
feat: Implement scrolling camera with layout-aware filtering
- Rename VERTICAL camera mode to FEED (rapid single-item view) - Add SCROLL camera mode with float accumulation for smooth movie-credits style scrolling - Add estimate_block_height() for cheap layout calculation without full rendering - Replace ViewportFilterStage with layout-aware filtering that tracks camera position - Add render caching to FontStage to avoid re-rendering items - Fix CameraStage to use global canvas height for scrolling bounds - Add horizontal padding in Camera.apply() to prevent ghosting - Add get_dimensions() to MultiDisplay for proper viewport sizing - Fix PygameDisplay to auto-detect viewport from window size - Update presets to use scroll camera with appropriate speeds
This commit is contained in:
@@ -120,7 +120,7 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
print(f" \033[38;5;196mFailed to create display: {display_name}\033[0m")
|
||||
sys.exit(1)
|
||||
|
||||
display.init(80, 24)
|
||||
display.init(0, 0)
|
||||
|
||||
effect_registry = get_registry()
|
||||
|
||||
@@ -171,16 +171,21 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
from engine.pipeline.adapters import CameraStage
|
||||
|
||||
camera = None
|
||||
if preset.camera == "vertical":
|
||||
camera = Camera.vertical()
|
||||
speed = getattr(preset, "camera_speed", 1.0)
|
||||
if preset.camera == "feed":
|
||||
camera = Camera.feed(speed=speed)
|
||||
elif preset.camera == "scroll":
|
||||
camera = Camera.scroll(speed=speed)
|
||||
elif preset.camera == "vertical":
|
||||
camera = Camera.scroll(speed=speed) # Backwards compat
|
||||
elif preset.camera == "horizontal":
|
||||
camera = Camera.horizontal()
|
||||
camera = Camera.horizontal(speed=speed)
|
||||
elif preset.camera == "omni":
|
||||
camera = Camera.omni()
|
||||
camera = Camera.omni(speed=speed)
|
||||
elif preset.camera == "floating":
|
||||
camera = Camera.floating()
|
||||
camera = Camera.floating(speed=speed)
|
||||
elif preset.camera == "bounce":
|
||||
camera = Camera.bounce()
|
||||
camera = Camera.bounce(speed=speed)
|
||||
|
||||
if camera:
|
||||
pipeline.add_stage("camera", CameraStage(camera, name=preset.camera))
|
||||
@@ -213,6 +218,7 @@ def run_pipeline_mode(preset_name: str = "demo"):
|
||||
ctx.set("items", items)
|
||||
ctx.set("pipeline", pipeline)
|
||||
ctx.set("pipeline_order", pipeline.execution_order)
|
||||
ctx.set("camera_y", 0)
|
||||
|
||||
current_width = 80
|
||||
current_height = 24
|
||||
|
||||
101
engine/camera.py
101
engine/camera.py
@@ -17,7 +17,8 @@ from enum import Enum, auto
|
||||
|
||||
|
||||
class CameraMode(Enum):
|
||||
VERTICAL = auto()
|
||||
FEED = auto() # Single item view (static or rapid cycling)
|
||||
SCROLL = auto() # Smooth vertical scrolling (movie credits style)
|
||||
HORIZONTAL = auto()
|
||||
OMNI = auto()
|
||||
FLOATING = auto()
|
||||
@@ -55,12 +56,14 @@ class Camera:
|
||||
|
||||
x: int = 0
|
||||
y: int = 0
|
||||
mode: CameraMode = CameraMode.VERTICAL
|
||||
mode: CameraMode = CameraMode.FEED
|
||||
speed: float = 1.0
|
||||
zoom: float = 1.0
|
||||
canvas_width: int = 200 # Larger than viewport for scrolling
|
||||
canvas_height: int = 200
|
||||
custom_update: Callable[["Camera", float], None] | None = None
|
||||
_x_float: float = field(default=0.0, repr=False)
|
||||
_y_float: float = field(default=0.0, repr=False)
|
||||
_time: float = field(default=0.0, repr=False)
|
||||
|
||||
@property
|
||||
@@ -128,8 +131,10 @@ class Camera:
|
||||
self.custom_update(self, dt)
|
||||
return
|
||||
|
||||
if self.mode == CameraMode.VERTICAL:
|
||||
self._update_vertical(dt)
|
||||
if self.mode == CameraMode.FEED:
|
||||
self._update_feed(dt)
|
||||
elif self.mode == CameraMode.SCROLL:
|
||||
self._update_scroll(dt)
|
||||
elif self.mode == CameraMode.HORIZONTAL:
|
||||
self._update_horizontal(dt)
|
||||
elif self.mode == CameraMode.OMNI:
|
||||
@@ -159,9 +164,15 @@ class Camera:
|
||||
if vh < self.canvas_height:
|
||||
self.y = max(0, min(self.y, self.canvas_height - vh))
|
||||
|
||||
def _update_vertical(self, dt: float) -> None:
|
||||
def _update_feed(self, dt: float) -> None:
|
||||
"""Feed mode: rapid scrolling (1 row per frame at speed=1.0)."""
|
||||
self.y += int(self.speed * dt * 60)
|
||||
|
||||
def _update_scroll(self, dt: float) -> None:
|
||||
"""Scroll mode: smooth vertical scrolling with float accumulation."""
|
||||
self._y_float += self.speed * dt * 60
|
||||
self.y = int(self._y_float)
|
||||
|
||||
def _update_horizontal(self, dt: float) -> None:
|
||||
self.x += int(self.speed * dt * 60)
|
||||
|
||||
@@ -230,10 +241,86 @@ class Camera:
|
||||
self.canvas_height = height
|
||||
self._clamp_to_bounds()
|
||||
|
||||
def apply(
|
||||
self, buffer: list[str], viewport_width: int, viewport_height: int | None = None
|
||||
) -> list[str]:
|
||||
"""Apply camera viewport to a text buffer.
|
||||
|
||||
Slices the buffer based on camera position (x, y) and viewport dimensions.
|
||||
Handles ANSI escape codes correctly for colored/styled text.
|
||||
|
||||
Args:
|
||||
buffer: List of strings representing lines of text
|
||||
viewport_width: Width of the visible viewport in characters
|
||||
viewport_height: Height of the visible viewport (overrides camera's viewport_height if provided)
|
||||
|
||||
Returns:
|
||||
Sliced buffer containing only the visible lines and columns
|
||||
"""
|
||||
from engine.effects.legacy import vis_offset, vis_trunc
|
||||
|
||||
if not buffer:
|
||||
return buffer
|
||||
|
||||
# Get current viewport bounds (clamped to canvas size)
|
||||
viewport = self.get_viewport()
|
||||
|
||||
# Use provided viewport_height if given, otherwise use camera's viewport
|
||||
vh = viewport_height if viewport_height is not None else viewport.height
|
||||
|
||||
# Vertical slice: extract lines that fit in viewport height
|
||||
start_y = viewport.y
|
||||
end_y = min(viewport.y + vh, len(buffer))
|
||||
|
||||
if start_y >= len(buffer):
|
||||
# Scrolled past end of buffer, return empty viewport
|
||||
return [""] * vh
|
||||
|
||||
vertical_slice = buffer[start_y:end_y]
|
||||
|
||||
# Horizontal slice: apply horizontal offset and truncate to width
|
||||
horizontal_slice = []
|
||||
for line in vertical_slice:
|
||||
# Apply horizontal offset (skip first x characters, handling ANSI)
|
||||
offset_line = vis_offset(line, viewport.x)
|
||||
# Truncate to viewport width (handling ANSI)
|
||||
truncated_line = vis_trunc(offset_line, viewport_width)
|
||||
|
||||
# Pad line to full viewport width to prevent ghosting when panning
|
||||
import re
|
||||
|
||||
visible_len = len(re.sub(r"\x1b\[[0-9;]*m", "", truncated_line))
|
||||
if visible_len < viewport_width:
|
||||
truncated_line += " " * (viewport_width - visible_len)
|
||||
|
||||
horizontal_slice.append(truncated_line)
|
||||
|
||||
# Pad with empty lines if needed to fill viewport height
|
||||
while len(horizontal_slice) < vh:
|
||||
horizontal_slice.append("")
|
||||
|
||||
return horizontal_slice
|
||||
|
||||
@classmethod
|
||||
def feed(cls, speed: float = 1.0) -> "Camera":
|
||||
"""Create a feed camera (rapid single-item scrolling, 1 row/frame at speed=1.0)."""
|
||||
return cls(mode=CameraMode.FEED, speed=speed, canvas_height=200)
|
||||
|
||||
@classmethod
|
||||
def scroll(cls, speed: float = 0.5) -> "Camera":
|
||||
"""Create a smooth scrolling camera (movie credits style).
|
||||
|
||||
Uses float accumulation for sub-integer speeds.
|
||||
Sets canvas_width=0 so it matches viewport_width for proper text wrapping.
|
||||
"""
|
||||
return cls(
|
||||
mode=CameraMode.SCROLL, speed=speed, canvas_width=0, canvas_height=200
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def vertical(cls, speed: float = 1.0) -> "Camera":
|
||||
"""Create a vertical scrolling camera."""
|
||||
return cls(mode=CameraMode.VERTICAL, speed=speed, canvas_height=200)
|
||||
"""Deprecated: Use feed() or scroll() instead."""
|
||||
return cls(mode=CameraMode.FEED, speed=speed, canvas_height=200)
|
||||
|
||||
@classmethod
|
||||
def horizontal(cls, speed: float = 1.0) -> "Camera":
|
||||
|
||||
@@ -38,6 +38,13 @@ class MultiDisplay:
|
||||
for d in self.displays:
|
||||
d.clear()
|
||||
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get dimensions from the first child display that supports it."""
|
||||
for d in self.displays:
|
||||
if hasattr(d, "get_dimensions"):
|
||||
return d.get_dimensions()
|
||||
return (self.width, self.height)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
for d in self.displays:
|
||||
d.cleanup()
|
||||
|
||||
@@ -122,6 +122,10 @@ class PygameDisplay:
|
||||
self._pygame = pygame
|
||||
PygameDisplay._pygame_initialized = True
|
||||
|
||||
# Calculate character dimensions from actual window size
|
||||
self.width = max(1, self.window_width // self.cell_width)
|
||||
self.height = max(1, self.window_height // self.cell_height)
|
||||
|
||||
font_path = self._get_font_path()
|
||||
if font_path:
|
||||
try:
|
||||
|
||||
@@ -314,9 +314,7 @@ class CameraStage(Stage):
|
||||
|
||||
@property
|
||||
def dependencies(self) -> set[str]:
|
||||
return {
|
||||
"source"
|
||||
} # Prefix match any source (source.headlines, source.poetry, etc.)
|
||||
return {"render.output"} # Depend on rendered output from font or render stage
|
||||
|
||||
@property
|
||||
def inlet_types(self) -> set:
|
||||
@@ -335,9 +333,54 @@ class CameraStage(Stage):
|
||||
if data is None:
|
||||
return None
|
||||
if hasattr(self._camera, "apply"):
|
||||
return self._camera.apply(
|
||||
data, ctx.params.viewport_width if ctx.params else 80
|
||||
viewport_width = ctx.params.viewport_width if ctx.params else 80
|
||||
viewport_height = ctx.params.viewport_height if ctx.params else 24
|
||||
buffer_height = len(data) if isinstance(data, list) else 0
|
||||
|
||||
# Get global layout height for canvas (enables full scrolling range)
|
||||
total_layout_height = ctx.get("total_layout_height", buffer_height)
|
||||
|
||||
# Preserve camera's configured canvas width, but ensure it's at least viewport_width
|
||||
# This allows horizontal/omni/floating/bounce cameras to scroll properly
|
||||
canvas_width = max(
|
||||
viewport_width, getattr(self._camera, "canvas_width", viewport_width)
|
||||
)
|
||||
|
||||
# Update camera's viewport dimensions so it knows its actual bounds
|
||||
if hasattr(self._camera, "viewport_width"):
|
||||
self._camera.viewport_width = viewport_width
|
||||
self._camera.viewport_height = viewport_height
|
||||
|
||||
# Set canvas to full layout height so camera can scroll through all content
|
||||
self._camera.set_canvas_size(width=canvas_width, height=total_layout_height)
|
||||
|
||||
# Update camera position (scroll) - uses global canvas for clamping
|
||||
if hasattr(self._camera, "update"):
|
||||
self._camera.update(1 / 60)
|
||||
|
||||
# Store camera_y in context for ViewportFilterStage (global y position)
|
||||
ctx.set("camera_y", self._camera.y)
|
||||
|
||||
# Apply camera viewport slicing to the partial buffer
|
||||
# The buffer starts at render_offset_y in global coordinates
|
||||
render_offset_y = ctx.get("render_offset_y", 0)
|
||||
|
||||
# Temporarily shift camera to local buffer coordinates for apply()
|
||||
real_y = self._camera.y
|
||||
local_y = max(0, real_y - render_offset_y)
|
||||
|
||||
# Temporarily shrink canvas to local buffer size so apply() works correctly
|
||||
self._camera.set_canvas_size(width=canvas_width, height=buffer_height)
|
||||
self._camera.y = local_y
|
||||
|
||||
# Apply slicing
|
||||
result = self._camera.apply(data, viewport_width, viewport_height)
|
||||
|
||||
# Restore global canvas and camera position for next frame
|
||||
self._camera.set_canvas_size(width=canvas_width, height=total_layout_height)
|
||||
self._camera.y = real_y
|
||||
|
||||
return result
|
||||
return data
|
||||
|
||||
def cleanup(self) -> None:
|
||||
@@ -346,20 +389,20 @@ class CameraStage(Stage):
|
||||
|
||||
|
||||
class ViewportFilterStage(Stage):
|
||||
"""Stage that limits items to fit in viewport.
|
||||
"""Stage that limits items based on layout calculation.
|
||||
|
||||
Filters the input list of items to only include as many as can fit
|
||||
in the visible viewport. This prevents FontStage from rendering
|
||||
thousands of items when only a few are visible, reducing processing time.
|
||||
|
||||
Estimate: each rendered item typically takes 5-8 terminal lines.
|
||||
For a 24-line viewport, we limit to ~4 items (conservative estimate).
|
||||
Computes cumulative y-offsets for all items using cheap height estimation,
|
||||
then returns only items that overlap the camera's viewport window.
|
||||
This prevents FontStage from rendering thousands of items when only a few
|
||||
are visible, while still allowing camera scrolling through all content.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str = "viewport-filter"):
|
||||
self.name = name
|
||||
self.category = "filter"
|
||||
self.optional = False
|
||||
self._cached_count = 0
|
||||
self._layout: list[tuple[int, int]] = []
|
||||
|
||||
@property
|
||||
def stage_type(self) -> str:
|
||||
@@ -386,21 +429,60 @@ class ViewportFilterStage(Stage):
|
||||
return {DataType.SOURCE_ITEMS}
|
||||
|
||||
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
||||
"""Filter items to viewport-fitting count."""
|
||||
"""Filter items based on layout and camera position."""
|
||||
if data is None or not isinstance(data, list):
|
||||
return data
|
||||
|
||||
# Estimate: each rendered headline takes 5-8 lines
|
||||
# Use a conservative factor to ensure we don't run out of space
|
||||
lines_per_item = 6
|
||||
viewport_height = ctx.params.viewport_height if ctx.params else 24
|
||||
viewport_width = ctx.params.viewport_width if ctx.params else 80
|
||||
camera_y = ctx.get("camera_y", 0)
|
||||
|
||||
# Calculate how many items we need to fill the viewport
|
||||
# Add 1 extra to account for padding/spacing
|
||||
max_items = max(1, viewport_height // lines_per_item + 1)
|
||||
# Recompute layout only when item count changes
|
||||
if len(data) != self._cached_count:
|
||||
self._layout = []
|
||||
y = 0
|
||||
from engine.render.blocks import estimate_block_height
|
||||
|
||||
# Return only the items that fit in the viewport
|
||||
return data[:max_items]
|
||||
for item in data:
|
||||
if hasattr(item, "content"):
|
||||
title = item.content
|
||||
elif isinstance(item, tuple):
|
||||
title = str(item[0]) if item else ""
|
||||
else:
|
||||
title = str(item)
|
||||
h = estimate_block_height(title, viewport_width)
|
||||
self._layout.append((y, h))
|
||||
y += h
|
||||
self._cached_count = len(data)
|
||||
|
||||
# Find items visible in [camera_y - buffer, camera_y + viewport_height + buffer]
|
||||
buffer_zone = viewport_height
|
||||
vis_start = max(0, camera_y - buffer_zone)
|
||||
vis_end = camera_y + viewport_height + buffer_zone
|
||||
|
||||
visible_items = []
|
||||
render_offset_y = 0
|
||||
first_visible_found = False
|
||||
for i, (start_y, height) in enumerate(self._layout):
|
||||
item_end = start_y + height
|
||||
if item_end > vis_start and start_y < vis_end:
|
||||
if not first_visible_found:
|
||||
render_offset_y = start_y
|
||||
first_visible_found = True
|
||||
visible_items.append(data[i])
|
||||
|
||||
# Compute total layout height for the canvas
|
||||
total_layout_height = 0
|
||||
if self._layout:
|
||||
last_start, last_height = self._layout[-1]
|
||||
total_layout_height = last_start + last_height
|
||||
|
||||
# Store metadata for CameraStage
|
||||
ctx.set("render_offset_y", render_offset_y)
|
||||
ctx.set("total_layout_height", total_layout_height)
|
||||
|
||||
# Always return at least one item to avoid empty buffer errors
|
||||
return visible_items if visible_items else data[:1]
|
||||
|
||||
|
||||
class FontStage(Stage):
|
||||
@@ -434,6 +516,7 @@ class FontStage(Stage):
|
||||
self._font_size = font_size
|
||||
self._font_ref = font_ref
|
||||
self._font = None
|
||||
self._render_cache: dict[tuple[str, str, int], list[str]] = {}
|
||||
|
||||
@property
|
||||
def stage_type(self) -> str:
|
||||
@@ -504,8 +587,15 @@ class FontStage(Stage):
|
||||
src = "unknown"
|
||||
ts = "0"
|
||||
|
||||
# Check cache first
|
||||
cache_key = (title, src, ts, w)
|
||||
if cache_key in self._render_cache:
|
||||
result.extend(self._render_cache[cache_key])
|
||||
continue
|
||||
|
||||
try:
|
||||
block_lines, color_code, meta_idx = make_block(title, src, ts, w)
|
||||
self._render_cache[cache_key] = block_lines
|
||||
result.extend(block_lines)
|
||||
except Exception:
|
||||
result.append(title)
|
||||
|
||||
@@ -45,7 +45,7 @@ class PipelinePreset:
|
||||
description: str = ""
|
||||
source: str = "headlines"
|
||||
display: str = "terminal"
|
||||
camera: str = "vertical"
|
||||
camera: str = "scroll"
|
||||
effects: list[str] = field(default_factory=list)
|
||||
border: bool = False
|
||||
|
||||
@@ -79,7 +79,7 @@ DEMO_PRESET = PipelinePreset(
|
||||
description="Demo mode with effect cycling and camera modes",
|
||||
source="headlines",
|
||||
display="pygame",
|
||||
camera="vertical",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade", "glitch", "firehose", "hud"],
|
||||
)
|
||||
|
||||
@@ -88,7 +88,7 @@ POETRY_PRESET = PipelinePreset(
|
||||
description="Poetry feed with subtle effects",
|
||||
source="poetry",
|
||||
display="pygame",
|
||||
camera="vertical",
|
||||
camera="scroll",
|
||||
effects=["fade", "hud"],
|
||||
)
|
||||
|
||||
@@ -106,7 +106,7 @@ WEBSOCKET_PRESET = PipelinePreset(
|
||||
description="WebSocket display mode",
|
||||
source="headlines",
|
||||
display="websocket",
|
||||
camera="vertical",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade", "glitch", "hud"],
|
||||
)
|
||||
|
||||
@@ -115,7 +115,7 @@ SIXEL_PRESET = PipelinePreset(
|
||||
description="Sixel graphics display mode",
|
||||
source="headlines",
|
||||
display="sixel",
|
||||
camera="vertical",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade", "glitch", "hud"],
|
||||
)
|
||||
|
||||
@@ -124,7 +124,7 @@ FIREHOSE_PRESET = PipelinePreset(
|
||||
description="High-speed firehose mode",
|
||||
source="headlines",
|
||||
display="pygame",
|
||||
camera="vertical",
|
||||
camera="scroll",
|
||||
effects=["noise", "fade", "glitch", "firehose", "hud"],
|
||||
)
|
||||
|
||||
|
||||
@@ -13,6 +13,50 @@ from engine import config
|
||||
from engine.sources import NO_UPPER, SCRIPT_FONTS, SOURCE_LANGS
|
||||
from engine.translate import detect_location_language, translate_headline
|
||||
|
||||
|
||||
def estimate_block_height(title: str, width: int, fnt=None) -> int:
|
||||
"""Estimate rendered block height without full PIL rendering.
|
||||
|
||||
Uses font bbox measurement to count wrapped lines, then computes:
|
||||
height = num_lines * RENDER_H + (num_lines - 1) + 2
|
||||
|
||||
Args:
|
||||
title: Headline text to measure
|
||||
width: Terminal width in characters
|
||||
fnt: Optional PIL font (uses default if None)
|
||||
|
||||
Returns:
|
||||
Estimated height in terminal rows
|
||||
"""
|
||||
if fnt is None:
|
||||
fnt = font()
|
||||
text = re.sub(r"\s+", " ", title.upper())
|
||||
words = text.split()
|
||||
lines = 0
|
||||
cur = ""
|
||||
for word in words:
|
||||
test = f"{cur} {word}".strip() if cur else word
|
||||
bbox = fnt.getbbox(test)
|
||||
if bbox:
|
||||
img_h = bbox[3] - bbox[1] + 8
|
||||
pix_h = config.RENDER_H * 2
|
||||
scale = pix_h / max(img_h, 1)
|
||||
term_w = int((bbox[2] - bbox[0] + 8) * scale)
|
||||
else:
|
||||
term_w = 0
|
||||
max_term_w = width - 4 - 4
|
||||
if term_w > max_term_w and cur:
|
||||
lines += 1
|
||||
cur = word
|
||||
else:
|
||||
cur = test
|
||||
if cur:
|
||||
lines += 1
|
||||
if lines == 0:
|
||||
lines = 1
|
||||
return lines * config.RENDER_H + max(0, lines - 1) + 2
|
||||
|
||||
|
||||
# ─── FONT LOADING ─────────────────────────────────────────
|
||||
_FONT_OBJ = None
|
||||
_FONT_OBJ_KEY = None
|
||||
|
||||
Reference in New Issue
Block a user