""" Layer compositing — message overlay, ticker zone, firehose, noise. Depends on: config, render, effects. .. deprecated:: This module contains legacy rendering code. New pipeline code should use the Stage-based pipeline architecture instead. This module is maintained for backwards compatibility with the demo mode. """ import random import re import time from datetime import datetime from engine import config from engine.effects import ( EffectChain, EffectContext, fade_line, firehose_line, glitch_bar, noise, vis_offset, vis_trunc, ) from engine.render import big_wrap, lr_gradient, lr_gradient_opposite from engine.terminal import RST, W_COOL MSG_META = "\033[38;5;245m" MSG_BORDER = "\033[2;38;5;37m" def render_message_overlay( msg: tuple[str, str, float] | None, w: int, h: int, msg_cache: tuple, ) -> tuple[list[str], tuple]: """Render ntfy message overlay. Args: msg: (title, body, timestamp) or None w: terminal width h: terminal height msg_cache: (cache_key, rendered_rows) for caching Returns: (list of ANSI strings, updated cache) """ overlay = [] if msg is None: return overlay, msg_cache m_title, m_body, m_ts = msg display_text = m_body or m_title or "(empty)" display_text = re.sub(r"\s+", " ", display_text.upper()) cache_key = (display_text, w) if msg_cache[0] != cache_key: msg_rows = big_wrap(display_text, w - 4) msg_cache = (cache_key, msg_rows) else: msg_rows = msg_cache[1] msg_rows = lr_gradient_opposite( msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 ) elapsed_s = int(time.monotonic() - m_ts) remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) ts_str = datetime.now().strftime("%H:%M:%S") panel_h = len(msg_rows) + 2 panel_top = max(0, (h - panel_h) // 2) row_idx = 0 for mr in msg_rows: ln = vis_trunc(mr, w) overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") row_idx += 1 meta_parts = [] if m_title and m_title != m_body: meta_parts.append(m_title) meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") meta = ( " " + " \u00b7 ".join(meta_parts) if len(meta_parts) > 1 else " " + meta_parts[0] ) overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") row_idx += 1 bar = "\u2500" * (w - 4) overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") return overlay, msg_cache def render_ticker_zone( active: list, scroll_cam: int, camera_x: int = 0, ticker_h: int = 0, w: int = 80, noise_cache: dict | None = None, grad_offset: float = 0.0, ) -> tuple[list[str], dict]: """Render the ticker scroll zone. Args: active: list of (content_rows, color, canvas_y, meta_idx) scroll_cam: camera position (viewport top) camera_x: horizontal camera offset ticker_h: height of ticker zone w: terminal width noise_cache: dict of cy -> noise string grad_offset: gradient animation offset Returns: (list of ANSI strings, updated noise_cache) """ if noise_cache is None: noise_cache = {} buf = [] top_zone = max(1, int(ticker_h * 0.25)) bot_zone = max(1, int(ticker_h * 0.10)) def noise_at(cy): if cy not in noise_cache: noise_cache[cy] = noise(w) if random.random() < 0.15 else None return noise_cache[cy] for r in range(ticker_h): scr_row = r + 1 cy = scroll_cam + r top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 row_fade = min(top_f, bot_f) drawn = False for content, hc, by, midx in active: cr = cy - by if 0 <= cr < len(content): raw = content[cr] if cr != midx: colored = lr_gradient([raw], grad_offset)[0] else: colored = raw ln = vis_trunc(vis_offset(colored, camera_x), w) if row_fade < 1.0: ln = fade_line(ln, row_fade) if cr == midx: buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") elif ln.strip(): buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") else: buf.append(f"\033[{scr_row};1H\033[K") drawn = True break if not drawn: n = noise_at(cy) if row_fade < 1.0 and n: n = fade_line(n, row_fade) if n: buf.append(f"\033[{scr_row};1H{n}") else: buf.append(f"\033[{scr_row};1H\033[K") return buf, noise_cache def apply_glitch( buf: list[str], ticker_buf_start: int, mic_excess: float, w: int, ) -> list[str]: """Apply glitch effect to ticker buffer. Args: buf: current buffer ticker_buf_start: index where ticker starts in buffer mic_excess: mic level above threshold w: terminal width Returns: Updated buffer with glitches applied """ glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) n_hits = 4 + int(mic_excess / 2) ticker_buf_len = len(buf) - ticker_buf_start if random.random() < glitch_prob and ticker_buf_len > 0: for _ in range(min(n_hits, ticker_buf_len)): gi = random.randint(0, ticker_buf_len - 1) scr_row = gi + 1 buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" return buf def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: """Render firehose strip at bottom of screen.""" buf = [] if fh > 0: for fr in range(fh): scr_row = h - fh + fr + 1 fline = firehose_line(items, w) buf.append(f"\033[{scr_row};1H{fline}\033[K") return buf _effect_chain = None def init_effects() -> None: """Initialize effect plugins and chain.""" global _effect_chain from engine.effects import EffectChain, get_registry registry = get_registry() import effects_plugins effects_plugins.discover_plugins() chain = EffectChain(registry) chain.set_order(["noise", "fade", "glitch", "firehose"]) _effect_chain = chain def process_effects( buf: list[str], w: int, h: int, scroll_cam: int, ticker_h: int, camera_x: int = 0, mic_excess: float = 0.0, grad_offset: float = 0.0, frame_number: int = 0, has_message: bool = False, items: list | None = None, ) -> list[str]: """Process buffer through effect chain.""" if _effect_chain is None: init_effects() ctx = EffectContext( terminal_width=w, terminal_height=h, scroll_cam=scroll_cam, camera_x=camera_x, ticker_height=ticker_h, mic_excess=mic_excess, grad_offset=grad_offset, frame_number=frame_number, has_message=has_message, items=items or [], ) return _effect_chain.process(buf, ctx) def get_effect_chain() -> EffectChain | None: """Get the effect chain instance.""" global _effect_chain if _effect_chain is None: init_effects() return _effect_chain