261 lines
6.9 KiB
Python
261 lines
6.9 KiB
Python
"""
|
|
Layer compositing — message overlay, ticker zone, firehose, noise.
|
|
Depends on: config, render, effects.
|
|
"""
|
|
|
|
import random
|
|
import re
|
|
import time
|
|
from datetime import datetime
|
|
|
|
from engine import config
|
|
from engine.effects import (
|
|
EffectChain,
|
|
EffectContext,
|
|
fade_line,
|
|
firehose_line,
|
|
glitch_bar,
|
|
noise,
|
|
vis_trunc,
|
|
)
|
|
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite
|
|
from engine.terminal import RST, W_COOL
|
|
|
|
MSG_META = "\033[38;5;245m"
|
|
MSG_BORDER = "\033[2;38;5;37m"
|
|
|
|
|
|
def render_message_overlay(
|
|
msg: tuple[str, str, float] | None,
|
|
w: int,
|
|
h: int,
|
|
msg_cache: tuple,
|
|
) -> tuple[list[str], tuple]:
|
|
"""Render ntfy message overlay.
|
|
|
|
Args:
|
|
msg: (title, body, timestamp) or None
|
|
w: terminal width
|
|
h: terminal height
|
|
msg_cache: (cache_key, rendered_rows) for caching
|
|
|
|
Returns:
|
|
(list of ANSI strings, updated cache)
|
|
"""
|
|
overlay = []
|
|
if msg is None:
|
|
return overlay, msg_cache
|
|
|
|
m_title, m_body, m_ts = msg
|
|
display_text = m_body or m_title or "(empty)"
|
|
display_text = re.sub(r"\s+", " ", display_text.upper())
|
|
|
|
cache_key = (display_text, w)
|
|
if msg_cache[0] != cache_key:
|
|
msg_rows = big_wrap(display_text, w - 4)
|
|
msg_cache = (cache_key, msg_rows)
|
|
else:
|
|
msg_rows = msg_cache[1]
|
|
|
|
msg_rows = lr_gradient_opposite(
|
|
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
|
|
)
|
|
|
|
elapsed_s = int(time.monotonic() - m_ts)
|
|
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
|
|
ts_str = datetime.now().strftime("%H:%M:%S")
|
|
panel_h = len(msg_rows) + 2
|
|
panel_top = max(0, (h - panel_h) // 2)
|
|
|
|
row_idx = 0
|
|
for mr in msg_rows:
|
|
ln = vis_trunc(mr, w)
|
|
overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K")
|
|
row_idx += 1
|
|
|
|
meta_parts = []
|
|
if m_title and m_title != m_body:
|
|
meta_parts.append(m_title)
|
|
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
|
|
meta = (
|
|
" " + " \u00b7 ".join(meta_parts)
|
|
if len(meta_parts) > 1
|
|
else " " + meta_parts[0]
|
|
)
|
|
overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K")
|
|
row_idx += 1
|
|
|
|
bar = "\u2500" * (w - 4)
|
|
overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K")
|
|
|
|
return overlay, msg_cache
|
|
|
|
|
|
def render_ticker_zone(
|
|
active: list,
|
|
scroll_cam: int,
|
|
ticker_h: int,
|
|
w: int,
|
|
noise_cache: dict,
|
|
grad_offset: float,
|
|
) -> tuple[list[str], dict]:
|
|
"""Render the ticker scroll zone.
|
|
|
|
Args:
|
|
active: list of (content_rows, color, canvas_y, meta_idx)
|
|
scroll_cam: camera position (viewport top)
|
|
ticker_h: height of ticker zone
|
|
w: terminal width
|
|
noise_cache: dict of cy -> noise string
|
|
grad_offset: gradient animation offset
|
|
|
|
Returns:
|
|
(list of ANSI strings, updated noise_cache)
|
|
"""
|
|
buf = []
|
|
top_zone = max(1, int(ticker_h * 0.25))
|
|
bot_zone = max(1, int(ticker_h * 0.10))
|
|
|
|
def noise_at(cy):
|
|
if cy not in noise_cache:
|
|
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
|
|
return noise_cache[cy]
|
|
|
|
for r in range(ticker_h):
|
|
scr_row = r + 1
|
|
cy = scroll_cam + r
|
|
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
|
|
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
|
|
row_fade = min(top_f, bot_f)
|
|
drawn = False
|
|
|
|
for content, hc, by, midx in active:
|
|
cr = cy - by
|
|
if 0 <= cr < len(content):
|
|
raw = content[cr]
|
|
if cr != midx:
|
|
colored = lr_gradient([raw], grad_offset)[0]
|
|
else:
|
|
colored = raw
|
|
ln = vis_trunc(colored, w)
|
|
if row_fade < 1.0:
|
|
ln = fade_line(ln, row_fade)
|
|
|
|
if cr == midx:
|
|
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
|
|
elif ln.strip():
|
|
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
|
|
else:
|
|
buf.append(f"\033[{scr_row};1H\033[K")
|
|
drawn = True
|
|
break
|
|
|
|
if not drawn:
|
|
n = noise_at(cy)
|
|
if row_fade < 1.0 and n:
|
|
n = fade_line(n, row_fade)
|
|
if n:
|
|
buf.append(f"\033[{scr_row};1H{n}")
|
|
else:
|
|
buf.append(f"\033[{scr_row};1H\033[K")
|
|
|
|
return buf, noise_cache
|
|
|
|
|
|
def apply_glitch(
|
|
buf: list[str],
|
|
ticker_buf_start: int,
|
|
mic_excess: float,
|
|
w: int,
|
|
) -> list[str]:
|
|
"""Apply glitch effect to ticker buffer.
|
|
|
|
Args:
|
|
buf: current buffer
|
|
ticker_buf_start: index where ticker starts in buffer
|
|
mic_excess: mic level above threshold
|
|
w: terminal width
|
|
|
|
Returns:
|
|
Updated buffer with glitches applied
|
|
"""
|
|
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
|
|
n_hits = 4 + int(mic_excess / 2)
|
|
ticker_buf_len = len(buf) - ticker_buf_start
|
|
|
|
if random.random() < glitch_prob and ticker_buf_len > 0:
|
|
for _ in range(min(n_hits, ticker_buf_len)):
|
|
gi = random.randint(0, ticker_buf_len - 1)
|
|
scr_row = gi + 1
|
|
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
|
|
|
|
return buf
|
|
|
|
|
|
def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
|
|
"""Render firehose strip at bottom of screen."""
|
|
buf = []
|
|
if fh > 0:
|
|
for fr in range(fh):
|
|
scr_row = h - fh + fr + 1
|
|
fline = firehose_line(items, w)
|
|
buf.append(f"\033[{scr_row};1H{fline}\033[K")
|
|
return buf
|
|
|
|
|
|
_effect_chain = None
|
|
|
|
|
|
def init_effects() -> None:
|
|
"""Initialize effect plugins and chain."""
|
|
global _effect_chain
|
|
from engine.effects import EffectChain, get_registry
|
|
|
|
registry = get_registry()
|
|
|
|
import effects_plugins
|
|
|
|
effects_plugins.discover_plugins()
|
|
|
|
chain = EffectChain(registry)
|
|
chain.set_order(["noise", "fade", "glitch", "firehose"])
|
|
_effect_chain = chain
|
|
|
|
|
|
def process_effects(
|
|
buf: list[str],
|
|
w: int,
|
|
h: int,
|
|
scroll_cam: int,
|
|
ticker_h: int,
|
|
mic_excess: float,
|
|
grad_offset: float,
|
|
frame_number: int,
|
|
has_message: bool,
|
|
items: list,
|
|
) -> list[str]:
|
|
"""Process buffer through effect chain."""
|
|
if _effect_chain is None:
|
|
init_effects()
|
|
|
|
ctx = EffectContext(
|
|
terminal_width=w,
|
|
terminal_height=h,
|
|
scroll_cam=scroll_cam,
|
|
ticker_height=ticker_h,
|
|
mic_excess=mic_excess,
|
|
grad_offset=grad_offset,
|
|
frame_number=frame_number,
|
|
has_message=has_message,
|
|
items=items,
|
|
)
|
|
return _effect_chain.process(buf, ctx)
|
|
|
|
|
|
def get_effect_chain() -> EffectChain | None:
|
|
"""Get the effect chain instance."""
|
|
global _effect_chain
|
|
if _effect_chain is None:
|
|
init_effects()
|
|
return _effect_chain
|