diff --git a/TODO.md b/TODO.md
index d9e0b01..6165dfb 100644
--- a/TODO.md
+++ b/TODO.md
@@ -19,6 +19,42 @@
- [x] Enumerate all effect plugin parameters automatically for UI control (intensity, decay, etc.)
- [ ] Implement pipeline hot-rebuild when stage toggles or params change, preserving camera and display state [#43](https://git.notsosm.art/david/Mainline/issues/43)
+## Test Suite Cleanup & Feature Implementation
+### Phase 1: Test Suite Cleanup (In Progress)
+- [x] Port figment feature to modern pipeline architecture
+- [x] Create `engine/effects/plugins/figment.py` (full port)
+- [x] Add `figment.py` to `engine/effects/plugins/`
+- [x] Copy SVG files to `figments/` directory
+- [x] Update `pyproject.toml` with figment extra
+- [x] Add `test-figment` preset to `presets.toml`
+- [x] Update pipeline adapters for overlay effects
+- [x] Clean up `test_adapters.py` (removed 18 mock-only tests)
+- [x] Verify all tests pass (652 passing, 20 skipped, 58% coverage)
+- [ ] Review remaining mock-heavy tests in `test_pipeline.py`
+- [ ] Review `test_effects.py` for implementation detail tests
+- [ ] Identify additional tests to remove/consolidate
+- [ ] Target: ~600 tests total
+
+### Phase 2: Acceptance Test Expansion (Planned)
+- [ ] Create `test_message_overlay.py` for message rendering
+- [ ] Create `test_firehose.py` for firehose rendering
+- [ ] Create `test_pipeline_order.py` for execution order verification
+- [ ] Expand `test_figment_effect.py` for animation phases
+- [ ] Target: 10-15 new acceptance tests
+
+### Phase 3: Post-Branch Features (Planned)
+- [ ] Port message overlay system from `upstream_layers.py`
+- [ ] Port firehose rendering from `upstream_layers.py`
+- [ ] Create `MessageOverlayStage` for pipeline integration
+- [ ] Verify figment renders in correct order (effects → figment → messages → display)
+
+### Phase 4: Visual Quality Improvements (Planned)
+- [ ] Compare upstream vs current pipeline output
+- [ ] Implement easing functions for figment animations
+- [ ] Add animated gradient shifts
+- [ ] Improve strobe effect patterns
+- [ ] Use introspection to match visual style
+
## Gitea Issues Tracking
- [#37](https://git.notsosm.art/david/Mainline/issues/37): Refactor app.py and adapter.py for better maintainability
- [#35](https://git.notsosm.art/david/Mainline/issues/35): Epic: Pipeline Mutation API for Stage Hot-Swapping
diff --git a/engine/display/backends/animation_report.py b/engine/display/backends/animation_report.py
new file mode 100644
index 0000000..e9ef2fc
--- /dev/null
+++ b/engine/display/backends/animation_report.py
@@ -0,0 +1,656 @@
+"""
+Animation Report Display Backend
+
+Captures frames from pipeline stages and generates an interactive HTML report
+showing before/after states for each transformative stage.
+"""
+
+import time
+from dataclasses import dataclass, field
+from datetime import datetime
+from pathlib import Path
+from typing import Any
+
+from engine.display.streaming import compute_diff
+
+
+@dataclass
+class CapturedFrame:
+ """A captured frame with metadata."""
+
+ stage: str
+ buffer: list[str]
+ timestamp: float
+ frame_number: int
+ diff_from_previous: dict[str, Any] | None = None
+
+
+@dataclass
+class StageCapture:
+ """Captures frames for a single pipeline stage."""
+
+ name: str
+ frames: list[CapturedFrame] = field(default_factory=list)
+ start_time: float = field(default_factory=time.time)
+ end_time: float = 0.0
+
+ def add_frame(
+ self,
+ buffer: list[str],
+ frame_number: int,
+ previous_buffer: list[str] | None = None,
+ ) -> None:
+ """Add a captured frame."""
+ timestamp = time.time()
+ diff = None
+ if previous_buffer is not None:
+ diff_data = compute_diff(previous_buffer, buffer)
+ diff = {
+ "changed_lines": len(diff_data.changed_lines),
+ "total_lines": len(buffer),
+ "width": diff_data.width,
+ "height": diff_data.height,
+ }
+
+ frame = CapturedFrame(
+ stage=self.name,
+ buffer=list(buffer),
+ timestamp=timestamp,
+ frame_number=frame_number,
+ diff_from_previous=diff,
+ )
+ self.frames.append(frame)
+
+ def finish(self) -> None:
+ """Mark capture as finished."""
+ self.end_time = time.time()
+
+
+class AnimationReportDisplay:
+ """
+ Display backend that captures frames for animation report generation.
+
+ Instead of rendering to terminal, this display captures the buffer at each
+ stage and stores it for later HTML report generation.
+ """
+
+ width: int = 80
+ height: int = 24
+
+ def __init__(self, output_dir: str = "./reports"):
+ """
+ Initialize the animation report display.
+
+ Args:
+ output_dir: Directory where reports will be saved
+ """
+ self.output_dir = Path(output_dir)
+ self.output_dir.mkdir(parents=True, exist_ok=True)
+
+ self._stages: dict[str, StageCapture] = {}
+ self._current_stage: str = ""
+ self._previous_buffer: list[str] | None = None
+ self._frame_number: int = 0
+ self._total_frames: int = 0
+ self._start_time: float = 0.0
+
+ def init(self, width: int, height: int, reuse: bool = False) -> None:
+ """Initialize display with dimensions."""
+ self.width = width
+ self.height = height
+ self._start_time = time.time()
+
+ def show(self, buffer: list[str], border: bool = False) -> None:
+ """
+ Capture a frame for the current stage.
+
+ Args:
+ buffer: The frame buffer to capture
+ border: Border flag (ignored)
+ """
+ if not self._current_stage:
+ # If no stage is set, use a default name
+ self._current_stage = "final"
+
+ if self._current_stage not in self._stages:
+ self._stages[self._current_stage] = StageCapture(self._current_stage)
+
+ stage = self._stages[self._current_stage]
+ stage.add_frame(buffer, self._frame_number, self._previous_buffer)
+
+ self._previous_buffer = list(buffer)
+ self._frame_number += 1
+ self._total_frames += 1
+
+ def start_stage(self, stage_name: str) -> None:
+ """
+ Start capturing frames for a new stage.
+
+ Args:
+ stage_name: Name of the stage (e.g., "noise", "fade", "firehose")
+ """
+ if self._current_stage and self._current_stage in self._stages:
+ # Finish previous stage
+ self._stages[self._current_stage].finish()
+
+ self._current_stage = stage_name
+ self._previous_buffer = None # Reset for new stage
+
+ def clear(self) -> None:
+ """Clear the display (no-op for report display)."""
+ pass
+
+ def cleanup(self) -> None:
+ """Cleanup resources."""
+ # Finish current stage
+ if self._current_stage and self._current_stage in self._stages:
+ self._stages[self._current_stage].finish()
+
+ def get_dimensions(self) -> tuple[int, int]:
+ """Get current dimensions."""
+ return (self.width, self.height)
+
+ def get_stages(self) -> dict[str, StageCapture]:
+ """Get all captured stages."""
+ return self._stages
+
+ def generate_report(self, title: str = "Animation Report") -> Path:
+ """
+ Generate an HTML report with captured frames and animations.
+
+ Args:
+ title: Title of the report
+
+ Returns:
+ Path to the generated HTML file
+ """
+ report_path = self.output_dir / f"animation_report_{int(time.time())}.html"
+ html_content = self._build_html(title)
+ report_path.write_text(html_content)
+ return report_path
+
+ def _build_html(self, title: str) -> str:
+ """Build the HTML content for the report."""
+ # Collect all frames across stages
+ all_frames = []
+ for stage_name, stage in self._stages.items():
+ for frame in stage.frames:
+ all_frames.append(frame)
+
+ # Sort frames by timestamp
+ all_frames.sort(key=lambda f: f.timestamp)
+
+ # Build stage sections
+ stages_html = ""
+ for stage_name, stage in self._stages.items():
+ stages_html += self._build_stage_section(stage_name, stage)
+
+ # Build full HTML
+ html = f"""
+
+
+
+
+
+ {title}
+
+
+
+
+
+
+
+
+ {stages_html}
+
+
+
+
+
+
+
+"""
+ return html
+
+ def _build_stage_section(self, stage_name: str, stage: StageCapture) -> str:
+ """Build HTML for a single stage section."""
+ frames_html = ""
+ for i, frame in enumerate(stage.frames):
+ diff_info = ""
+ if frame.diff_from_previous:
+ changed = frame.diff_from_previous.get("changed_lines", 0)
+ total = frame.diff_from_previous.get("total_lines", 0)
+ diff_info = f'Δ {changed}/{total}'
+
+ frames_html += f"""
+
+
+
{self._escape_html("".join(frame.buffer))}
+
+ """
+
+ return f"""
+
+ """
+
+ def _build_timeline(self, all_frames: list[CapturedFrame]) -> str:
+ """Build timeline HTML."""
+ if not all_frames:
+ return ""
+
+ markers_html = ""
+ for i, frame in enumerate(all_frames):
+ left_percent = (i / len(all_frames)) * 100
+ markers_html += f''
+
+ return markers_html
+
+ def _build_stage_colors(self) -> str:
+ """Build stage color mapping for JavaScript."""
+ colors = [
+ "#00d4ff",
+ "#00ff88",
+ "#ff6b6b",
+ "#ffd93d",
+ "#a855f7",
+ "#ec4899",
+ "#14b8a6",
+ "#f97316",
+ "#8b5cf6",
+ "#06b6d4",
+ ]
+ color_map = ""
+ for i, stage_name in enumerate(self._stages.keys()):
+ color = colors[i % len(colors)]
+ color_map += f' "{stage_name}": "{color}",\n'
+ return color_map.rstrip(",\n")
+
+ def _build_timeline_markers(self, all_frames: list[CapturedFrame]) -> str:
+ """Build timeline markers in JavaScript."""
+ if not all_frames:
+ return ""
+
+ markers_js = ""
+ for i, frame in enumerate(all_frames):
+ left_percent = (i / len(all_frames)) * 100
+ stage_color = f"stageColors['{frame.stage}']"
+ markers_js += f"""
+ const marker{i} = document.createElement('div');
+ marker{i}.className = 'timeline-marker stage-{{frame.stage}}';
+ marker{i}.style.left = '{left_percent}%';
+ marker{i}.style.setProperty('--stage-color', {stage_color});
+ marker{i}.onclick = () => {{
+ currentFrame = {i};
+ updateFrameDisplay();
+ }};
+ timeline.appendChild(marker{i});
+ """
+
+ return markers_js
+
+ def _escape_html(self, text: str) -> str:
+ """Escape HTML special characters."""
+ return (
+ text.replace("&", "&")
+ .replace("<", "<")
+ .replace(">", ">")
+ .replace('"', """)
+ .replace("'", "'")
+ )
diff --git a/engine/effects/plugins/figment.py b/engine/effects/plugins/figment.py
new file mode 100644
index 0000000..062fd8b
--- /dev/null
+++ b/engine/effects/plugins/figment.py
@@ -0,0 +1,332 @@
+"""
+Figment overlay effect for modern pipeline architecture.
+
+Provides periodic SVG glyph overlays with reveal/hold/dissolve animation phases.
+Integrates directly with the pipeline's effect system without legacy dependencies.
+"""
+
+import random
+from dataclasses import dataclass
+from enum import Enum, auto
+from pathlib import Path
+
+from engine import config
+from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
+from engine.figment_render import rasterize_svg
+from engine.figment_trigger import FigmentAction, FigmentCommand, FigmentTrigger
+from engine.terminal import RST
+from engine.themes import THEME_REGISTRY
+
+
+class FigmentPhase(Enum):
+ """Animation phases for figment overlay."""
+
+ REVEAL = auto()
+ HOLD = auto()
+ DISSOLVE = auto()
+
+
+@dataclass
+class FigmentState:
+ """State of a figment overlay at a given frame."""
+
+ phase: FigmentPhase
+ progress: float
+ rows: list[str]
+ gradient: list[int]
+ center_row: int
+ center_col: int
+
+
+def _color_codes_to_ansi(gradient: list[int]) -> list[str]:
+ """Convert gradient list to ANSI color codes.
+
+ Args:
+ gradient: List of 256-color palette codes
+
+ Returns:
+ List of ANSI escape code strings
+ """
+ codes = []
+ for color in gradient:
+ if isinstance(color, int):
+ codes.append(f"\033[38;5;{color}m")
+ else:
+ # Fallback to green
+ codes.append("\033[38;5;46m")
+ return codes if codes else ["\033[38;5;46m"]
+
+
+def render_figment_overlay(figment_state: FigmentState, w: int, h: int) -> list[str]:
+ """Render figment overlay as ANSI cursor-positioning commands.
+
+ Args:
+ figment_state: FigmentState with phase, progress, rows, gradient, centering.
+ w: terminal width
+ h: terminal height
+
+ Returns:
+ List of ANSI strings to append to display buffer.
+ """
+ rows = figment_state.rows
+ if not rows:
+ return []
+
+ phase = figment_state.phase
+ progress = figment_state.progress
+ gradient = figment_state.gradient
+ center_row = figment_state.center_row
+ center_col = figment_state.center_col
+
+ cols = _color_codes_to_ansi(gradient)
+
+ # Build a list of non-space cell positions
+ cell_positions = []
+ for r_idx, row in enumerate(rows):
+ for c_idx, ch in enumerate(row):
+ if ch != " ":
+ cell_positions.append((r_idx, c_idx))
+
+ n_cells = len(cell_positions)
+ if n_cells == 0:
+ return []
+
+ # Use a deterministic seed so the reveal/dissolve pattern is stable per-figment
+ rng = random.Random(hash(tuple(rows[0][:10])) if rows[0] else 42)
+ shuffled = list(cell_positions)
+ rng.shuffle(shuffled)
+
+ # Phase-dependent visibility
+ if phase == FigmentPhase.REVEAL:
+ visible_count = int(n_cells * progress)
+ visible = set(shuffled[:visible_count])
+ elif phase == FigmentPhase.HOLD:
+ visible = set(cell_positions)
+ # Strobe: dim some cells periodically
+ if int(progress * 20) % 3 == 0:
+ dim_count = int(n_cells * 0.3)
+ visible -= set(shuffled[:dim_count])
+ elif phase == FigmentPhase.DISSOLVE:
+ remaining_count = int(n_cells * (1.0 - progress))
+ visible = set(shuffled[:remaining_count])
+ else:
+ visible = set(cell_positions)
+
+ # Build overlay commands
+ overlay: list[str] = []
+ n_cols = len(cols)
+ max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1)
+
+ for r_idx, row in enumerate(rows):
+ scr_row = center_row + r_idx + 1 # 1-indexed
+ if scr_row < 1 or scr_row > h:
+ continue
+
+ line_buf: list[str] = []
+ has_content = False
+
+ for c_idx, ch in enumerate(row):
+ scr_col = center_col + c_idx + 1
+ if scr_col < 1 or scr_col > w:
+ continue
+
+ if ch != " " and (r_idx, c_idx) in visible:
+ # Apply gradient color
+ shifted = (c_idx / max(max_x - 1, 1)) % 1.0
+ idx = min(round(shifted * (n_cols - 1)), n_cols - 1)
+ line_buf.append(f"{cols[idx]}{ch}{RST}")
+ has_content = True
+ else:
+ line_buf.append(" ")
+
+ if has_content:
+ line_str = "".join(line_buf).rstrip()
+ if line_str.strip():
+ overlay.append(f"\033[{scr_row};{center_col + 1}H{line_str}{RST}")
+
+ return overlay
+
+
+class FigmentEffect(EffectPlugin):
+ """Figment overlay effect for pipeline architecture.
+
+ Provides periodic SVG overlays with reveal/hold/dissolve animation.
+ """
+
+ name = "figment"
+ config = EffectConfig(
+ enabled=True,
+ intensity=1.0,
+ params={
+ "interval_secs": 60,
+ "display_secs": 4.5,
+ "figment_dir": "figments",
+ },
+ )
+ supports_partial_updates = False
+ is_overlay = True # Figment is an overlay effect that composes on top of the buffer
+
+ def __init__(
+ self,
+ figment_dir: str | None = None,
+ triggers: list[FigmentTrigger] | None = None,
+ ):
+ self.config = EffectConfig(
+ enabled=True,
+ intensity=1.0,
+ params={
+ "interval_secs": 60,
+ "display_secs": 4.5,
+ "figment_dir": figment_dir or "figments",
+ },
+ )
+ self._triggers = triggers or []
+ self._phase: FigmentPhase | None = None
+ self._progress: float = 0.0
+ self._rows: list[str] = []
+ self._gradient: list[int] = []
+ self._center_row: int = 0
+ self._center_col: int = 0
+ self._timer: float = 0.0
+ self._last_svg: str | None = None
+ self._svg_files: list[str] = []
+ self._scan_svgs()
+
+ def _scan_svgs(self) -> None:
+ """Scan figment directory for SVG files."""
+ figment_dir = Path(self.config.params["figment_dir"])
+ if figment_dir.is_dir():
+ self._svg_files = sorted(str(p) for p in figment_dir.glob("*.svg"))
+
+ def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
+ """Add figment overlay to buffer."""
+ if not self.config.enabled:
+ return buf
+
+ # Get figment state using frame number from context
+ figment_state = self.get_figment_state(
+ ctx.frame_number, ctx.terminal_width, ctx.terminal_height
+ )
+
+ if figment_state:
+ # Render overlay and append to buffer
+ overlay = render_figment_overlay(
+ figment_state, ctx.terminal_width, ctx.terminal_height
+ )
+ buf = buf + overlay
+
+ return buf
+
+ def configure(self, config: EffectConfig) -> None:
+ """Configure the effect."""
+ # Preserve figment_dir if the new config doesn't supply one
+ figment_dir = config.params.get(
+ "figment_dir", self.config.params.get("figment_dir", "figments")
+ )
+ self.config = config
+ if "figment_dir" not in self.config.params:
+ self.config.params["figment_dir"] = figment_dir
+ self._scan_svgs()
+
+ def trigger(self, w: int, h: int) -> None:
+ """Manually trigger a figment display."""
+ if not self._svg_files:
+ return
+
+ # Pick a random SVG, avoid repeating
+ candidates = [s for s in self._svg_files if s != self._last_svg]
+ if not candidates:
+ candidates = self._svg_files
+ svg_path = random.choice(candidates)
+ self._last_svg = svg_path
+
+ # Rasterize
+ try:
+ self._rows = rasterize_svg(svg_path, w, h)
+ except Exception:
+ return
+
+ # Pick random theme gradient
+ theme_key = random.choice(list(THEME_REGISTRY.keys()))
+ self._gradient = THEME_REGISTRY[theme_key].main_gradient
+
+ # Center in viewport
+ figment_h = len(self._rows)
+ figment_w = max((len(r) for r in self._rows), default=0)
+ self._center_row = max(0, (h - figment_h) // 2)
+ self._center_col = max(0, (w - figment_w) // 2)
+
+ # Start reveal phase
+ self._phase = FigmentPhase.REVEAL
+ self._progress = 0.0
+
+ def get_figment_state(
+ self, frame_number: int, w: int, h: int
+ ) -> FigmentState | None:
+ """Tick the state machine and return current state, or None if idle."""
+ if not self.config.enabled:
+ return None
+
+ # Poll triggers
+ for trig in self._triggers:
+ cmd = trig.poll()
+ if cmd is not None:
+ self._handle_command(cmd, w, h)
+
+ # Tick timer when idle
+ if self._phase is None:
+ self._timer += config.FRAME_DT
+ interval = self.config.params.get("interval_secs", 60)
+ if self._timer >= interval:
+ self._timer = 0.0
+ self.trigger(w, h)
+
+ # Tick animation — snapshot current phase/progress, then advance
+ if self._phase is not None:
+ # Capture the state at the start of this frame
+ current_phase = self._phase
+ current_progress = self._progress
+
+ # Advance for next frame
+ display_secs = self.config.params.get("display_secs", 4.5)
+ phase_duration = display_secs / 3.0
+ self._progress += config.FRAME_DT / phase_duration
+
+ if self._progress >= 1.0:
+ self._progress = 0.0
+ if self._phase == FigmentPhase.REVEAL:
+ self._phase = FigmentPhase.HOLD
+ elif self._phase == FigmentPhase.HOLD:
+ self._phase = FigmentPhase.DISSOLVE
+ elif self._phase == FigmentPhase.DISSOLVE:
+ self._phase = None
+
+ return FigmentState(
+ phase=current_phase,
+ progress=current_progress,
+ rows=self._rows,
+ gradient=self._gradient,
+ center_row=self._center_row,
+ center_col=self._center_col,
+ )
+
+ return None
+
+ def _handle_command(self, cmd: FigmentCommand, w: int, h: int) -> None:
+ """Handle a figment command."""
+ if cmd.action == FigmentAction.TRIGGER:
+ self.trigger(w, h)
+ elif cmd.action == FigmentAction.SET_INTENSITY and isinstance(
+ cmd.value, (int, float)
+ ):
+ self.config.intensity = float(cmd.value)
+ elif cmd.action == FigmentAction.SET_INTERVAL and isinstance(
+ cmd.value, (int, float)
+ ):
+ self.config.params["interval_secs"] = float(cmd.value)
+ elif cmd.action == FigmentAction.SET_COLOR and isinstance(cmd.value, str):
+ if cmd.value in THEME_REGISTRY:
+ self._gradient = THEME_REGISTRY[cmd.value].main_gradient
+ elif cmd.action == FigmentAction.STOP:
+ self._phase = None
+ self._progress = 0.0
diff --git a/engine/figment_render.py b/engine/figment_render.py
new file mode 100644
index 0000000..0b9e0ea
--- /dev/null
+++ b/engine/figment_render.py
@@ -0,0 +1,90 @@
+"""
+SVG to half-block terminal art rasterization.
+
+Pipeline: SVG -> cairosvg -> PIL -> greyscale threshold -> half-block encode.
+Follows the same pixel-pair approach as engine/render.py for OTF fonts.
+"""
+
+from __future__ import annotations
+
+import os
+import sys
+from io import BytesIO
+
+# cairocffi (used by cairosvg) calls dlopen() to find the Cairo C library.
+# On macOS with Homebrew, Cairo lives in /opt/homebrew/lib (Apple Silicon) or
+# /usr/local/lib (Intel), which are not in dyld's default search path.
+# Setting DYLD_LIBRARY_PATH before the import directs dlopen() to those paths.
+if sys.platform == "darwin" and not os.environ.get("DYLD_LIBRARY_PATH"):
+ for _brew_lib in ("/opt/homebrew/lib", "/usr/local/lib"):
+ if os.path.exists(os.path.join(_brew_lib, "libcairo.2.dylib")):
+ os.environ["DYLD_LIBRARY_PATH"] = _brew_lib
+ break
+
+import cairosvg
+from PIL import Image
+
+_cache: dict[tuple[str, int, int], list[str]] = {}
+
+
+def rasterize_svg(svg_path: str, width: int, height: int) -> list[str]:
+ """Convert SVG file to list of half-block terminal rows (uncolored).
+
+ Args:
+ svg_path: Path to SVG file.
+ width: Target terminal width in columns.
+ height: Target terminal height in rows.
+
+ Returns:
+ List of strings, one per terminal row, containing block characters.
+ """
+ cache_key = (svg_path, width, height)
+ if cache_key in _cache:
+ return _cache[cache_key]
+
+ # SVG -> PNG in memory
+ png_bytes = cairosvg.svg2png(
+ url=svg_path,
+ output_width=width,
+ output_height=height * 2, # 2 pixel rows per terminal row
+ )
+
+ # PNG -> greyscale PIL image
+ # Composite RGBA onto white background so transparent areas become white (255)
+ # and drawn pixels retain their luminance values.
+ img_rgba = Image.open(BytesIO(png_bytes)).convert("RGBA")
+ img_rgba = img_rgba.resize((width, height * 2), Image.Resampling.LANCZOS)
+ background = Image.new("RGBA", img_rgba.size, (255, 255, 255, 255))
+ background.paste(img_rgba, mask=img_rgba.split()[3])
+ img = background.convert("L")
+
+ data = img.tobytes()
+ pix_w = width
+ pix_h = height * 2
+ # White (255) = empty space, dark (< threshold) = filled pixel
+ threshold = 128
+
+ # Half-block encode: walk pixel pairs
+ rows: list[str] = []
+ for y in range(0, pix_h, 2):
+ row: list[str] = []
+ for x in range(pix_w):
+ top = data[y * pix_w + x] < threshold
+ bot = data[(y + 1) * pix_w + x] < threshold if y + 1 < pix_h else False
+ if top and bot:
+ row.append("█")
+ elif top:
+ row.append("▀")
+ elif bot:
+ row.append("▄")
+ else:
+ row.append(" ")
+ rows.append("".join(row))
+
+ _cache[cache_key] = rows
+ return rows
+
+
+def clear_cache() -> None:
+ """Clear the rasterization cache (e.g., on terminal resize)."""
+ _cache.clear()
diff --git a/engine/figment_trigger.py b/engine/figment_trigger.py
new file mode 100644
index 0000000..d3aac9c
--- /dev/null
+++ b/engine/figment_trigger.py
@@ -0,0 +1,36 @@
+"""
+Figment trigger protocol and command types.
+
+Defines the extensible input abstraction for triggering figment displays
+from any control surface (ntfy, MQTT, serial, etc.).
+"""
+
+from __future__ import annotations
+
+from dataclasses import dataclass
+from enum import Enum
+from typing import Protocol
+
+
+class FigmentAction(Enum):
+ TRIGGER = "trigger"
+ SET_INTENSITY = "set_intensity"
+ SET_INTERVAL = "set_interval"
+ SET_COLOR = "set_color"
+ STOP = "stop"
+
+
+@dataclass
+class FigmentCommand:
+ action: FigmentAction
+ value: float | str | None = None
+
+
+class FigmentTrigger(Protocol):
+ """Protocol for figment trigger sources.
+
+ Any input source (ntfy, MQTT, serial) can implement this
+ to trigger and control figment displays.
+ """
+
+ def poll(self) -> FigmentCommand | None: ...
diff --git a/engine/pipeline/adapters/effect_plugin.py b/engine/pipeline/adapters/effect_plugin.py
index 2e42c95..1185021 100644
--- a/engine/pipeline/adapters/effect_plugin.py
+++ b/engine/pipeline/adapters/effect_plugin.py
@@ -27,9 +27,9 @@ class EffectPluginStage(Stage):
def stage_type(self) -> str:
"""Return stage_type based on effect name.
- HUD effects are overlays.
+ Overlay effects have stage_type "overlay".
"""
- if self.name == "hud":
+ if self.is_overlay:
return "overlay"
return self.category
@@ -37,19 +37,26 @@ class EffectPluginStage(Stage):
def render_order(self) -> int:
"""Return render_order based on effect type.
- HUD effects have high render_order to appear on top.
+ Overlay effects have high render_order to appear on top.
"""
- if self.name == "hud":
+ if self.is_overlay:
return 100 # High order for overlays
return 0
@property
def is_overlay(self) -> bool:
- """Return True for HUD effects.
+ """Return True for overlay effects.
- HUD is an overlay - it composes on top of the buffer
+ Overlay effects compose on top of the buffer
rather than transforming it for the next stage.
"""
+ # Check if the effect has an is_overlay attribute that is explicitly True
+ # (not just any truthy value from a mock object)
+ if hasattr(self._effect, "is_overlay"):
+ effect_overlay = self._effect.is_overlay
+ # Only return True if it's explicitly set to True
+ if effect_overlay is True:
+ return True
return self.name == "hud"
@property
diff --git a/engine/pipeline/adapters/frame_capture.py b/engine/pipeline/adapters/frame_capture.py
new file mode 100644
index 0000000..03d909a
--- /dev/null
+++ b/engine/pipeline/adapters/frame_capture.py
@@ -0,0 +1,165 @@
+"""
+Frame Capture Stage Adapter
+
+Wraps pipeline stages to capture frames for animation report generation.
+"""
+
+from typing import Any
+
+from engine.display.backends.animation_report import AnimationReportDisplay
+from engine.pipeline.core import PipelineContext, Stage
+
+
+class FrameCaptureStage(Stage):
+ """
+ Wrapper stage that captures frames before and after a wrapped stage.
+
+ This allows generating animation reports showing how each stage
+ transforms the data.
+ """
+
+ def __init__(
+ self,
+ wrapped_stage: Stage,
+ display: AnimationReportDisplay,
+ name: str | None = None,
+ ):
+ """
+ Initialize frame capture stage.
+
+ Args:
+ wrapped_stage: The stage to wrap and capture frames from
+ display: The animation report display to send frames to
+ name: Optional name for this capture stage
+ """
+ self._wrapped_stage = wrapped_stage
+ self._display = display
+ self.name = name or f"capture_{wrapped_stage.name}"
+ self.category = wrapped_stage.category
+ self.optional = wrapped_stage.optional
+
+ # Capture state
+ self._captured_input = False
+ self._captured_output = False
+
+ @property
+ def stage_type(self) -> str:
+ return self._wrapped_stage.stage_type
+
+ @property
+ def capabilities(self) -> set[str]:
+ return self._wrapped_stage.capabilities
+
+ @property
+ def dependencies(self) -> set[str]:
+ return self._wrapped_stage.dependencies
+
+ @property
+ def inlet_types(self) -> set:
+ return self._wrapped_stage.inlet_types
+
+ @property
+ def outlet_types(self) -> set:
+ return self._wrapped_stage.outlet_types
+
+ def init(self, ctx: PipelineContext) -> bool:
+ """Initialize the wrapped stage."""
+ return self._wrapped_stage.init(ctx)
+
+ def process(self, data: Any, ctx: PipelineContext) -> Any:
+ """
+ Process data through wrapped stage and capture frames.
+
+ Args:
+ data: Input data (typically a text buffer)
+ ctx: Pipeline context
+
+ Returns:
+ Output data from wrapped stage
+ """
+ # Capture input frame (before stage processing)
+ if isinstance(data, list) and all(isinstance(line, str) for line in data):
+ self._display.start_stage(f"{self._wrapped_stage.name}_input")
+ self._display.show(data)
+ self._captured_input = True
+
+ # Process through wrapped stage
+ result = self._wrapped_stage.process(data, ctx)
+
+ # Capture output frame (after stage processing)
+ if isinstance(result, list) and all(isinstance(line, str) for line in result):
+ self._display.start_stage(f"{self._wrapped_stage.name}_output")
+ self._display.show(result)
+ self._captured_output = True
+
+ return result
+
+ def cleanup(self) -> None:
+ """Cleanup the wrapped stage."""
+ self._wrapped_stage.cleanup()
+
+
+class FrameCaptureController:
+ """
+ Controller for managing frame capture across the pipeline.
+
+ This class provides an easy way to enable frame capture for
+ specific stages or the entire pipeline.
+ """
+
+ def __init__(self, display: AnimationReportDisplay):
+ """
+ Initialize frame capture controller.
+
+ Args:
+ display: The animation report display to use for capture
+ """
+ self._display = display
+ self._captured_stages: list[FrameCaptureStage] = []
+
+ def wrap_stage(self, stage: Stage, name: str | None = None) -> FrameCaptureStage:
+ """
+ Wrap a stage with frame capture.
+
+ Args:
+ stage: The stage to wrap
+ name: Optional name for the capture stage
+
+ Returns:
+ Wrapped stage that captures frames
+ """
+ capture_stage = FrameCaptureStage(stage, self._display, name)
+ self._captured_stages.append(capture_stage)
+ return capture_stage
+
+ def wrap_stages(self, stages: dict[str, Stage]) -> dict[str, Stage]:
+ """
+ Wrap multiple stages with frame capture.
+
+ Args:
+ stages: Dictionary of stage names to stages
+
+ Returns:
+ Dictionary of stage names to wrapped stages
+ """
+ wrapped = {}
+ for name, stage in stages.items():
+ wrapped[name] = self.wrap_stage(stage, name)
+ return wrapped
+
+ def get_captured_stages(self) -> list[FrameCaptureStage]:
+ """Get list of all captured stages."""
+ return self._captured_stages
+
+ def generate_report(self, title: str = "Pipeline Animation Report") -> str:
+ """
+ Generate the animation report.
+
+ Args:
+ title: Title for the report
+
+ Returns:
+ Path to the generated HTML file
+ """
+ report_path = self._display.generate_report(title)
+ return str(report_path)
diff --git a/engine/themes.py b/engine/themes.py
new file mode 100644
index 0000000..a6d3432
--- /dev/null
+++ b/engine/themes.py
@@ -0,0 +1,60 @@
+"""
+Theme definitions with color gradients for terminal rendering.
+
+This module is data-only and does not import config or render
+to prevent circular dependencies.
+"""
+
+
+class Theme:
+ """Represents a color theme with two gradients."""
+
+ def __init__(self, name, main_gradient, message_gradient):
+ """Initialize a theme with name and color gradients.
+
+ Args:
+ name: Theme identifier string
+ main_gradient: List of 12 ANSI 256-color codes for main gradient
+ message_gradient: List of 12 ANSI 256-color codes for message gradient
+ """
+ self.name = name
+ self.main_gradient = main_gradient
+ self.message_gradient = message_gradient
+
+
+# ─── GRADIENT DEFINITIONS ─────────────────────────────────────────────────
+# Each gradient is 12 ANSI 256-color codes in sequence
+# Format: [light...] → [medium...] → [dark...] → [black]
+
+_GREEN_MAIN = [231, 195, 123, 118, 82, 46, 40, 34, 28, 22, 22, 235]
+_GREEN_MSG = [231, 225, 219, 213, 207, 201, 165, 161, 125, 89, 89, 235]
+
+_ORANGE_MAIN = [231, 215, 209, 208, 202, 166, 130, 94, 58, 94, 94, 235]
+_ORANGE_MSG = [231, 195, 33, 27, 21, 21, 21, 18, 18, 18, 18, 235]
+
+_PURPLE_MAIN = [231, 225, 177, 171, 165, 135, 129, 93, 57, 57, 57, 235]
+_PURPLE_MSG = [231, 226, 226, 220, 220, 184, 184, 178, 178, 172, 172, 235]
+
+
+# ─── THEME REGISTRY ───────────────────────────────────────────────────────
+
+THEME_REGISTRY = {
+ "green": Theme("green", _GREEN_MAIN, _GREEN_MSG),
+ "orange": Theme("orange", _ORANGE_MAIN, _ORANGE_MSG),
+ "purple": Theme("purple", _PURPLE_MAIN, _PURPLE_MSG),
+}
+
+
+def get_theme(theme_id):
+ """Retrieve a theme by ID.
+
+ Args:
+ theme_id: Theme identifier string
+
+ Returns:
+ Theme object matching the ID
+
+ Raises:
+ KeyError: If theme_id is not in registry
+ """
+ return THEME_REGISTRY[theme_id]
diff --git a/figments/animal-head-symbol-of-mexico-antique-cultures-svgrepo-com.svg b/figments/animal-head-symbol-of-mexico-antique-cultures-svgrepo-com.svg
new file mode 100644
index 0000000..264eb08
--- /dev/null
+++ b/figments/animal-head-symbol-of-mexico-antique-cultures-svgrepo-com.svg
@@ -0,0 +1,32 @@
+
+
+
+
\ No newline at end of file
diff --git a/figments/mayan-mask-of-mexico-svgrepo-com.svg b/figments/mayan-mask-of-mexico-svgrepo-com.svg
new file mode 100644
index 0000000..75fca60
--- /dev/null
+++ b/figments/mayan-mask-of-mexico-svgrepo-com.svg
@@ -0,0 +1,60 @@
+
+
+
+
\ No newline at end of file
diff --git a/figments/mayan-symbol-of-mexico-svgrepo-com.svg b/figments/mayan-symbol-of-mexico-svgrepo-com.svg
new file mode 100644
index 0000000..a396536
--- /dev/null
+++ b/figments/mayan-symbol-of-mexico-svgrepo-com.svg
@@ -0,0 +1,110 @@
+
+
+
+
\ No newline at end of file
diff --git a/presets.toml b/presets.toml
index 635bffd..7fbb5e7 100644
--- a/presets.toml
+++ b/presets.toml
@@ -40,6 +40,15 @@ camera_speed = 0.5
viewport_width = 80
viewport_height = 24
+[presets.test-figment]
+description = "Test: Figment overlay effect"
+source = "empty"
+display = "terminal"
+camera = "feed"
+effects = ["figment"]
+viewport_width = 80
+viewport_height = 24
+
# ============================================
# DEMO PRESETS (for demonstration and exploration)
# ============================================
diff --git a/pyproject.toml b/pyproject.toml
index c238079..a128407 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -40,6 +40,9 @@ pygame = [
browser = [
"playwright>=1.40.0",
]
+figment = [
+ "cairosvg>=2.7.0",
+]
dev = [
"pytest>=8.0.0",
"pytest-benchmark>=4.0.0",
diff --git a/tests/test_adapters.py b/tests/test_adapters.py
index 3bd7024..8745e4a 100644
--- a/tests/test_adapters.py
+++ b/tests/test_adapters.py
@@ -1,17 +1,16 @@
"""
Tests for engine/pipeline/adapters.py - Stage adapters for the pipeline.
-Tests Stage adapters that bridge existing components to the Stage interface:
-- DataSourceStage: Wraps DataSource objects
-- DisplayStage: Wraps Display backends
-- PassthroughStage: Simple pass-through stage for pre-rendered data
-- SourceItemsToBufferStage: Converts SourceItem objects to text buffers
-- EffectPluginStage: Wraps effect plugins
+Tests Stage adapters that bridge existing components to the Stage interface.
+Focuses on behavior testing rather than mock interactions.
"""
from unittest.mock import MagicMock
from engine.data_sources.sources import SourceItem
+from engine.display.backends.null import NullDisplay
+from engine.effects.plugins import discover_plugins
+from engine.effects.registry import get_registry
from engine.pipeline.adapters import (
DataSourceStage,
DisplayStage,
@@ -25,28 +24,14 @@ from engine.pipeline.core import PipelineContext
class TestDataSourceStage:
"""Test DataSourceStage adapter."""
- def test_datasource_stage_name(self):
- """DataSourceStage stores name correctly."""
+ def test_datasource_stage_properties(self):
+ """DataSourceStage has correct name, category, and capabilities."""
mock_source = MagicMock()
stage = DataSourceStage(mock_source, name="headlines")
+
assert stage.name == "headlines"
-
- def test_datasource_stage_category(self):
- """DataSourceStage has 'source' category."""
- mock_source = MagicMock()
- stage = DataSourceStage(mock_source, name="headlines")
assert stage.category == "source"
-
- def test_datasource_stage_capabilities(self):
- """DataSourceStage advertises source capability."""
- mock_source = MagicMock()
- stage = DataSourceStage(mock_source, name="headlines")
assert "source.headlines" in stage.capabilities
-
- def test_datasource_stage_dependencies(self):
- """DataSourceStage has no dependencies."""
- mock_source = MagicMock()
- stage = DataSourceStage(mock_source, name="headlines")
assert stage.dependencies == set()
def test_datasource_stage_process_calls_get_items(self):
@@ -64,7 +49,7 @@ class TestDataSourceStage:
assert result == mock_items
mock_source.get_items.assert_called_once()
- def test_datasource_stage_process_fallback_returns_data(self):
+ def test_datasource_stage_process_fallback(self):
"""DataSourceStage.process() returns data if no get_items method."""
mock_source = MagicMock(spec=[]) # No get_items method
stage = DataSourceStage(mock_source, name="headlines")
@@ -76,124 +61,68 @@ class TestDataSourceStage:
class TestDisplayStage:
- """Test DisplayStage adapter."""
+ """Test DisplayStage adapter using NullDisplay for real behavior."""
+
+ def test_display_stage_properties(self):
+ """DisplayStage has correct name, category, and capabilities."""
+ display = NullDisplay()
+ stage = DisplayStage(display, name="terminal")
- def test_display_stage_name(self):
- """DisplayStage stores name correctly."""
- mock_display = MagicMock()
- stage = DisplayStage(mock_display, name="terminal")
assert stage.name == "terminal"
-
- def test_display_stage_category(self):
- """DisplayStage has 'display' category."""
- mock_display = MagicMock()
- stage = DisplayStage(mock_display, name="terminal")
assert stage.category == "display"
-
- def test_display_stage_capabilities(self):
- """DisplayStage advertises display capability."""
- mock_display = MagicMock()
- stage = DisplayStage(mock_display, name="terminal")
assert "display.output" in stage.capabilities
-
- def test_display_stage_dependencies(self):
- """DisplayStage depends on render.output."""
- mock_display = MagicMock()
- stage = DisplayStage(mock_display, name="terminal")
assert "render.output" in stage.dependencies
- def test_display_stage_init(self):
- """DisplayStage.init() calls display.init() with dimensions."""
- mock_display = MagicMock()
- mock_display.init.return_value = True
- stage = DisplayStage(mock_display, name="terminal")
+ def test_display_stage_init_and_process(self):
+ """DisplayStage initializes display and processes buffer."""
+ from engine.pipeline.params import PipelineParams
+
+ display = NullDisplay()
+ stage = DisplayStage(display, name="terminal")
ctx = PipelineContext()
- ctx.params = MagicMock()
- ctx.params.viewport_width = 100
- ctx.params.viewport_height = 30
+ ctx.params = PipelineParams()
+ ctx.params.viewport_width = 80
+ ctx.params.viewport_height = 24
+ # Initialize
result = stage.init(ctx)
-
assert result is True
- mock_display.init.assert_called_once_with(100, 30, reuse=False)
- def test_display_stage_init_uses_defaults(self):
- """DisplayStage.init() uses defaults when params missing."""
- mock_display = MagicMock()
- mock_display.init.return_value = True
- stage = DisplayStage(mock_display, name="terminal")
+ # Process buffer
+ buffer = ["Line 1", "Line 2", "Line 3"]
+ output = stage.process(buffer, ctx)
+ assert output == buffer
- ctx = PipelineContext()
- ctx.params = None
+ # Verify display captured the buffer
+ assert display._last_buffer == buffer
- result = stage.init(ctx)
-
- assert result is True
- mock_display.init.assert_called_once_with(80, 24, reuse=False)
-
- def test_display_stage_process_calls_show(self):
- """DisplayStage.process() calls display.show() with data."""
- mock_display = MagicMock()
- stage = DisplayStage(mock_display, name="terminal")
-
- test_buffer = [[["A", "red"] for _ in range(80)] for _ in range(24)]
- ctx = PipelineContext()
- result = stage.process(test_buffer, ctx)
-
- assert result == test_buffer
- mock_display.show.assert_called_once_with(test_buffer)
-
- def test_display_stage_process_skips_none_data(self):
+ def test_display_stage_skips_none_data(self):
"""DisplayStage.process() skips show() if data is None."""
- mock_display = MagicMock()
- stage = DisplayStage(mock_display, name="terminal")
+ display = NullDisplay()
+ stage = DisplayStage(display, name="terminal")
ctx = PipelineContext()
result = stage.process(None, ctx)
assert result is None
- mock_display.show.assert_not_called()
-
- def test_display_stage_cleanup(self):
- """DisplayStage.cleanup() calls display.cleanup()."""
- mock_display = MagicMock()
- stage = DisplayStage(mock_display, name="terminal")
-
- stage.cleanup()
-
- mock_display.cleanup.assert_called_once()
+ assert display._last_buffer is None
class TestPassthroughStage:
"""Test PassthroughStage adapter."""
- def test_passthrough_stage_name(self):
- """PassthroughStage stores name correctly."""
+ def test_passthrough_stage_properties(self):
+ """PassthroughStage has correct properties."""
stage = PassthroughStage(name="test")
+
assert stage.name == "test"
-
- def test_passthrough_stage_category(self):
- """PassthroughStage has 'render' category."""
- stage = PassthroughStage()
assert stage.category == "render"
-
- def test_passthrough_stage_is_optional(self):
- """PassthroughStage is optional."""
- stage = PassthroughStage()
assert stage.optional is True
-
- def test_passthrough_stage_capabilities(self):
- """PassthroughStage advertises render output capability."""
- stage = PassthroughStage()
assert "render.output" in stage.capabilities
-
- def test_passthrough_stage_dependencies(self):
- """PassthroughStage depends on source."""
- stage = PassthroughStage()
assert "source" in stage.dependencies
- def test_passthrough_stage_process_returns_data_unchanged(self):
+ def test_passthrough_stage_process_unchanged(self):
"""PassthroughStage.process() returns data unchanged."""
stage = PassthroughStage()
ctx = PipelineContext()
@@ -210,32 +139,17 @@ class TestPassthroughStage:
class TestSourceItemsToBufferStage:
"""Test SourceItemsToBufferStage adapter."""
- def test_source_items_to_buffer_stage_name(self):
- """SourceItemsToBufferStage stores name correctly."""
+ def test_source_items_to_buffer_stage_properties(self):
+ """SourceItemsToBufferStage has correct properties."""
stage = SourceItemsToBufferStage(name="custom-name")
+
assert stage.name == "custom-name"
-
- def test_source_items_to_buffer_stage_category(self):
- """SourceItemsToBufferStage has 'render' category."""
- stage = SourceItemsToBufferStage()
assert stage.category == "render"
-
- def test_source_items_to_buffer_stage_is_optional(self):
- """SourceItemsToBufferStage is optional."""
- stage = SourceItemsToBufferStage()
assert stage.optional is True
-
- def test_source_items_to_buffer_stage_capabilities(self):
- """SourceItemsToBufferStage advertises render output capability."""
- stage = SourceItemsToBufferStage()
assert "render.output" in stage.capabilities
-
- def test_source_items_to_buffer_stage_dependencies(self):
- """SourceItemsToBufferStage depends on source."""
- stage = SourceItemsToBufferStage()
assert "source" in stage.dependencies
- def test_source_items_to_buffer_stage_process_single_line_item(self):
+ def test_source_items_to_buffer_stage_process_single_line(self):
"""SourceItemsToBufferStage converts single-line SourceItem."""
stage = SourceItemsToBufferStage()
ctx = PipelineContext()
@@ -247,10 +161,10 @@ class TestSourceItemsToBufferStage:
assert isinstance(result, list)
assert len(result) >= 1
- # Result should be lines of text
assert all(isinstance(line, str) for line in result)
+ assert "Single line content" in result[0]
- def test_source_items_to_buffer_stage_process_multiline_item(self):
+ def test_source_items_to_buffer_stage_process_multiline(self):
"""SourceItemsToBufferStage splits multiline SourceItem content."""
stage = SourceItemsToBufferStage()
ctx = PipelineContext()
@@ -283,63 +197,76 @@ class TestSourceItemsToBufferStage:
class TestEffectPluginStage:
- """Test EffectPluginStage adapter."""
+ """Test EffectPluginStage adapter with real effect plugins."""
- def test_effect_plugin_stage_name(self):
- """EffectPluginStage stores name correctly."""
- mock_effect = MagicMock()
- stage = EffectPluginStage(mock_effect, name="blur")
- assert stage.name == "blur"
+ def test_effect_plugin_stage_properties(self):
+ """EffectPluginStage has correct properties for real effects."""
+ discover_plugins()
+ registry = get_registry()
+ effect = registry.get("noise")
- def test_effect_plugin_stage_category(self):
- """EffectPluginStage has 'effect' category."""
- mock_effect = MagicMock()
- stage = EffectPluginStage(mock_effect, name="blur")
+ stage = EffectPluginStage(effect, name="noise")
+
+ assert stage.name == "noise"
assert stage.category == "effect"
-
- def test_effect_plugin_stage_is_not_optional(self):
- """EffectPluginStage is required when configured."""
- mock_effect = MagicMock()
- stage = EffectPluginStage(mock_effect, name="blur")
assert stage.optional is False
-
- def test_effect_plugin_stage_capabilities(self):
- """EffectPluginStage advertises effect capability with name."""
- mock_effect = MagicMock()
- stage = EffectPluginStage(mock_effect, name="blur")
- assert "effect.blur" in stage.capabilities
-
- def test_effect_plugin_stage_dependencies(self):
- """EffectPluginStage has no static dependencies."""
- mock_effect = MagicMock()
- stage = EffectPluginStage(mock_effect, name="blur")
- # EffectPluginStage has empty dependencies - they are resolved dynamically
- assert stage.dependencies == set()
-
- def test_effect_plugin_stage_stage_type(self):
- """EffectPluginStage.stage_type returns effect for non-HUD."""
- mock_effect = MagicMock()
- stage = EffectPluginStage(mock_effect, name="blur")
- assert stage.stage_type == "effect"
+ assert "effect.noise" in stage.capabilities
def test_effect_plugin_stage_hud_special_handling(self):
"""EffectPluginStage has special handling for HUD effect."""
- mock_effect = MagicMock()
- stage = EffectPluginStage(mock_effect, name="hud")
+ discover_plugins()
+ registry = get_registry()
+ hud_effect = registry.get("hud")
+
+ stage = EffectPluginStage(hud_effect, name="hud")
+
assert stage.stage_type == "overlay"
assert stage.is_overlay is True
assert stage.render_order == 100
- def test_effect_plugin_stage_process(self):
- """EffectPluginStage.process() calls effect.process()."""
- mock_effect = MagicMock()
- mock_effect.process.return_value = "processed_data"
+ def test_effect_plugin_stage_process_real_effect(self):
+ """EffectPluginStage.process() calls real effect.process()."""
+ from engine.pipeline.params import PipelineParams
- stage = EffectPluginStage(mock_effect, name="blur")
+ discover_plugins()
+ registry = get_registry()
+ effect = registry.get("noise")
+
+ stage = EffectPluginStage(effect, name="noise")
ctx = PipelineContext()
- test_buffer = "test_buffer"
+ ctx.params = PipelineParams()
+ ctx.params.viewport_width = 80
+ ctx.params.viewport_height = 24
+ ctx.params.frame_number = 0
+ test_buffer = ["Line 1", "Line 2", "Line 3"]
result = stage.process(test_buffer, ctx)
- assert result == "processed_data"
- mock_effect.process.assert_called_once()
+ # Should return a list (possibly modified buffer)
+ assert isinstance(result, list)
+ # Noise effect should preserve line count
+ assert len(result) == len(test_buffer)
+
+ def test_effect_plugin_stage_process_with_real_figment(self):
+ """EffectPluginStage processes figment effect correctly."""
+ from engine.pipeline.params import PipelineParams
+
+ discover_plugins()
+ registry = get_registry()
+ figment = registry.get("figment")
+
+ stage = EffectPluginStage(figment, name="figment")
+ ctx = PipelineContext()
+ ctx.params = PipelineParams()
+ ctx.params.viewport_width = 80
+ ctx.params.viewport_height = 24
+ ctx.params.frame_number = 0
+
+ test_buffer = ["Line 1", "Line 2", "Line 3"]
+ result = stage.process(test_buffer, ctx)
+
+ # Figment is an overlay effect
+ assert stage.is_overlay is True
+ assert stage.stage_type == "overlay"
+ # Result should be a list
+ assert isinstance(result, list)
diff --git a/tests/test_figment_effect.py b/tests/test_figment_effect.py
new file mode 100644
index 0000000..0542a96
--- /dev/null
+++ b/tests/test_figment_effect.py
@@ -0,0 +1,103 @@
+"""
+Tests for the FigmentOverlayEffect plugin.
+"""
+
+import pytest
+
+from engine.effects.plugins import discover_plugins
+from engine.effects.registry import get_registry
+from engine.effects.types import EffectConfig, create_effect_context
+from engine.pipeline.adapters import EffectPluginStage
+
+
+class TestFigmentEffect:
+ """Tests for FigmentOverlayEffect."""
+
+ def setup_method(self):
+ """Discover plugins before each test."""
+ discover_plugins()
+
+ def test_figment_plugin_discovered(self):
+ """Figment plugin is discovered and registered."""
+ registry = get_registry()
+ figment = registry.get("figment")
+ assert figment is not None
+ assert figment.name == "figment"
+
+ def test_figment_plugin_enabled_by_default(self):
+ """Figment plugin is enabled by default."""
+ registry = get_registry()
+ figment = registry.get("figment")
+ assert figment.config.enabled is True
+
+ def test_figment_renders_overlay(self):
+ """Figment renders SVG overlay after interval."""
+ registry = get_registry()
+ figment = registry.get("figment")
+
+ # Configure with short interval for testing
+ config = EffectConfig(
+ enabled=True,
+ intensity=1.0,
+ params={
+ "interval_secs": 0.1, # 100ms
+ "display_secs": 1.0,
+ "figment_dir": "figments",
+ },
+ )
+ figment.configure(config)
+
+ # Create test buffer
+ buf = [" " * 80 for _ in range(24)]
+
+ # Create context
+ ctx = create_effect_context(
+ terminal_width=80,
+ terminal_height=24,
+ frame_number=0,
+ )
+
+ # Process frames until figment renders
+ for i in range(20):
+ result = figment.process(buf, ctx)
+ if len(result) > len(buf):
+ # Figment rendered overlay
+ assert len(result) > len(buf)
+ # Check that overlay lines contain ANSI escape codes
+ overlay_lines = result[len(buf) :]
+ assert len(overlay_lines) > 0
+ # First overlay line should contain cursor positioning
+ assert "\x1b[" in overlay_lines[0]
+ assert "H" in overlay_lines[0]
+ return
+ ctx.frame_number += 1
+
+ pytest.fail("Figment did not render in 20 frames")
+
+ def test_figment_stage_capabilities(self):
+ """EffectPluginStage wraps figment correctly."""
+ registry = get_registry()
+ figment = registry.get("figment")
+
+ stage = EffectPluginStage(figment, name="figment")
+ assert "effect.figment" in stage.capabilities
+
+ def test_figment_configure_preserves_params(self):
+ """Figment configuration preserves figment_dir."""
+ registry = get_registry()
+ figment = registry.get("figment")
+
+ # Configure without figment_dir
+ config = EffectConfig(
+ enabled=True,
+ intensity=1.0,
+ params={
+ "interval_secs": 30.0,
+ "display_secs": 3.0,
+ },
+ )
+ figment.configure(config)
+
+ # figment_dir should be preserved
+ assert "figment_dir" in figment.config.params
+ assert figment.config.params["figment_dir"] == "figments"
diff --git a/tests/test_figment_pipeline.py b/tests/test_figment_pipeline.py
new file mode 100644
index 0000000..90bca46
--- /dev/null
+++ b/tests/test_figment_pipeline.py
@@ -0,0 +1,79 @@
+"""
+Integration tests for figment effect in the pipeline.
+"""
+
+from engine.effects.plugins import discover_plugins
+from engine.effects.registry import get_registry
+from engine.pipeline import Pipeline, PipelineConfig, get_preset
+from engine.pipeline.adapters import (
+ EffectPluginStage,
+ SourceItemsToBufferStage,
+ create_stage_from_display,
+)
+from engine.pipeline.controller import PipelineRunner
+
+
+class TestFigmentPipeline:
+ """Tests for figment effect in pipeline integration."""
+
+ def setup_method(self):
+ """Discover plugins before each test."""
+ discover_plugins()
+
+ def test_figment_in_pipeline(self):
+ """Figment effect can be added to pipeline."""
+ registry = get_registry()
+ figment = registry.get("figment")
+
+ pipeline = Pipeline(
+ config=PipelineConfig(
+ source="empty",
+ display="null",
+ camera="feed",
+ effects=["figment"],
+ )
+ )
+
+ # Add source stage
+ from engine.data_sources.sources import EmptyDataSource
+ from engine.pipeline.adapters import DataSourceStage
+
+ empty_source = EmptyDataSource(width=80, height=24)
+ pipeline.add_stage("source", DataSourceStage(empty_source, name="empty"))
+
+ # Add render stage
+ pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
+
+ # Add figment effect stage
+ pipeline.add_stage("effect_figment", EffectPluginStage(figment, name="figment"))
+
+ # Add display stage
+ from engine.display import DisplayRegistry
+
+ display = DisplayRegistry.create("null")
+ display.init(0, 0)
+ pipeline.add_stage("display", create_stage_from_display(display, "null"))
+
+ # Build and initialize pipeline
+ pipeline.build()
+ assert pipeline.initialize()
+
+ # Use PipelineRunner to step through frames
+ runner = PipelineRunner(pipeline)
+ runner.start()
+
+ # Run pipeline for a few frames
+ for i in range(10):
+ runner.step()
+ # Result might be None for null display, but that's okay
+
+ # Verify pipeline ran without errors
+ assert pipeline.context.params.frame_number == 10
+
+ def test_figment_preset(self):
+ """Figment preset is properly configured."""
+ preset = get_preset("test-figment")
+ assert preset is not None
+ assert preset.source == "empty"
+ assert preset.display == "terminal"
+ assert "figment" in preset.effects
diff --git a/tests/test_figment_render.py b/tests/test_figment_render.py
new file mode 100644
index 0000000..910fdb6
--- /dev/null
+++ b/tests/test_figment_render.py
@@ -0,0 +1,104 @@
+"""
+Tests to verify figment rendering in the pipeline.
+"""
+
+from engine.effects.plugins import discover_plugins
+from engine.effects.registry import get_registry
+from engine.effects.types import EffectConfig
+from engine.pipeline import Pipeline, PipelineConfig
+from engine.pipeline.adapters import (
+ EffectPluginStage,
+ SourceItemsToBufferStage,
+ create_stage_from_display,
+)
+from engine.pipeline.controller import PipelineRunner
+
+
+def test_figment_renders_in_pipeline():
+ """Verify figment renders overlay in pipeline."""
+ # Discover plugins
+ discover_plugins()
+
+ # Get figment plugin
+ registry = get_registry()
+ figment = registry.get("figment")
+
+ # Configure with short interval for testing
+ config = EffectConfig(
+ enabled=True,
+ intensity=1.0,
+ params={
+ "interval_secs": 0.1, # 100ms
+ "display_secs": 1.0,
+ "figment_dir": "figments",
+ },
+ )
+ figment.configure(config)
+
+ # Create pipeline
+ pipeline = Pipeline(
+ config=PipelineConfig(
+ source="empty",
+ display="null",
+ camera="feed",
+ effects=["figment"],
+ )
+ )
+
+ # Add source stage
+ from engine.data_sources.sources import EmptyDataSource
+ from engine.pipeline.adapters import DataSourceStage
+
+ empty_source = EmptyDataSource(width=80, height=24)
+ pipeline.add_stage("source", DataSourceStage(empty_source, name="empty"))
+
+ # Add render stage
+ pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
+
+ # Add figment effect stage
+ pipeline.add_stage("effect_figment", EffectPluginStage(figment, name="figment"))
+
+ # Add display stage
+ from engine.display import DisplayRegistry
+
+ display = DisplayRegistry.create("null")
+ display.init(0, 0)
+ pipeline.add_stage("display", create_stage_from_display(display, "null"))
+
+ # Build and initialize pipeline
+ pipeline.build()
+ assert pipeline.initialize()
+
+ # Use PipelineRunner to step through frames
+ runner = PipelineRunner(pipeline)
+ runner.start()
+
+ # Run pipeline until figment renders (or timeout)
+ figment_rendered = False
+ for i in range(30):
+ runner.step()
+
+ # Check if figment rendered by inspecting the display's internal buffer
+ # The null display stores the last rendered buffer
+ if hasattr(display, "_last_buffer") and display._last_buffer:
+ buffer = display._last_buffer
+ # Check if buffer contains ANSI escape codes (indicating figment overlay)
+ # Figment adds overlay lines at the end of the buffer
+ for line in buffer:
+ if "\x1b[" in line:
+ figment_rendered = True
+ print(f"Figment rendered at frame {i}")
+ # Print first few lines containing escape codes
+ for j, line in enumerate(buffer[:10]):
+ if "\x1b[" in line:
+ print(f"Line {j}: {repr(line[:80])}")
+ break
+ if figment_rendered:
+ break
+
+ assert figment_rendered, "Figment did not render in 30 frames"
+
+
+if __name__ == "__main__":
+ test_figment_renders_in_pipeline()
+ print("Test passed!")