""" Layer compositing — message overlay, ticker zone, firehose, noise. Depends on: config, render, effects. """ import random import re import time from datetime import datetime from engine import config from engine.effects import ( EffectChain, EffectContext, fade_line, firehose_line, glitch_bar, noise, vis_trunc, ) from engine.render import big_wrap, lr_gradient, msg_gradient from engine.terminal import RST, W_COOL MSG_META = "\033[38;5;245m" MSG_BORDER = "\033[2;38;5;37m" def render_message_overlay( msg: tuple[str, str, float] | None, w: int, h: int, msg_cache: tuple, ) -> tuple[list[str], tuple]: """Render ntfy message overlay. Args: msg: (title, body, timestamp) or None w: terminal width h: terminal height msg_cache: (cache_key, rendered_rows) for caching Returns: (list of ANSI strings, updated cache) """ overlay = [] if msg is None: return overlay, msg_cache m_title, m_body, m_ts = msg display_text = m_body or m_title or "(empty)" display_text = re.sub(r"\s+", " ", display_text.upper()) cache_key = (display_text, w) if msg_cache[0] != cache_key: msg_rows = big_wrap(display_text, w - 4) msg_cache = (cache_key, msg_rows) else: msg_rows = msg_cache[1] msg_rows = msg_gradient( msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0 ) elapsed_s = int(time.monotonic() - m_ts) remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) ts_str = datetime.now().strftime("%H:%M:%S") panel_h = len(msg_rows) + 2 panel_top = max(0, (h - panel_h) // 2) row_idx = 0 for mr in msg_rows: ln = vis_trunc(mr, w) overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") row_idx += 1 meta_parts = [] if m_title and m_title != m_body: meta_parts.append(m_title) meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") meta = ( " " + " \u00b7 ".join(meta_parts) if len(meta_parts) > 1 else " " + meta_parts[0] ) overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") row_idx += 1 bar = "\u2500" * (w - 4) overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") return overlay, msg_cache def render_ticker_zone( active: list, scroll_cam: int, ticker_h: int, w: int, noise_cache: dict, grad_offset: float, ) -> tuple[list[str], dict]: """Render the ticker scroll zone. Args: active: list of (content_rows, color, canvas_y, meta_idx) scroll_cam: camera position (viewport top) ticker_h: height of ticker zone w: terminal width noise_cache: dict of cy -> noise string grad_offset: gradient animation offset Returns: (list of ANSI strings, updated noise_cache) """ buf = [] top_zone = max(1, int(ticker_h * 0.25)) bot_zone = max(1, int(ticker_h * 0.10)) def noise_at(cy): if cy not in noise_cache: noise_cache[cy] = noise(w) if random.random() < 0.15 else None return noise_cache[cy] for r in range(ticker_h): scr_row = r + 1 cy = scroll_cam + r top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 row_fade = min(top_f, bot_f) drawn = False for content, hc, by, midx in active: cr = cy - by if 0 <= cr < len(content): raw = content[cr] if cr != midx: colored = lr_gradient([raw], grad_offset)[0] else: colored = raw ln = vis_trunc(colored, w) if row_fade < 1.0: ln = fade_line(ln, row_fade) if cr == midx: buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") elif ln.strip(): buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") else: buf.append(f"\033[{scr_row};1H\033[K") drawn = True break if not drawn: n = noise_at(cy) if row_fade < 1.0 and n: n = fade_line(n, row_fade) if n: buf.append(f"\033[{scr_row};1H{n}") else: buf.append(f"\033[{scr_row};1H\033[K") return buf, noise_cache def apply_glitch( buf: list[str], ticker_buf_start: int, mic_excess: float, w: int, ) -> list[str]: """Apply glitch effect to ticker buffer. Args: buf: current buffer ticker_buf_start: index where ticker starts in buffer mic_excess: mic level above threshold w: terminal width Returns: Updated buffer with glitches applied """ glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) n_hits = 4 + int(mic_excess / 2) ticker_buf_len = len(buf) - ticker_buf_start if random.random() < glitch_prob and ticker_buf_len > 0: for _ in range(min(n_hits, ticker_buf_len)): gi = random.randint(0, ticker_buf_len - 1) scr_row = gi + 1 buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" return buf def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: """Render firehose strip at bottom of screen.""" buf = [] if fh > 0: for fr in range(fh): scr_row = h - fh + fr + 1 fline = firehose_line(items, w) buf.append(f"\033[{scr_row};1H{fline}\033[K") return buf _effect_chain = None def init_effects() -> None: """Initialize effect plugins and chain.""" global _effect_chain from engine.effects import EffectChain, get_registry registry = get_registry() import effects_plugins effects_plugins.discover_plugins() chain = EffectChain(registry) chain.set_order(["noise", "fade", "glitch", "firehose"]) _effect_chain = chain def process_effects( buf: list[str], w: int, h: int, scroll_cam: int, ticker_h: int, mic_excess: float, grad_offset: float, frame_number: int, has_message: bool, items: list, ) -> list[str]: """Process buffer through effect chain.""" if _effect_chain is None: init_effects() ctx = EffectContext( terminal_width=w, terminal_height=h, scroll_cam=scroll_cam, ticker_height=ticker_h, mic_excess=mic_excess, grad_offset=grad_offset, frame_number=frame_number, has_message=has_message, items=items, ) return _effect_chain.process(buf, ctx) def get_effect_chain() -> EffectChain | None: """Get the effect chain instance.""" global _effect_chain if _effect_chain is None: init_effects() return _effect_chain def render_figment_overlay( figment_state, w: int, h: int, ) -> list[str]: """Render figment overlay as ANSI cursor-positioning commands. Args: figment_state: FigmentState with phase, progress, rows, gradient, centering. w: terminal width h: terminal height Returns: List of ANSI strings to append to display buffer. """ from engine.render import _color_codes_to_ansi rows = figment_state.rows if not rows: return [] phase = figment_state.phase progress = figment_state.progress gradient = figment_state.gradient center_row = figment_state.center_row center_col = figment_state.center_col cols = _color_codes_to_ansi(gradient) # Build a list of non-space cell positions cell_positions = [] for r_idx, row in enumerate(rows): for c_idx, ch in enumerate(row): if ch != " ": cell_positions.append((r_idx, c_idx)) n_cells = len(cell_positions) if n_cells == 0: return [] # Use a deterministic seed so the reveal/dissolve pattern is stable per-figment rng = random.Random(hash(tuple(rows[0][:10])) if rows[0] else 42) shuffled = list(cell_positions) rng.shuffle(shuffled) # Phase-dependent visibility from effects_plugins.figment import FigmentPhase if phase == FigmentPhase.REVEAL: visible_count = int(n_cells * progress) visible = set(shuffled[:visible_count]) elif phase == FigmentPhase.HOLD: visible = set(cell_positions) # Strobe: dim some cells periodically if int(progress * 20) % 3 == 0: dim_count = int(n_cells * 0.3) visible -= set(shuffled[:dim_count]) elif phase == FigmentPhase.DISSOLVE: remaining_count = int(n_cells * (1.0 - progress)) visible = set(shuffled[:remaining_count]) else: visible = set(cell_positions) # Build overlay commands overlay: list[str] = [] n_cols = len(cols) max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1) for r_idx, row in enumerate(rows): scr_row = center_row + r_idx + 1 # 1-indexed if scr_row < 1 or scr_row > h: continue line_buf: list[str] = [] has_content = False for c_idx, ch in enumerate(row): scr_col = center_col + c_idx + 1 if scr_col < 1 or scr_col > w: continue if ch != " " and (r_idx, c_idx) in visible: # Apply gradient color shifted = (c_idx / max(max_x - 1, 1)) % 1.0 idx = min(round(shifted * (n_cols - 1)), n_cols - 1) line_buf.append(f"{cols[idx]}{ch}{RST}") has_content = True else: line_buf.append(" ") if has_content: line_str = "".join(line_buf).rstrip() if line_str.strip(): overlay.append( f"\033[{scr_row};{center_col + 1}H{line_str}{RST}" ) return overlay