""" Layer compositing — message overlay, ticker zone, firehose, noise. Depends on: config, render, effects. """ import random import re import time from datetime import datetime from engine import config from engine.effects import ( fade_line, firehose_line, glitch_bar, noise, vis_trunc, ) from engine.render import big_wrap, lr_gradient, lr_gradient_opposite from engine.terminal import RST, W_COOL MSG_META = "\033[38;5;245m" MSG_BORDER = "\033[2;38;5;37m" def render_message_overlay( msg: tuple[str, str, float] | None, w: int, h: int, msg_cache: tuple, ) -> tuple[list[str], tuple]: """Render ntfy message overlay. Args: msg: (title, body, timestamp) or None w: terminal width h: terminal height msg_cache: (cache_key, rendered_rows) for caching Returns: (list of ANSI strings, updated cache) """ overlay = [] if msg is None: return overlay, msg_cache m_title, m_body, m_ts = msg display_text = m_body or m_title or "(empty)" display_text = re.sub(r"\s+", " ", display_text.upper()) cache_key = (display_text, w) if msg_cache[0] != cache_key: msg_rows = big_wrap(display_text, w - 4) msg_cache = (cache_key, msg_rows) else: msg_rows = msg_cache[1] msg_rows = lr_gradient_opposite( msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 ) elapsed_s = int(time.monotonic() - m_ts) remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) ts_str = datetime.now().strftime("%H:%M:%S") panel_h = len(msg_rows) + 2 panel_top = max(0, (h - panel_h) // 2) row_idx = 0 for mr in msg_rows: ln = vis_trunc(mr, w) overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") row_idx += 1 meta_parts = [] if m_title and m_title != m_body: meta_parts.append(m_title) meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") meta = ( " " + " \u00b7 ".join(meta_parts) if len(meta_parts) > 1 else " " + meta_parts[0] ) overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") row_idx += 1 bar = "\u2500" * (w - 4) overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") return overlay, msg_cache def render_ticker_zone( active: list, scroll_cam: int, ticker_h: int, w: int, noise_cache: dict, grad_offset: float, ) -> tuple[list[str], dict]: """Render the ticker scroll zone. Args: active: list of (content_rows, color, canvas_y, meta_idx) scroll_cam: camera position (viewport top) ticker_h: height of ticker zone w: terminal width noise_cache: dict of cy -> noise string grad_offset: gradient animation offset Returns: (list of ANSI strings, updated noise_cache) """ buf = [] top_zone = max(1, int(ticker_h * 0.25)) bot_zone = max(1, int(ticker_h * 0.10)) def noise_at(cy): if cy not in noise_cache: noise_cache[cy] = noise(w) if random.random() < 0.15 else None return noise_cache[cy] for r in range(ticker_h): scr_row = r + 1 cy = scroll_cam + r top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 row_fade = min(top_f, bot_f) drawn = False for content, hc, by, midx in active: cr = cy - by if 0 <= cr < len(content): raw = content[cr] if cr != midx: colored = lr_gradient([raw], grad_offset)[0] else: colored = raw ln = vis_trunc(colored, w) if row_fade < 1.0: ln = fade_line(ln, row_fade) if cr == midx: buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") elif ln.strip(): buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") else: buf.append(f"\033[{scr_row};1H\033[K") drawn = True break if not drawn: n = noise_at(cy) if row_fade < 1.0 and n: n = fade_line(n, row_fade) if n: buf.append(f"\033[{scr_row};1H{n}") else: buf.append(f"\033[{scr_row};1H\033[K") return buf, noise_cache def apply_glitch( buf: list[str], ticker_buf_start: int, mic_excess: float, w: int, ) -> list[str]: """Apply glitch effect to ticker buffer. Args: buf: current buffer ticker_buf_start: index where ticker starts in buffer mic_excess: mic level above threshold w: terminal width Returns: Updated buffer with glitches applied """ glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) n_hits = 4 + int(mic_excess / 2) ticker_buf_len = len(buf) - ticker_buf_start if random.random() < glitch_prob and ticker_buf_len > 0: for _ in range(min(n_hits, ticker_buf_len)): gi = random.randint(0, ticker_buf_len - 1) scr_row = gi + 1 buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" return buf def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: """Render firehose strip at bottom of screen.""" buf = [] if fh > 0: for fr in range(fh): scr_row = h - fh + fr + 1 fline = firehose_line(items, w) buf.append(f"\033[{scr_row};1H{fline}\033[K") return buf