381 lines
12 KiB
Python
381 lines
12 KiB
Python
"""
|
|
Tests for engine.benchmark module - performance regression tests.
|
|
"""
|
|
|
|
import os
|
|
from unittest.mock import patch
|
|
|
|
import pytest
|
|
|
|
from engine.display import MultiDisplay, NullDisplay, TerminalDisplay
|
|
from engine.effects import EffectContext, get_registry
|
|
from engine.effects.plugins import discover_plugins
|
|
|
|
|
|
def _is_coverage_active():
|
|
"""Check if coverage is active."""
|
|
# Check if coverage module is loaded
|
|
import sys
|
|
|
|
return "coverage" in sys.modules or "cov" in sys.modules
|
|
|
|
|
|
def _get_min_fps_threshold(base_threshold: int) -> int:
|
|
"""
|
|
Get minimum FPS threshold adjusted for coverage mode.
|
|
|
|
Coverage instrumentation typically slows execution by 2-5x.
|
|
We adjust thresholds accordingly to avoid false positives.
|
|
"""
|
|
if _is_coverage_active():
|
|
# Coverage typically slows execution by 2-5x
|
|
# Use a more conservative threshold (25% of original to account for higher overhead)
|
|
return max(500, int(base_threshold * 0.25))
|
|
return base_threshold
|
|
|
|
|
|
def _get_iterations() -> int:
|
|
"""Get number of iterations for benchmarks."""
|
|
# Check for environment variable override
|
|
env_iterations = os.environ.get("BENCHMARK_ITERATIONS")
|
|
if env_iterations:
|
|
try:
|
|
return int(env_iterations)
|
|
except ValueError:
|
|
pass
|
|
|
|
# Default based on coverage mode
|
|
if _is_coverage_active():
|
|
return 100 # Fewer iterations when coverage is active
|
|
return 500 # Default iterations
|
|
|
|
|
|
class TestBenchmarkNullDisplay:
|
|
"""Performance tests for NullDisplay - regression tests."""
|
|
|
|
@pytest.mark.benchmark
|
|
def test_null_display_minimum_fps(self):
|
|
"""NullDisplay should meet minimum performance threshold."""
|
|
import time
|
|
|
|
display = NullDisplay()
|
|
display.init(80, 24)
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
display.show(buffer)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(20000)
|
|
|
|
assert fps >= min_fps, f"NullDisplay FPS {fps:.0f} below minimum {min_fps}"
|
|
|
|
@pytest.mark.benchmark
|
|
def test_effects_minimum_throughput(self):
|
|
"""Effects should meet minimum processing throughput."""
|
|
import time
|
|
|
|
from engine.effects import EffectContext, get_registry
|
|
from engine.effects.plugins import discover_plugins
|
|
|
|
discover_plugins()
|
|
registry = get_registry()
|
|
effect = registry.get("noise")
|
|
assert effect is not None, "Noise effect should be registered"
|
|
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
effect.process(buffer, ctx)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(10000)
|
|
|
|
assert fps >= min_fps, (
|
|
f"Effect processing FPS {fps:.0f} below minimum {min_fps}"
|
|
)
|
|
|
|
|
|
class TestBenchmarkWebSocketDisplay:
|
|
"""Performance tests for WebSocketDisplay."""
|
|
|
|
@pytest.mark.benchmark
|
|
def test_websocket_display_minimum_fps(self):
|
|
"""WebSocketDisplay should meet minimum performance threshold."""
|
|
import time
|
|
|
|
with patch("engine.display.backends.websocket.websockets", None):
|
|
from engine.display import WebSocketDisplay
|
|
|
|
display = WebSocketDisplay()
|
|
display.init(80, 24)
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
display.show(buffer)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(10000)
|
|
|
|
assert fps >= min_fps, (
|
|
f"WebSocketDisplay FPS {fps:.0f} below minimum {min_fps}"
|
|
)
|
|
|
|
|
|
class TestBenchmarkTerminalDisplay:
|
|
"""Performance tests for TerminalDisplay."""
|
|
|
|
@pytest.mark.benchmark
|
|
def test_terminal_display_minimum_fps(self):
|
|
"""TerminalDisplay should meet minimum performance threshold."""
|
|
import time
|
|
|
|
display = TerminalDisplay()
|
|
display.init(80, 24)
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
display.show(buffer)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(10000)
|
|
|
|
assert fps >= min_fps, f"TerminalDisplay FPS {fps:.0f} below minimum {min_fps}"
|
|
|
|
|
|
class TestBenchmarkMultiDisplay:
|
|
"""Performance tests for MultiDisplay."""
|
|
|
|
@pytest.mark.benchmark
|
|
def test_multi_display_minimum_fps(self):
|
|
"""MultiDisplay should meet minimum performance threshold."""
|
|
import time
|
|
|
|
with patch("engine.display.backends.websocket.websockets", None):
|
|
from engine.display import WebSocketDisplay
|
|
|
|
null_display = NullDisplay()
|
|
null_display.init(80, 24)
|
|
ws_display = WebSocketDisplay()
|
|
ws_display.init(80, 24)
|
|
|
|
display = MultiDisplay([null_display, ws_display])
|
|
display.init(80, 24)
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
display.show(buffer)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(5000)
|
|
|
|
assert fps >= min_fps, f"MultiDisplay FPS {fps:.0f} below minimum {min_fps}"
|
|
|
|
|
|
class TestBenchmarkEffects:
|
|
"""Performance tests for various effects."""
|
|
|
|
@pytest.mark.benchmark
|
|
def test_fade_effect_minimum_fps(self):
|
|
"""Fade effect should meet minimum performance threshold."""
|
|
import time
|
|
|
|
discover_plugins()
|
|
registry = get_registry()
|
|
effect = registry.get("fade")
|
|
assert effect is not None, "Fade effect should be registered"
|
|
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
effect.process(buffer, ctx)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(7000)
|
|
|
|
assert fps >= min_fps, f"Fade effect FPS {fps:.0f} below minimum {min_fps}"
|
|
|
|
@pytest.mark.benchmark
|
|
def test_glitch_effect_minimum_fps(self):
|
|
"""Glitch effect should meet minimum performance threshold."""
|
|
import time
|
|
|
|
discover_plugins()
|
|
registry = get_registry()
|
|
effect = registry.get("glitch")
|
|
assert effect is not None, "Glitch effect should be registered"
|
|
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
effect.process(buffer, ctx)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(5000)
|
|
|
|
assert fps >= min_fps, f"Glitch effect FPS {fps:.0f} below minimum {min_fps}"
|
|
|
|
@pytest.mark.benchmark
|
|
def test_border_effect_minimum_fps(self):
|
|
"""Border effect should meet minimum performance threshold."""
|
|
import time
|
|
|
|
discover_plugins()
|
|
registry = get_registry()
|
|
effect = registry.get("border")
|
|
assert effect is not None, "Border effect should be registered"
|
|
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
effect.process(buffer, ctx)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(5000)
|
|
|
|
assert fps >= min_fps, f"Border effect FPS {fps:.0f} below minimum {min_fps}"
|
|
|
|
@pytest.mark.benchmark
|
|
def test_tint_effect_minimum_fps(self):
|
|
"""Tint effect should meet minimum performance threshold."""
|
|
import time
|
|
|
|
discover_plugins()
|
|
registry = get_registry()
|
|
effect = registry.get("tint")
|
|
assert effect is not None, "Tint effect should be registered"
|
|
|
|
buffer = ["x" * 80 for _ in range(24)]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
effect.process(buffer, ctx)
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(8000)
|
|
|
|
assert fps >= min_fps, f"Tint effect FPS {fps:.0f} below minimum {min_fps}"
|
|
|
|
|
|
class TestBenchmarkPipeline:
|
|
"""Performance tests for pipeline execution."""
|
|
|
|
@pytest.mark.benchmark
|
|
def test_pipeline_execution_minimum_fps(self):
|
|
"""Pipeline execution should meet minimum performance threshold."""
|
|
import time
|
|
|
|
from engine.data_sources.sources import EmptyDataSource
|
|
from engine.pipeline import Pipeline, StageRegistry, discover_stages
|
|
from engine.pipeline.adapters import DataSourceStage, SourceItemsToBufferStage
|
|
|
|
discover_stages()
|
|
|
|
# Create a minimal pipeline with empty source to avoid network calls
|
|
pipeline = Pipeline()
|
|
|
|
# Create empty source directly (not registered in stage registry)
|
|
empty_source = EmptyDataSource(width=80, height=24)
|
|
source_stage = DataSourceStage(empty_source, name="empty")
|
|
|
|
# Add render stage to convert items to text buffer
|
|
render_stage = SourceItemsToBufferStage(name="items-to-buffer")
|
|
|
|
# Get null display from registry
|
|
null_display = StageRegistry.create("display", "null")
|
|
assert null_display is not None, "null display should be registered"
|
|
|
|
pipeline.add_stage("source", source_stage)
|
|
pipeline.add_stage("render", render_stage)
|
|
pipeline.add_stage("display", null_display)
|
|
pipeline.build()
|
|
|
|
iterations = _get_iterations()
|
|
start = time.perf_counter()
|
|
for _ in range(iterations):
|
|
pipeline.execute()
|
|
elapsed = time.perf_counter() - start
|
|
|
|
fps = iterations / elapsed
|
|
min_fps = _get_min_fps_threshold(1000)
|
|
|
|
assert fps >= min_fps, (
|
|
f"Pipeline execution FPS {fps:.0f} below minimum {min_fps}"
|
|
)
|