428 lines
13 KiB
Python
428 lines
13 KiB
Python
"""
|
|
Tests for engine.effects module.
|
|
"""
|
|
|
|
from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry
|
|
|
|
|
|
class MockEffect:
|
|
name = "mock"
|
|
config = EffectConfig(enabled=True, intensity=1.0)
|
|
|
|
def __init__(self):
|
|
self.processed = False
|
|
self.last_ctx = None
|
|
|
|
def process(self, buf, ctx):
|
|
self.processed = True
|
|
self.last_ctx = ctx
|
|
return buf + ["processed"]
|
|
|
|
def configure(self, config):
|
|
self.config = config
|
|
|
|
|
|
class TestEffectConfig:
|
|
def test_defaults(self):
|
|
cfg = EffectConfig()
|
|
assert cfg.enabled is True
|
|
assert cfg.intensity == 1.0
|
|
assert cfg.params == {}
|
|
|
|
def test_custom_values(self):
|
|
cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"})
|
|
assert cfg.enabled is False
|
|
assert cfg.intensity == 0.5
|
|
assert cfg.params == {"key": "value"}
|
|
|
|
|
|
class TestEffectContext:
|
|
def test_defaults(self):
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
assert ctx.terminal_width == 80
|
|
assert ctx.terminal_height == 24
|
|
assert ctx.ticker_height == 20
|
|
assert ctx.items == []
|
|
|
|
def test_with_items(self):
|
|
items = [("Title", "Source", "12:00")]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
items=items,
|
|
)
|
|
assert ctx.items == items
|
|
|
|
|
|
class TestEffectRegistry:
|
|
def test_init_empty(self):
|
|
registry = EffectRegistry()
|
|
assert len(registry.list_all()) == 0
|
|
|
|
def test_register(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
registry.register(effect)
|
|
assert "mock" in registry.list_all()
|
|
|
|
def test_get(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
registry.register(effect)
|
|
retrieved = registry.get("mock")
|
|
assert retrieved is effect
|
|
|
|
def test_get_nonexistent(self):
|
|
registry = EffectRegistry()
|
|
assert registry.get("nonexistent") is None
|
|
|
|
def test_enable(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
effect.config.enabled = False
|
|
registry.register(effect)
|
|
registry.enable("mock")
|
|
assert effect.config.enabled is True
|
|
|
|
def test_disable(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
effect.config.enabled = True
|
|
registry.register(effect)
|
|
registry.disable("mock")
|
|
assert effect.config.enabled is False
|
|
|
|
def test_list_enabled(self):
|
|
registry = EffectRegistry()
|
|
|
|
class EnabledEffect:
|
|
name = "enabled_effect"
|
|
config = EffectConfig(enabled=True, intensity=1.0)
|
|
|
|
class DisabledEffect:
|
|
name = "disabled_effect"
|
|
config = EffectConfig(enabled=False, intensity=1.0)
|
|
|
|
registry.register(EnabledEffect())
|
|
registry.register(DisabledEffect())
|
|
enabled = registry.list_enabled()
|
|
assert len(enabled) == 1
|
|
assert enabled[0].name == "enabled_effect"
|
|
|
|
def test_configure(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
registry.register(effect)
|
|
new_config = EffectConfig(enabled=False, intensity=0.3)
|
|
registry.configure("mock", new_config)
|
|
assert effect.config.enabled is False
|
|
assert effect.config.intensity == 0.3
|
|
|
|
def test_is_enabled(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
effect.config.enabled = True
|
|
registry.register(effect)
|
|
assert registry.is_enabled("mock") is True
|
|
assert registry.is_enabled("nonexistent") is False
|
|
|
|
|
|
class TestEffectChain:
|
|
def test_init(self):
|
|
registry = EffectRegistry()
|
|
chain = EffectChain(registry)
|
|
assert chain.get_order() == []
|
|
|
|
def test_set_order(self):
|
|
registry = EffectRegistry()
|
|
effect1 = MockEffect()
|
|
effect1.name = "effect1"
|
|
effect2 = MockEffect()
|
|
effect2.name = "effect2"
|
|
registry.register(effect1)
|
|
registry.register(effect2)
|
|
chain = EffectChain(registry)
|
|
chain.set_order(["effect1", "effect2"])
|
|
assert chain.get_order() == ["effect1", "effect2"]
|
|
|
|
def test_add_effect(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
effect.name = "test_effect"
|
|
registry.register(effect)
|
|
chain = EffectChain(registry)
|
|
chain.add_effect("test_effect")
|
|
assert "test_effect" in chain.get_order()
|
|
|
|
def test_add_effect_invalid(self):
|
|
registry = EffectRegistry()
|
|
chain = EffectChain(registry)
|
|
result = chain.add_effect("nonexistent")
|
|
assert result is False
|
|
|
|
def test_remove_effect(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
effect.name = "test_effect"
|
|
registry.register(effect)
|
|
chain = EffectChain(registry)
|
|
chain.set_order(["test_effect"])
|
|
chain.remove_effect("test_effect")
|
|
assert "test_effect" not in chain.get_order()
|
|
|
|
def test_reorder(self):
|
|
registry = EffectRegistry()
|
|
effect1 = MockEffect()
|
|
effect1.name = "effect1"
|
|
effect2 = MockEffect()
|
|
effect2.name = "effect2"
|
|
effect3 = MockEffect()
|
|
effect3.name = "effect3"
|
|
registry.register(effect1)
|
|
registry.register(effect2)
|
|
registry.register(effect3)
|
|
chain = EffectChain(registry)
|
|
chain.set_order(["effect1", "effect2", "effect3"])
|
|
result = chain.reorder(["effect3", "effect1", "effect2"])
|
|
assert result is True
|
|
assert chain.get_order() == ["effect3", "effect1", "effect2"]
|
|
|
|
def test_reorder_invalid(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
effect.name = "effect1"
|
|
registry.register(effect)
|
|
chain = EffectChain(registry)
|
|
result = chain.reorder(["effect1", "nonexistent"])
|
|
assert result is False
|
|
|
|
def test_process_empty_chain(self):
|
|
registry = EffectRegistry()
|
|
chain = EffectChain(registry)
|
|
buf = ["line1", "line2"]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
result = chain.process(buf, ctx)
|
|
assert result == buf
|
|
|
|
def test_process_with_effects(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
effect.name = "test_effect"
|
|
registry.register(effect)
|
|
chain = EffectChain(registry)
|
|
chain.set_order(["test_effect"])
|
|
buf = ["line1", "line2"]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
result = chain.process(buf, ctx)
|
|
assert result == ["line1", "line2", "processed"]
|
|
assert effect.processed is True
|
|
assert effect.last_ctx is ctx
|
|
|
|
def test_process_disabled_effect(self):
|
|
registry = EffectRegistry()
|
|
effect = MockEffect()
|
|
effect.name = "test_effect"
|
|
effect.config.enabled = False
|
|
registry.register(effect)
|
|
chain = EffectChain(registry)
|
|
chain.set_order(["test_effect"])
|
|
buf = ["line1"]
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=0,
|
|
has_message=False,
|
|
)
|
|
result = chain.process(buf, ctx)
|
|
assert result == ["line1"]
|
|
assert effect.processed is False
|
|
|
|
|
|
class TestEffectsExports:
|
|
def test_all_exports_are_importable(self):
|
|
"""Verify all exports in __all__ can actually be imported."""
|
|
import engine.effects as effects_module
|
|
|
|
for name in effects_module.__all__:
|
|
getattr(effects_module, name)
|
|
|
|
|
|
class TestPerformanceMonitor:
|
|
def test_empty_stats(self):
|
|
from engine.effects.performance import PerformanceMonitor
|
|
|
|
monitor = PerformanceMonitor()
|
|
stats = monitor.get_stats()
|
|
assert "error" in stats
|
|
|
|
def test_record_and_retrieve(self):
|
|
from engine.effects.performance import PerformanceMonitor
|
|
|
|
monitor = PerformanceMonitor()
|
|
monitor.start_frame(1)
|
|
monitor.record_effect("test_effect", 1.5, 100, 150)
|
|
monitor.end_frame(1, 2.0)
|
|
|
|
stats = monitor.get_stats()
|
|
assert "error" not in stats
|
|
assert stats["frame_count"] == 1
|
|
assert "test_effect" in stats["effects"]
|
|
|
|
def test_multiple_frames(self):
|
|
from engine.effects.performance import PerformanceMonitor
|
|
|
|
monitor = PerformanceMonitor(max_frames=3)
|
|
for i in range(5):
|
|
monitor.start_frame(i)
|
|
monitor.record_effect("effect1", 1.0, 100, 100)
|
|
monitor.record_effect("effect2", 0.5, 100, 100)
|
|
monitor.end_frame(i, 1.5)
|
|
|
|
stats = monitor.get_stats()
|
|
assert stats["frame_count"] == 3
|
|
assert "effect1" in stats["effects"]
|
|
assert "effect2" in stats["effects"]
|
|
|
|
def test_reset(self):
|
|
from engine.effects.performance import PerformanceMonitor
|
|
|
|
monitor = PerformanceMonitor()
|
|
monitor.start_frame(1)
|
|
monitor.record_effect("test", 1.0, 100, 100)
|
|
monitor.end_frame(1, 1.0)
|
|
|
|
monitor.reset()
|
|
stats = monitor.get_stats()
|
|
assert "error" in stats
|
|
|
|
|
|
class TestEffectPipelinePerformance:
|
|
def test_pipeline_stays_within_frame_budget(self):
|
|
"""Verify effect pipeline completes within frame budget (33ms for 30fps)."""
|
|
from engine.effects import (
|
|
EffectChain,
|
|
EffectConfig,
|
|
EffectContext,
|
|
EffectRegistry,
|
|
)
|
|
|
|
class DummyEffect:
|
|
name = "dummy"
|
|
config = EffectConfig(enabled=True, intensity=1.0)
|
|
|
|
def process(self, buf, ctx):
|
|
return [line * 2 for line in buf]
|
|
|
|
registry = EffectRegistry()
|
|
registry.register(DummyEffect())
|
|
|
|
from engine.effects.performance import PerformanceMonitor
|
|
|
|
monitor = PerformanceMonitor(max_frames=10)
|
|
chain = EffectChain(registry, monitor)
|
|
chain.set_order(["dummy"])
|
|
|
|
buf = ["x" * 80] * 20
|
|
|
|
for i in range(10):
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=i,
|
|
has_message=False,
|
|
)
|
|
chain.process(buf, ctx)
|
|
|
|
stats = monitor.get_stats()
|
|
assert "error" not in stats
|
|
assert stats["pipeline"]["max_ms"] < 33.0
|
|
|
|
def test_individual_effects_performance(self):
|
|
"""Verify individual effects don't exceed 10ms per frame."""
|
|
from engine.effects import (
|
|
EffectChain,
|
|
EffectConfig,
|
|
EffectContext,
|
|
EffectRegistry,
|
|
)
|
|
|
|
class SlowEffect:
|
|
name = "slow"
|
|
config = EffectConfig(enabled=True, intensity=1.0)
|
|
|
|
def process(self, buf, ctx):
|
|
result = []
|
|
for line in buf:
|
|
result.append(line)
|
|
result.append(line + line)
|
|
return result
|
|
|
|
registry = EffectRegistry()
|
|
registry.register(SlowEffect())
|
|
|
|
from engine.effects.performance import PerformanceMonitor
|
|
|
|
monitor = PerformanceMonitor(max_frames=5)
|
|
chain = EffectChain(registry, monitor)
|
|
chain.set_order(["slow"])
|
|
|
|
buf = ["x" * 80] * 10
|
|
|
|
for i in range(5):
|
|
ctx = EffectContext(
|
|
terminal_width=80,
|
|
terminal_height=24,
|
|
scroll_cam=0,
|
|
ticker_height=20,
|
|
mic_excess=0.0,
|
|
grad_offset=0.0,
|
|
frame_number=i,
|
|
has_message=False,
|
|
)
|
|
chain.process(buf, ctx)
|
|
|
|
stats = monitor.get_stats()
|
|
assert "error" not in stats
|
|
assert stats["effects"]["slow"]["max_ms"] < 10.0
|