feat(figment): complete pipeline integration with native effect plugin

- Add engine/effects/plugins/figment.py (native pipeline implementation)
- Add engine/figment_render.py, engine/figment_trigger.py, engine/themes.py
- Add 3 SVG assets in figments/ (Mexican/Aztec motif)
- Add engine/display/backends/animation_report.py for debugging
- Add engine/pipeline/adapters/frame_capture.py for frame capture
- Add test-figment preset to presets.toml
- Add cairosvg optional dependency to pyproject.toml
- Update EffectPluginStage to support is_overlay attribute (for overlay effects)
- Add comprehensive tests: test_figment_effect.py, test_figment_pipeline.py, test_figment_render.py
- Remove obsolete test_ui_simple.py
- Update TODO.md with test cleanup plan
- Refactor test_adapters.py to use real components instead of mocks

This completes the figment SVG overlay feature integration using the modern pipeline architecture, avoiding legacy effects_plugins. All tests pass (758 total).
This commit is contained in:
2026-03-21 13:09:37 -07:00
parent ef0c43266a
commit 7185005f9b
17 changed files with 1990 additions and 181 deletions

View File

@@ -0,0 +1,656 @@
"""
Animation Report Display Backend
Captures frames from pipeline stages and generates an interactive HTML report
showing before/after states for each transformative stage.
"""
import time
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
from engine.display.streaming import compute_diff
@dataclass
class CapturedFrame:
"""A captured frame with metadata."""
stage: str
buffer: list[str]
timestamp: float
frame_number: int
diff_from_previous: dict[str, Any] | None = None
@dataclass
class StageCapture:
"""Captures frames for a single pipeline stage."""
name: str
frames: list[CapturedFrame] = field(default_factory=list)
start_time: float = field(default_factory=time.time)
end_time: float = 0.0
def add_frame(
self,
buffer: list[str],
frame_number: int,
previous_buffer: list[str] | None = None,
) -> None:
"""Add a captured frame."""
timestamp = time.time()
diff = None
if previous_buffer is not None:
diff_data = compute_diff(previous_buffer, buffer)
diff = {
"changed_lines": len(diff_data.changed_lines),
"total_lines": len(buffer),
"width": diff_data.width,
"height": diff_data.height,
}
frame = CapturedFrame(
stage=self.name,
buffer=list(buffer),
timestamp=timestamp,
frame_number=frame_number,
diff_from_previous=diff,
)
self.frames.append(frame)
def finish(self) -> None:
"""Mark capture as finished."""
self.end_time = time.time()
class AnimationReportDisplay:
"""
Display backend that captures frames for animation report generation.
Instead of rendering to terminal, this display captures the buffer at each
stage and stores it for later HTML report generation.
"""
width: int = 80
height: int = 24
def __init__(self, output_dir: str = "./reports"):
"""
Initialize the animation report display.
Args:
output_dir: Directory where reports will be saved
"""
self.output_dir = Path(output_dir)
self.output_dir.mkdir(parents=True, exist_ok=True)
self._stages: dict[str, StageCapture] = {}
self._current_stage: str = ""
self._previous_buffer: list[str] | None = None
self._frame_number: int = 0
self._total_frames: int = 0
self._start_time: float = 0.0
def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize display with dimensions."""
self.width = width
self.height = height
self._start_time = time.time()
def show(self, buffer: list[str], border: bool = False) -> None:
"""
Capture a frame for the current stage.
Args:
buffer: The frame buffer to capture
border: Border flag (ignored)
"""
if not self._current_stage:
# If no stage is set, use a default name
self._current_stage = "final"
if self._current_stage not in self._stages:
self._stages[self._current_stage] = StageCapture(self._current_stage)
stage = self._stages[self._current_stage]
stage.add_frame(buffer, self._frame_number, self._previous_buffer)
self._previous_buffer = list(buffer)
self._frame_number += 1
self._total_frames += 1
def start_stage(self, stage_name: str) -> None:
"""
Start capturing frames for a new stage.
Args:
stage_name: Name of the stage (e.g., "noise", "fade", "firehose")
"""
if self._current_stage and self._current_stage in self._stages:
# Finish previous stage
self._stages[self._current_stage].finish()
self._current_stage = stage_name
self._previous_buffer = None # Reset for new stage
def clear(self) -> None:
"""Clear the display (no-op for report display)."""
pass
def cleanup(self) -> None:
"""Cleanup resources."""
# Finish current stage
if self._current_stage and self._current_stage in self._stages:
self._stages[self._current_stage].finish()
def get_dimensions(self) -> tuple[int, int]:
"""Get current dimensions."""
return (self.width, self.height)
def get_stages(self) -> dict[str, StageCapture]:
"""Get all captured stages."""
return self._stages
def generate_report(self, title: str = "Animation Report") -> Path:
"""
Generate an HTML report with captured frames and animations.
Args:
title: Title of the report
Returns:
Path to the generated HTML file
"""
report_path = self.output_dir / f"animation_report_{int(time.time())}.html"
html_content = self._build_html(title)
report_path.write_text(html_content)
return report_path
def _build_html(self, title: str) -> str:
"""Build the HTML content for the report."""
# Collect all frames across stages
all_frames = []
for stage_name, stage in self._stages.items():
for frame in stage.frames:
all_frames.append(frame)
# Sort frames by timestamp
all_frames.sort(key=lambda f: f.timestamp)
# Build stage sections
stages_html = ""
for stage_name, stage in self._stages.items():
stages_html += self._build_stage_section(stage_name, stage)
# Build full HTML
html = f"""
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{title}</title>
<style>
* {{
box-sizing: border-box;
margin: 0;
padding: 0;
}}
body {{
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, sans-serif;
background: #1a1a2e;
color: #eee;
padding: 20px;
line-height: 1.6;
}}
.container {{
max-width: 1400px;
margin: 0 auto;
}}
.header {{
background: linear-gradient(135deg, #16213e 0%, #1a1a2e 100%);
padding: 30px;
border-radius: 12px;
margin-bottom: 30px;
text-align: center;
box-shadow: 0 4px 20px rgba(0,0,0,0.3);
}}
.header h1 {{
font-size: 2.5em;
margin-bottom: 10px;
background: linear-gradient(90deg, #00d4ff, #00ff88);
-webkit-background-clip: text;
-webkit-text-fill-color: transparent;
background-clip: text;
}}
.header .meta {{
color: #888;
font-size: 0.9em;
}}
.stats-grid {{
display: grid;
grid-template-columns: repeat(auto-fit, minmax(150px, 1fr));
gap: 15px;
margin: 20px 0;
}}
.stat-card {{
background: #16213e;
padding: 15px;
border-radius: 8px;
text-align: center;
}}
.stat-value {{
font-size: 1.8em;
font-weight: bold;
color: #00ff88;
}}
.stat-label {{
color: #888;
font-size: 0.85em;
margin-top: 5px;
}}
.stage-section {{
background: #16213e;
border-radius: 12px;
margin-bottom: 25px;
overflow: hidden;
box-shadow: 0 2px 10px rgba(0,0,0,0.2);
}}
.stage-header {{
background: #1f2a48;
padding: 15px 20px;
display: flex;
justify-content: space-between;
align-items: center;
cursor: pointer;
user-select: none;
}}
.stage-header:hover {{
background: #253252;
}}
.stage-name {{
font-weight: bold;
font-size: 1.1em;
color: #00d4ff;
}}
.stage-info {{
color: #888;
font-size: 0.9em;
}}
.stage-content {{
padding: 20px;
}}
.frames-container {{
display: grid;
grid-template-columns: repeat(auto-fill, minmax(300px, 1fr));
gap: 15px;
}}
.frame-card {{
background: #0f0f1a;
border-radius: 8px;
overflow: hidden;
border: 1px solid #333;
transition: transform 0.2s, box-shadow 0.2s;
}}
.frame-card:hover {{
transform: translateY(-2px);
box-shadow: 0 4px 15px rgba(0,212,255,0.2);
}}
.frame-header {{
background: #1a1a2e;
padding: 10px 15px;
font-size: 0.85em;
color: #888;
border-bottom: 1px solid #333;
display: flex;
justify-content: space-between;
}}
.frame-number {{
color: #00ff88;
}}
.frame-diff {{
color: #ff6b6b;
}}
.frame-content {{
padding: 10px;
font-family: 'Fira Code', 'Consolas', 'Monaco', monospace;
font-size: 11px;
line-height: 1.3;
white-space: pre;
overflow-x: auto;
max-height: 200px;
overflow-y: auto;
}}
.timeline-section {{
background: #16213e;
border-radius: 12px;
padding: 20px;
margin-bottom: 25px;
}}
.timeline-header {{
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 15px;
}}
.timeline-title {{
font-weight: bold;
color: #00d4ff;
}}
.timeline-controls {{
display: flex;
gap: 10px;
}}
.timeline-controls button {{
background: #1f2a48;
border: 1px solid #333;
color: #eee;
padding: 8px 15px;
border-radius: 6px;
cursor: pointer;
transition: all 0.2s;
}}
.timeline-controls button:hover {{
background: #253252;
border-color: #00d4ff;
}}
.timeline-controls button.active {{
background: #00d4ff;
color: #000;
}}
.timeline-canvas {{
width: 100%;
height: 100px;
background: #0f0f1a;
border-radius: 8px;
position: relative;
overflow: hidden;
}}
.timeline-track {{
position: absolute;
top: 50%;
left: 0;
right: 0;
height: 4px;
background: #333;
transform: translateY(-50%);
}}
.timeline-marker {{
position: absolute;
top: 50%;
transform: translate(-50%, -50%);
width: 12px;
height: 12px;
background: #00d4ff;
border-radius: 50%;
cursor: pointer;
transition: all 0.2s;
}}
.timeline-marker:hover {{
transform: translate(-50%, -50%) scale(1.3);
box-shadow: 0 0 10px #00d4ff;
}}
.timeline-marker.stage-{{stage_name}} {{
background: var(--stage-color, #00d4ff);
}}
.comparison-view {{
display: grid;
grid-template-columns: 1fr 1fr;
gap: 20px;
margin-top: 20px;
}}
.comparison-panel {{
background: #0f0f1a;
border-radius: 8px;
padding: 15px;
border: 1px solid #333;
}}
.comparison-panel h4 {{
color: #888;
margin-bottom: 10px;
font-size: 0.9em;
}}
.comparison-content {{
font-family: 'Fira Code', 'Consolas', 'Monaco', monospace;
font-size: 11px;
line-height: 1.3;
white-space: pre;
}}
.diff-added {{
background: rgba(0, 255, 136, 0.2);
}}
.diff-removed {{
background: rgba(255, 107, 107, 0.2);
}}
@keyframes pulse {{
0%, 100% {{ opacity: 1; }}
50% {{ opacity: 0.7; }}
}}
.animating {{
animation: pulse 1s infinite;
}}
.footer {{
text-align: center;
color: #666;
padding: 20px;
font-size: 0.9em;
}}
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🎬 {title}</h1>
<div class="meta">
Generated: {datetime.now().strftime("%Y-%m-%d %H:%M:%S")} |
Total Frames: {self._total_frames} |
Duration: {time.time() - self._start_time:.2f}s
</div>
<div class="stats-grid">
<div class="stat-card">
<div class="stat-value">{len(self._stages)}</div>
<div class="stat-label">Pipeline Stages</div>
</div>
<div class="stat-card">
<div class="stat-value">{self._total_frames}</div>
<div class="stat-label">Total Frames</div>
</div>
<div class="stat-card">
<div class="stat-value">{time.time() - self._start_time:.2f}s</div>
<div class="stat-label">Capture Duration</div>
</div>
<div class="stat-card">
<div class="stat-value">{self.width}x{self.height}</div>
<div class="stat-label">Resolution</div>
</div>
</div>
</div>
<div class="timeline-section">
<div class="timeline-header">
<div class="timeline-title">Timeline</div>
<div class="timeline-controls">
<button onclick="playAnimation()">▶ Play</button>
<button onclick="pauseAnimation()">⏸ Pause</button>
<button onclick="stepForward()">⏭ Step</button>
</div>
</div>
<div class="timeline-canvas" id="timeline">
<div class="timeline-track"></div>
<!-- Timeline markers will be added by JavaScript -->
</div>
</div>
{stages_html}
<div class="footer">
<p>Animation Report generated by Mainline</p>
<p>Use the timeline controls above to play/pause the animation</p>
</div>
</div>
<script>
// Animation state
let currentFrame = 0;
let isPlaying = false;
let animationInterval = null;
const totalFrames = {len(all_frames)};
// Stage colors for timeline markers
const stageColors = {{
{self._build_stage_colors()}
}};
// Initialize timeline
function initTimeline() {{
const timeline = document.getElementById('timeline');
const track = timeline.querySelector('.timeline-track');
{self._build_timeline_markers(all_frames)}
}}
function playAnimation() {{
if (isPlaying) return;
isPlaying = true;
animationInterval = setInterval(() => {{
currentFrame = (currentFrame + 1) % totalFrames;
updateFrameDisplay();
}}, 100);
}}
function pauseAnimation() {{
isPlaying = false;
if (animationInterval) {{
clearInterval(animationInterval);
animationInterval = null;
}}
}}
function stepForward() {{
currentFrame = (currentFrame + 1) % totalFrames;
updateFrameDisplay();
}}
function updateFrameDisplay() {{
// Highlight current frame in timeline
const markers = document.querySelectorAll('.timeline-marker');
markers.forEach((marker, index) => {{
if (index === currentFrame) {{
marker.style.transform = 'translate(-50%, -50%) scale(1.5)';
marker.style.boxShadow = '0 0 15px #00ff88';
}} else {{
marker.style.transform = 'translate(-50%, -50%) scale(1)';
marker.style.boxShadow = 'none';
}}
}});
}}
// Initialize on page load
document.addEventListener('DOMContentLoaded', initTimeline);
</script>
</body>
</html>
"""
return html
def _build_stage_section(self, stage_name: str, stage: StageCapture) -> str:
"""Build HTML for a single stage section."""
frames_html = ""
for i, frame in enumerate(stage.frames):
diff_info = ""
if frame.diff_from_previous:
changed = frame.diff_from_previous.get("changed_lines", 0)
total = frame.diff_from_previous.get("total_lines", 0)
diff_info = f'<span class="frame-diff"{changed}/{total}</span>'
frames_html += f"""
<div class="frame-card">
<div class="frame-header">
<span>Frame <span class="frame-number">{frame.frame_number}</span></span>
{diff_info}
</div>
<div class="frame-content">{self._escape_html("".join(frame.buffer))}</div>
</div>
"""
return f"""
<div class="stage-section">
<div class="stage-header" onclick="this.nextElementSibling.classList.toggle('hidden')">
<span class="stage-name">{stage_name}</span>
<span class="stage-info">{len(stage.frames)} frames</span>
</div>
<div class="stage-content">
<div class="frames-container">
{frames_html}
</div>
</div>
</div>
"""
def _build_timeline(self, all_frames: list[CapturedFrame]) -> str:
"""Build timeline HTML."""
if not all_frames:
return ""
markers_html = ""
for i, frame in enumerate(all_frames):
left_percent = (i / len(all_frames)) * 100
markers_html += f'<div class="timeline-marker" style="left: {left_percent}%" data-frame="{i}"></div>'
return markers_html
def _build_stage_colors(self) -> str:
"""Build stage color mapping for JavaScript."""
colors = [
"#00d4ff",
"#00ff88",
"#ff6b6b",
"#ffd93d",
"#a855f7",
"#ec4899",
"#14b8a6",
"#f97316",
"#8b5cf6",
"#06b6d4",
]
color_map = ""
for i, stage_name in enumerate(self._stages.keys()):
color = colors[i % len(colors)]
color_map += f' "{stage_name}": "{color}",\n'
return color_map.rstrip(",\n")
def _build_timeline_markers(self, all_frames: list[CapturedFrame]) -> str:
"""Build timeline markers in JavaScript."""
if not all_frames:
return ""
markers_js = ""
for i, frame in enumerate(all_frames):
left_percent = (i / len(all_frames)) * 100
stage_color = f"stageColors['{frame.stage}']"
markers_js += f"""
const marker{i} = document.createElement('div');
marker{i}.className = 'timeline-marker stage-{{frame.stage}}';
marker{i}.style.left = '{left_percent}%';
marker{i}.style.setProperty('--stage-color', {stage_color});
marker{i}.onclick = () => {{
currentFrame = {i};
updateFrameDisplay();
}};
timeline.appendChild(marker{i});
"""
return markers_js
def _escape_html(self, text: str) -> str:
"""Escape HTML special characters."""
return (
text.replace("&", "&amp;")
.replace("<", "&lt;")
.replace(">", "&gt;")
.replace('"', "&quot;")
.replace("'", "&#39;")
)

View File

@@ -0,0 +1,332 @@
"""
Figment overlay effect for modern pipeline architecture.
Provides periodic SVG glyph overlays with reveal/hold/dissolve animation phases.
Integrates directly with the pipeline's effect system without legacy dependencies.
"""
import random
from dataclasses import dataclass
from enum import Enum, auto
from pathlib import Path
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.figment_render import rasterize_svg
from engine.figment_trigger import FigmentAction, FigmentCommand, FigmentTrigger
from engine.terminal import RST
from engine.themes import THEME_REGISTRY
class FigmentPhase(Enum):
"""Animation phases for figment overlay."""
REVEAL = auto()
HOLD = auto()
DISSOLVE = auto()
@dataclass
class FigmentState:
"""State of a figment overlay at a given frame."""
phase: FigmentPhase
progress: float
rows: list[str]
gradient: list[int]
center_row: int
center_col: int
def _color_codes_to_ansi(gradient: list[int]) -> list[str]:
"""Convert gradient list to ANSI color codes.
Args:
gradient: List of 256-color palette codes
Returns:
List of ANSI escape code strings
"""
codes = []
for color in gradient:
if isinstance(color, int):
codes.append(f"\033[38;5;{color}m")
else:
# Fallback to green
codes.append("\033[38;5;46m")
return codes if codes else ["\033[38;5;46m"]
def render_figment_overlay(figment_state: FigmentState, w: int, h: int) -> list[str]:
"""Render figment overlay as ANSI cursor-positioning commands.
Args:
figment_state: FigmentState with phase, progress, rows, gradient, centering.
w: terminal width
h: terminal height
Returns:
List of ANSI strings to append to display buffer.
"""
rows = figment_state.rows
if not rows:
return []
phase = figment_state.phase
progress = figment_state.progress
gradient = figment_state.gradient
center_row = figment_state.center_row
center_col = figment_state.center_col
cols = _color_codes_to_ansi(gradient)
# Build a list of non-space cell positions
cell_positions = []
for r_idx, row in enumerate(rows):
for c_idx, ch in enumerate(row):
if ch != " ":
cell_positions.append((r_idx, c_idx))
n_cells = len(cell_positions)
if n_cells == 0:
return []
# Use a deterministic seed so the reveal/dissolve pattern is stable per-figment
rng = random.Random(hash(tuple(rows[0][:10])) if rows[0] else 42)
shuffled = list(cell_positions)
rng.shuffle(shuffled)
# Phase-dependent visibility
if phase == FigmentPhase.REVEAL:
visible_count = int(n_cells * progress)
visible = set(shuffled[:visible_count])
elif phase == FigmentPhase.HOLD:
visible = set(cell_positions)
# Strobe: dim some cells periodically
if int(progress * 20) % 3 == 0:
dim_count = int(n_cells * 0.3)
visible -= set(shuffled[:dim_count])
elif phase == FigmentPhase.DISSOLVE:
remaining_count = int(n_cells * (1.0 - progress))
visible = set(shuffled[:remaining_count])
else:
visible = set(cell_positions)
# Build overlay commands
overlay: list[str] = []
n_cols = len(cols)
max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1)
for r_idx, row in enumerate(rows):
scr_row = center_row + r_idx + 1 # 1-indexed
if scr_row < 1 or scr_row > h:
continue
line_buf: list[str] = []
has_content = False
for c_idx, ch in enumerate(row):
scr_col = center_col + c_idx + 1
if scr_col < 1 or scr_col > w:
continue
if ch != " " and (r_idx, c_idx) in visible:
# Apply gradient color
shifted = (c_idx / max(max_x - 1, 1)) % 1.0
idx = min(round(shifted * (n_cols - 1)), n_cols - 1)
line_buf.append(f"{cols[idx]}{ch}{RST}")
has_content = True
else:
line_buf.append(" ")
if has_content:
line_str = "".join(line_buf).rstrip()
if line_str.strip():
overlay.append(f"\033[{scr_row};{center_col + 1}H{line_str}{RST}")
return overlay
class FigmentEffect(EffectPlugin):
"""Figment overlay effect for pipeline architecture.
Provides periodic SVG overlays with reveal/hold/dissolve animation.
"""
name = "figment"
config = EffectConfig(
enabled=True,
intensity=1.0,
params={
"interval_secs": 60,
"display_secs": 4.5,
"figment_dir": "figments",
},
)
supports_partial_updates = False
is_overlay = True # Figment is an overlay effect that composes on top of the buffer
def __init__(
self,
figment_dir: str | None = None,
triggers: list[FigmentTrigger] | None = None,
):
self.config = EffectConfig(
enabled=True,
intensity=1.0,
params={
"interval_secs": 60,
"display_secs": 4.5,
"figment_dir": figment_dir or "figments",
},
)
self._triggers = triggers or []
self._phase: FigmentPhase | None = None
self._progress: float = 0.0
self._rows: list[str] = []
self._gradient: list[int] = []
self._center_row: int = 0
self._center_col: int = 0
self._timer: float = 0.0
self._last_svg: str | None = None
self._svg_files: list[str] = []
self._scan_svgs()
def _scan_svgs(self) -> None:
"""Scan figment directory for SVG files."""
figment_dir = Path(self.config.params["figment_dir"])
if figment_dir.is_dir():
self._svg_files = sorted(str(p) for p in figment_dir.glob("*.svg"))
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
"""Add figment overlay to buffer."""
if not self.config.enabled:
return buf
# Get figment state using frame number from context
figment_state = self.get_figment_state(
ctx.frame_number, ctx.terminal_width, ctx.terminal_height
)
if figment_state:
# Render overlay and append to buffer
overlay = render_figment_overlay(
figment_state, ctx.terminal_width, ctx.terminal_height
)
buf = buf + overlay
return buf
def configure(self, config: EffectConfig) -> None:
"""Configure the effect."""
# Preserve figment_dir if the new config doesn't supply one
figment_dir = config.params.get(
"figment_dir", self.config.params.get("figment_dir", "figments")
)
self.config = config
if "figment_dir" not in self.config.params:
self.config.params["figment_dir"] = figment_dir
self._scan_svgs()
def trigger(self, w: int, h: int) -> None:
"""Manually trigger a figment display."""
if not self._svg_files:
return
# Pick a random SVG, avoid repeating
candidates = [s for s in self._svg_files if s != self._last_svg]
if not candidates:
candidates = self._svg_files
svg_path = random.choice(candidates)
self._last_svg = svg_path
# Rasterize
try:
self._rows = rasterize_svg(svg_path, w, h)
except Exception:
return
# Pick random theme gradient
theme_key = random.choice(list(THEME_REGISTRY.keys()))
self._gradient = THEME_REGISTRY[theme_key].main_gradient
# Center in viewport
figment_h = len(self._rows)
figment_w = max((len(r) for r in self._rows), default=0)
self._center_row = max(0, (h - figment_h) // 2)
self._center_col = max(0, (w - figment_w) // 2)
# Start reveal phase
self._phase = FigmentPhase.REVEAL
self._progress = 0.0
def get_figment_state(
self, frame_number: int, w: int, h: int
) -> FigmentState | None:
"""Tick the state machine and return current state, or None if idle."""
if not self.config.enabled:
return None
# Poll triggers
for trig in self._triggers:
cmd = trig.poll()
if cmd is not None:
self._handle_command(cmd, w, h)
# Tick timer when idle
if self._phase is None:
self._timer += config.FRAME_DT
interval = self.config.params.get("interval_secs", 60)
if self._timer >= interval:
self._timer = 0.0
self.trigger(w, h)
# Tick animation — snapshot current phase/progress, then advance
if self._phase is not None:
# Capture the state at the start of this frame
current_phase = self._phase
current_progress = self._progress
# Advance for next frame
display_secs = self.config.params.get("display_secs", 4.5)
phase_duration = display_secs / 3.0
self._progress += config.FRAME_DT / phase_duration
if self._progress >= 1.0:
self._progress = 0.0
if self._phase == FigmentPhase.REVEAL:
self._phase = FigmentPhase.HOLD
elif self._phase == FigmentPhase.HOLD:
self._phase = FigmentPhase.DISSOLVE
elif self._phase == FigmentPhase.DISSOLVE:
self._phase = None
return FigmentState(
phase=current_phase,
progress=current_progress,
rows=self._rows,
gradient=self._gradient,
center_row=self._center_row,
center_col=self._center_col,
)
return None
def _handle_command(self, cmd: FigmentCommand, w: int, h: int) -> None:
"""Handle a figment command."""
if cmd.action == FigmentAction.TRIGGER:
self.trigger(w, h)
elif cmd.action == FigmentAction.SET_INTENSITY and isinstance(
cmd.value, (int, float)
):
self.config.intensity = float(cmd.value)
elif cmd.action == FigmentAction.SET_INTERVAL and isinstance(
cmd.value, (int, float)
):
self.config.params["interval_secs"] = float(cmd.value)
elif cmd.action == FigmentAction.SET_COLOR and isinstance(cmd.value, str):
if cmd.value in THEME_REGISTRY:
self._gradient = THEME_REGISTRY[cmd.value].main_gradient
elif cmd.action == FigmentAction.STOP:
self._phase = None
self._progress = 0.0

90
engine/figment_render.py Normal file
View File

@@ -0,0 +1,90 @@
"""
SVG to half-block terminal art rasterization.
Pipeline: SVG -> cairosvg -> PIL -> greyscale threshold -> half-block encode.
Follows the same pixel-pair approach as engine/render.py for OTF fonts.
"""
from __future__ import annotations
import os
import sys
from io import BytesIO
# cairocffi (used by cairosvg) calls dlopen() to find the Cairo C library.
# On macOS with Homebrew, Cairo lives in /opt/homebrew/lib (Apple Silicon) or
# /usr/local/lib (Intel), which are not in dyld's default search path.
# Setting DYLD_LIBRARY_PATH before the import directs dlopen() to those paths.
if sys.platform == "darwin" and not os.environ.get("DYLD_LIBRARY_PATH"):
for _brew_lib in ("/opt/homebrew/lib", "/usr/local/lib"):
if os.path.exists(os.path.join(_brew_lib, "libcairo.2.dylib")):
os.environ["DYLD_LIBRARY_PATH"] = _brew_lib
break
import cairosvg
from PIL import Image
_cache: dict[tuple[str, int, int], list[str]] = {}
def rasterize_svg(svg_path: str, width: int, height: int) -> list[str]:
"""Convert SVG file to list of half-block terminal rows (uncolored).
Args:
svg_path: Path to SVG file.
width: Target terminal width in columns.
height: Target terminal height in rows.
Returns:
List of strings, one per terminal row, containing block characters.
"""
cache_key = (svg_path, width, height)
if cache_key in _cache:
return _cache[cache_key]
# SVG -> PNG in memory
png_bytes = cairosvg.svg2png(
url=svg_path,
output_width=width,
output_height=height * 2, # 2 pixel rows per terminal row
)
# PNG -> greyscale PIL image
# Composite RGBA onto white background so transparent areas become white (255)
# and drawn pixels retain their luminance values.
img_rgba = Image.open(BytesIO(png_bytes)).convert("RGBA")
img_rgba = img_rgba.resize((width, height * 2), Image.Resampling.LANCZOS)
background = Image.new("RGBA", img_rgba.size, (255, 255, 255, 255))
background.paste(img_rgba, mask=img_rgba.split()[3])
img = background.convert("L")
data = img.tobytes()
pix_w = width
pix_h = height * 2
# White (255) = empty space, dark (< threshold) = filled pixel
threshold = 128
# Half-block encode: walk pixel pairs
rows: list[str] = []
for y in range(0, pix_h, 2):
row: list[str] = []
for x in range(pix_w):
top = data[y * pix_w + x] < threshold
bot = data[(y + 1) * pix_w + x] < threshold if y + 1 < pix_h else False
if top and bot:
row.append("")
elif top:
row.append("")
elif bot:
row.append("")
else:
row.append(" ")
rows.append("".join(row))
_cache[cache_key] = rows
return rows
def clear_cache() -> None:
"""Clear the rasterization cache (e.g., on terminal resize)."""
_cache.clear()

36
engine/figment_trigger.py Normal file
View File

@@ -0,0 +1,36 @@
"""
Figment trigger protocol and command types.
Defines the extensible input abstraction for triggering figment displays
from any control surface (ntfy, MQTT, serial, etc.).
"""
from __future__ import annotations
from dataclasses import dataclass
from enum import Enum
from typing import Protocol
class FigmentAction(Enum):
TRIGGER = "trigger"
SET_INTENSITY = "set_intensity"
SET_INTERVAL = "set_interval"
SET_COLOR = "set_color"
STOP = "stop"
@dataclass
class FigmentCommand:
action: FigmentAction
value: float | str | None = None
class FigmentTrigger(Protocol):
"""Protocol for figment trigger sources.
Any input source (ntfy, MQTT, serial) can implement this
to trigger and control figment displays.
"""
def poll(self) -> FigmentCommand | None: ...

View File

@@ -27,9 +27,9 @@ class EffectPluginStage(Stage):
def stage_type(self) -> str:
"""Return stage_type based on effect name.
HUD effects are overlays.
Overlay effects have stage_type "overlay".
"""
if self.name == "hud":
if self.is_overlay:
return "overlay"
return self.category
@@ -37,19 +37,26 @@ class EffectPluginStage(Stage):
def render_order(self) -> int:
"""Return render_order based on effect type.
HUD effects have high render_order to appear on top.
Overlay effects have high render_order to appear on top.
"""
if self.name == "hud":
if self.is_overlay:
return 100 # High order for overlays
return 0
@property
def is_overlay(self) -> bool:
"""Return True for HUD effects.
"""Return True for overlay effects.
HUD is an overlay - it composes on top of the buffer
Overlay effects compose on top of the buffer
rather than transforming it for the next stage.
"""
# Check if the effect has an is_overlay attribute that is explicitly True
# (not just any truthy value from a mock object)
if hasattr(self._effect, "is_overlay"):
effect_overlay = self._effect.is_overlay
# Only return True if it's explicitly set to True
if effect_overlay is True:
return True
return self.name == "hud"
@property

View File

@@ -0,0 +1,165 @@
"""
Frame Capture Stage Adapter
Wraps pipeline stages to capture frames for animation report generation.
"""
from typing import Any
from engine.display.backends.animation_report import AnimationReportDisplay
from engine.pipeline.core import PipelineContext, Stage
class FrameCaptureStage(Stage):
"""
Wrapper stage that captures frames before and after a wrapped stage.
This allows generating animation reports showing how each stage
transforms the data.
"""
def __init__(
self,
wrapped_stage: Stage,
display: AnimationReportDisplay,
name: str | None = None,
):
"""
Initialize frame capture stage.
Args:
wrapped_stage: The stage to wrap and capture frames from
display: The animation report display to send frames to
name: Optional name for this capture stage
"""
self._wrapped_stage = wrapped_stage
self._display = display
self.name = name or f"capture_{wrapped_stage.name}"
self.category = wrapped_stage.category
self.optional = wrapped_stage.optional
# Capture state
self._captured_input = False
self._captured_output = False
@property
def stage_type(self) -> str:
return self._wrapped_stage.stage_type
@property
def capabilities(self) -> set[str]:
return self._wrapped_stage.capabilities
@property
def dependencies(self) -> set[str]:
return self._wrapped_stage.dependencies
@property
def inlet_types(self) -> set:
return self._wrapped_stage.inlet_types
@property
def outlet_types(self) -> set:
return self._wrapped_stage.outlet_types
def init(self, ctx: PipelineContext) -> bool:
"""Initialize the wrapped stage."""
return self._wrapped_stage.init(ctx)
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""
Process data through wrapped stage and capture frames.
Args:
data: Input data (typically a text buffer)
ctx: Pipeline context
Returns:
Output data from wrapped stage
"""
# Capture input frame (before stage processing)
if isinstance(data, list) and all(isinstance(line, str) for line in data):
self._display.start_stage(f"{self._wrapped_stage.name}_input")
self._display.show(data)
self._captured_input = True
# Process through wrapped stage
result = self._wrapped_stage.process(data, ctx)
# Capture output frame (after stage processing)
if isinstance(result, list) and all(isinstance(line, str) for line in result):
self._display.start_stage(f"{self._wrapped_stage.name}_output")
self._display.show(result)
self._captured_output = True
return result
def cleanup(self) -> None:
"""Cleanup the wrapped stage."""
self._wrapped_stage.cleanup()
class FrameCaptureController:
"""
Controller for managing frame capture across the pipeline.
This class provides an easy way to enable frame capture for
specific stages or the entire pipeline.
"""
def __init__(self, display: AnimationReportDisplay):
"""
Initialize frame capture controller.
Args:
display: The animation report display to use for capture
"""
self._display = display
self._captured_stages: list[FrameCaptureStage] = []
def wrap_stage(self, stage: Stage, name: str | None = None) -> FrameCaptureStage:
"""
Wrap a stage with frame capture.
Args:
stage: The stage to wrap
name: Optional name for the capture stage
Returns:
Wrapped stage that captures frames
"""
capture_stage = FrameCaptureStage(stage, self._display, name)
self._captured_stages.append(capture_stage)
return capture_stage
def wrap_stages(self, stages: dict[str, Stage]) -> dict[str, Stage]:
"""
Wrap multiple stages with frame capture.
Args:
stages: Dictionary of stage names to stages
Returns:
Dictionary of stage names to wrapped stages
"""
wrapped = {}
for name, stage in stages.items():
wrapped[name] = self.wrap_stage(stage, name)
return wrapped
def get_captured_stages(self) -> list[FrameCaptureStage]:
"""Get list of all captured stages."""
return self._captured_stages
def generate_report(self, title: str = "Pipeline Animation Report") -> str:
"""
Generate the animation report.
Args:
title: Title for the report
Returns:
Path to the generated HTML file
"""
report_path = self._display.generate_report(title)
return str(report_path)

60
engine/themes.py Normal file
View File

@@ -0,0 +1,60 @@
"""
Theme definitions with color gradients for terminal rendering.
This module is data-only and does not import config or render
to prevent circular dependencies.
"""
class Theme:
"""Represents a color theme with two gradients."""
def __init__(self, name, main_gradient, message_gradient):
"""Initialize a theme with name and color gradients.
Args:
name: Theme identifier string
main_gradient: List of 12 ANSI 256-color codes for main gradient
message_gradient: List of 12 ANSI 256-color codes for message gradient
"""
self.name = name
self.main_gradient = main_gradient
self.message_gradient = message_gradient
# ─── GRADIENT DEFINITIONS ─────────────────────────────────────────────────
# Each gradient is 12 ANSI 256-color codes in sequence
# Format: [light...] → [medium...] → [dark...] → [black]
_GREEN_MAIN = [231, 195, 123, 118, 82, 46, 40, 34, 28, 22, 22, 235]
_GREEN_MSG = [231, 225, 219, 213, 207, 201, 165, 161, 125, 89, 89, 235]
_ORANGE_MAIN = [231, 215, 209, 208, 202, 166, 130, 94, 58, 94, 94, 235]
_ORANGE_MSG = [231, 195, 33, 27, 21, 21, 21, 18, 18, 18, 18, 235]
_PURPLE_MAIN = [231, 225, 177, 171, 165, 135, 129, 93, 57, 57, 57, 235]
_PURPLE_MSG = [231, 226, 226, 220, 220, 184, 184, 178, 178, 172, 172, 235]
# ─── THEME REGISTRY ───────────────────────────────────────────────────────
THEME_REGISTRY = {
"green": Theme("green", _GREEN_MAIN, _GREEN_MSG),
"orange": Theme("orange", _ORANGE_MAIN, _ORANGE_MSG),
"purple": Theme("purple", _PURPLE_MAIN, _PURPLE_MSG),
}
def get_theme(theme_id):
"""Retrieve a theme by ID.
Args:
theme_id: Theme identifier string
Returns:
Theme object matching the ID
Raises:
KeyError: If theme_id is not in registry
"""
return THEME_REGISTRY[theme_id]