"""Frame capture utilities for upstream vs sideline comparison. This module provides functions to capture frames from both upstream and sideline implementations for visual comparison and performance analysis. """ import json import time from pathlib import Path from typing import Any, Dict, List, Tuple import tomli from engine.pipeline import Pipeline, PipelineConfig, PipelineContext from engine.pipeline.params import PipelineParams def load_comparison_preset(preset_name: str) -> Any: """Load a comparison preset from comparison_presets.toml. Args: preset_name: Name of the preset to load Returns: Preset configuration dictionary """ presets_file = Path("tests/comparison_presets.toml") if not presets_file.exists(): raise FileNotFoundError(f"Comparison presets file not found: {presets_file}") with open(presets_file, "rb") as f: config = tomli.load(f) presets = config.get("presets", {}) full_name = ( f"presets.{preset_name}" if not preset_name.startswith("presets.") else preset_name ) simple_name = ( preset_name.replace("presets.", "") if preset_name.startswith("presets.") else preset_name ) if full_name in presets: return presets[full_name] elif simple_name in presets: return presets[simple_name] else: raise ValueError( f"Preset '{preset_name}' not found in {presets_file}. Available: {list(presets.keys())}" ) def capture_frames( preset_name: str, frame_count: int = 30, output_dir: Path = Path("tests/comparison_output"), ) -> Dict[str, Any]: """Capture frames from sideline pipeline using a preset. Args: preset_name: Name of preset to use frame_count: Number of frames to capture output_dir: Directory to save captured frames Returns: Dictionary with captured frames and metadata """ from engine.pipeline.presets import get_preset output_dir.mkdir(parents=True, exist_ok=True) # Load preset - try comparison presets first, then built-in presets try: preset = load_comparison_preset(preset_name) # Convert dict to object-like access from types import SimpleNamespace preset = SimpleNamespace(**preset) except (FileNotFoundError, ValueError): # Fall back to built-in presets preset = get_preset(preset_name) if not preset: raise ValueError( f"Preset '{preset_name}' not found in comparison or built-in presets" ) # Create pipeline config from preset config = PipelineConfig( source=preset.source, display="null", # Always use null display for capture camera=preset.camera, effects=preset.effects, ) # Create pipeline ctx = PipelineContext() ctx.terminal_width = preset.viewport_width ctx.terminal_height = preset.viewport_height pipeline = Pipeline(config=config, context=ctx) # Create params params = PipelineParams( viewport_width=preset.viewport_width, viewport_height=preset.viewport_height, ) ctx.params = params # Add stages based on source type (similar to pipeline_runner) from engine.display import DisplayRegistry from engine.pipeline.adapters import create_stage_from_display from engine.data_sources.sources import EmptyDataSource from engine.pipeline.adapters import DataSourceStage # Add source stage if preset.source == "empty": source_stage = DataSourceStage( EmptyDataSource(width=preset.viewport_width, height=preset.viewport_height), name="empty", ) else: # For headlines/poetry, use the actual source from engine.data_sources.sources import HeadlinesDataSource, PoetryDataSource if preset.source == "headlines": source_stage = DataSourceStage(HeadlinesDataSource(), name="headlines") elif preset.source == "poetry": source_stage = DataSourceStage(PoetryDataSource(), name="poetry") else: # Fallback to empty source_stage = DataSourceStage( EmptyDataSource( width=preset.viewport_width, height=preset.viewport_height ), name="empty", ) pipeline.add_stage("source", source_stage) # Add font stage for headlines/poetry (with viewport filter) if preset.source in ["headlines", "poetry"]: from engine.pipeline.adapters import FontStage, ViewportFilterStage # Add viewport filter to prevent rendering all items pipeline.add_stage( "viewport_filter", ViewportFilterStage(name="viewport-filter") ) # Add font stage for block character rendering pipeline.add_stage("font", FontStage(name="font")) else: # Fallback to simple conversion for empty/other sources from engine.pipeline.adapters import SourceItemsToBufferStage pipeline.add_stage("render", SourceItemsToBufferStage(name="items-to-buffer")) # Add camera stage from engine.camera import Camera from engine.pipeline.adapters import CameraStage, CameraClockStage # Create camera based on preset if preset.camera == "feed": camera = Camera.feed() elif preset.camera == "scroll": camera = Camera.scroll(speed=0.1) elif preset.camera == "horizontal": camera = Camera.horizontal(speed=0.1) else: camera = Camera.feed() camera.set_canvas_size(preset.viewport_width, preset.viewport_height * 2) # Add camera update (for animation) pipeline.add_stage("camera_update", CameraClockStage(camera, name="camera-clock")) # Add camera stage pipeline.add_stage("camera", CameraStage(camera, name=preset.camera)) # Add effects if preset.effects: from engine.effects.registry import EffectRegistry from engine.pipeline.adapters import create_stage_from_effect effect_registry = EffectRegistry() for effect_name in preset.effects: effect = effect_registry.get(effect_name) if effect: pipeline.add_stage( f"effect_{effect_name}", create_stage_from_effect(effect, effect_name), ) # Add message overlay stage if enabled (BEFORE display) if getattr(preset, "enable_message_overlay", False): from engine.pipeline.adapters import MessageOverlayConfig, MessageOverlayStage overlay_config = MessageOverlayConfig( enabled=True, display_secs=30, ) pipeline.add_stage( "message_overlay", MessageOverlayStage(config=overlay_config) ) # Add null display stage (LAST) null_display = DisplayRegistry.create("null") if null_display: pipeline.add_stage("display", create_stage_from_display(null_display, "null")) # Build pipeline pipeline.build() # Enable recording on null display if available display_stage = pipeline._stages.get("display") if display_stage and hasattr(display_stage, "_display"): backend = display_stage._display if hasattr(backend, "start_recording"): backend.start_recording() # Capture frames frames = [] start_time = time.time() for i in range(frame_count): frame_start = time.time() stage_result = pipeline.execute() frame_time = time.time() - frame_start # Get frames from display recording display_stage = pipeline._stages.get("display") if display_stage and hasattr(display_stage, "_display"): backend = display_stage._display if hasattr(backend, "get_recorded_data"): recorded_frames = backend.get_recorded_data() # Add render_time_ms to each frame for frame in recorded_frames: frame["render_time_ms"] = frame_time * 1000 frames = recorded_frames # Fallback: create empty frames if no recording if not frames: for i in range(frame_count): frames.append( { "frame_number": i, "buffer": [], "width": preset.viewport_width, "height": preset.viewport_height, "render_time_ms": frame_time * 1000, } ) # Stop recording on null display display_stage = pipeline._stages.get("display") if display_stage and hasattr(display_stage, "_display"): backend = display_stage._display if hasattr(backend, "stop_recording"): backend.stop_recording() total_time = time.time() - start_time # Save captured data output_file = output_dir / f"{preset_name}_sideline.json" captured_data = { "preset": preset_name, "config": { "source": preset.source, "camera": preset.camera, "effects": preset.effects, "viewport_width": preset.viewport_width, "viewport_height": preset.viewport_height, "enable_message_overlay": getattr(preset, "enable_message_overlay", False), }, "capture_stats": { "frame_count": frame_count, "total_time_ms": total_time * 1000, "avg_frame_time_ms": (total_time * 1000) / frame_count, "fps": frame_count / total_time if total_time > 0 else 0, }, "frames": frames, } with open(output_file, "w") as f: json.dump(captured_data, f, indent=2) return captured_data def compare_captured_outputs( sideline_file: Path, upstream_file: Path, output_dir: Path = Path("tests/comparison_output"), ) -> Dict[str, Any]: """Compare captured outputs from sideline and upstream. Args: sideline_file: Path to sideline captured output upstream_file: Path to upstream captured output output_dir: Directory to save comparison results Returns: Dictionary with comparison results """ output_dir.mkdir(parents=True, exist_ok=True) # Load captured data with open(sideline_file) as f: sideline_data = json.load(f) with open(upstream_file) as f: upstream_data = json.load(f) # Compare configurations config_diff = {} for key in [ "source", "camera", "effects", "viewport_width", "viewport_height", "enable_message_overlay", ]: sideline_val = sideline_data["config"].get(key) upstream_val = upstream_data["config"].get(key) if sideline_val != upstream_val: config_diff[key] = {"sideline": sideline_val, "upstream": upstream_val} # Compare frame counts sideline_frames = len(sideline_data["frames"]) upstream_frames = len(upstream_data["frames"]) frame_count_match = sideline_frames == upstream_frames # Compare individual frames frame_comparisons = [] total_diff = 0 max_diff = 0 identical_frames = 0 min_frames = min(sideline_frames, upstream_frames) for i in range(min_frames): sideline_frame = sideline_data["frames"][i] upstream_frame = upstream_data["frames"][i] sideline_buffer = sideline_frame["buffer"] upstream_buffer = upstream_frame["buffer"] # Compare buffers line by line line_diffs = [] frame_diff = 0 max_lines = max(len(sideline_buffer), len(upstream_buffer)) for line_idx in range(max_lines): sideline_line = ( sideline_buffer[line_idx] if line_idx < len(sideline_buffer) else "" ) upstream_line = ( upstream_buffer[line_idx] if line_idx < len(upstream_buffer) else "" ) if sideline_line != upstream_line: line_diffs.append( { "line": line_idx, "sideline": sideline_line, "upstream": upstream_line, } ) frame_diff += 1 if frame_diff == 0: identical_frames += 1 total_diff += frame_diff max_diff = max(max_diff, frame_diff) frame_comparisons.append( { "frame_number": i, "differences": frame_diff, "line_diffs": line_diffs[ :5 ], # Only store first 5 differences per frame "render_time_diff_ms": sideline_frame.get("render_time_ms", 0) - upstream_frame.get("render_time_ms", 0), } ) # Calculate statistics stats = { "total_frames_compared": min_frames, "identical_frames": identical_frames, "frames_with_differences": min_frames - identical_frames, "total_differences": total_diff, "max_differences_per_frame": max_diff, "avg_differences_per_frame": total_diff / min_frames if min_frames > 0 else 0, "match_percentage": (identical_frames / min_frames * 100) if min_frames > 0 else 0, } # Compare performance stats sideline_stats = sideline_data.get("capture_stats", {}) upstream_stats = upstream_data.get("capture_stats", {}) performance_comparison = { "sideline": { "total_time_ms": sideline_stats.get("total_time_ms", 0), "avg_frame_time_ms": sideline_stats.get("avg_frame_time_ms", 0), "fps": sideline_stats.get("fps", 0), }, "upstream": { "total_time_ms": upstream_stats.get("total_time_ms", 0), "avg_frame_time_ms": upstream_stats.get("avg_frame_time_ms", 0), "fps": upstream_stats.get("fps", 0), }, "diff": { "total_time_ms": sideline_stats.get("total_time_ms", 0) - upstream_stats.get("total_time_ms", 0), "avg_frame_time_ms": sideline_stats.get("avg_frame_time_ms", 0) - upstream_stats.get("avg_frame_time_ms", 0), "fps": sideline_stats.get("fps", 0) - upstream_stats.get("fps", 0), }, } # Build comparison result result = { "preset": sideline_data["preset"], "config_diff": config_diff, "frame_count_match": frame_count_match, "stats": stats, "performance_comparison": performance_comparison, "frame_comparisons": frame_comparisons, "sideline_file": str(sideline_file), "upstream_file": str(upstream_file), } # Save comparison result output_file = output_dir / f"{sideline_data['preset']}_comparison.json" with open(output_file, "w") as f: json.dump(result, f, indent=2) return result def generate_html_report( comparison_results: List[Dict[str, Any]], output_dir: Path = Path("tests/comparison_output"), ) -> Path: """Generate HTML report from comparison results. Args: comparison_results: List of comparison results output_dir: Directory to save HTML report Returns: Path to generated HTML report """ output_dir.mkdir(parents=True, exist_ok=True) html_content = """ Mainline Comparison Report

Mainline Pipeline Comparison Report

Generated: {{timestamp}}

Summary

0
Presets Tested
0%
Average Match Rate
0
Total Frames Compared
""" # Generate comparison data for JavaScript comparison_data_json = json.dumps(comparison_results) # Calculate summary statistics total_presets = len(comparison_results) total_frames = sum(r["stats"]["total_frames_compared"] for r in comparison_results) total_identical = sum(r["stats"]["identical_frames"] for r in comparison_results) average_match = (total_identical / total_frames * 100) if total_frames > 0 else 0 summary = { "total_presets": total_presets, "total_frames": total_frames, "total_identical": total_identical, "average_match": average_match, } # Replace placeholders html_content = html_content.replace( "{{timestamp}}", time.strftime("%Y-%m-%d %H:%M:%S") ) html_content = html_content.replace("{{comparison_data}}", comparison_data_json) html_content = html_content.replace("{{summary}}", json.dumps(summary)) # Save HTML report output_file = output_dir / "comparison_report.html" with open(output_file, "w") as f: f.write(html_content) return output_file