- Implements pipeline hot-rebuild with state preservation (issue #43) - Adds auto-injection of MVP stages for missing capabilities - Adds radial camera mode for polar coordinate scanning - Adds afterimage and motionblur effects using framebuffer history - Adds comprehensive acceptance tests for camera modes and pipeline rebuild - Updates presets.toml with new effect configurations Related to: #35 (Pipeline Mutation API epic) Closes: #43, #44, #45
184 lines
5.4 KiB
Python
184 lines
5.4 KiB
Python
"""
|
|
Null/headless display backend.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
class NullDisplay:
|
|
"""Headless/null display - discards all output.
|
|
|
|
This display does nothing - useful for headless benchmarking
|
|
or when no display output is needed. Captures last buffer
|
|
for testing purposes. Supports frame recording for replay
|
|
and file export/import.
|
|
"""
|
|
|
|
width: int = 80
|
|
height: int = 24
|
|
_last_buffer: list[str] | None = None
|
|
|
|
def __init__(self):
|
|
self._last_buffer = None
|
|
self._is_recording = False
|
|
self._recorded_frames: list[dict[str, Any]] = []
|
|
self._frame_count = 0
|
|
|
|
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
|
"""Initialize display with dimensions.
|
|
|
|
Args:
|
|
width: Terminal width in characters
|
|
height: Terminal height in rows
|
|
reuse: Ignored for NullDisplay (no resources to reuse)
|
|
"""
|
|
self.width = width
|
|
self.height = height
|
|
self._last_buffer = None
|
|
|
|
def show(self, buffer: list[str], border: bool = False) -> None:
|
|
import sys
|
|
|
|
from engine.display import get_monitor, render_border
|
|
|
|
fps = 0.0
|
|
frame_time = 0.0
|
|
monitor = get_monitor()
|
|
if monitor:
|
|
stats = monitor.get_stats()
|
|
avg_ms = stats.get("pipeline", {}).get("avg_ms", 0) if stats else 0
|
|
frame_count = stats.get("frame_count", 0) if stats else 0
|
|
if avg_ms and frame_count > 0:
|
|
fps = 1000.0 / avg_ms
|
|
frame_time = avg_ms
|
|
|
|
if border:
|
|
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
|
|
|
self._last_buffer = buffer
|
|
|
|
if self._is_recording:
|
|
self._recorded_frames.append(
|
|
{
|
|
"frame_number": self._frame_count,
|
|
"buffer": buffer,
|
|
"width": self.width,
|
|
"height": self.height,
|
|
}
|
|
)
|
|
|
|
if self._frame_count <= 5 or self._frame_count % 10 == 0:
|
|
sys.stdout.write("\n" + "=" * 80 + "\n")
|
|
sys.stdout.write(
|
|
f"Frame {self._frame_count} (buffer height: {len(buffer)})\n"
|
|
)
|
|
sys.stdout.write("=" * 80 + "\n")
|
|
for i, line in enumerate(buffer[:30]):
|
|
sys.stdout.write(f"{i:2}: {line}\n")
|
|
if len(buffer) > 30:
|
|
sys.stdout.write(f"... ({len(buffer) - 30} more lines)\n")
|
|
sys.stdout.flush()
|
|
|
|
if monitor:
|
|
t0 = time.perf_counter()
|
|
chars_in = sum(len(line) for line in buffer)
|
|
elapsed_ms = (time.perf_counter() - t0) * 1000
|
|
monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in)
|
|
|
|
self._frame_count += 1
|
|
|
|
def start_recording(self) -> None:
|
|
"""Begin recording frames."""
|
|
self._is_recording = True
|
|
self._recorded_frames = []
|
|
|
|
def stop_recording(self) -> None:
|
|
"""Stop recording frames."""
|
|
self._is_recording = False
|
|
|
|
def get_frames(self) -> list[list[str]]:
|
|
"""Get recorded frames as list of buffers.
|
|
|
|
Returns:
|
|
List of buffers, each buffer is a list of strings (lines)
|
|
"""
|
|
return [frame["buffer"] for frame in self._recorded_frames]
|
|
|
|
def get_recorded_data(self) -> list[dict[str, Any]]:
|
|
"""Get full recorded data including metadata.
|
|
|
|
Returns:
|
|
List of frame dicts with 'frame_number', 'buffer', 'width', 'height'
|
|
"""
|
|
return self._recorded_frames
|
|
|
|
def clear_recording(self) -> None:
|
|
"""Clear recorded frames."""
|
|
self._recorded_frames = []
|
|
|
|
def save_recording(self, filepath: str | Path) -> None:
|
|
"""Save recorded frames to a JSON file.
|
|
|
|
Args:
|
|
filepath: Path to save the recording
|
|
"""
|
|
path = Path(filepath)
|
|
data = {
|
|
"version": 1,
|
|
"display": "null",
|
|
"width": self.width,
|
|
"height": self.height,
|
|
"frame_count": len(self._recorded_frames),
|
|
"frames": self._recorded_frames,
|
|
}
|
|
path.write_text(json.dumps(data, indent=2))
|
|
|
|
def load_recording(self, filepath: str | Path) -> list[dict[str, Any]]:
|
|
"""Load recorded frames from a JSON file.
|
|
|
|
Args:
|
|
filepath: Path to load the recording from
|
|
|
|
Returns:
|
|
List of frame dicts
|
|
"""
|
|
path = Path(filepath)
|
|
data = json.loads(path.read_text())
|
|
self._recorded_frames = data.get("frames", [])
|
|
self.width = data.get("width", 80)
|
|
self.height = data.get("height", 24)
|
|
return self._recorded_frames
|
|
|
|
def replay_frames(self) -> list[list[str]]:
|
|
"""Get frames for replay.
|
|
|
|
Returns:
|
|
List of buffers for replay
|
|
"""
|
|
return self.get_frames()
|
|
|
|
def clear(self) -> None:
|
|
pass
|
|
|
|
def cleanup(self) -> None:
|
|
pass
|
|
|
|
def get_dimensions(self) -> tuple[int, int]:
|
|
"""Get current dimensions.
|
|
|
|
Returns:
|
|
(width, height) in character cells
|
|
"""
|
|
return (self.width, self.height)
|
|
|
|
def is_quit_requested(self) -> bool:
|
|
"""Check if quit was requested (optional protocol method)."""
|
|
return False
|
|
|
|
def clear_quit_request(self) -> None:
|
|
"""Clear quit request (optional protocol method)."""
|
|
pass
|