Reviewed-on: #53 Co-authored-by: David Gwilliam <dhgwilliam@gmail.com> Co-committed-by: David Gwilliam <dhgwilliam@gmail.com>
490 lines
16 KiB
Python
490 lines
16 KiB
Python
"""Frame capture utilities for upstream vs sideline comparison.
|
|
|
|
This module provides functions to capture frames from both upstream and sideline
|
|
implementations for visual comparison and performance analysis.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
import tomli
|
|
|
|
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
|
|
from engine.pipeline.params import PipelineParams
|
|
|
|
|
|
def load_comparison_preset(preset_name: str) -> Any:
|
|
"""Load a comparison preset from comparison_presets.toml.
|
|
|
|
Args:
|
|
preset_name: Name of the preset to load
|
|
|
|
Returns:
|
|
Preset configuration dictionary
|
|
"""
|
|
presets_file = Path("tests/comparison_presets.toml")
|
|
if not presets_file.exists():
|
|
raise FileNotFoundError(f"Comparison presets file not found: {presets_file}")
|
|
|
|
with open(presets_file, "rb") as f:
|
|
config = tomli.load(f)
|
|
|
|
presets = config.get("presets", {})
|
|
full_name = (
|
|
f"presets.{preset_name}"
|
|
if not preset_name.startswith("presets.")
|
|
else preset_name
|
|
)
|
|
simple_name = (
|
|
preset_name.replace("presets.", "")
|
|
if preset_name.startswith("presets.")
|
|
else preset_name
|
|
)
|
|
|
|
if full_name in presets:
|
|
return presets[full_name]
|
|
elif simple_name in presets:
|
|
return presets[simple_name]
|
|
else:
|
|
raise ValueError(
|
|
f"Preset '{preset_name}' not found in {presets_file}. Available: {list(presets.keys())}"
|
|
)
|
|
|
|
|
|
def capture_frames(
|
|
preset_name: str,
|
|
frame_count: int = 30,
|
|
output_dir: Path = Path("tests/comparison_output"),
|
|
) -> Dict[str, Any]:
|
|
"""Capture frames from sideline pipeline using a preset.
|
|
|
|
Args:
|
|
preset_name: Name of preset to use
|
|
frame_count: Number of frames to capture
|
|
output_dir: Directory to save captured frames
|
|
|
|
Returns:
|
|
Dictionary with captured frames and metadata
|
|
"""
|
|
from engine.pipeline.presets import get_preset
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load preset - try comparison presets first, then built-in presets
|
|
try:
|
|
preset = load_comparison_preset(preset_name)
|
|
# Convert dict to object-like access
|
|
from types import SimpleNamespace
|
|
|
|
preset = SimpleNamespace(**preset)
|
|
except (FileNotFoundError, ValueError):
|
|
# Fall back to built-in presets
|
|
preset = get_preset(preset_name)
|
|
if not preset:
|
|
raise ValueError(
|
|
f"Preset '{preset_name}' not found in comparison or built-in presets"
|
|
)
|
|
|
|
# Create pipeline config from preset
|
|
config = PipelineConfig(
|
|
source=preset.source,
|
|
display="null", # Always use null display for capture
|
|
camera=preset.camera,
|
|
effects=preset.effects,
|
|
)
|
|
|
|
# Create pipeline
|
|
ctx = PipelineContext()
|
|
ctx.terminal_width = preset.viewport_width
|
|
ctx.terminal_height = preset.viewport_height
|
|
pipeline = Pipeline(config=config, context=ctx)
|
|
|
|
# Create params
|
|
params = PipelineParams(
|
|
viewport_width=preset.viewport_width,
|
|
viewport_height=preset.viewport_height,
|
|
)
|
|
ctx.params = params
|
|
|
|
# Add stages based on source type (similar to pipeline_runner)
|
|
from engine.display import DisplayRegistry
|
|
from engine.pipeline.adapters import create_stage_from_display
|
|
from engine.data_sources.sources import EmptyDataSource
|
|
from engine.pipeline.adapters import DataSourceStage
|
|
|
|
# Add source stage
|
|
if preset.source == "empty":
|
|
source_stage = DataSourceStage(
|
|
EmptyDataSource(width=preset.viewport_width, height=preset.viewport_height),
|
|
name="empty",
|
|
)
|
|
else:
|
|
# For headlines/poetry, use the actual source
|
|
from engine.data_sources.sources import HeadlinesDataSource, PoetryDataSource
|
|
|
|
if preset.source == "headlines":
|
|
source_stage = DataSourceStage(HeadlinesDataSource(), name="headlines")
|
|
elif preset.source == "poetry":
|
|
source_stage = DataSourceStage(PoetryDataSource(), name="poetry")
|
|
else:
|
|
# Fallback to empty
|
|
source_stage = DataSourceStage(
|
|
EmptyDataSource(
|
|
width=preset.viewport_width, height=preset.viewport_height
|
|
),
|
|
name="empty",
|
|
)
|
|
pipeline.add_stage("source", source_stage)
|
|
|
|
# Add font stage for headlines/poetry (with viewport filter)
|
|
if preset.source in ["headlines", "poetry"]:
|
|
from engine.pipeline.adapters import FontStage, ViewportFilterStage
|
|
|
|
# Add viewport filter to prevent rendering all items
|
|
pipeline.add_stage(
|
|
"viewport_filter", ViewportFilterStage(name="viewport-filter")
|
|
)
|
|
# Add font stage for block character rendering
|
|
pipeline.add_stage("font", FontStage(name="font"))
|
|
else:
|
|
# Fallback to simple conversion for empty/other sources
|
|
from engine.pipeline.adapters import SourceItemsToBufferStage
|
|
|
|
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
|
|
|
|
# Add camera stage
|
|
from engine.camera import Camera
|
|
from engine.pipeline.adapters import CameraStage, CameraClockStage
|
|
|
|
# Create camera based on preset
|
|
if preset.camera == "feed":
|
|
camera = Camera.feed()
|
|
elif preset.camera == "scroll":
|
|
camera = Camera.scroll(speed=0.1)
|
|
elif preset.camera == "horizontal":
|
|
camera = Camera.horizontal(speed=0.1)
|
|
else:
|
|
camera = Camera.feed()
|
|
|
|
camera.set_canvas_size(preset.viewport_width, preset.viewport_height * 2)
|
|
|
|
# Add camera update (for animation)
|
|
pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock"))
|
|
# Add camera stage
|
|
pipeline.add_stage("camera", CameraStage(camera, name=preset.camera))
|
|
|
|
# Add effects
|
|
if preset.effects:
|
|
from engine.effects.registry import EffectRegistry
|
|
from engine.pipeline.adapters import create_stage_from_effect
|
|
|
|
effect_registry = EffectRegistry()
|
|
for effect_name in preset.effects:
|
|
effect = effect_registry.get(effect_name)
|
|
if effect:
|
|
pipeline.add_stage(
|
|
f"effect_{effect_name}",
|
|
create_stage_from_effect(effect, effect_name),
|
|
)
|
|
|
|
# Add message overlay stage if enabled (BEFORE display)
|
|
if getattr(preset, "enable_message_overlay", False):
|
|
from engine.pipeline.adapters import MessageOverlayConfig, MessageOverlayStage
|
|
|
|
overlay_config = MessageOverlayConfig(
|
|
enabled=True,
|
|
display_secs=30,
|
|
)
|
|
pipeline.add_stage(
|
|
"message_overlay", MessageOverlayStage(config=overlay_config)
|
|
)
|
|
|
|
# Add null display stage (LAST)
|
|
null_display = DisplayRegistry.create("null")
|
|
if null_display:
|
|
pipeline.add_stage("display", create_stage_from_display(null_display, "null"))
|
|
|
|
# Build pipeline
|
|
pipeline.build()
|
|
|
|
# Enable recording on null display if available
|
|
display_stage = pipeline._stages.get("display")
|
|
if display_stage and hasattr(display_stage, "_display"):
|
|
backend = display_stage._display
|
|
if hasattr(backend, "start_recording"):
|
|
backend.start_recording()
|
|
|
|
# Capture frames
|
|
frames = []
|
|
start_time = time.time()
|
|
|
|
for i in range(frame_count):
|
|
frame_start = time.time()
|
|
stage_result = pipeline.execute()
|
|
frame_time = time.time() - frame_start
|
|
|
|
# Get frames from display recording
|
|
display_stage = pipeline._stages.get("display")
|
|
if display_stage and hasattr(display_stage, "_display"):
|
|
backend = display_stage._display
|
|
if hasattr(backend, "get_recorded_data"):
|
|
recorded_frames = backend.get_recorded_data()
|
|
# Add render_time_ms to each frame
|
|
for frame in recorded_frames:
|
|
frame["render_time_ms"] = frame_time * 1000
|
|
frames = recorded_frames
|
|
|
|
# Fallback: create empty frames if no recording
|
|
if not frames:
|
|
for i in range(frame_count):
|
|
frames.append(
|
|
{
|
|
"frame_number": i,
|
|
"buffer": [],
|
|
"width": preset.viewport_width,
|
|
"height": preset.viewport_height,
|
|
"render_time_ms": frame_time * 1000,
|
|
}
|
|
)
|
|
|
|
# Stop recording on null display
|
|
display_stage = pipeline._stages.get("display")
|
|
if display_stage and hasattr(display_stage, "_display"):
|
|
backend = display_stage._display
|
|
if hasattr(backend, "stop_recording"):
|
|
backend.stop_recording()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Save captured data
|
|
output_file = output_dir / f"{preset_name}_sideline.json"
|
|
captured_data = {
|
|
"preset": preset_name,
|
|
"config": {
|
|
"source": preset.source,
|
|
"camera": preset.camera,
|
|
"effects": preset.effects,
|
|
"viewport_width": preset.viewport_width,
|
|
"viewport_height": preset.viewport_height,
|
|
"enable_message_overlay": getattr(preset, "enable_message_overlay", False),
|
|
},
|
|
"capture_stats": {
|
|
"frame_count": frame_count,
|
|
"total_time_ms": total_time * 1000,
|
|
"avg_frame_time_ms": (total_time * 1000) / frame_count,
|
|
"fps": frame_count / total_time if total_time > 0 else 0,
|
|
},
|
|
"frames": frames,
|
|
}
|
|
|
|
with open(output_file, "w") as f:
|
|
json.dump(captured_data, f, indent=2)
|
|
|
|
return captured_data
|
|
|
|
|
|
def compare_captured_outputs(
|
|
sideline_file: Path,
|
|
upstream_file: Path,
|
|
output_dir: Path = Path("tests/comparison_output"),
|
|
) -> Dict[str, Any]:
|
|
"""Compare captured outputs from sideline and upstream.
|
|
|
|
Args:
|
|
sideline_file: Path to sideline captured output
|
|
upstream_file: Path to upstream captured output
|
|
output_dir: Directory to save comparison results
|
|
|
|
Returns:
|
|
Dictionary with comparison results
|
|
"""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load captured data
|
|
with open(sideline_file) as f:
|
|
sideline_data = json.load(f)
|
|
|
|
with open(upstream_file) as f:
|
|
upstream_data = json.load(f)
|
|
|
|
# Compare configurations
|
|
config_diff = {}
|
|
for key in [
|
|
"source",
|
|
"camera",
|
|
"effects",
|
|
"viewport_width",
|
|
"viewport_height",
|
|
"enable_message_overlay",
|
|
]:
|
|
sideline_val = sideline_data["config"].get(key)
|
|
upstream_val = upstream_data["config"].get(key)
|
|
if sideline_val != upstream_val:
|
|
config_diff[key] = {"sideline": sideline_val, "upstream": upstream_val}
|
|
|
|
# Compare frame counts
|
|
sideline_frames = len(sideline_data["frames"])
|
|
upstream_frames = len(upstream_data["frames"])
|
|
frame_count_match = sideline_frames == upstream_frames
|
|
|
|
# Compare individual frames
|
|
frame_comparisons = []
|
|
total_diff = 0
|
|
max_diff = 0
|
|
identical_frames = 0
|
|
|
|
min_frames = min(sideline_frames, upstream_frames)
|
|
for i in range(min_frames):
|
|
sideline_frame = sideline_data["frames"][i]
|
|
upstream_frame = upstream_data["frames"][i]
|
|
|
|
sideline_buffer = sideline_frame["buffer"]
|
|
upstream_buffer = upstream_frame["buffer"]
|
|
|
|
# Compare buffers line by line
|
|
line_diffs = []
|
|
frame_diff = 0
|
|
max_lines = max(len(sideline_buffer), len(upstream_buffer))
|
|
|
|
for line_idx in range(max_lines):
|
|
sideline_line = (
|
|
sideline_buffer[line_idx] if line_idx < len(sideline_buffer) else ""
|
|
)
|
|
upstream_line = (
|
|
upstream_buffer[line_idx] if line_idx < len(upstream_buffer) else ""
|
|
)
|
|
|
|
if sideline_line != upstream_line:
|
|
line_diffs.append(
|
|
{
|
|
"line": line_idx,
|
|
"sideline": sideline_line,
|
|
"upstream": upstream_line,
|
|
}
|
|
)
|
|
frame_diff += 1
|
|
|
|
if frame_diff == 0:
|
|
identical_frames += 1
|
|
|
|
total_diff += frame_diff
|
|
max_diff = max(max_diff, frame_diff)
|
|
|
|
frame_comparisons.append(
|
|
{
|
|
"frame_number": i,
|
|
"differences": frame_diff,
|
|
"line_diffs": line_diffs[
|
|
:5
|
|
], # Only store first 5 differences per frame
|
|
"render_time_diff_ms": sideline_frame.get("render_time_ms", 0)
|
|
- upstream_frame.get("render_time_ms", 0),
|
|
}
|
|
)
|
|
|
|
# Calculate statistics
|
|
stats = {
|
|
"total_frames_compared": min_frames,
|
|
"identical_frames": identical_frames,
|
|
"frames_with_differences": min_frames - identical_frames,
|
|
"total_differences": total_diff,
|
|
"max_differences_per_frame": max_diff,
|
|
"avg_differences_per_frame": total_diff / min_frames if min_frames > 0 else 0,
|
|
"match_percentage": (identical_frames / min_frames * 100)
|
|
if min_frames > 0
|
|
else 0,
|
|
}
|
|
|
|
# Compare performance stats
|
|
sideline_stats = sideline_data.get("capture_stats", {})
|
|
upstream_stats = upstream_data.get("capture_stats", {})
|
|
performance_comparison = {
|
|
"sideline": {
|
|
"total_time_ms": sideline_stats.get("total_time_ms", 0),
|
|
"avg_frame_time_ms": sideline_stats.get("avg_frame_time_ms", 0),
|
|
"fps": sideline_stats.get("fps", 0),
|
|
},
|
|
"upstream": {
|
|
"total_time_ms": upstream_stats.get("total_time_ms", 0),
|
|
"avg_frame_time_ms": upstream_stats.get("avg_frame_time_ms", 0),
|
|
"fps": upstream_stats.get("fps", 0),
|
|
},
|
|
"diff": {
|
|
"total_time_ms": sideline_stats.get("total_time_ms", 0)
|
|
- upstream_stats.get("total_time_ms", 0),
|
|
"avg_frame_time_ms": sideline_stats.get("avg_frame_time_ms", 0)
|
|
- upstream_stats.get("avg_frame_time_ms", 0),
|
|
"fps": sideline_stats.get("fps", 0) - upstream_stats.get("fps", 0),
|
|
},
|
|
}
|
|
|
|
# Build comparison result
|
|
result = {
|
|
"preset": sideline_data["preset"],
|
|
"config_diff": config_diff,
|
|
"frame_count_match": frame_count_match,
|
|
"stats": stats,
|
|
"performance_comparison": performance_comparison,
|
|
"frame_comparisons": frame_comparisons,
|
|
"sideline_file": str(sideline_file),
|
|
"upstream_file": str(upstream_file),
|
|
}
|
|
|
|
# Save comparison result
|
|
output_file = output_dir / f"{sideline_data['preset']}_comparison.json"
|
|
with open(output_file, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
|
|
return result
|
|
|
|
|
|
def generate_html_report(
|
|
comparison_results: List[Dict[str, Any]],
|
|
output_dir: Path = Path("tests/comparison_output"),
|
|
) -> Path:
|
|
"""Generate HTML report from comparison results using acceptance_report.py.
|
|
|
|
Args:
|
|
comparison_results: List of comparison results
|
|
output_dir: Directory to save HTML report
|
|
|
|
Returns:
|
|
Path to generated HTML report
|
|
"""
|
|
from tests.acceptance_report import save_index_report
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate index report with links to all comparison results
|
|
reports = []
|
|
for result in comparison_results:
|
|
reports.append(
|
|
{
|
|
"test_name": f"comparison-{result['preset']}",
|
|
"status": "PASS" if result.get("status") == "success" else "FAIL",
|
|
"frame_count": result["stats"]["total_frames_compared"],
|
|
"duration_ms": result["performance_comparison"]["sideline"][
|
|
"total_time_ms"
|
|
],
|
|
}
|
|
)
|
|
|
|
# Save index report
|
|
index_file = save_index_report(reports, str(output_dir))
|
|
|
|
# Also save a summary JSON file for programmatic access
|
|
summary_file = output_dir / "comparison_summary.json"
|
|
with open(summary_file, "w") as f:
|
|
json.dump(
|
|
{
|
|
"timestamp": __import__("datetime").datetime.now().isoformat(),
|
|
"results": comparison_results,
|
|
},
|
|
f,
|
|
indent=2,
|
|
)
|
|
|
|
return Path(index_file)
|