feat(effects): add plugin architecture with performance monitoring

This commit is contained in:
2026-03-15 18:42:42 -07:00
parent 3551cc249f
commit 35e5a71930
14 changed files with 1182 additions and 0 deletions

View File

@@ -0,0 +1,35 @@
from pathlib import Path
PLUGIN_DIR = Path(__file__).parent
def discover_plugins():
from engine.effects.registry import get_registry
registry = get_registry()
imported = {}
for file_path in PLUGIN_DIR.glob("*.py"):
if file_path.name.startswith("_"):
continue
module_name = file_path.stem
if module_name in ("base", "types"):
continue
try:
module = __import__(f"effects_plugins.{module_name}", fromlist=[""])
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and hasattr(attr, "name")
and hasattr(attr, "process")
and attr_name.endswith("Effect")
):
plugin = attr()
registry.register(plugin)
imported[plugin.name] = plugin
except Exception:
pass
return imported

58
effects_plugins/fade.py Normal file
View File

@@ -0,0 +1,58 @@
import random
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
class FadeEffect:
name = "fade"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not ctx.ticker_height:
return buf
result = list(buf)
intensity = self.config.intensity
top_zone = max(1, int(ctx.ticker_height * 0.25))
bot_zone = max(1, int(ctx.ticker_height * 0.10))
for r in range(len(result)):
if r >= ctx.ticker_height:
continue
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = (
min(1.0, (ctx.ticker_height - 1 - r) / bot_zone)
if bot_zone > 0
else 1.0
)
row_fade = min(top_f, bot_f) * intensity
if row_fade < 1.0 and result[r].strip():
result[r] = self._fade_line(result[r], row_fade)
return result
def _fade_line(self, s: str, fade: float) -> str:
if fade >= 1.0:
return s
if fade <= 0.0:
return ""
result = []
i = 0
while i < len(s):
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i : j + 1])
i = j + 1
elif s[i] == " ":
result.append(" ")
i += 1
else:
result.append(s[i] if random.random() < fade else " ")
i += 1
return "".join(result)
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -0,0 +1,72 @@
import random
from datetime import datetime
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
class FirehoseEffect:
name = "firehose"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0
if firehose_h <= 0 or not ctx.items:
return buf
result = list(buf)
intensity = self.config.intensity
h = ctx.terminal_height
for fr in range(firehose_h):
scr_row = h - firehose_h + fr + 1
fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity)
result.append(f"\033[{scr_row};1H{fline}\033[K")
return result
def _firehose_line(self, items: list, w: int, intensity: float) -> str:
r = random.random()
if r < 0.35 * intensity:
title, src, ts = random.choice(items)
text = title[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
return f"{color}{text}{RST}"
elif r < 0.55 * intensity:
d = random.choice([0.45, 0.55, 0.65, 0.75])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
elif r < 0.78 * intensity:
sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
src = random.choice(list(sources.keys()))
msgs = [
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
f" ░░ FEED ACTIVE :: {src}",
f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}",
f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}",
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
]
text = random.choice(msgs)[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST])
return f"{color}{text}{RST}"
else:
title, _, _ = random.choice(items)
start = random.randint(0, max(0, len(title) - 20))
frag = title[start : start + random.randint(10, 35)]
pad = random.randint(0, max(0, w - len(frag) - 8))
gp = "".join(
random.choice(config.GLITCH) for _ in range(random.randint(1, 3))
)
text = (" " * pad + gp + " " + frag)[: w - 1]
color = random.choice([G_LO, C_DIM, W_GHOST])
return f"{color}{text}{RST}"
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

37
effects_plugins/glitch.py Normal file
View File

