- Add engine/effects/plugins/figment.py (native pipeline implementation) - Add engine/figment_render.py, engine/figment_trigger.py, engine/themes.py - Add 3 SVG assets in figments/ (Mexican/Aztec motif) - Add engine/display/backends/animation_report.py for debugging - Add engine/pipeline/adapters/frame_capture.py for frame capture - Add test-figment preset to presets.toml - Add cairosvg optional dependency to pyproject.toml - Update EffectPluginStage to support is_overlay attribute (for overlay effects) - Add comprehensive tests: test_figment_effect.py, test_figment_pipeline.py, test_figment_render.py - Remove obsolete test_ui_simple.py - Update TODO.md with test cleanup plan - Refactor test_adapters.py to use real components instead of mocks This completes the figment SVG overlay feature integration using the modern pipeline architecture, avoiding legacy effects_plugins. All tests pass (758 total).
333 lines
11 KiB
Python
333 lines
11 KiB
Python
"""
|
|
Figment overlay effect for modern pipeline architecture.
|
|
|
|
Provides periodic SVG glyph overlays with reveal/hold/dissolve animation phases.
|
|
Integrates directly with the pipeline's effect system without legacy dependencies.
|
|
"""
|
|
|
|
import random
|
|
from dataclasses import dataclass
|
|
from enum import Enum, auto
|
|
from pathlib import Path
|
|
|
|
from engine import config
|
|
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
|
from engine.figment_render import rasterize_svg
|
|
from engine.figment_trigger import FigmentAction, FigmentCommand, FigmentTrigger
|
|
from engine.terminal import RST
|
|
from engine.themes import THEME_REGISTRY
|
|
|
|
|
|
class FigmentPhase(Enum):
|
|
"""Animation phases for figment overlay."""
|
|
|
|
REVEAL = auto()
|
|
HOLD = auto()
|
|
DISSOLVE = auto()
|
|
|
|
|
|
@dataclass
|
|
class FigmentState:
|
|
"""State of a figment overlay at a given frame."""
|
|
|
|
phase: FigmentPhase
|
|
progress: float
|
|
rows: list[str]
|
|
gradient: list[int]
|
|
center_row: int
|
|
center_col: int
|
|
|
|
|
|
def _color_codes_to_ansi(gradient: list[int]) -> list[str]:
|
|
"""Convert gradient list to ANSI color codes.
|
|
|
|
Args:
|
|
gradient: List of 256-color palette codes
|
|
|
|
Returns:
|
|
List of ANSI escape code strings
|
|
"""
|
|
codes = []
|
|
for color in gradient:
|
|
if isinstance(color, int):
|
|
codes.append(f"\033[38;5;{color}m")
|
|
else:
|
|
# Fallback to green
|
|
codes.append("\033[38;5;46m")
|
|
return codes if codes else ["\033[38;5;46m"]
|
|
|
|
|
|
def render_figment_overlay(figment_state: FigmentState, w: int, h: int) -> list[str]:
|
|
"""Render figment overlay as ANSI cursor-positioning commands.
|
|
|
|
Args:
|
|
figment_state: FigmentState with phase, progress, rows, gradient, centering.
|
|
w: terminal width
|
|
h: terminal height
|
|
|
|
Returns:
|
|
List of ANSI strings to append to display buffer.
|
|
"""
|
|
rows = figment_state.rows
|
|
if not rows:
|
|
return []
|
|
|
|
phase = figment_state.phase
|
|
progress = figment_state.progress
|
|
gradient = figment_state.gradient
|
|
center_row = figment_state.center_row
|
|
center_col = figment_state.center_col
|
|
|
|
cols = _color_codes_to_ansi(gradient)
|
|
|
|
# Build a list of non-space cell positions
|
|
cell_positions = []
|
|
for r_idx, row in enumerate(rows):
|
|
for c_idx, ch in enumerate(row):
|
|
if ch != " ":
|
|
cell_positions.append((r_idx, c_idx))
|
|
|
|
n_cells = len(cell_positions)
|
|
if n_cells == 0:
|
|
return []
|
|
|
|
# Use a deterministic seed so the reveal/dissolve pattern is stable per-figment
|
|
rng = random.Random(hash(tuple(rows[0][:10])) if rows[0] else 42)
|
|
shuffled = list(cell_positions)
|
|
rng.shuffle(shuffled)
|
|
|
|
# Phase-dependent visibility
|
|
if phase == FigmentPhase.REVEAL:
|
|
visible_count = int(n_cells * progress)
|
|
visible = set(shuffled[:visible_count])
|
|
elif phase == FigmentPhase.HOLD:
|
|
visible = set(cell_positions)
|
|
# Strobe: dim some cells periodically
|
|
if int(progress * 20) % 3 == 0:
|
|
dim_count = int(n_cells * 0.3)
|
|
visible -= set(shuffled[:dim_count])
|
|
elif phase == FigmentPhase.DISSOLVE:
|
|
remaining_count = int(n_cells * (1.0 - progress))
|
|
visible = set(shuffled[:remaining_count])
|
|
else:
|
|
visible = set(cell_positions)
|
|
|
|
# Build overlay commands
|
|
overlay: list[str] = []
|
|
n_cols = len(cols)
|
|
max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1)
|
|
|
|
for r_idx, row in enumerate(rows):
|
|
scr_row = center_row + r_idx + 1 # 1-indexed
|
|
if scr_row < 1 or scr_row > h:
|
|
continue
|
|
|
|
line_buf: list[str] = []
|
|
has_content = False
|
|
|
|
for c_idx, ch in enumerate(row):
|
|
scr_col = center_col + c_idx + 1
|
|
if scr_col < 1 or scr_col > w:
|
|
continue
|
|
|
|
if ch != " " and (r_idx, c_idx) in visible:
|
|
# Apply gradient color
|
|
shifted = (c_idx / max(max_x - 1, 1)) % 1.0
|
|
idx = min(round(shifted * (n_cols - 1)), n_cols - 1)
|
|
line_buf.append(f"{cols[idx]}{ch}{RST}")
|
|
has_content = True
|
|
else:
|
|
line_buf.append(" ")
|
|
|
|
if has_content:
|
|
line_str = "".join(line_buf).rstrip()
|
|
if line_str.strip():
|
|
overlay.append(f"\033[{scr_row};{center_col + 1}H{line_str}{RST}")
|
|
|
|
return overlay
|
|
|
|
|
|
class FigmentEffect(EffectPlugin):
|
|
"""Figment overlay effect for pipeline architecture.
|
|
|
|
Provides periodic SVG overlays with reveal/hold/dissolve animation.
|
|
"""
|
|
|
|
name = "figment"
|
|
config = EffectConfig(
|
|
enabled=True,
|
|
intensity=1.0,
|
|
params={
|
|
"interval_secs": 60,
|
|
"display_secs": 4.5,
|
|
"figment_dir": "figments",
|
|
},
|
|
)
|
|
supports_partial_updates = False
|
|
is_overlay = True # Figment is an overlay effect that composes on top of the buffer
|
|
|
|
def __init__(
|
|
self,
|
|
figment_dir: str | None = None,
|
|
triggers: list[FigmentTrigger] | None = None,
|
|
):
|
|
self.config = EffectConfig(
|
|
enabled=True,
|
|
intensity=1.0,
|
|
params={
|
|
"interval_secs": 60,
|
|
"display_secs": 4.5,
|
|
"figment_dir": figment_dir or "figments",
|
|
},
|
|
)
|
|
self._triggers = triggers or []
|
|
self._phase: FigmentPhase | None = None
|
|
self._progress: float = 0.0
|
|
self._rows: list[str] = []
|
|
self._gradient: list[int] = []
|
|
self._center_row: int = 0
|
|
self._center_col: int = 0
|
|
self._timer: float = 0.0
|
|
self._last_svg: str | None = None
|
|
self._svg_files: list[str] = []
|
|
self._scan_svgs()
|
|
|
|
def _scan_svgs(self) -> None:
|
|
"""Scan figment directory for SVG files."""
|
|
figment_dir = Path(self.config.params["figment_dir"])
|
|
if figment_dir.is_dir():
|
|
self._svg_files = sorted(str(p) for p in figment_dir.glob("*.svg"))
|
|
|
|
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
|
"""Add figment overlay to buffer."""
|
|
if not self.config.enabled:
|
|
return buf
|
|
|
|
# Get figment state using frame number from context
|
|
figment_state = self.get_figment_state(
|
|
ctx.frame_number, ctx.terminal_width, ctx.terminal_height
|
|
)
|
|
|
|
if figment_state:
|
|
# Render overlay and append to buffer
|
|
overlay = render_figment_overlay(
|
|
figment_state, ctx.terminal_width, ctx.terminal_height
|
|
)
|
|
buf = buf + overlay
|
|
|
|
return buf
|
|
|
|
def configure(self, config: EffectConfig) -> None:
|
|
"""Configure the effect."""
|
|
# Preserve figment_dir if the new config doesn't supply one
|
|
figment_dir = config.params.get(
|
|
"figment_dir", self.config.params.get("figment_dir", "figments")
|
|
)
|
|
self.config = config
|
|
if "figment_dir" not in self.config.params:
|
|
self.config.params["figment_dir"] = figment_dir
|
|
self._scan_svgs()
|
|
|
|
def trigger(self, w: int, h: int) -> None:
|
|
"""Manually trigger a figment display."""
|
|
if not self._svg_files:
|
|
return
|
|
|
|
# Pick a random SVG, avoid repeating
|
|
candidates = [s for s in self._svg_files if s != self._last_svg]
|
|
if not candidates:
|
|
candidates = self._svg_files
|
|
svg_path = random.choice(candidates)
|
|
self._last_svg = svg_path
|
|
|
|
# Rasterize
|
|
try:
|
|
self._rows = rasterize_svg(svg_path, w, h)
|
|
except Exception:
|
|
return
|
|
|
|
# Pick random theme gradient
|
|
theme_key = random.choice(list(THEME_REGISTRY.keys()))
|
|
self._gradient = THEME_REGISTRY[theme_key].main_gradient
|
|
|
|
# Center in viewport
|
|
figment_h = len(self._rows)
|
|
figment_w = max((len(r) for r in self._rows), default=0)
|
|
self._center_row = max(0, (h - figment_h) // 2)
|
|
self._center_col = max(0, (w - figment_w) // 2)
|
|
|
|
# Start reveal phase
|
|
self._phase = FigmentPhase.REVEAL
|
|
self._progress = 0.0
|
|
|
|
def get_figment_state(
|
|
self, frame_number: int, w: int, h: int
|
|
) -> FigmentState | None:
|
|
"""Tick the state machine and return current state, or None if idle."""
|
|
if not self.config.enabled:
|
|
return None
|
|
|
|
# Poll triggers
|
|
for trig in self._triggers:
|
|
cmd = trig.poll()
|
|
if cmd is not None:
|
|
self._handle_command(cmd, w, h)
|
|
|
|
# Tick timer when idle
|
|
if self._phase is None:
|
|
self._timer += config.FRAME_DT
|
|
interval = self.config.params.get("interval_secs", 60)
|
|
if self._timer >= interval:
|
|
self._timer = 0.0
|
|
self.trigger(w, h)
|
|
|
|
# Tick animation — snapshot current phase/progress, then advance
|
|
if self._phase is not None:
|
|
# Capture the state at the start of this frame
|
|
current_phase = self._phase
|
|
current_progress = self._progress
|
|
|
|
# Advance for next frame
|
|
display_secs = self.config.params.get("display_secs", 4.5)
|
|
phase_duration = display_secs / 3.0
|
|
self._progress += config.FRAME_DT / phase_duration
|
|
|
|
if self._progress >= 1.0:
|
|
self._progress = 0.0
|
|
if self._phase == FigmentPhase.REVEAL:
|
|
self._phase = FigmentPhase.HOLD
|
|
elif self._phase == FigmentPhase.HOLD:
|
|
self._phase = FigmentPhase.DISSOLVE
|
|
elif self._phase == FigmentPhase.DISSOLVE:
|
|
self._phase = None
|
|
|
|
return FigmentState(
|
|
phase=current_phase,
|
|
progress=current_progress,
|
|
rows=self._rows,
|
|
gradient=self._gradient,
|
|
center_row=self._center_row,
|
|
center_col=self._center_col,
|
|
)
|
|
|
|
return None
|
|
|
|
def _handle_command(self, cmd: FigmentCommand, w: int, h: int) -> None:
|
|
"""Handle a figment command."""
|
|
if cmd.action == FigmentAction.TRIGGER:
|
|
self.trigger(w, h)
|
|
elif cmd.action == FigmentAction.SET_INTENSITY and isinstance(
|
|
cmd.value, (int, float)
|
|
):
|
|
self.config.intensity = float(cmd.value)
|
|
elif cmd.action == FigmentAction.SET_INTERVAL and isinstance(
|
|
cmd.value, (int, float)
|
|
):
|
|
self.config.params["interval_secs"] = float(cmd.value)
|
|
elif cmd.action == FigmentAction.SET_COLOR and isinstance(cmd.value, str):
|
|
if cmd.value in THEME_REGISTRY:
|
|
self._gradient = THEME_REGISTRY[cmd.value].main_gradient
|
|
elif cmd.action == FigmentAction.STOP:
|
|
self._phase = None
|
|
self._progress = 0.0
|