refactor: phase 2 - modularization of scroll engine
Split monolithic scroll.py into focused modules: - viewport.py: terminal size (tw/th), ANSI positioning helpers - frame.py: FrameTimer class, scroll step calculation - layers.py: message overlay, ticker zone, firehose rendering - scroll.py: simplified orchestrator, imports from new modules Add stream controller and event types for future event-driven architecture: - controller.py: StreamController for source initialization and stream lifecycle - events.py: EventType enum and event dataclasses (HeadlineEvent, FrameTickEvent, etc.) Added tests for new modules: - test_viewport.py: 8 tests for viewport utilities - test_frame.py: 10 tests for frame timing - test_layers.py: 13 tests for layer compositing - test_events.py: 11 tests for event types - test_controller.py: 6 tests for stream controller This enables: - Testable chunks with clear responsibilities - Reusable viewport utilities across modules - Better separation of concerns in render pipeline - Foundation for future event-driven architecture Also includes Phase 1 documentation updates in code comments.
This commit is contained in:
201
engine/layers.py
Normal file
201
engine/layers.py
Normal file
@@ -0,0 +1,201 @@
|
||||
"""
|
||||
Layer compositing — message overlay, ticker zone, firehose, noise.
|
||||
Depends on: config, render, effects.
|
||||
"""
|
||||
|
||||
import random
|
||||
import re
|
||||
import time
|
||||
from datetime import datetime
|
||||
|
||||
from engine import config
|
||||
from engine.effects import (
|
||||
fade_line,
|
||||
firehose_line,
|
||||
glitch_bar,
|
||||
noise,
|
||||
vis_trunc,
|
||||
)
|
||||
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite
|
||||
from engine.terminal import RST, W_COOL
|
||||
|
||||
MSG_META = "\033[38;5;245m"
|
||||
MSG_BORDER = "\033[2;38;5;37m"
|
||||
|
||||
|
||||
def render_message_overlay(
|
||||
msg: tuple[str, str, float] | None,
|
||||
w: int,
|
||||
h: int,
|
||||
msg_cache: tuple,
|
||||
) -> tuple[list[str], tuple]:
|
||||
"""Render ntfy message overlay.
|
||||
|
||||
Args:
|
||||
msg: (title, body, timestamp) or None
|
||||
w: terminal width
|
||||
h: terminal height
|
||||
msg_cache: (cache_key, rendered_rows) for caching
|
||||
|
||||
Returns:
|
||||
(list of ANSI strings, updated cache)
|
||||
"""
|
||||
overlay = []
|
||||
if msg is None:
|
||||
return overlay, msg_cache
|
||||
|
||||
m_title, m_body, m_ts = msg
|
||||
display_text = m_body or m_title or "(empty)"
|
||||
display_text = re.sub(r"\s+", " ", display_text.upper())
|
||||
|
||||
cache_key = (display_text, w)
|
||||
if msg_cache[0] != cache_key:
|
||||
msg_rows = big_wrap(display_text, w - 4)
|
||||
msg_cache = (cache_key, msg_rows)
|
||||
else:
|
||||
msg_rows = msg_cache[1]
|
||||
|
||||
msg_rows = lr_gradient_opposite(
|
||||
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
|
||||
)
|
||||
|
||||
elapsed_s = int(time.monotonic() - m_ts)
|
||||
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
|
||||
ts_str = datetime.now().strftime("%H:%M:%S")
|
||||
panel_h = len(msg_rows) + 2
|
||||
panel_top = max(0, (h - panel_h) // 2)
|
||||
|
||||
row_idx = 0
|
||||
for mr in msg_rows:
|
||||
ln = vis_trunc(mr, w)
|
||||
overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K")
|
||||
row_idx += 1
|
||||
|
||||
meta_parts = []
|
||||
if m_title and m_title != m_body:
|
||||
meta_parts.append(m_title)
|
||||
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
|
||||
meta = (
|
||||
" " + " \u00b7 ".join(meta_parts)
|
||||
if len(meta_parts) > 1
|
||||
else " " + meta_parts[0]
|
||||
)
|
||||
overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K")
|
||||
row_idx += 1
|
||||
|
||||
bar = "\u2500" * (w - 4)
|
||||
overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K")
|
||||
|
||||
return overlay, msg_cache
|
||||
|
||||
|
||||
def render_ticker_zone(
|
||||
active: list,
|
||||
scroll_cam: int,
|
||||
ticker_h: int,
|
||||
w: int,
|
||||
noise_cache: dict,
|
||||
grad_offset: float,
|
||||
) -> tuple[list[str], dict]:
|
||||
"""Render the ticker scroll zone.
|
||||
|
||||
Args:
|
||||
active: list of (content_rows, color, canvas_y, meta_idx)
|
||||
scroll_cam: camera position (viewport top)
|
||||
ticker_h: height of ticker zone
|
||||
w: terminal width
|
||||
noise_cache: dict of cy -> noise string
|
||||
grad_offset: gradient animation offset
|
||||
|
||||
Returns:
|
||||
(list of ANSI strings, updated noise_cache)
|
||||
"""
|
||||
buf = []
|
||||
top_zone = max(1, int(ticker_h * 0.25))
|
||||
bot_zone = max(1, int(ticker_h * 0.10))
|
||||
|
||||
def noise_at(cy):
|
||||
if cy not in noise_cache:
|
||||
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
|
||||
return noise_cache[cy]
|
||||
|
||||
for r in range(ticker_h):
|
||||
scr_row = r + 1
|
||||
cy = scroll_cam + r
|
||||
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
|
||||
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
|
||||
row_fade = min(top_f, bot_f)
|
||||
drawn = False
|
||||
|
||||
for content, hc, by, midx in active:
|
||||
cr = cy - by
|
||||
if 0 <= cr < len(content):
|
||||
raw = content[cr]
|
||||
if cr != midx:
|
||||
colored = lr_gradient([raw], grad_offset)[0]
|
||||
else:
|
||||
colored = raw
|
||||
ln = vis_trunc(colored, w)
|
||||
if row_fade < 1.0:
|
||||
ln = fade_line(ln, row_fade)
|
||||
|
||||
if cr == midx:
|
||||
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
|
||||
elif ln.strip():
|
||||
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
|
||||
else:
|
||||
buf.append(f"\033[{scr_row};1H\033[K")
|
||||
drawn = True
|
||||
break
|
||||
|
||||
if not drawn:
|
||||
n = noise_at(cy)
|
||||
if row_fade < 1.0 and n:
|
||||
n = fade_line(n, row_fade)
|
||||
if n:
|
||||
buf.append(f"\033[{scr_row};1H{n}")
|
||||
else:
|
||||
buf.append(f"\033[{scr_row};1H\033[K")
|
||||
|
||||
return buf, noise_cache
|
||||
|
||||
|
||||
def apply_glitch(
|
||||
buf: list[str],
|
||||
ticker_buf_start: int,
|
||||
mic_excess: float,
|
||||
w: int,
|
||||
) -> list[str]:
|
||||
"""Apply glitch effect to ticker buffer.
|
||||
|
||||
Args:
|
||||
buf: current buffer
|
||||
ticker_buf_start: index where ticker starts in buffer
|
||||
mic_excess: mic level above threshold
|
||||
w: terminal width
|
||||
|
||||
Returns:
|
||||
Updated buffer with glitches applied
|
||||
"""
|
||||
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
|
||||
n_hits = 4 + int(mic_excess / 2)
|
||||
ticker_buf_len = len(buf) - ticker_buf_start
|
||||
|
||||
if random.random() < glitch_prob and ticker_buf_len > 0:
|
||||
for _ in range(min(n_hits, ticker_buf_len)):
|
||||
gi = random.randint(0, ticker_buf_len - 1)
|
||||
scr_row = gi + 1
|
||||
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
|
||||
|
||||
return buf
|
||||
|
||||
|
||||
def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
|
||||
"""Render firehose strip at bottom of screen."""
|
||||
buf = []
|
||||
if fh > 0:
|
||||
for fr in range(fh):
|
||||
scr_row = h - fh + fr + 1
|
||||
fline = firehose_line(items, w)
|
||||
buf.append(f"\033[{scr_row};1H{fline}\033[K")
|
||||
return buf
|
||||
Reference in New Issue
Block a user