feat/figment: periodic SVG glyph overlays with CLI flag #34
190
effects_plugins/figment.py
Normal file
190
effects_plugins/figment.py
Normal file
@@ -0,0 +1,190 @@
|
||||
"""
|
||||
Figment effect plugin — periodic SVG glyph overlay.
|
||||
|
||||
Owns the figment lifecycle: timer, SVG selection, state machine.
|
||||
Delegates rendering to render_figment_overlay() in engine/layers.py.
|
||||
|
||||
Named FigmentEffect (not FigmentPlugin) to match the *Effect discovery
|
||||
convention in effects_plugins/__init__.py.
|
||||
|
||||
NOT added to the EffectChain order — process() is a no-op. The overlay
|
||||
rendering is handled by scroll.py calling get_figment_state().
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import random
|
||||
from dataclasses import dataclass
|
||||
from enum import Enum, auto
|
||||
from pathlib import Path
|
||||
|
||||
from engine import config
|
||||
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
from engine.figment_render import rasterize_svg
|
||||
from engine.figment_trigger import FigmentAction, FigmentCommand, FigmentTrigger
|
||||
from engine.themes import THEME_REGISTRY
|
||||
|
||||
|
||||
class FigmentPhase(Enum):
|
||||
REVEAL = auto()
|
||||
HOLD = auto()
|
||||
DISSOLVE = auto()
|
||||
|
||||
|
||||
@dataclass
|
||||
class FigmentState:
|
||||
phase: FigmentPhase
|
||||
progress: float
|
||||
rows: list[str]
|
||||
gradient: list[int]
|
||||
center_row: int
|
||||
center_col: int
|
||||
|
||||
|
||||
class FigmentEffect(EffectPlugin):
|
||||
name = "figment"
|
||||
config = EffectConfig(
|
||||
enabled=False,
|
||||
intensity=1.0,
|
||||
params={
|
||||
"interval_secs": 60,
|
||||
"display_secs": 4.5,
|
||||
"figment_dir": "figments",
|
||||
},
|
||||
)
|
||||
|
||||
def __init__(self, figment_dir: str | None = None, triggers: list[FigmentTrigger] | None = None):
|
||||
self.config = EffectConfig(
|
||||
enabled=False,
|
||||
intensity=1.0,
|
||||
params={
|
||||
"interval_secs": 60,
|
||||
"display_secs": 4.5,
|
||||
"figment_dir": figment_dir or "figments",
|
||||
},
|
||||
)
|
||||
self._triggers = triggers or []
|
||||
self._phase: FigmentPhase | None = None
|
||||
self._progress: float = 0.0
|
||||
self._rows: list[str] = []
|
||||
self._gradient: list[int] = []
|
||||
self._center_row: int = 0
|
||||
self._center_col: int = 0
|
||||
self._timer: float = 0.0
|
||||
self._last_svg: str | None = None
|
||||
self._svg_files: list[str] = []
|
||||
self._scan_svgs()
|
||||
|
||||
def _scan_svgs(self) -> None:
|
||||
figment_dir = Path(self.config.params["figment_dir"])
|
||||
if figment_dir.is_dir():
|
||||
self._svg_files = sorted(str(p) for p in figment_dir.glob("*.svg"))
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
return buf
|
||||
|
||||
def configure(self, cfg: EffectConfig) -> None:
|
||||
# Preserve figment_dir if the new config doesn't supply one
|
||||
figment_dir = cfg.params.get(
|
||||
"figment_dir", self.config.params.get("figment_dir", "figments")
|
||||
)
|
||||
self.config = cfg
|
||||
if "figment_dir" not in self.config.params:
|
||||
self.config.params["figment_dir"] = figment_dir
|
||||
self._scan_svgs()
|
||||
|
||||
def trigger(self, w: int, h: int) -> None:
|
||||
"""Manually trigger a figment display."""
|
||||
if not self._svg_files:
|
||||
return
|
||||
|
||||
# Pick a random SVG, avoid repeating
|
||||
candidates = [s for s in self._svg_files if s != self._last_svg]
|
||||
if not candidates:
|
||||
candidates = self._svg_files
|
||||
svg_path = random.choice(candidates)
|
||||
self._last_svg = svg_path
|
||||
|
||||
# Rasterize
|
||||
try:
|
||||
self._rows = rasterize_svg(svg_path, w, h)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
# Pick random theme gradient
|
||||
theme_key = random.choice(list(THEME_REGISTRY.keys()))
|
||||
self._gradient = THEME_REGISTRY[theme_key].main_gradient
|
||||
|
||||
# Center in viewport
|
||||
figment_h = len(self._rows)
|
||||
figment_w = max((len(r) for r in self._rows), default=0)
|
||||
self._center_row = max(0, (h - figment_h) // 2)
|
||||
self._center_col = max(0, (w - figment_w) // 2)
|
||||
|
||||
# Start reveal phase
|
||||
self._phase = FigmentPhase.REVEAL
|
||||
self._progress = 0.0
|
||||
|
||||
def get_figment_state(self, frame_number: int, w: int, h: int) -> FigmentState | None:
|
||||
"""Tick the state machine and return current state, or None if idle."""
|
||||
if not self.config.enabled:
|
||||
return None
|
||||
|
||||
# Poll triggers
|
||||
for trig in self._triggers:
|
||||
cmd = trig.poll()
|
||||
if cmd is not None:
|
||||
self._handle_command(cmd, w, h)
|
||||
|
||||
# Tick timer when idle
|
||||
if self._phase is None:
|
||||
self._timer += config.FRAME_DT
|
||||
interval = self.config.params.get("interval_secs", 60)
|
||||
if self._timer >= interval:
|
||||
self._timer = 0.0
|
||||
self.trigger(w, h)
|
||||
|
||||
# Tick animation — snapshot current phase/progress, then advance
|
||||
if self._phase is not None:
|
||||
# Capture the state at the start of this frame
|
||||
current_phase = self._phase
|
||||
current_progress = self._progress
|
||||
|
||||
# Advance for next frame
|
||||
display_secs = self.config.params.get("display_secs", 4.5)
|
||||
phase_duration = display_secs / 3.0
|
||||
self._progress += config.FRAME_DT / phase_duration
|
||||
|
||||
if self._progress >= 1.0:
|
||||
self._progress = 0.0
|
||||
if self._phase == FigmentPhase.REVEAL:
|
||||
self._phase = FigmentPhase.HOLD
|
||||
elif self._phase == FigmentPhase.HOLD:
|
||||
self._phase = FigmentPhase.DISSOLVE
|
||||
elif self._phase == FigmentPhase.DISSOLVE:
|
||||
self._phase = None
|
||||
|
||||
return FigmentState(
|
||||
phase=current_phase,
|
||||
progress=current_progress,
|
||||
rows=self._rows,
|
||||
gradient=self._gradient,
|
||||
center_row=self._center_row,
|
||||
center_col=self._center_col,
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
def _handle_command(self, cmd: FigmentCommand, w: int, h: int) -> None:
|
||||
if cmd.action == FigmentAction.TRIGGER:
|
||||
self.trigger(w, h)
|
||||
elif cmd.action == FigmentAction.SET_INTENSITY and isinstance(cmd.value, (int, float)):
|
||||
self.config.intensity = float(cmd.value)
|
||||
elif cmd.action == FigmentAction.SET_INTERVAL and isinstance(cmd.value, (int, float)):
|
||||
self.config.params["interval_secs"] = float(cmd.value)
|
||||
elif cmd.action == FigmentAction.SET_COLOR and isinstance(cmd.value, str):
|
||||
if cmd.value in THEME_REGISTRY:
|
||||
self._gradient = THEME_REGISTRY[cmd.value].main_gradient
|
||||
elif cmd.action == FigmentAction.STOP:
|
||||
self._phase = None
|
||||
self._progress = 0.0
|
||||
155
tests/test_figment.py
Normal file
155
tests/test_figment.py
Normal file
@@ -0,0 +1,155 @@
|
||||
"""Tests for the FigmentEffect plugin."""
|
||||
|
||||
import os
|
||||
from enum import Enum
|
||||
from unittest.mock import patch
|
||||
|
||||
import pytest
|
||||
|
||||
from effects_plugins.figment import FigmentEffect, FigmentPhase, FigmentState
|
||||
from engine.effects.types import EffectConfig, EffectContext
|
||||
|
||||
|
||||
FIXTURE_SVG = os.path.join(
|
||||
os.path.dirname(__file__), "fixtures", "test.svg"
|
||||
)
|
||||
FIGMENTS_DIR = os.path.join(os.path.dirname(__file__), "fixtures")
|
||||
|
||||
|
||||
class TestFigmentPhase:
|
||||
def test_is_enum(self):
|
||||
assert issubclass(FigmentPhase, Enum)
|
||||
|
||||
def test_has_all_phases(self):
|
||||
assert hasattr(FigmentPhase, "REVEAL")
|
||||
assert hasattr(FigmentPhase, "HOLD")
|
||||
assert hasattr(FigmentPhase, "DISSOLVE")
|
||||
|
||||
|
||||
class TestFigmentState:
|
||||
def test_creation(self):
|
||||
state = FigmentState(
|
||||
phase=FigmentPhase.REVEAL,
|
||||
progress=0.5,
|
||||
rows=["█▀▄", " █ "],
|
||||
gradient=[46, 40, 34, 28, 22, 22, 34, 40, 46, 82, 118, 231],
|
||||
center_row=5,
|
||||
center_col=10,
|
||||
)
|
||||
assert state.phase == FigmentPhase.REVEAL
|
||||
assert state.progress == 0.5
|
||||
assert len(state.rows) == 2
|
||||
|
||||
|
||||
class TestFigmentEffectInit:
|
||||
def test_name(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
assert effect.name == "figment"
|
||||
|
||||
def test_default_config(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
assert effect.config.enabled is False
|
||||
assert effect.config.intensity == 1.0
|
||||
assert effect.config.params["interval_secs"] == 60
|
||||
assert effect.config.params["display_secs"] == 4.5
|
||||
|
||||
def test_process_is_noop(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
buf = ["line1", "line2"]
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=20,
|
||||
)
|
||||
result = effect.process(buf, ctx)
|
||||
assert result == buf
|
||||
assert result is buf
|
||||
|
||||
def test_configure(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
new_cfg = EffectConfig(enabled=True, intensity=0.5)
|
||||
effect.configure(new_cfg)
|
||||
assert effect.config.enabled is True
|
||||
assert effect.config.intensity == 0.5
|
||||
|
||||
|
||||
class TestFigmentStateMachine:
|
||||
def test_idle_initially(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
effect.config.enabled = True
|
||||
state = effect.get_figment_state(0, 80, 24)
|
||||
# Timer hasn't fired yet, should be None (idle)
|
||||
assert state is None
|
||||
|
||||
def test_trigger_starts_reveal(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
effect.config.enabled = True
|
||||
effect.trigger(80, 24)
|
||||
state = effect.get_figment_state(1, 80, 24)
|
||||
assert state is not None
|
||||
assert state.phase == FigmentPhase.REVEAL
|
||||
|
||||
def test_full_cycle(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
effect.config.enabled = True
|
||||
effect.config.params["display_secs"] = 0.15 # 3 phases x 0.05s
|
||||
|
||||
effect.trigger(40, 20)
|
||||
|
||||
# Advance through reveal (30 frames at 0.05s = 1.5s, but we shrunk it)
|
||||
# With display_secs=0.15, each phase is 0.05s = 1 frame
|
||||
state = effect.get_figment_state(1, 40, 20)
|
||||
assert state is not None
|
||||
assert state.phase == FigmentPhase.REVEAL
|
||||
|
||||
# Advance enough frames to get through all phases
|
||||
last_state = None
|
||||
for frame in range(2, 100):
|
||||
state = effect.get_figment_state(frame, 40, 20)
|
||||
if state is None:
|
||||
break
|
||||
last_state = state
|
||||
|
||||
# Should have completed the full cycle back to idle
|
||||
assert state is None
|
||||
|
||||
def test_timer_fires_at_interval(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
effect.config.enabled = True
|
||||
effect.config.params["interval_secs"] = 0.1 # 2 frames at 20fps
|
||||
|
||||
# Frame 0: idle
|
||||
state = effect.get_figment_state(0, 40, 20)
|
||||
assert state is None
|
||||
|
||||
# Advance past interval (0.1s = 2 frames)
|
||||
state = effect.get_figment_state(1, 40, 20)
|
||||
state = effect.get_figment_state(2, 40, 20)
|
||||
state = effect.get_figment_state(3, 40, 20)
|
||||
# Timer should have fired by now
|
||||
assert state is not None
|
||||
|
||||
|
||||
class TestFigmentEdgeCases:
|
||||
def test_empty_figment_dir(self, tmp_path):
|
||||
effect = FigmentEffect(figment_dir=str(tmp_path))
|
||||
effect.config.enabled = True
|
||||
effect.trigger(40, 20)
|
||||
state = effect.get_figment_state(1, 40, 20)
|
||||
# No SVGs available — should stay idle
|
||||
assert state is None
|
||||
|
||||
def test_missing_figment_dir(self):
|
||||
effect = FigmentEffect(figment_dir="/nonexistent/path")
|
||||
effect.config.enabled = True
|
||||
effect.trigger(40, 20)
|
||||
state = effect.get_figment_state(1, 40, 20)
|
||||
assert state is None
|
||||
|
||||
def test_disabled_ignores_trigger(self):
|
||||
effect = FigmentEffect(figment_dir=FIGMENTS_DIR)
|
||||
effect.config.enabled = False
|
||||
effect.trigger(80, 24)
|
||||
state = effect.get_figment_state(1, 80, 24)
|
||||
assert state is None
|
||||
Reference in New Issue
Block a user