#!/usr/bin/env python3 """ Benchmark runner for mainline - tests performance across effects and displays. Usage: python -m engine.benchmark python -m engine.benchmark --output report.md python -m engine.benchmark --displays terminal,websocket --effects glitch,fade python -m engine.benchmark --format json --output benchmark.json Headless mode (default): suppress all terminal output during benchmarks. """ import argparse import json import sys import time from dataclasses import dataclass, field from datetime import datetime from io import StringIO from pathlib import Path from typing import Any import numpy as np @dataclass class BenchmarkResult: """Result of a single benchmark run.""" name: str display: str effect: str | None iterations: int total_time_ms: float avg_time_ms: float std_dev_ms: float min_ms: float max_ms: float fps: float chars_processed: int chars_per_sec: float @dataclass class BenchmarkReport: """Complete benchmark report.""" timestamp: str python_version: str results: list[BenchmarkResult] = field(default_factory=list) summary: dict[str, Any] = field(default_factory=dict) def get_sample_buffer(width: int = 80, height: int = 24) -> list[str]: """Generate a sample buffer for benchmarking.""" lines = [] for i in range(height): line = f"\x1b[32mLine {i}\x1b[0m " + "A" * (width - 10) lines.append(line) return lines def benchmark_display( display_class, buffer: list[str], iterations: int = 100, display=None, reuse: bool = False, ) -> BenchmarkResult | None: """Benchmark a single display. Args: display_class: Display class to instantiate buffer: Buffer to display iterations: Number of iterations display: Optional existing display instance to reuse reuse: If True and display provided, use reuse mode """ old_stdout = sys.stdout old_stderr = sys.stderr try: sys.stdout = StringIO() sys.stderr = StringIO() if display is None: display = display_class() display.init(80, 24, reuse=False) should_cleanup = True else: should_cleanup = False times = [] chars = sum(len(line) for line in buffer) for _ in range(iterations): t0 = time.perf_counter() display.show(buffer) elapsed = (time.perf_counter() - t0) * 1000 times.append(elapsed) if should_cleanup and hasattr(display, "cleanup"): display.cleanup(quit_pygame=False) except Exception: return None finally: sys.stdout = old_stdout sys.stderr = old_stderr times_arr = np.array(times) return BenchmarkResult( name=f"display_{display_class.__name__}", display=display_class.__name__, effect=None, iterations=iterations, total_time_ms=sum(times), avg_time_ms=float(np.mean(times_arr)), std_dev_ms=float(np.std(times_arr)), min_ms=float(np.min(times_arr)), max_ms=float(np.max(times_arr)), fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0, chars_processed=chars * iterations, chars_per_sec=float((chars * iterations) / (sum(times) / 1000)) if sum(times) > 0 else 0.0, ) def benchmark_effect_with_display( effect_class, display, buffer: list[str], iterations: int = 100, reuse: bool = False ) -> BenchmarkResult | None: """Benchmark an effect with a display. Args: effect_class: Effect class to instantiate display: Display instance to use buffer: Buffer to process and display iterations: Number of iterations reuse: If True, use reuse mode for display """ old_stdout = sys.stdout old_stderr = sys.stderr try: from engine.effects.types import EffectConfig, EffectContext sys.stdout = StringIO() sys.stderr = StringIO() effect = effect_class() effect.configure(EffectConfig(enabled=True, intensity=1.0)) ctx = EffectContext( terminal_width=80, terminal_height=24, scroll_cam=0, ticker_height=0, mic_excess=0.0, grad_offset=0.0, frame_number=0, has_message=False, ) times = [] chars = sum(len(line) for line in buffer) for _ in range(iterations): processed = effect.process(buffer, ctx) t0 = time.perf_counter() display.show(processed) elapsed = (time.perf_counter() - t0) * 1000 times.append(elapsed) if not reuse and hasattr(display, "cleanup"): display.cleanup(quit_pygame=False) except Exception: return None finally: sys.stdout = old_stdout sys.stderr = old_stderr times_arr = np.array(times) return BenchmarkResult( name=f"effect_{effect_class.__name__}_with_{display.__class__.__name__}", display=display.__class__.__name__, effect=effect_class.__name__, iterations=iterations, total_time_ms=sum(times), avg_time_ms=float(np.mean(times_arr)), std_dev_ms=float(np.std(times_arr)), min_ms=float(np.min(times_arr)), max_ms=float(np.max(times_arr)), fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0, chars_processed=chars * iterations, chars_per_sec=float((chars * iterations) / (sum(times) / 1000)) if sum(times) > 0 else 0.0, ) def get_available_displays(): """Get available display classes.""" from engine.display import ( DisplayRegistry, NullDisplay, TerminalDisplay, ) DisplayRegistry.initialize() displays = [ ("null", NullDisplay), ("terminal", TerminalDisplay), ] try: from engine.display.backends.websocket import WebSocketDisplay displays.append(("websocket", WebSocketDisplay)) except Exception: pass try: from engine.display.backends.sixel import SixelDisplay displays.append(("sixel", SixelDisplay)) except Exception: pass try: from engine.display.backends.pygame import PygameDisplay displays.append(("pygame", PygameDisplay)) except Exception: pass return displays def get_available_effects(): """Get available effect classes.""" try: from engine.effects import get_registry try: from effects_plugins import discover_plugins discover_plugins() except Exception: pass except Exception: return [] effects = [] registry = get_registry() for name, effect in registry.list_all().items(): if effect: effect_cls = type(effect) effects.append((name, effect_cls)) return effects def run_benchmarks( displays: list[tuple[str, Any]] | None = None, effects: list[tuple[str, Any]] | None = None, iterations: int = 100, verbose: bool = False, ) -> BenchmarkReport: """Run all benchmarks and return report.""" from datetime import datetime if displays is None: displays = get_available_displays() if effects is None: effects = get_available_effects() buffer = get_sample_buffer(80, 24) results = [] if verbose: print(f"Running benchmarks ({iterations} iterations each)...") pygame_display = None for name, display_class in displays: if verbose: print(f"Benchmarking display: {name}") result = benchmark_display(display_class, buffer, iterations) if result: results.append(result) if verbose: print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg") if name == "pygame": pygame_display = result if verbose: print() pygame_instance = None if pygame_display: try: from engine.display.backends.pygame import PygameDisplay PygameDisplay.reset_state() pygame_instance = PygameDisplay() pygame_instance.init(80, 24, reuse=False) except Exception: pygame_instance = None for effect_name, effect_class in effects: for display_name, display_class in displays: if display_name == "websocket": continue if display_name == "pygame": if verbose: print(f"Benchmarking effect: {effect_name} with {display_name}") if pygame_instance: result = benchmark_effect_with_display( effect_class, pygame_instance, buffer, iterations, reuse=True ) if result: results.append(result) if verbose: print( f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg" ) continue if verbose: print(f"Benchmarking effect: {effect_name} with {display_name}") display = display_class() display.init(80, 24) result = benchmark_effect_with_display( effect_class, display, buffer, iterations ) if result: results.append(result) if verbose: print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg") if pygame_instance: try: pygame_instance.cleanup(quit_pygame=True) except Exception: pass summary = generate_summary(results) return BenchmarkReport( timestamp=datetime.now().isoformat(), python_version=sys.version, results=results, summary=summary, ) def generate_summary(results: list[BenchmarkResult]) -> dict[str, Any]: """Generate summary statistics from results.""" by_display: dict[str, list[BenchmarkResult]] = {} by_effect: dict[str, list[BenchmarkResult]] = {} for r in results: if r.display not in by_display: by_display[r.display] = [] by_display[r.display].append(r) if r.effect: if r.effect not in by_effect: by_effect[r.effect] = [] by_effect[r.effect].append(r) summary = { "by_display": {}, "by_effect": {}, "overall": { "total_tests": len(results), "displays_tested": len(by_display), "effects_tested": len(by_effect), }, } for display, res in by_display.items(): fps_values = [r.fps for r in res] summary["by_display"][display] = { "avg_fps": float(np.mean(fps_values)), "min_fps": float(np.min(fps_values)), "max_fps": float(np.max(fps_values)), "tests": len(res), } for effect, res in by_effect.items(): fps_values = [r.fps for r in res] summary["by_effect"][effect] = { "avg_fps": float(np.mean(fps_values)), "min_fps": float(np.min(fps_values)), "max_fps": float(np.max(fps_values)), "tests": len(res), } return summary DEFAULT_CACHE_PATH = Path.home() / ".mainline_benchmark_cache.json" def load_baseline(cache_path: Path | None = None) -> dict[str, Any] | None: """Load baseline benchmark results from cache.""" path = cache_path or DEFAULT_CACHE_PATH if not path.exists(): return None try: with open(path) as f: return json.load(f) except Exception: return None def save_baseline( results: list[BenchmarkResult], cache_path: Path | None = None, ) -> None: """Save benchmark results as baseline to cache.""" path = cache_path or DEFAULT_CACHE_PATH baseline = { "timestamp": datetime.now().isoformat(), "results": { r.name: { "fps": r.fps, "avg_time_ms": r.avg_time_ms, "chars_per_sec": r.chars_per_sec, } for r in results }, } path.parent.mkdir(parents=True, exist_ok=True) with open(path, "w") as f: json.dump(baseline, f, indent=2) def compare_with_baseline( results: list[BenchmarkResult], baseline: dict[str, Any], threshold: float = 0.2, verbose: bool = True, ) -> tuple[bool, list[str]]: """Compare current results with baseline. Returns (pass, messages).""" baseline_results = baseline.get("results", {}) failures = [] warnings = [] for r in results: if r.name not in baseline_results: warnings.append(f"New test: {r.name} (no baseline)") continue b = baseline_results[r.name] if b["fps"] == 0: continue degradation = (b["fps"] - r.fps) / b["fps"] if degradation > threshold: failures.append( f"{r.name}: FPS degraded {degradation * 100:.1f}% " f"(baseline: {b['fps']:.1f}, current: {r.fps:.1f})" ) elif verbose: print(f" {r.name}: {r.fps:.1f} FPS (baseline: {b['fps']:.1f})") passed = len(failures) == 0 messages = [] if failures: messages.extend(failures) if warnings: messages.extend(warnings) return passed, messages def run_hook_mode( displays: list[tuple[str, Any]] | None = None, effects: list[tuple[str, Any]] | None = None, iterations: int = 20, threshold: float = 0.2, cache_path: Path | None = None, verbose: bool = False, ) -> int: """Run in hook mode: compare against baseline, exit 0 on pass, 1 on fail.""" baseline = load_baseline(cache_path) if baseline is None: print("No baseline found. Run with --baseline to create one.") return 1 report = run_benchmarks(displays, effects, iterations, verbose) passed, messages = compare_with_baseline( report.results, baseline, threshold, verbose ) print("\n=== Benchmark Hook Results ===") if passed: print("PASSED - No significant performance degradation") return 0 else: print("FAILED - Performance degradation detected:") for msg in messages: print(f" - {msg}") return 1 def format_report_text(report: BenchmarkReport) -> str: """Format report as human-readable text.""" lines = [ "# Mainline Performance Benchmark Report", "", f"Generated: {report.timestamp}", f"Python: {report.python_version}", "", "## Summary", "", f"Total tests: {report.summary['overall']['total_tests']}", f"Displays tested: {report.summary['overall']['displays_tested']}", f"Effects tested: {report.summary['overall']['effects_tested']}", "", "## By Display", "", ] for display, stats in report.summary["by_display"].items(): lines.append(f"### {display}") lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}") lines.append(f"- Min FPS: {stats['min_fps']:.1f}") lines.append(f"- Max FPS: {stats['max_fps']:.1f}") lines.append(f"- Tests: {stats['tests']}") lines.append("") if report.summary["by_effect"]: lines.append("## By Effect") lines.append("") for effect, stats in report.summary["by_effect"].items(): lines.append(f"### {effect}") lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}") lines.append(f"- Min FPS: {stats['min_fps']:.1f}") lines.append(f"- Max FPS: {stats['max_fps']:.1f}") lines.append(f"- Tests: {stats['tests']}") lines.append("") lines.append("## Detailed Results") lines.append("") lines.append("| Display | Effect | FPS | Avg ms | StdDev ms | Min ms | Max ms |") lines.append("|---------|--------|-----|--------|-----------|--------|--------|") for r in report.results: effect_col = r.effect if r.effect else "-" lines.append( f"| {r.display} | {effect_col} | {r.fps:.1f} | {r.avg_time_ms:.2f} | " f"{r.std_dev_ms:.2f} | {r.min_ms:.2f} | {r.max_ms:.2f} |" ) return "\n".join(lines) def format_report_json(report: BenchmarkReport) -> str: """Format report as JSON.""" data = { "timestamp": report.timestamp, "python_version": report.python_version, "summary": report.summary, "results": [ { "name": r.name, "display": r.display, "effect": r.effect, "iterations": r.iterations, "total_time_ms": r.total_time_ms, "avg_time_ms": r.avg_time_ms, "std_dev_ms": r.std_dev_ms, "min_ms": r.min_ms, "max_ms": r.max_ms, "fps": r.fps, "chars_processed": r.chars_processed, "chars_per_sec": r.chars_per_sec, } for r in report.results ], } return json.dumps(data, indent=2) def main(): parser = argparse.ArgumentParser(description="Run mainline benchmarks") parser.add_argument( "--displays", help="Comma-separated list of displays to test (default: all)", ) parser.add_argument( "--effects", help="Comma-separated list of effects to test (default: all)", ) parser.add_argument( "--iterations", type=int, default=100, help="Number of iterations per test (default: 100)", ) parser.add_argument( "--output", help="Output file path (default: stdout)", ) parser.add_argument( "--format", choices=["text", "json"], default="text", help="Output format (default: text)", ) parser.add_argument( "--verbose", "-v", action="store_true", help="Show progress during benchmarking", ) parser.add_argument( "--hook", action="store_true", help="Run in hook mode: compare against baseline, exit 0 pass, 1 fail", ) parser.add_argument( "--baseline", action="store_true", help="Save current results as baseline for future hook comparisons", ) parser.add_argument( "--threshold", type=float, default=0.2, help="Performance degradation threshold for hook mode (default: 0.2 = 20%%)", ) parser.add_argument( "--cache", type=str, default=None, help="Path to baseline cache file (default: ~/.mainline_benchmark_cache.json)", ) args = parser.parse_args() cache_path = Path(args.cache) if args.cache else DEFAULT_CACHE_PATH if args.hook: displays = None if args.displays: display_map = dict(get_available_displays()) displays = [ (name, display_map[name]) for name in args.displays.split(",") if name in display_map ] effects = None if args.effects: effect_map = dict(get_available_effects()) effects = [ (name, effect_map[name]) for name in args.effects.split(",") if name in effect_map ] return run_hook_mode( displays, effects, iterations=args.iterations, threshold=args.threshold, cache_path=cache_path, verbose=args.verbose, ) displays = None if args.displays: display_map = dict(get_available_displays()) displays = [ (name, display_map[name]) for name in args.displays.split(",") if name in display_map ] effects = None if args.effects: effect_map = dict(get_available_effects()) effects = [ (name, effect_map[name]) for name in args.effects.split(",") if name in effect_map ] report = run_benchmarks(displays, effects, args.iterations, args.verbose) if args.baseline: save_baseline(report.results, cache_path) print(f"Baseline saved to {cache_path}") return 0 if args.format == "json": output = format_report_json(report) else: output = format_report_text(report) if args.output: with open(args.output, "w") as f: f.write(output) else: print(output) return 0 if __name__ == "__main__": sys.exit(main())