""" Figment overlay effect for modern pipeline architecture. Provides periodic SVG glyph overlays with reveal/hold/dissolve animation phases. Integrates directly with the pipeline's effect system without legacy dependencies. """ import random from dataclasses import dataclass from enum import Enum, auto from pathlib import Path from engine import config from engine.effects.types import EffectConfig, EffectContext, EffectPlugin from engine.figment_render import rasterize_svg from engine.figment_trigger import FigmentAction, FigmentCommand, FigmentTrigger from engine.terminal import RST from engine.themes import THEME_REGISTRY class FigmentPhase(Enum): """Animation phases for figment overlay.""" REVEAL = auto() HOLD = auto() DISSOLVE = auto() @dataclass class FigmentState: """State of a figment overlay at a given frame.""" phase: FigmentPhase progress: float rows: list[str] gradient: list[int] center_row: int center_col: int def _color_codes_to_ansi(gradient: list[int]) -> list[str]: """Convert gradient list to ANSI color codes. Args: gradient: List of 256-color palette codes Returns: List of ANSI escape code strings """ codes = [] for color in gradient: if isinstance(color, int): codes.append(f"\033[38;5;{color}m") else: # Fallback to green codes.append("\033[38;5;46m") return codes if codes else ["\033[38;5;46m"] def render_figment_overlay(figment_state: FigmentState, w: int, h: int) -> list[str]: """Render figment overlay as ANSI cursor-positioning commands. Args: figment_state: FigmentState with phase, progress, rows, gradient, centering. w: terminal width h: terminal height Returns: List of ANSI strings to append to display buffer. """ rows = figment_state.rows if not rows: return [] phase = figment_state.phase progress = figment_state.progress gradient = figment_state.gradient center_row = figment_state.center_row center_col = figment_state.center_col cols = _color_codes_to_ansi(gradient) # Build a list of non-space cell positions cell_positions = [] for r_idx, row in enumerate(rows): for c_idx, ch in enumerate(row): if ch != " ": cell_positions.append((r_idx, c_idx)) n_cells = len(cell_positions) if n_cells == 0: return [] # Use a deterministic seed so the reveal/dissolve pattern is stable per-figment rng = random.Random(hash(tuple(rows[0][:10])) if rows[0] else 42) shuffled = list(cell_positions) rng.shuffle(shuffled) # Phase-dependent visibility if phase == FigmentPhase.REVEAL: visible_count = int(n_cells * progress) visible = set(shuffled[:visible_count]) elif phase == FigmentPhase.HOLD: visible = set(cell_positions) # Strobe: dim some cells periodically if int(progress * 20) % 3 == 0: dim_count = int(n_cells * 0.3) visible -= set(shuffled[:dim_count]) elif phase == FigmentPhase.DISSOLVE: remaining_count = int(n_cells * (1.0 - progress)) visible = set(shuffled[:remaining_count]) else: visible = set(cell_positions) # Build overlay commands overlay: list[str] = [] n_cols = len(cols) max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1) for r_idx, row in enumerate(rows): scr_row = center_row + r_idx + 1 # 1-indexed if scr_row < 1 or scr_row > h: continue line_buf: list[str] = [] has_content = False for c_idx, ch in enumerate(row): scr_col = center_col + c_idx + 1 if scr_col < 1 or scr_col > w: continue if ch != " " and (r_idx, c_idx) in visible: # Apply gradient color shifted = (c_idx / max(max_x - 1, 1)) % 1.0 idx = min(round(shifted * (n_cols - 1)), n_cols - 1) line_buf.append(f"{cols[idx]}{ch}{RST}") has_content = True else: line_buf.append(" ") if has_content: line_str = "".join(line_buf).rstrip() if line_str.strip(): overlay.append(f"\033[{scr_row};{center_col + 1}H{line_str}{RST}") return overlay class FigmentEffect(EffectPlugin): """Figment overlay effect for pipeline architecture. Provides periodic SVG overlays with reveal/hold/dissolve animation. """ name = "figment" config = EffectConfig( enabled=True, intensity=1.0, params={ "interval_secs": 60, "display_secs": 4.5, "figment_dir": "figments", }, ) supports_partial_updates = False is_overlay = True # Figment is an overlay effect that composes on top of the buffer def __init__( self, figment_dir: str | None = None, triggers: list[FigmentTrigger] | None = None, ): self.config = EffectConfig( enabled=True, intensity=1.0, params={ "interval_secs": 60, "display_secs": 4.5, "figment_dir": figment_dir or "figments", }, ) self._triggers = triggers or [] self._phase: FigmentPhase | None = None self._progress: float = 0.0 self._rows: list[str] = [] self._gradient: list[int] = [] self._center_row: int = 0 self._center_col: int = 0 self._timer: float = 0.0 self._last_svg: str | None = None self._svg_files: list[str] = [] self._scan_svgs() def _scan_svgs(self) -> None: """Scan figment directory for SVG files.""" figment_dir = Path(self.config.params["figment_dir"]) if figment_dir.is_dir(): self._svg_files = sorted(str(p) for p in figment_dir.glob("*.svg")) def process(self, buf: list[str], ctx: EffectContext) -> list[str]: """Add figment overlay to buffer.""" if not self.config.enabled: return buf # Get figment state using frame number from context figment_state = self.get_figment_state( ctx.frame_number, ctx.terminal_width, ctx.terminal_height ) if figment_state: # Render overlay and append to buffer overlay = render_figment_overlay( figment_state, ctx.terminal_width, ctx.terminal_height ) buf = buf + overlay return buf def configure(self, config: EffectConfig) -> None: """Configure the effect.""" # Preserve figment_dir if the new config doesn't supply one figment_dir = config.params.get( "figment_dir", self.config.params.get("figment_dir", "figments") ) self.config = config if "figment_dir" not in self.config.params: self.config.params["figment_dir"] = figment_dir self._scan_svgs() def trigger(self, w: int, h: int) -> None: """Manually trigger a figment display.""" if not self._svg_files: return # Pick a random SVG, avoid repeating candidates = [s for s in self._svg_files if s != self._last_svg] if not candidates: candidates = self._svg_files svg_path = random.choice(candidates) self._last_svg = svg_path # Rasterize try: self._rows = rasterize_svg(svg_path, w, h) except Exception: return # Pick random theme gradient theme_key = random.choice(list(THEME_REGISTRY.keys())) self._gradient = THEME_REGISTRY[theme_key].main_gradient # Center in viewport figment_h = len(self._rows) figment_w = max((len(r) for r in self._rows), default=0) self._center_row = max(0, (h - figment_h) // 2) self._center_col = max(0, (w - figment_w) // 2) # Start reveal phase self._phase = FigmentPhase.REVEAL self._progress = 0.0 def get_figment_state( self, frame_number: int, w: int, h: int ) -> FigmentState | None: """Tick the state machine and return current state, or None if idle.""" if not self.config.enabled: return None # Poll triggers for trig in self._triggers: cmd = trig.poll() if cmd is not None: self._handle_command(cmd, w, h) # Tick timer when idle if self._phase is None: self._timer += config.FRAME_DT interval = self.config.params.get("interval_secs", 60) if self._timer >= interval: self._timer = 0.0 self.trigger(w, h) # Tick animation — snapshot current phase/progress, then advance if self._phase is not None: # Capture the state at the start of this frame current_phase = self._phase current_progress = self._progress # Advance for next frame display_secs = self.config.params.get("display_secs", 4.5) phase_duration = display_secs / 3.0 self._progress += config.FRAME_DT / phase_duration if self._progress >= 1.0: self._progress = 0.0 if self._phase == FigmentPhase.REVEAL: self._phase = FigmentPhase.HOLD elif self._phase == FigmentPhase.HOLD: self._phase = FigmentPhase.DISSOLVE elif self._phase == FigmentPhase.DISSOLVE: self._phase = None return FigmentState( phase=current_phase, progress=current_progress, rows=self._rows, gradient=self._gradient, center_row=self._center_row, center_col=self._center_col, ) return None def _handle_command(self, cmd: FigmentCommand, w: int, h: int) -> None: """Handle a figment command.""" if cmd.action == FigmentAction.TRIGGER: self.trigger(w, h) elif cmd.action == FigmentAction.SET_INTENSITY and isinstance( cmd.value, (int, float) ): self.config.intensity = float(cmd.value) elif cmd.action == FigmentAction.SET_INTERVAL and isinstance( cmd.value, (int, float) ): self.config.params["interval_secs"] = float(cmd.value) elif cmd.action == FigmentAction.SET_COLOR and isinstance(cmd.value, str): if cmd.value in THEME_REGISTRY: self._gradient = THEME_REGISTRY[cmd.value].main_gradient elif cmd.action == FigmentAction.STOP: self._phase = None self._progress = 0.0