@@ -0,0 +1,37 @@
import random
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST
class GlitchEffect:
name = "glitch"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not buf:
return buf
result = list(buf)
intensity = self.config.intensity
glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16)
glitch_prob = glitch_prob * intensity
n_hits = 4 + int(ctx.mic_excess / 2)
n_hits = int(n_hits * intensity)
if random.random() < glitch_prob:
for _ in range(min(n_hits, len(result))):
gi = random.randint(0, len(result) - 1)
scr_row = gi + 1
result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}"
return result
def _glitch_bar(self, w: int) -> str:
c = random.choice(["", "", "", "\xc2"])
n = random.randint(3, w // 2)
o = random.randint(0, w - n)
return " " * o + f"{G_LO}{DIM}" + c * n + RST
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

36
effects_plugins/noise.py Normal file
View File

@@ -0,0 +1,36 @@
import random
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
class NoiseEffect:
name = "noise"
config = EffectConfig(enabled=True, intensity=0.15)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not ctx.ticker_height:
return buf
result = list(buf)
intensity = self.config.intensity
probability = intensity * 0.15
for r in range(len(result)):
cy = ctx.scroll_cam + r
if random.random() < probability:
result[r] = self._generate_noise(ctx.terminal_width, cy)
return result
def _generate_noise(self, w: int, cy: int) -> str:
d = random.choice([0.15, 0.25, 0.35, 0.12])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -0,0 +1,42 @@
from engine.effects.chain import EffectChain
from engine.effects.controller import handle_effects_command, show_effects_menu
from engine.effects.legacy import (
fade_line,
firehose_line,
glitch_bar,
next_headline,
noise,
vis_trunc,
)
from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor
from engine.effects.registry import EffectRegistry, get_registry, set_registry
from engine.effects.types import EffectConfig, EffectContext, PipelineConfig
def get_effect_chain():
from engine.layers import get_effect_chain as _chain
return _chain()
__all__ = [
"EffectChain",
"EffectRegistry",
"EffectConfig",
"EffectContext",
"PipelineConfig",
"get_registry",
"set_registry",
"get_effect_chain",
"get_monitor",
"set_monitor",
"PerformanceMonitor",
"handle_effects_command",
"show_effects_menu",
"fade_line",
"firehose_line",
"glitch_bar",
"noise",
"next_headline",
"vis_trunc",
]

71
engine/effects/chain.py Normal file
View File

@@ -0,0 +1,71 @@
import time
from engine.effects.performance import PerformanceMonitor, get_monitor
from engine.effects.registry import EffectRegistry
from engine.effects.types import EffectContext
class EffectChain:
def __init__(
self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None
):
self._registry = registry
self._order: list[str] = []
self._monitor = monitor
def _get_monitor(self) -> PerformanceMonitor:
if self._monitor is not None:
return self._monitor
return get_monitor()
def set_order(self, names: list[str]) -> None:
self._order = list(names)
def get_order(self) -> list[str]:
return self._order.copy()
def add_effect(self, name: str, position: int | None = None) -> bool:
if name not in self._registry.list_all():
return False
if position is None:
self._order.append(name)
else:
self._order.insert(position, name)
return True
def remove_effect(self, name: str) -> bool:
if name in self._order:
self._order.remove(name)
return True
return False
def reorder(self, new_order: list[str]) -> bool:
all_plugins = set(self._registry.list_all().keys())
if not all(name in all_plugins for name in new_order):
return False
self._order = list(new_order)
return True
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
monitor = self._get_monitor()
frame_number = ctx.frame_number
monitor.start_frame(frame_number)
frame_start = time.perf_counter()
result = list(buf)
for name in self._order:
plugin = self._registry.get(name)
if plugin and plugin.config.enabled:
chars_in = sum(len(line) for line in result)
effect_start = time.perf_counter()
try:
result = plugin.process(result, ctx)
except Exception:
plugin.config.enabled = False
elapsed = time.perf_counter() - effect_start
chars_out = sum(len(line) for line in result)
monitor.record_effect(name, elapsed * 1000, chars_in, chars_out)
total_elapsed = time.perf_counter() - frame_start
monitor.end_frame(frame_number, total_elapsed * 1000)
return result

View File

@@ -0,0 +1,144 @@
from engine.effects.performance import get_monitor
from engine.effects.registry import get_registry
_effect_chain_ref = None
def _get_effect_chain():
global _effect_chain_ref
if _effect_chain_ref is not None:
return _effect_chain_ref
try:
from engine.layers import get_effect_chain as _chain
return _chain()
except Exception:
return None
def set_effect_chain_ref(chain) -> None:
global _effect_chain_ref
_effect_chain_ref = chain
def handle_effects_command(cmd: str) -> str:
"""Handle /effects command from NTFY message.
Commands:
/effects list - list all effects and their status
/effects <name> on - enable an effect
/effects <name> off - disable an effect
/effects <name> intensity <0.0-1.0> - set intensity
/effects reorder <name1>,<name2>,... - reorder pipeline
/effects stats - show performance statistics
"""
parts = cmd.strip().split()
if not parts or parts[0] != "/effects":
return "Unknown command"
registry = get_registry()
chain = _get_effect_chain()
if len(parts) == 1 or parts[1] == "list":
result = ["Effects:"]
for name, plugin in registry.list_all().items():
status = "ON" if plugin.config.enabled else "OFF"
intensity = plugin.config.intensity
result.append(f" {name}: {status} (intensity={intensity})")
if chain:
result.append(f"Order: {chain.get_order()}")
return "\n".join(result)
if parts[1] == "stats":
return _format_stats()
if parts[1] == "reorder" and len(parts) >= 3:
new_order = parts[2].split(",")
if chain and chain.reorder(new_order):
return f"Reordered pipeline: {new_order}"
return "Failed to reorder pipeline"
if len(parts) < 3:
return "Usage: /effects <name> on|off|intensity <value>"
effect_name = parts[1]
action = parts[2]
if effect_name not in registry.list_all():
return f"Unknown effect: {effect_name}"
if action == "on":
registry.enable(effect_name)
return f"Enabled: {effect_name}"
if action == "off":
registry.disable(effect_name)
return f"Disabled: {effect_name}"
if action == "intensity" and len(parts) >= 4:
try:
value = float(parts[3])
if not 0.0 <= value <= 1.0:
return "Intensity must be between 0.0 and 1.0"
plugin = registry.get(effect_name)
if plugin:
plugin.config.intensity = value
return f"Set {effect_name} intensity to {value}"
except ValueError:
return "Invalid intensity value"
return f"Unknown action: {action}"
def _format_stats() -> str:
monitor = get_monitor()
stats = monitor.get_stats()
if "error" in stats:
return stats["error"]
lines = ["Performance Stats:"]
pipeline = stats["pipeline"]
lines.append(
f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)"
)
if stats["effects"]:
lines.append(" Per-effect (avg ms):")
for name, effect_stats in stats["effects"].items():
lines.append(
f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms"
)
return "\n".join(lines)
def show_effects_menu() -> str:
"""Generate effects menu text for display."""
registry = get_registry()
chain = _get_effect_chain()
lines = [
"\033[1;38;5;231m=== EFFECTS MENU ===\033[0m",
"",
"Effects:",
]
for name, plugin in registry.list_all().items():
status = "ON" if plugin.config.enabled else "OFF"
intensity = plugin.config.intensity
lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}")
if chain:
lines.append("")
lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}")
lines.append("")
lines.append("Controls:")
lines.append(" /effects <name> on|off")
lines.append(" /effects <name> intensity <0.0-1.0>")
lines.append(" /effects reorder name1,name2,...")
lines.append("")
return "\n".join(lines)

