diff --git a/engine/pipeline/adapters/message_overlay.py b/engine/pipeline/adapters/message_overlay.py new file mode 100644 index 0000000..2433a38 --- /dev/null +++ b/engine/pipeline/adapters/message_overlay.py @@ -0,0 +1,185 @@ +""" +Message overlay stage - Renders ntfy messages as an overlay on the buffer. + +This stage provides message overlay capability for displaying ntfy.sh messages +as a centered panel with pink/magenta gradient, matching upstream/main aesthetics. +""" + +import re +import time +from dataclasses import dataclass +from datetime import datetime + +from engine import config +from engine.effects.legacy import vis_trunc +from engine.pipeline.core import DataType, PipelineContext, Stage +from engine.render.blocks import big_wrap +from engine.render.gradient import msg_gradient + + +@dataclass +class MessageOverlayConfig: + """Configuration for MessageOverlayStage.""" + + enabled: bool = True + display_secs: int = 30 # How long to display messages + topic_url: str | None = None # Ntfy topic URL (None = use config default) + + +class MessageOverlayStage(Stage): + """Stage that renders ntfy message overlay on the buffer. + + Provides: + - message.overlay capability (optional) + - Renders centered panel with pink/magenta gradient + - Shows title, body, timestamp, and remaining time + """ + + name = "message_overlay" + category = "overlay" + + def __init__( + self, config: MessageOverlayConfig | None = None, name: str = "message_overlay" + ): + self.config = config or MessageOverlayConfig() + self._ntfy_poller = None + self._msg_cache = (None, None) # (cache_key, rendered_rows) + + @property + def capabilities(self) -> set[str]: + """Provides message overlay capability.""" + return {"message.overlay"} if self.config.enabled else set() + + @property + def dependencies(self) -> set[str]: + """Needs rendered buffer and camera transformation to overlay onto.""" + return {"render.output", "camera"} + + @property + def inlet_types(self) -> set: + return {DataType.TEXT_BUFFER} + + @property + def outlet_types(self) -> set: + return {DataType.TEXT_BUFFER} + + def init(self, ctx: PipelineContext) -> bool: + """Initialize ntfy poller if topic URL is configured.""" + if not self.config.enabled: + return True + + # Get or create ntfy poller + topic_url = self.config.topic_url or config.NTFY_TOPIC + if topic_url: + from engine.ntfy import NtfyPoller + + self._ntfy_poller = NtfyPoller( + topic_url=topic_url, + reconnect_delay=getattr(config, "NTFY_RECONNECT_DELAY", 5), + display_secs=self.config.display_secs, + ) + self._ntfy_poller.start() + ctx.set("ntfy_poller", self._ntfy_poller) + + return True + + def process(self, data: list[str], ctx: PipelineContext) -> list[str]: + """Render message overlay on the buffer.""" + if not self.config.enabled or not data: + return data + + # Get active message from poller + msg = None + if self._ntfy_poller: + msg = self._ntfy_poller.get_active_message() + + if msg is None: + return data + + # Render overlay + w = ctx.terminal_width if hasattr(ctx, "terminal_width") else 80 + h = ctx.terminal_height if hasattr(ctx, "terminal_height") else 24 + + overlay, self._msg_cache = self._render_message_overlay( + msg, w, h, self._msg_cache + ) + + # Composite overlay onto buffer + result = list(data) + for line in overlay: + # Overlay uses ANSI cursor positioning, just append + result.append(line) + + return result + + def _render_message_overlay( + self, + msg: tuple[str, str, float] | None, + w: int, + h: int, + msg_cache: tuple, + ) -> tuple[list[str], tuple]: + """Render ntfy message overlay. + + Args: + msg: (title, body, timestamp) or None + w: terminal width + h: terminal height + msg_cache: (cache_key, rendered_rows) for caching + + Returns: + (list of ANSI strings, updated cache) + """ + overlay = [] + if msg is None: + return overlay, msg_cache + + m_title, m_body, m_ts = msg + display_text = m_body or m_title or "(empty)" + display_text = re.sub(r"\s+", " ", display_text.upper()) + + cache_key = (display_text, w) + if msg_cache[0] != cache_key: + msg_rows = big_wrap(display_text, w - 4) + msg_cache = (cache_key, msg_rows) + else: + msg_rows = msg_cache[1] + + msg_rows = msg_gradient(msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0) + + elapsed_s = int(time.monotonic() - m_ts) + remaining = max(0, self.config.display_secs - elapsed_s) + ts_str = datetime.now().strftime("%H:%M:%S") + panel_h = len(msg_rows) + 2 + panel_top = max(0, (h - panel_h) // 2) + + row_idx = 0 + for mr in msg_rows: + ln = vis_trunc(mr, w) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") + row_idx += 1 + + meta_parts = [] + if m_title and m_title != m_body: + meta_parts.append(m_title) + meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") + meta = ( + " " + " \u00b7 ".join(meta_parts) + if len(meta_parts) > 1 + else " " + meta_parts[0] + ) + overlay.append( + f"\033[{panel_top + row_idx + 1};1H\033[38;5;245m{meta}\033[0m\033[K" + ) + row_idx += 1 + + bar = "\u2500" * (w - 4) + overlay.append( + f"\033[{panel_top + row_idx + 1};1H \033[2;38;5;37m{bar}\033[0m\033[K" + ) + + return overlay, msg_cache + + def cleanup(self) -> None: + """Cleanup resources.""" + pass