- Fix lint errors and LSP issues in benchmark.py - Add --hook mode to compare against saved baseline - Add --baseline flag to save results as baseline - Add --threshold to configure degradation threshold (default 20%) - Add benchmark step to pre-push hook in hk.pkl - Update AGENTS.md with hk documentation links and benchmark runner docs
660 lines
18 KiB
Python
660 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Benchmark runner for mainline - tests performance across effects and displays.
|
|
|
|
Usage:
|
|
python -m engine.benchmark
|
|
python -m engine.benchmark --output report.md
|
|
python -m engine.benchmark --displays terminal,websocket --effects glitch,fade
|
|
python -m engine.benchmark --format json --output benchmark.json
|
|
|
|
Headless mode (default): suppress all terminal output during benchmarks.
|
|
"""
|
|
|
|
import argparse
|
|
import json
|
|
import sys
|
|
import time
|
|
from dataclasses import dataclass, field
|
|
from datetime import datetime
|
|
from io import StringIO
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
import numpy as np
|
|
|
|
|
|
@dataclass
|
|
class BenchmarkResult:
|
|
"""Result of a single benchmark run."""
|
|
|
|
name: str
|
|
display: str
|
|
effect: str | None
|
|
iterations: int
|
|
total_time_ms: float
|
|
avg_time_ms: float
|
|
std_dev_ms: float
|
|
min_ms: float
|
|
max_ms: float
|
|
fps: float
|
|
chars_processed: int
|
|
chars_per_sec: float
|
|
|
|
|
|
@dataclass
|
|
class BenchmarkReport:
|
|
"""Complete benchmark report."""
|
|
|
|
timestamp: str
|
|
python_version: str
|
|
results: list[BenchmarkResult] = field(default_factory=list)
|
|
summary: dict[str, Any] = field(default_factory=dict)
|
|
|
|
|
|
def get_sample_buffer(width: int = 80, height: int = 24) -> list[str]:
|
|
"""Generate a sample buffer for benchmarking."""
|
|
lines = []
|
|
for i in range(height):
|
|
line = f"\x1b[32mLine {i}\x1b[0m " + "A" * (width - 10)
|
|
lines.append(line)
|
|
return lines
|
|
|
|
|
|
def benchmark_display(
|
|
display_class, buffer: list[str], iterations: int = 100
|
|
) -> BenchmarkResult | None:
|
|
"""Benchmark a single display."""
|
|
old_stdout = sys.stdout
|
|
old_stderr = sys.stderr
|
|
|
|
try:
|
|
sys.stdout = StringIO()
|
|
sys.stderr = StringIO()
|
|
|
|
display = display_class()
|
|
display.init(80, 24)
|
|
|
|
times = []
|
|
chars = sum(len(line) for line in buffer)
|
|
|
|
for _ in range(iterations):
|
|
t0 = time.perf_counter()
|
|
display.show(buffer)
|
|
elapsed = (time.perf_counter() - t0) * 1000
|
|
times.append(elapsed)
|
|
|
|
display.cleanup()
|
|
|
|
except Exception:
|
|
return None
|
|
finally:
|
|
sys.stdout = old_stdout
|
|
sys.stderr = old_stderr
|
|
|
|
times_arr = np.array(times)
|
|
|
|
return BenchmarkResult(
|
|
name=f"display_{display_class.__name__}",
|
|
display=display_class.__name__,
|
|
effect=None,
|
|
iterations=iterations,
|
|
total_time_ms=sum(times),
|
|
avg_time_ms=float(np.mean(times_arr)),
|
|
std_dev_ms=float(np.std(times_arr)),
|
|
min_ms=float(np.min(times_arr)),
|
|
max_ms=float(np.max(times_arr)),
|
|
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
|
|
chars_processed=chars * iterations,
|
|
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
|
|
if sum(times) > 0
|
|
else 0.0,
|
|
)
|
|
|
|
|
|
def benchmark_effect_with_display(
|
|
effect_class, display, buffer: list[str], iterations: int = 100
|
|
) -> BenchmarkResult | None:
|
|
"""Benchmark an effect with a display."""
|
|
old_stdout = sys.stdout
|
|
old_stderr = sys.stderr
|
|
|
|
try:
|
|
from engine.effects.types import EffectConfig, EffectContext
|
|
|
|
sys.stdout = StringIO()
|
|
sys.stderr = StringIO()
|
|
|
|
effect = effect_class()
|
|
effect.configure(EffectConfig(enabled=True, intensity=1.0))
|
|
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=0,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
|
|
times = []
|
|
chars = sum(len(line) for line in buffer)
|
|
|
|
for _ in range(iterations):
|
|
processed = effect.process(buffer, ctx)
|
|
t0 = time.perf_counter()
|
|
display.show(processed)
|
|
elapsed = (time.perf_counter() - t0) * 1000
|
|
times.append(elapsed)
|
|
|
|
display.cleanup()
|
|
|
|
except Exception:
|
|
return None
|
|
finally:
|
|
sys.stdout = old_stdout
|
|
sys.stderr = old_stderr
|
|
|
|
times_arr = np.array(times)
|
|
|
|
return BenchmarkResult(
|
|
name=f"effect_{effect_class.__name__}_with_{display.__class__.__name__}",
|
|
display=display.__class__.__name__,
|
|
effect=effect_class.__name__,
|
|
iterations=iterations,
|
|
total_time_ms=sum(times),
|
|
avg_time_ms=float(np.mean(times_arr)),
|
|
std_dev_ms=float(np.std(times_arr)),
|
|
min_ms=float(np.min(times_arr)),
|
|
max_ms=float(np.max(times_arr)),
|
|
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
|
|
chars_processed=chars * iterations,
|
|
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
|
|
if sum(times) > 0
|
|
else 0.0,
|
|
)
|
|
|
|
|
|
def get_available_displays():
|
|
"""Get available display classes."""
|
|
from engine.display import (
|
|
DisplayRegistry,
|
|
NullDisplay,
|
|
TerminalDisplay,
|
|
)
|
|
|
|
DisplayRegistry.initialize()
|
|
|
|
displays = [
|
|
("null", NullDisplay),
|
|
("terminal", TerminalDisplay),
|
|
]
|
|
|
|
try:
|
|
from engine.display.backends.websocket import WebSocketDisplay
|
|
|
|
displays.append(("websocket", WebSocketDisplay))
|
|
except Exception:
|
|
pass
|
|
|
|
try:
|
|
from engine.display.backends.sixel import SixelDisplay
|
|
|
|
displays.append(("sixel", SixelDisplay))
|
|
except Exception:
|
|
pass
|
|
|
|
return displays
|
|
|
|
|
|
def get_available_effects():
|
|
"""Get available effect classes."""
|
|
try:
|
|
from engine.effects import get_registry
|
|
|
|
try:
|
|
from effects_plugins import discover_plugins
|
|
|
|
discover_plugins()
|
|
except Exception:
|
|
pass
|
|
except Exception:
|
|
return []
|
|
|
|
effects = []
|
|
registry = get_registry()
|
|
|
|
for name, effect in registry.list_all().items():
|
|
if effect:
|
|
effect_cls = type(effect)
|
|
effects.append((name, effect_cls))
|
|
|
|
return effects
|
|
|
|
|
|
def run_benchmarks(
|
|
displays: list[tuple[str, Any]] | None = None,
|
|
effects: list[tuple[str, Any]] | None = None,
|
|
iterations: int = 100,
|
|
verbose: bool = False,
|
|
) -> BenchmarkReport:
|
|
"""Run all benchmarks and return report."""
|
|
from datetime import datetime
|
|
|
|
if displays is None:
|
|
displays = get_available_displays()
|
|
|
|
if effects is None:
|
|
effects = get_available_effects()
|
|
|
|
buffer = get_sample_buffer(80, 24)
|
|
results = []
|
|
|
|
if verbose:
|
|
print(f"Running benchmarks ({iterations} iterations each)...")
|
|
|
|
for name, display_class in displays:
|
|
if verbose:
|
|
print(f"Benchmarking display: {name}")
|
|
|
|
result = benchmark_display(display_class, buffer, iterations)
|
|
if result:
|
|
results.append(result)
|
|
if verbose:
|
|
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
|
|
|
|
if verbose:
|
|
print()
|
|
|
|
for effect_name, effect_class in effects:
|
|
for display_name, display_class in displays:
|
|
if display_name == "websocket":
|
|
continue
|
|
if verbose:
|
|
print(f"Benchmarking effect: {effect_name} with {display_name}")
|
|
|
|
display = display_class()
|
|
display.init(80, 24)
|
|
result = benchmark_effect_with_display(
|
|
effect_class, display, buffer, iterations
|
|
)
|
|
if result:
|
|
results.append(result)
|
|
if verbose:
|
|
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
|
|
|
|
summary = generate_summary(results)
|
|
|
|
return BenchmarkReport(
|
|
timestamp=datetime.now().isoformat(),
|
|
python_version=sys.version,
|
|
results=results,
|
|
summary=summary,
|
|
)
|
|
|
|
|
|
def generate_summary(results: list[BenchmarkResult]) -> dict[str, Any]:
|
|
"""Generate summary statistics from results."""
|
|
by_display: dict[str, list[BenchmarkResult]] = {}
|
|
by_effect: dict[str, list[BenchmarkResult]] = {}
|
|
|
|
for r in results:
|
|
if r.display not in by_display:
|
|
by_display[r.display] = []
|
|
by_display[r.display].append(r)
|
|
|
|
if r.effect:
|
|
if r.effect not in by_effect:
|
|
by_effect[r.effect] = []
|
|
by_effect[r.effect].append(r)
|
|
|
|
summary = {
|
|
"by_display": {},
|
|
"by_effect": {},
|
|
"overall": {
|
|
"total_tests": len(results),
|
|
"displays_tested": len(by_display),
|
|
"effects_tested": len(by_effect),
|
|
},
|
|
}
|
|
|
|
for display, res in by_display.items():
|
|
fps_values = [r.fps for r in res]
|
|
summary["by_display"][display] = {
|
|
"avg_fps": float(np.mean(fps_values)),
|
|
"min_fps": float(np.min(fps_values)),
|
|
"max_fps": float(np.max(fps_values)),
|
|
"tests": len(res),
|
|
}
|
|
|
|
for effect, res in by_effect.items():
|
|
fps_values = [r.fps for r in res]
|
|
summary["by_effect"][effect] = {
|
|
"avg_fps": float(np.mean(fps_values)),
|
|
"min_fps": float(np.min(fps_values)),
|
|
"max_fps": float(np.max(fps_values)),
|
|
"tests": len(res),
|
|
}
|
|
|
|
return summary
|
|
|
|
|
|
DEFAULT_CACHE_PATH = Path.home() / ".mainline_benchmark_cache.json"
|
|
|
|
|
|
def load_baseline(cache_path: Path | None = None) -> dict[str, Any] | None:
|
|
"""Load baseline benchmark results from cache."""
|
|
path = cache_path or DEFAULT_CACHE_PATH
|
|
if not path.exists():
|
|
return None
|
|
try:
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def save_baseline(
|
|
results: list[BenchmarkResult],
|
|
cache_path: Path | None = None,
|
|
) -> None:
|
|
"""Save benchmark results as baseline to cache."""
|
|
path = cache_path or DEFAULT_CACHE_PATH
|
|
baseline = {
|
|
"timestamp": datetime.now().isoformat(),
|
|
"results": {
|
|
r.name: {
|
|
"fps": r.fps,
|
|
"avg_time_ms": r.avg_time_ms,
|
|
"chars_per_sec": r.chars_per_sec,
|
|
}
|
|
for r in results
|
|
},
|
|
}
|
|
path.parent.mkdir(parents=True, exist_ok=True)
|
|
with open(path, "w") as f:
|
|
json.dump(baseline, f, indent=2)
|
|
|
|
|
|
def compare_with_baseline(
|
|
results: list[BenchmarkResult],
|
|
baseline: dict[str, Any],
|
|
threshold: float = 0.2,
|
|
verbose: bool = True,
|
|
) -> tuple[bool, list[str]]:
|
|
"""Compare current results with baseline. Returns (pass, messages)."""
|
|
baseline_results = baseline.get("results", {})
|
|
failures = []
|
|
warnings = []
|
|
|
|
for r in results:
|
|
if r.name not in baseline_results:
|
|
warnings.append(f"New test: {r.name} (no baseline)")
|
|
continue
|
|
|
|
b = baseline_results[r.name]
|
|
if b["fps"] == 0:
|
|
continue
|
|
|
|
degradation = (b["fps"] - r.fps) / b["fps"]
|
|
if degradation > threshold:
|
|
failures.append(
|
|
f"{r.name}: FPS degraded {degradation * 100:.1f}% "
|
|
f"(baseline: {b['fps']:.1f}, current: {r.fps:.1f})"
|
|
)
|
|
elif verbose:
|
|
print(f" {r.name}: {r.fps:.1f} FPS (baseline: {b['fps']:.1f})")
|
|
|
|
passed = len(failures) == 0
|
|
messages = []
|
|
if failures:
|
|
messages.extend(failures)
|
|
if warnings:
|
|
messages.extend(warnings)
|
|
|
|
return passed, messages
|
|
|
|
|
|
def run_hook_mode(
|
|
displays: list[tuple[str, Any]] | None = None,
|
|
effects: list[tuple[str, Any]] | None = None,
|
|
iterations: int = 20,
|
|
threshold: float = 0.2,
|
|
cache_path: Path | None = None,
|
|
verbose: bool = False,
|
|
) -> int:
|
|
"""Run in hook mode: compare against baseline, exit 0 on pass, 1 on fail."""
|
|
baseline = load_baseline(cache_path)
|
|
|
|
if baseline is None:
|
|
print("No baseline found. Run with --baseline to create one.")
|
|
return 1
|
|
|
|
report = run_benchmarks(displays, effects, iterations, verbose)
|
|
|
|
passed, messages = compare_with_baseline(
|
|
report.results, baseline, threshold, verbose
|
|
)
|
|
|
|
print("\n=== Benchmark Hook Results ===")
|
|
if passed:
|
|
print("PASSED - No significant performance degradation")
|
|
return 0
|
|
else:
|
|
print("FAILED - Performance degradation detected:")
|
|
for msg in messages:
|
|
print(f" - {msg}")
|
|
return 1
|
|
|
|
|
|
def format_report_text(report: BenchmarkReport) -> str:
|
|
"""Format report as human-readable text."""
|
|
lines = [
|
|
"# Mainline Performance Benchmark Report",
|
|
"",
|
|
f"Generated: {report.timestamp}",
|
|
f"Python: {report.python_version}",
|
|
"",
|
|
"## Summary",
|
|
"",
|
|
f"Total tests: {report.summary['overall']['total_tests']}",
|
|
f"Displays tested: {report.summary['overall']['displays_tested']}",
|
|
f"Effects tested: {report.summary['overall']['effects_tested']}",
|
|
"",
|
|
"## By Display",
|
|
"",
|
|
]
|
|
|
|
for display, stats in report.summary["by_display"].items():
|
|
lines.append(f"### {display}")
|
|
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
|
|
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
|
|
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
|
|
lines.append(f"- Tests: {stats['tests']}")
|
|
lines.append("")
|
|
|
|
if report.summary["by_effect"]:
|
|
lines.append("## By Effect")
|
|
lines.append("")
|
|
|
|
for effect, stats in report.summary["by_effect"].items():
|
|
lines.append(f"### {effect}")
|
|
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
|
|
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
|
|
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
|
|
lines.append(f"- Tests: {stats['tests']}")
|
|
lines.append("")
|
|
|
|
lines.append("## Detailed Results")
|
|
lines.append("")
|
|
lines.append("| Display | Effect | FPS | Avg ms | StdDev ms | Min ms | Max ms |")
|
|
lines.append("|---------|--------|-----|--------|-----------|--------|--------|")
|
|
|
|
for r in report.results:
|
|
effect_col = r.effect if r.effect else "-"
|
|
lines.append(
|
|
f"| {r.display} | {effect_col} | {r.fps:.1f} | {r.avg_time_ms:.2f} | "
|
|
f"{r.std_dev_ms:.2f} | {r.min_ms:.2f} | {r.max_ms:.2f} |"
|
|
)
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
def format_report_json(report: BenchmarkReport) -> str:
|
|
"""Format report as JSON."""
|
|
data = {
|
|
"timestamp": report.timestamp,
|
|
"python_version": report.python_version,
|
|
"summary": report.summary,
|
|
"results": [
|
|
{
|
|
"name": r.name,
|
|
"display": r.display,
|
|
"effect": r.effect,
|
|
"iterations": r.iterations,
|
|
"total_time_ms": r.total_time_ms,
|
|
"avg_time_ms": r.avg_time_ms,
|
|
"std_dev_ms": r.std_dev_ms,
|
|
"min_ms": r.min_ms,
|
|
"max_ms": r.max_ms,
|
|
"fps": r.fps,
|
|
"chars_processed": r.chars_processed,
|
|
"chars_per_sec": r.chars_per_sec,
|
|
}
|
|
for r in report.results
|
|
],
|
|
}
|
|
return json.dumps(data, indent=2)
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(description="Run mainline benchmarks")
|
|
parser.add_argument(
|
|
"--displays",
|
|
help="Comma-separated list of displays to test (default: all)",
|
|
)
|
|
parser.add_argument(
|
|
"--effects",
|
|
help="Comma-separated list of effects to test (default: all)",
|
|
)
|
|
parser.add_argument(
|
|
"--iterations",
|
|
type=int,
|
|
default=100,
|
|
help="Number of iterations per test (default: 100)",
|
|
)
|
|
parser.add_argument(
|
|
"--output",
|
|
help="Output file path (default: stdout)",
|
|
)
|
|
parser.add_argument(
|
|
"--format",
|
|
choices=["text", "json"],
|
|
default="text",
|
|
help="Output format (default: text)",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
"-v",
|
|
action="store_true",
|
|
help="Show progress during benchmarking",
|
|
)
|
|
parser.add_argument(
|
|
"--hook",
|
|
action="store_true",
|
|
help="Run in hook mode: compare against baseline, exit 0 pass, 1 fail",
|
|
)
|
|
parser.add_argument(
|
|
"--baseline",
|
|
action="store_true",
|
|
help="Save current results as baseline for future hook comparisons",
|
|
)
|
|
parser.add_argument(
|
|
"--threshold",
|
|
type=float,
|
|
default=0.2,
|
|
help="Performance degradation threshold for hook mode (default: 0.2 = 20%%)",
|
|
)
|
|
parser.add_argument(
|
|
"--cache",
|
|
type=str,
|
|
default=None,
|
|
help="Path to baseline cache file (default: ~/.mainline_benchmark_cache.json)",
|
|
)
|
|
|
|
args = parser.parse_args()
|
|
|
|
cache_path = Path(args.cache) if args.cache else DEFAULT_CACHE_PATH
|
|
|
|
if args.hook:
|
|
displays = None
|
|
if args.displays:
|
|
display_map = dict(get_available_displays())
|
|
displays = [
|
|
(name, display_map[name])
|
|
for name in args.displays.split(",")
|
|
if name in display_map
|
|
]
|
|
|
|
effects = None
|
|
if args.effects:
|
|
effect_map = dict(get_available_effects())
|
|
effects = [
|
|
(name, effect_map[name])
|
|
for name in args.effects.split(",")
|
|
if name in effect_map
|
|
]
|
|
|
|
return run_hook_mode(
|
|
displays,
|
|
effects,
|
|
iterations=args.iterations,
|
|
threshold=args.threshold,
|
|
cache_path=cache_path,
|
|
verbose=args.verbose,
|
|
)
|
|
|
|
displays = None
|
|
if args.displays:
|
|
display_map = dict(get_available_displays())
|
|
displays = [
|
|
(name, display_map[name])
|
|
for name in args.displays.split(",")
|
|
if name in display_map
|
|
]
|
|
|
|
effects = None
|
|
if args.effects:
|
|
effect_map = dict(get_available_effects())
|
|
effects = [
|
|
(name, effect_map[name])
|
|
for name in args.effects.split(",")
|
|
if name in effect_map
|
|
]
|
|
|
|
report = run_benchmarks(displays, effects, args.iterations, args.verbose)
|
|
|
|
if args.baseline:
|
|
save_baseline(report.results, cache_path)
|
|
print(f"Baseline saved to {cache_path}")
|
|
return 0
|
|
|
|
if args.format == "json":
|
|
output = format_report_json(report)
|
|
else:
|
|
output = format_report_text(report)
|
|
|
|
if args.output:
|
|
with open(args.output, "w") as f:
|
|
f.write(output)
|
|
else:
|
|
print(output)
|
|
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|