View File

@@ -0,0 +1,103 @@
from collections import deque
from dataclasses import dataclass
@dataclass
class EffectTiming:
name: str
duration_ms: float
buffer_chars_in: int
buffer_chars_out: int
@dataclass
class FrameTiming:
frame_number: int
total_ms: float
effects: list[EffectTiming]
class PerformanceMonitor:
"""Collects and stores performance metrics for effect pipeline."""
def __init__(self, max_frames: int = 60):
self._max_frames = max_frames
self._frames: deque[FrameTiming] = deque(maxlen=max_frames)
self._current_frame: list[EffectTiming] = []
def start_frame(self, frame_number: int) -> None:
self._current_frame = []
def record_effect(
self, name: str, duration_ms: float, chars_in: int, chars_out: int
) -> None:
self._current_frame.append(
EffectTiming(
name=name,
duration_ms=duration_ms,
buffer_chars_in=chars_in,
buffer_chars_out=chars_out,
)
)
def end_frame(self, frame_number: int, total_ms: float) -> None:
self._frames.append(
FrameTiming(
frame_number=frame_number,
total_ms=total_ms,
effects=self._current_frame,
)
)
def get_stats(self) -> dict:
if not self._frames:
return {"error": "No timing data available"}
total_times = [f.total_ms for f in self._frames]
avg_total = sum(total_times) / len(total_times)
min_total = min(total_times)
max_total = max(total_times)
effect_stats: dict[str, dict] = {}
for frame in self._frames:
for effect in frame.effects:
if effect.name not in effect_stats:
effect_stats[effect.name] = {"times": [], "total_chars": 0}
effect_stats[effect.name]["times"].append(effect.duration_ms)
effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out
for name, stats in effect_stats.items():
times = stats["times"]
stats["avg_ms"] = sum(times) / len(times)
stats["min_ms"] = min(times)
stats["max_ms"] = max(times)
del stats["times"]
return {
"frame_count": len(self._frames),
"pipeline": {
"avg_ms": avg_total,
"min_ms": min_total,
"max_ms": max_total,
},
"effects": effect_stats,
}
def reset(self) -> None:
self._frames.clear()
self._current_frame = []
_monitor: PerformanceMonitor | None = None
def get_monitor() -> PerformanceMonitor:
global _monitor
if _monitor is None:
_monitor = PerformanceMonitor()
return _monitor
def set_monitor(monitor: PerformanceMonitor) -> None:
global _monitor
_monitor = monitor

