Instead of duplicating HTML generation code, use the existing acceptance_report.py infrastructure which already has: - ANSI code parsing for color rendering - Frame capture and display - Index report generation - Comprehensive styling This eliminates code duplication and leverages the existing acceptance testing patterns in the codebase.
490 lines
16 KiB
Python
490 lines
16 KiB
Python
"""Frame capture utilities for upstream vs sideline comparison.
|
|
|
|
This module provides functions to capture frames from both upstream and sideline
|
|
implementations for visual comparison and performance analysis.
|
|
"""
|
|
|
|
import json
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Tuple
|
|
|
|
import tomli
|
|
|
|
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
|
|
from engine.pipeline.params import PipelineParams
|
|
|
|
|
|
def load_comparison_preset(preset_name: str) -> Any:
|
|
"""Load a comparison preset from comparison_presets.toml.
|
|
|
|
Args:
|
|
preset_name: Name of the preset to load
|
|
|
|
Returns:
|
|
Preset configuration dictionary
|
|
"""
|
|
presets_file = Path("tests/comparison_presets.toml")
|
|
if not presets_file.exists():
|
|
raise FileNotFoundError(f"Comparison presets file not found: {presets_file}")
|
|
|
|
with open(presets_file, "rb") as f:
|
|
config = tomli.load(f)
|
|
|
|
presets = config.get("presets", {})
|
|
full_name = (
|
|
f"presets.{preset_name}"
|
|
if not preset_name.startswith("presets.")
|
|
else preset_name
|
|
)
|
|
simple_name = (
|
|
preset_name.replace("presets.", "")
|
|
if preset_name.startswith("presets.")
|
|
else preset_name
|
|
)
|
|
|
|
if full_name in presets:
|
|
return presets[full_name]
|
|
elif simple_name in presets:
|
|
return presets[simple_name]
|
|
else:
|
|
raise ValueError(
|
|
f"Preset '{preset_name}' not found in {presets_file}. Available: {list(presets.keys())}"
|
|
)
|
|
|
|
|
|
def capture_frames(
|
|
preset_name: str,
|
|
frame_count: int = 30,
|
|
output_dir: Path = Path("tests/comparison_output"),
|
|
) -> Dict[str, Any]:
|
|
"""Capture frames from sideline pipeline using a preset.
|
|
|
|
Args:
|
|
preset_name: Name of preset to use
|
|
frame_count: Number of frames to capture
|
|
output_dir: Directory to save captured frames
|
|
|
|
Returns:
|
|
Dictionary with captured frames and metadata
|
|
"""
|
|
from engine.pipeline.presets import get_preset
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load preset - try comparison presets first, then built-in presets
|
|
try:
|
|
preset = load_comparison_preset(preset_name)
|
|
# Convert dict to object-like access
|
|
from types import SimpleNamespace
|
|
|
|
preset = SimpleNamespace(**preset)
|
|
except (FileNotFoundError, ValueError):
|
|
# Fall back to built-in presets
|
|
preset = get_preset(preset_name)
|
|
if not preset:
|
|
raise ValueError(
|
|
f"Preset '{preset_name}' not found in comparison or built-in presets"
|
|
)
|
|
|
|
# Create pipeline config from preset
|
|
config = PipelineConfig(
|
|
source=preset.source,
|
|
display="null", # Always use null display for capture
|
|
camera=preset.camera,
|
|
effects=preset.effects,
|
|
)
|
|
|
|
# Create pipeline
|
|
ctx = PipelineContext()
|
|
ctx.terminal_width = preset.viewport_width
|
|
ctx.terminal_height = preset.viewport_height
|
|
pipeline = Pipeline(config=config, context=ctx)
|
|
|
|
# Create params
|
|
params = PipelineParams(
|
|
viewport_width=preset.viewport_width,
|
|
viewport_height=preset.viewport_height,
|
|
)
|
|
ctx.params = params
|
|
|
|
# Add stages based on source type (similar to pipeline_runner)
|
|
from engine.display import DisplayRegistry
|
|
from engine.pipeline.adapters import create_stage_from_display
|
|
from engine.data_sources.sources import EmptyDataSource
|
|
from engine.pipeline.adapters import DataSourceStage
|
|
|
|
# Add source stage
|
|
if preset.source == "empty":
|
|
source_stage = DataSourceStage(
|
|
EmptyDataSource(width=preset.viewport_width, height=preset.viewport_height),
|
|
name="empty",
|
|
)
|
|
else:
|
|
# For headlines/poetry, use the actual source
|
|
from engine.data_sources.sources import HeadlinesDataSource, PoetryDataSource
|
|
|
|
if preset.source == "headlines":
|
|
source_stage = DataSourceStage(HeadlinesDataSource(), name="headlines")
|
|
elif preset.source == "poetry":
|
|
source_stage = DataSourceStage(PoetryDataSource(), name="poetry")
|
|
else:
|
|
# Fallback to empty
|
|
source_stage = DataSourceStage(
|
|
EmptyDataSource(
|
|
width=preset.viewport_width, height=preset.viewport_height
|
|
),
|
|
name="empty",
|
|
)
|
|
pipeline.add_stage("source", source_stage)
|
|
|
|
# Add font stage for headlines/poetry (with viewport filter)
|
|
if preset.source in ["headlines", "poetry"]:
|
|
from engine.pipeline.adapters import FontStage, ViewportFilterStage
|
|
|
|
# Add viewport filter to prevent rendering all items
|
|
pipeline.add_stage(
|
|
"viewport_filter", ViewportFilterStage(name="viewport-filter")
|
|
)
|
|
# Add font stage for block character rendering
|
|
pipeline.add_stage("font", FontStage(name="font"))
|
|
else:
|
|
# Fallback to simple conversion for empty/other sources
|
|
from engine.pipeline.adapters import SourceItemsToBufferStage
|
|
|
|
pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer"))
|
|
|
|
# Add camera stage
|
|
from engine.camera import Camera
|
|
from engine.pipeline.adapters import CameraStage, CameraClockStage
|
|
|
|
# Create camera based on preset
|
|
if preset.camera == "feed":
|
|
camera = Camera.feed()
|
|
elif preset.camera == "scroll":
|
|
camera = Camera.scroll(speed=0.1)
|
|
elif preset.camera == "horizontal":
|
|
camera = Camera.horizontal(speed=0.1)
|
|
else:
|
|
camera = Camera.feed()
|
|
|
|
camera.set_canvas_size(preset.viewport_width, preset.viewport_height * 2)
|
|
|
|
# Add camera update (for animation)
|
|
pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock"))
|
|
# Add camera stage
|
|
pipeline.add_stage("camera", CameraStage(camera, name=preset.camera))
|
|
|
|
# Add effects
|
|
if preset.effects:
|
|
from engine.effects.registry import EffectRegistry
|
|
from engine.pipeline.adapters import create_stage_from_effect
|
|
|
|
effect_registry = EffectRegistry()
|
|
for effect_name in preset.effects:
|
|
effect = effect_registry.get(effect_name)
|
|
if effect:
|
|
pipeline.add_stage(
|
|
f"effect_{effect_name}",
|
|
create_stage_from_effect(effect, effect_name),
|
|
)
|
|
|
|
# Add message overlay stage if enabled (BEFORE display)
|
|
if getattr(preset, "enable_message_overlay", False):
|
|
from engine.pipeline.adapters import MessageOverlayConfig, MessageOverlayStage
|
|
|
|
overlay_config = MessageOverlayConfig(
|
|
enabled=True,
|
|
display_secs=30,
|
|
)
|
|
pipeline.add_stage(
|
|
"message_overlay", MessageOverlayStage(config=overlay_config)
|
|
)
|
|
|
|
# Add null display stage (LAST)
|
|
null_display = DisplayRegistry.create("null")
|
|
if null_display:
|
|
pipeline.add_stage("display", create_stage_from_display(null_display, "null"))
|
|
|
|
# Build pipeline
|
|
pipeline.build()
|
|
|
|
# Enable recording on null display if available
|
|
display_stage = pipeline._stages.get("display")
|
|
if display_stage and hasattr(display_stage, "_display"):
|
|
backend = display_stage._display
|
|
if hasattr(backend, "start_recording"):
|
|
backend.start_recording()
|
|
|
|
# Capture frames
|
|
frames = []
|
|
start_time = time.time()
|
|
|
|
for i in range(frame_count):
|
|
frame_start = time.time()
|
|
stage_result = pipeline.execute()
|
|
frame_time = time.time() - frame_start
|
|
|
|
# Get frames from display recording
|
|
display_stage = pipeline._stages.get("display")
|
|
if display_stage and hasattr(display_stage, "_display"):
|
|
backend = display_stage._display
|
|
if hasattr(backend, "get_recorded_data"):
|
|
recorded_frames = backend.get_recorded_data()
|
|
# Add render_time_ms to each frame
|
|
for frame in recorded_frames:
|
|
frame["render_time_ms"] = frame_time * 1000
|
|
frames = recorded_frames
|
|
|
|
# Fallback: create empty frames if no recording
|
|
if not frames:
|
|
for i in range(frame_count):
|
|
frames.append(
|
|
{
|
|
"frame_number": i,
|
|
"buffer": [],
|
|
"width": preset.viewport_width,
|
|
"height": preset.viewport_height,
|
|
"render_time_ms": frame_time * 1000,
|
|
}
|
|
)
|
|
|
|
# Stop recording on null display
|
|
display_stage = pipeline._stages.get("display")
|
|
if display_stage and hasattr(display_stage, "_display"):
|
|
backend = display_stage._display
|
|
if hasattr(backend, "stop_recording"):
|
|
backend.stop_recording()
|
|
|
|
total_time = time.time() - start_time
|
|
|
|
# Save captured data
|
|
output_file = output_dir / f"{preset_name}_sideline.json"
|
|
captured_data = {
|
|
"preset": preset_name,
|
|
"config": {
|
|
"source": preset.source,
|
|
"camera": preset.camera,
|
|
"effects": preset.effects,
|
|
"viewport_width": preset.viewport_width,
|
|
"viewport_height": preset.viewport_height,
|
|
"enable_message_overlay": getattr(preset, "enable_message_overlay", False),
|
|
},
|
|
"capture_stats": {
|
|
"frame_count": frame_count,
|
|
"total_time_ms": total_time * 1000,
|
|
"avg_frame_time_ms": (total_time * 1000) / frame_count,
|
|
"fps": frame_count / total_time if total_time > 0 else 0,
|
|
},
|
|
"frames": frames,
|
|
}
|
|
|
|
with open(output_file, "w") as f:
|
|
json.dump(captured_data, f, indent=2)
|
|
|
|
return captured_data
|
|
|
|
|
|
def compare_captured_outputs(
|
|
sideline_file: Path,
|
|
upstream_file: Path,
|
|
output_dir: Path = Path("tests/comparison_output"),
|
|
) -> Dict[str, Any]:
|
|
"""Compare captured outputs from sideline and upstream.
|
|
|
|
Args:
|
|
sideline_file: Path to sideline captured output
|
|
upstream_file: Path to upstream captured output
|
|
output_dir: Directory to save comparison results
|
|
|
|
Returns:
|
|
Dictionary with comparison results
|
|
"""
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Load captured data
|
|
with open(sideline_file) as f:
|
|
sideline_data = json.load(f)
|
|
|
|
with open(upstream_file) as f:
|
|
upstream_data = json.load(f)
|
|
|
|
# Compare configurations
|
|
config_diff = {}
|
|
for key in [
|
|
"source",
|
|
"camera",
|
|
"effects",
|
|
"viewport_width",
|
|
"viewport_height",
|
|
"enable_message_overlay",
|
|
]:
|
|
sideline_val = sideline_data["config"].get(key)
|
|
upstream_val = upstream_data["config"].get(key)
|
|
if sideline_val != upstream_val:
|
|
config_diff[key] = {"sideline": sideline_val, "upstream": upstream_val}
|
|
|
|
# Compare frame counts
|
|
sideline_frames = len(sideline_data["frames"])
|
|
upstream_frames = len(upstream_data["frames"])
|
|
frame_count_match = sideline_frames == upstream_frames
|
|
|
|
# Compare individual frames
|
|
frame_comparisons = []
|
|
total_diff = 0
|
|
max_diff = 0
|
|
identical_frames = 0
|
|
|
|
min_frames = min(sideline_frames, upstream_frames)
|
|
for i in range(min_frames):
|
|
sideline_frame = sideline_data["frames"][i]
|
|
upstream_frame = upstream_data["frames"][i]
|
|
|
|
sideline_buffer = sideline_frame["buffer"]
|
|
upstream_buffer = upstream_frame["buffer"]
|
|
|
|
# Compare buffers line by line
|
|
line_diffs = []
|
|
frame_diff = 0
|
|
max_lines = max(len(sideline_buffer), len(upstream_buffer))
|
|
|
|
for line_idx in range(max_lines):
|
|
sideline_line = (
|
|
sideline_buffer[line_idx] if line_idx < len(sideline_buffer) else ""
|
|
)
|
|
upstream_line = (
|
|
upstream_buffer[line_idx] if line_idx < len(upstream_buffer) else ""
|
|
)
|
|
|
|
if sideline_line != upstream_line:
|
|
line_diffs.append(
|
|
{
|
|
"line": line_idx,
|
|
"sideline": sideline_line,
|
|
"upstream": upstream_line,
|
|
}
|
|
)
|
|
frame_diff += 1
|
|
|
|
if frame_diff == 0:
|
|
identical_frames += 1
|
|
|
|
total_diff += frame_diff
|
|
max_diff = max(max_diff, frame_diff)
|
|
|
|
frame_comparisons.append(
|
|
{
|
|
"frame_number": i,
|
|
"differences": frame_diff,
|
|
"line_diffs": line_diffs[
|
|
:5
|
|
], # Only store first 5 differences per frame
|
|
"render_time_diff_ms": sideline_frame.get("render_time_ms", 0)
|
|
- upstream_frame.get("render_time_ms", 0),
|
|
}
|
|
)
|
|
|
|
# Calculate statistics
|
|
stats = {
|
|
"total_frames_compared": min_frames,
|
|
"identical_frames": identical_frames,
|
|
"frames_with_differences": min_frames - identical_frames,
|
|
"total_differences": total_diff,
|
|
"max_differences_per_frame": max_diff,
|
|
"avg_differences_per_frame": total_diff / min_frames if min_frames > 0 else 0,
|
|
"match_percentage": (identical_frames / min_frames * 100)
|
|
if min_frames > 0
|
|
else 0,
|
|
}
|
|
|
|
# Compare performance stats
|
|
sideline_stats = sideline_data.get("capture_stats", {})
|
|
upstream_stats = upstream_data.get("capture_stats", {})
|
|
performance_comparison = {
|
|
"sideline": {
|
|
"total_time_ms": sideline_stats.get("total_time_ms", 0),
|
|
"avg_frame_time_ms": sideline_stats.get("avg_frame_time_ms", 0),
|
|
"fps": sideline_stats.get("fps", 0),
|
|
},
|
|
"upstream": {
|
|
"total_time_ms": upstream_stats.get("total_time_ms", 0),
|
|
"avg_frame_time_ms": upstream_stats.get("avg_frame_time_ms", 0),
|
|
"fps": upstream_stats.get("fps", 0),
|
|
},
|
|
"diff": {
|
|
"total_time_ms": sideline_stats.get("total_time_ms", 0)
|
|
- upstream_stats.get("total_time_ms", 0),
|
|
"avg_frame_time_ms": sideline_stats.get("avg_frame_time_ms", 0)
|
|
- upstream_stats.get("avg_frame_time_ms", 0),
|
|
"fps": sideline_stats.get("fps", 0) - upstream_stats.get("fps", 0),
|
|
},
|
|
}
|
|
|
|
# Build comparison result
|
|
result = {
|
|
"preset": sideline_data["preset"],
|
|
"config_diff": config_diff,
|
|
"frame_count_match": frame_count_match,
|
|
"stats": stats,
|
|
"performance_comparison": performance_comparison,
|
|
"frame_comparisons": frame_comparisons,
|
|
"sideline_file": str(sideline_file),
|
|
"upstream_file": str(upstream_file),
|
|
}
|
|
|
|
# Save comparison result
|
|
output_file = output_dir / f"{sideline_data['preset']}_comparison.json"
|
|
with open(output_file, "w") as f:
|
|
json.dump(result, f, indent=2)
|
|
|
|
return result
|
|
|
|
|
|
def generate_html_report(
|
|
comparison_results: List[Dict[str, Any]],
|
|
output_dir: Path = Path("tests/comparison_output"),
|
|
) -> Path:
|
|
"""Generate HTML report from comparison results using acceptance_report.py.
|
|
|
|
Args:
|
|
comparison_results: List of comparison results
|
|
output_dir: Directory to save HTML report
|
|
|
|
Returns:
|
|
Path to generated HTML report
|
|
"""
|
|
from tests.acceptance_report import save_index_report
|
|
|
|
output_dir.mkdir(parents=True, exist_ok=True)
|
|
|
|
# Generate index report with links to all comparison results
|
|
reports = []
|
|
for result in comparison_results:
|
|
reports.append(
|
|
{
|
|
"test_name": f"comparison-{result['preset']}",
|
|
"status": "PASS" if result.get("status") == "success" else "FAIL",
|
|
"frame_count": result["stats"]["total_frames_compared"],
|
|
"duration_ms": result["performance_comparison"]["sideline"][
|
|
"total_time_ms"
|
|
],
|
|
}
|
|
)
|
|
|
|
# Save index report
|
|
index_file = save_index_report(reports, str(output_dir))
|
|
|
|
# Also save a summary JSON file for programmatic access
|
|
summary_file = output_dir / "comparison_summary.json"
|
|
with open(summary_file, "w") as f:
|
|
json.dump(
|
|
{
|
|
"timestamp": __import__("datetime").datetime.now().isoformat(),
|
|
"results": comparison_results,
|
|
},
|
|
f,
|
|
indent=2,
|
|
)
|
|
|
|
return Path(index_file)
|