feat(message-overlay): Add MessageOverlayStage for pipeline integration
- Create MessageOverlayStage adapter for ntfy message overlay - Integrates NtfyPoller with pipeline architecture - Uses centered panel with pink/magenta gradient for messages - Provides message.overlay capability
This commit is contained in:
185
engine/pipeline/adapters/message_overlay.py
Normal file
185
engine/pipeline/adapters/message_overlay.py
Normal file
@@ -0,0 +1,185 @@
|
|||||||
|
"""
|
||||||
|
Message overlay stage - Renders ntfy messages as an overlay on the buffer.
|
||||||
|
|
||||||
|
This stage provides message overlay capability for displaying ntfy.sh messages
|
||||||
|
as a centered panel with pink/magenta gradient, matching upstream/main aesthetics.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from engine import config
|
||||||
|
from engine.effects.legacy import vis_trunc
|
||||||
|
from engine.pipeline.core import DataType, PipelineContext, Stage
|
||||||
|
from engine.render.blocks import big_wrap
|
||||||
|
from engine.render.gradient import msg_gradient
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MessageOverlayConfig:
|
||||||
|
"""Configuration for MessageOverlayStage."""
|
||||||
|
|
||||||
|
enabled: bool = True
|
||||||
|
display_secs: int = 30 # How long to display messages
|
||||||
|
topic_url: str | None = None # Ntfy topic URL (None = use config default)
|
||||||
|
|
||||||
|
|
||||||
|
class MessageOverlayStage(Stage):
|
||||||
|
"""Stage that renders ntfy message overlay on the buffer.
|
||||||
|
|
||||||
|
Provides:
|
||||||
|
- message.overlay capability (optional)
|
||||||
|
- Renders centered panel with pink/magenta gradient
|
||||||
|
- Shows title, body, timestamp, and remaining time
|
||||||
|
"""
|
||||||
|
|
||||||
|
name = "message_overlay"
|
||||||
|
category = "overlay"
|
||||||
|
|
||||||
|
def __init__(
|
||||||
|
self, config: MessageOverlayConfig | None = None, name: str = "message_overlay"
|
||||||
|
):
|
||||||
|
self.config = config or MessageOverlayConfig()
|
||||||
|
self._ntfy_poller = None
|
||||||
|
self._msg_cache = (None, None) # (cache_key, rendered_rows)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def capabilities(self) -> set[str]:
|
||||||
|
"""Provides message overlay capability."""
|
||||||
|
return {"message.overlay"} if self.config.enabled else set()
|
||||||
|
|
||||||
|
@property
|
||||||
|
def dependencies(self) -> set[str]:
|
||||||
|
"""Needs rendered buffer and camera transformation to overlay onto."""
|
||||||
|
return {"render.output", "camera"}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def inlet_types(self) -> set:
|
||||||
|
return {DataType.TEXT_BUFFER}
|
||||||
|
|
||||||
|
@property
|
||||||
|
def outlet_types(self) -> set:
|
||||||
|
return {DataType.TEXT_BUFFER}
|
||||||
|
|
||||||
|
def init(self, ctx: PipelineContext) -> bool:
|
||||||
|
"""Initialize ntfy poller if topic URL is configured."""
|
||||||
|
if not self.config.enabled:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Get or create ntfy poller
|
||||||
|
topic_url = self.config.topic_url or config.NTFY_TOPIC
|
||||||
|
if topic_url:
|
||||||
|
from engine.ntfy import NtfyPoller
|
||||||
|
|
||||||
|
self._ntfy_poller = NtfyPoller(
|
||||||
|
topic_url=topic_url,
|
||||||
|
reconnect_delay=getattr(config, "NTFY_RECONNECT_DELAY", 5),
|
||||||
|
display_secs=self.config.display_secs,
|
||||||
|
)
|
||||||
|
self._ntfy_poller.start()
|
||||||
|
ctx.set("ntfy_poller", self._ntfy_poller)
|
||||||
|
|
||||||
|
return True
|
||||||
|
|
||||||
|
def process(self, data: list[str], ctx: PipelineContext) -> list[str]:
|
||||||
|
"""Render message overlay on the buffer."""
|
||||||
|
if not self.config.enabled or not data:
|
||||||
|
return data
|
||||||
|
|
||||||
|
# Get active message from poller
|
||||||
|
msg = None
|
||||||
|
if self._ntfy_poller:
|
||||||
|
msg = self._ntfy_poller.get_active_message()
|
||||||
|
|
||||||
|
if msg is None:
|
||||||
|
return data
|
||||||
|
|
||||||
|
# Render overlay
|
||||||
|
w = ctx.terminal_width if hasattr(ctx, "terminal_width") else 80
|
||||||
|
h = ctx.terminal_height if hasattr(ctx, "terminal_height") else 24
|
||||||
|
|
||||||
|
overlay, self._msg_cache = self._render_message_overlay(
|
||||||
|
msg, w, h, self._msg_cache
|
||||||
|
)
|
||||||
|
|
||||||
|
# Composite overlay onto buffer
|
||||||
|
result = list(data)
|
||||||
|
for line in overlay:
|
||||||
|
# Overlay uses ANSI cursor positioning, just append
|
||||||
|
result.append(line)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _render_message_overlay(
|
||||||
|
self,
|
||||||
|
msg: tuple[str, str, float] | None,
|
||||||
|
w: int,
|
||||||
|
h: int,
|
||||||
|
msg_cache: tuple,
|
||||||
|
) -> tuple[list[str], tuple]:
|
||||||
|
"""Render ntfy message overlay.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg: (title, body, timestamp) or None
|
||||||
|
w: terminal width
|
||||||
|
h: terminal height
|
||||||
|
msg_cache: (cache_key, rendered_rows) for caching
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list of ANSI strings, updated cache)
|
||||||
|
"""
|
||||||
|
overlay = []
|
||||||
|
if msg is None:
|
||||||
|
return overlay, msg_cache
|
||||||
|
|
||||||
|
m_title, m_body, m_ts = msg
|
||||||
|
display_text = m_body or m_title or "(empty)"
|
||||||
|
display_text = re.sub(r"\s+", " ", display_text.upper())
|
||||||
|
|
||||||
|
cache_key = (display_text, w)
|
||||||
|
if msg_cache[0] != cache_key:
|
||||||
|
msg_rows = big_wrap(display_text, w - 4)
|
||||||
|
msg_cache = (cache_key, msg_rows)
|
||||||
|
else:
|
||||||
|
msg_rows = msg_cache[1]
|
||||||
|
|
||||||
|
msg_rows = msg_gradient(msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0)
|
||||||
|
|
||||||
|
elapsed_s = int(time.monotonic() - m_ts)
|
||||||
|
remaining = max(0, self.config.display_secs - elapsed_s)
|
||||||
|
ts_str = datetime.now().strftime("%H:%M:%S")
|
||||||
|
panel_h = len(msg_rows) + 2
|
||||||
|
panel_top = max(0, (h - panel_h) // 2)
|
||||||
|
|
||||||
|
row_idx = 0
|
||||||
|
for mr in msg_rows:
|
||||||
|
ln = vis_trunc(mr, w)
|
||||||
|
overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K")
|
||||||
|
row_idx += 1
|
||||||
|
|
||||||
|
meta_parts = []
|
||||||
|
if m_title and m_title != m_body:
|
||||||
|
meta_parts.append(m_title)
|
||||||
|
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
|
||||||
|
meta = (
|
||||||
|
" " + " \u00b7 ".join(meta_parts)
|
||||||
|
if len(meta_parts) > 1
|
||||||
|
else " " + meta_parts[0]
|
||||||
|
)
|
||||||
|
overlay.append(
|
||||||
|
f"\033[{panel_top + row_idx + 1};1H\033[38;5;245m{meta}\033[0m\033[K"
|
||||||
|
)
|
||||||
|
row_idx += 1
|
||||||
|
|
||||||
|
bar = "\u2500" * (w - 4)
|
||||||
|
overlay.append(
|
||||||
|
f"\033[{panel_top + row_idx + 1};1H \033[2;38;5;37m{bar}\033[0m\033[K"
|
||||||
|
)
|
||||||
|
|
||||||
|
return overlay, msg_cache
|
||||||
|
|
||||||
|
def cleanup(self) -> None:
|
||||||
|
"""Cleanup resources."""
|
||||||
|
pass
|
||||||
Reference in New Issue
Block a user