View File

@@ -0,0 +1,59 @@
from engine.effects.types import EffectConfig, EffectPlugin
class EffectRegistry:
def __init__(self):
self._plugins: dict[str, EffectPlugin] = {}
self._discovered: bool = False
def register(self, plugin: EffectPlugin) -> None:
self._plugins[plugin.name] = plugin
def get(self, name: str) -> EffectPlugin | None:
return self._plugins.get(name)
def list_all(self) -> dict[str, EffectPlugin]:
return self._plugins.copy()
def list_enabled(self) -> list[EffectPlugin]:
return [p for p in self._plugins.values() if p.config.enabled]
def enable(self, name: str) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.config.enabled = True
return True
return False
def disable(self, name: str) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.config.enabled = False
return True
return False
def configure(self, name: str, config: EffectConfig) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.configure(config)
return True
return False
def is_enabled(self, name: str) -> bool:
plugin = self._plugins.get(name)
return plugin.config.enabled if plugin else False
_registry: EffectRegistry | None = None
def get_registry() -> EffectRegistry:
global _registry
if _registry is None:
_registry = EffectRegistry()
return _registry
def set_registry(registry: EffectRegistry) -> None:
global _registry
_registry = registry

39
engine/effects/types.py Normal file
View File

@@ -0,0 +1,39 @@
from dataclasses import dataclass, field
from typing import Any
@dataclass
class EffectContext:
terminal_width: int
terminal_height: int
scroll_cam: int
ticker_height: int
mic_excess: float
grad_offset: float
frame_number: int
has_message: bool
items: list = field(default_factory=list)
@dataclass
class EffectConfig:
enabled: bool = True
intensity: float = 1.0
params: dict[str, Any] = field(default_factory=dict)
class EffectPlugin:
name: str
config: EffectConfig
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
raise NotImplementedError
def configure(self, config: EffectConfig) -> None:
raise NotImplementedError
@dataclass
class PipelineConfig:
order: list[str] = field(default_factory=list)
effects: dict[str, EffectConfig] = field(default_factory=dict)

View File

@@ -10,6 +10,8 @@ from datetime import datetime
from engine import config
from engine.effects import (
EffectChain,
EffectContext,
fade_line,
firehose_line,
glitch_bar,
@@ -199,3 +201,60 @@ def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
return buf
_effect_chain = None
def init_effects() -> None:
"""Initialize effect plugins and chain."""
global _effect_chain
from engine.effects import EffectChain, get_registry
registry = get_registry()
import effects_plugins
effects_plugins.discover_plugins()
chain = EffectChain(registry)
chain.set_order(["noise", "fade", "glitch", "firehose"])
_effect_chain = chain
def process_effects(
buf: list[str],
w: int,
h: int,
scroll_cam: int,
ticker_h: int,
mic_excess: float,
grad_offset: float,
frame_number: int,
has_message: bool,
items: list,
) -> list[str]:
"""Process buffer through effect chain."""
if _effect_chain is None:
init_effects()
ctx = EffectContext(
terminal_width=w,
terminal_height=h,
scroll_cam=scroll_cam,
ticker_height=ticker_h,
mic_excess=mic_excess,
grad_offset=grad_offset,
frame_number=frame_number,
has_message=has_message,
items=items,
)
return _effect_chain.process(buf, ctx)
def get_effect_chain() -> EffectChain | None:
"""Get the effect chain instance."""
global _effect_chain
if _effect_chain is None:
init_effects()
return _effect_chain

427
tests/test_effects.py Normal file
View File

@@ -0,0 +1,427 @@
"""
Tests for engine.effects module.
"""
from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry
class MockEffect:
name = "mock"
config = EffectConfig(enabled=True, intensity=1.0)
def __init__(self):
self.processed = False
self.last_ctx = None
def process(self, buf, ctx):
self.processed = True
self.last_ctx = ctx
return buf + ["processed"]
def configure(self, config):
self.config = config
class TestEffectConfig:
def test_defaults(self):
cfg = EffectConfig()
assert cfg.enabled is True
assert cfg.intensity == 1.0
assert cfg.params == {}
def test_custom_values(self):
cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"})
assert cfg.enabled is False
assert cfg.intensity == 0.5
assert cfg.params == {"key": "value"}
class TestEffectContext:
def test_defaults(self):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
assert ctx.terminal_width == 80
assert ctx.terminal_height == 24
assert ctx.ticker_height == 20
assert ctx.items == []
def test_with_items(self):
items = [("Title", "Source", "12:00")]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
items=items,
)
assert ctx.items == items
class TestEffectRegistry:
def test_init_empty(self):
registry = EffectRegistry()
assert len(registry.list_all()) == 0
def test_register(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
assert "mock" in registry.list_all()
def test_get(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
retrieved = registry.get("mock")
assert retrieved is effect
def test_get_nonexistent(self):
registry = EffectRegistry()
assert registry.get("nonexistent") is None
def test_enable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = False
registry.register(effect)
registry.enable("mock")
assert effect.config.enabled is True
def test_disable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
registry.disable("mock")
assert effect.config.enabled is False
def test_list_enabled(self):
registry = EffectRegistry()
class EnabledEffect:
name = "enabled_effect"
config = EffectConfig(enabled=True, intensity=1.0)
class DisabledEffect:
name = "disabled_effect"
config = EffectConfig(enabled=False, intensity=1.0)
registry.register(EnabledEffect())
registry.register(DisabledEffect())
enabled = registry.list_enabled()
assert len(enabled) == 1
assert enabled[0].name == "enabled_effect"
def test_configure(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
new_config = EffectConfig(enabled=False, intensity=0.3)
registry.configure("mock", new_config)
assert effect.config.enabled is False
assert effect.config.intensity == 0.3
def test_is_enabled(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
assert registry.is_enabled("mock") is True
assert registry.is_enabled("nonexistent") is False
class TestEffectChain:
def test_init(self):
registry = EffectRegistry()
chain = EffectChain(registry)
assert chain.get_order() == []
def test_set_order(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
registry.register(effect1)
registry.register(effect2)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2"])
assert chain.get_order() == ["effect1", "effect2"]
def test_add_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.add_effect("test_effect")
assert "test_effect" in chain.get_order()
def test_add_effect_invalid(self):
registry = EffectRegistry()
chain = EffectChain(registry)
result = chain.add_effect("nonexistent")
assert result is False
def test_remove_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
chain.remove_effect("test_effect")
assert "test_effect" not in chain.get_order()
def test_reorder(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
effect3 = MockEffect()
effect3.name = "effect3"
registry.register(effect1)
registry.register(effect2)
registry.register(effect3)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2", "effect3"])
result = chain.reorder(["effect3", "effect1", "effect2"])
assert result is True
assert chain.get_order() == ["effect3", "effect1", "effect2"]
def test_reorder_invalid(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "effect1"
registry.register(effect)
chain = EffectChain(registry)
result = chain.reorder(["effect1", "nonexistent"])
assert result is False
def test_process_empty_chain(self):
registry = EffectRegistry()
chain = EffectChain(registry)
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == buf
def test_process_with_effects(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1", "line2", "processed"]
assert effect.processed is True
assert effect.last_ctx is ctx
def test_process_disabled_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
effect.config.enabled = False
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1"]
assert effect.processed is False
class TestEffectsExports:
def test_all_exports_are_importable(self):
"""Verify all exports in __all__ can actually be imported."""
import engine.effects as effects_module
for name in effects_module.__all__:
getattr(effects_module, name)
class TestPerformanceMonitor:
def test_empty_stats(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
stats = monitor.get_stats()
assert "error" in stats
def test_record_and_retrieve(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test_effect", 1.5, 100, 150)
monitor.end_frame(1, 2.0)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["frame_count"] == 1
assert "test_effect" in stats["effects"]
def test_multiple_frames(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=3)
for i in range(5):
monitor.start_frame(i)
monitor.record_effect("effect1", 1.0, 100, 100)
monitor.record_effect("effect2", 0.5, 100, 100)
monitor.end_frame(i, 1.5)
stats = monitor.get_stats()
assert stats["frame_count"] == 3
assert "effect1" in stats["effects"]
assert "effect2" in stats["effects"]
def test_reset(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test", 1.0, 100, 100)
monitor.end_frame(1, 1.0)
monitor.reset()
stats = monitor.get_stats()
assert "error" in stats
class TestEffectPipelinePerformance:
def test_pipeline_stays_within_frame_budget(self):
"""Verify effect pipeline completes within frame budget (33ms for 30fps)."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class DummyEffect:
name = "dummy"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
return [line * 2 for line in buf]
registry = EffectRegistry()
registry.register(DummyEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=10)
chain = EffectChain(registry, monitor)
chain.set_order(["dummy"])
buf = ["x" * 80] * 20
for i in range(10):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["pipeline"]["max_ms"] < 33.0
def test_individual_effects_performance(self):
"""Verify individual effects don't exceed 10ms per frame."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class SlowEffect:
name = "slow"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
result = []
for line in buf:
result.append(line)
result.append(line + line)
return result
registry = EffectRegistry()
registry.register(SlowEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=5)
chain = EffectChain(registry, monitor)
chain.set_order(["slow"])
buf = ["x" * 80] * 10
for i in range(5):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["effects"]["slow"]["max_ms"] < 10.0