From cfd7e8931e4b13b84316ae136c5c7da5c0d2871d Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 18:42:42 -0700 Subject: [PATCH] feat(effects): add plugin architecture with performance monitoring --- effects_plugins/__init__.py | 35 ++ effects_plugins/fade.py | 58 +++ effects_plugins/firehose.py | 72 ++++ effects_plugins/glitch.py | 37 ++ effects_plugins/noise.py | 36 ++ engine/effects/__init__.py | 42 +++ engine/effects/chain.py | 71 ++++ engine/effects/controller.py | 144 ++++++++ engine/{effects.py => effects/legacy.py} | 0 engine/effects/performance.py | 103 ++++++ engine/effects/registry.py | 59 ++++ engine/effects/types.py | 39 +++ engine/layers.py | 59 ++++ tests/test_effects.py | 427 +++++++++++++++++++++++ 14 files changed, 1182 insertions(+) create mode 100644 effects_plugins/__init__.py create mode 100644 effects_plugins/fade.py create mode 100644 effects_plugins/firehose.py create mode 100644 effects_plugins/glitch.py create mode 100644 effects_plugins/noise.py create mode 100644 engine/effects/__init__.py create mode 100644 engine/effects/chain.py create mode 100644 engine/effects/controller.py rename engine/{effects.py => effects/legacy.py} (100%) create mode 100644 engine/effects/performance.py create mode 100644 engine/effects/registry.py create mode 100644 engine/effects/types.py create mode 100644 tests/test_effects.py diff --git a/effects_plugins/__init__.py b/effects_plugins/__init__.py new file mode 100644 index 0000000..fc3c8d5 --- /dev/null +++ b/effects_plugins/__init__.py @@ -0,0 +1,35 @@ +from pathlib import Path + +PLUGIN_DIR = Path(__file__).parent + + +def discover_plugins(): + from engine.effects.registry import get_registry + + registry = get_registry() + imported = {} + + for file_path in PLUGIN_DIR.glob("*.py"): + if file_path.name.startswith("_"): + continue + module_name = file_path.stem + if module_name in ("base", "types"): + continue + + try: + module = __import__(f"effects_plugins.{module_name}", fromlist=[""]) + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and hasattr(attr, "name") + and hasattr(attr, "process") + and attr_name.endswith("Effect") + ): + plugin = attr() + registry.register(plugin) + imported[plugin.name] = plugin + except Exception: + pass + + return imported diff --git a/effects_plugins/fade.py b/effects_plugins/fade.py new file mode 100644 index 0000000..98ede65 --- /dev/null +++ b/effects_plugins/fade.py @@ -0,0 +1,58 @@ +import random + +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin + + +class FadeEffect: + name = "fade" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not ctx.ticker_height: + return buf + result = list(buf) + intensity = self.config.intensity + + top_zone = max(1, int(ctx.ticker_height * 0.25)) + bot_zone = max(1, int(ctx.ticker_height * 0.10)) + + for r in range(len(result)): + if r >= ctx.ticker_height: + continue + top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 + bot_f = ( + min(1.0, (ctx.ticker_height - 1 - r) / bot_zone) + if bot_zone > 0 + else 1.0 + ) + row_fade = min(top_f, bot_f) * intensity + + if row_fade < 1.0 and result[r].strip(): + result[r] = self._fade_line(result[r], row_fade) + + return result + + def _fade_line(self, s: str, fade: float) -> str: + if fade >= 1.0: + return s + if fade <= 0.0: + return "" + result = [] + i = 0 + while i < len(s): + if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[": + j = i + 2 + while j < len(s) and not s[j].isalpha(): + j += 1 + result.append(s[i : j + 1]) + i = j + 1 + elif s[i] == " ": + result.append(" ") + i += 1 + else: + result.append(s[i] if random.random() < fade else " ") + i += 1 + return "".join(result) + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/firehose.py b/effects_plugins/firehose.py new file mode 100644 index 0000000..4be520b --- /dev/null +++ b/effects_plugins/firehose.py @@ -0,0 +1,72 @@ +import random +from datetime import datetime + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.sources import FEEDS, POETRY_SOURCES +from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST + + +class FirehoseEffect: + name = "firehose" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0 + if firehose_h <= 0 or not ctx.items: + return buf + + result = list(buf) + intensity = self.config.intensity + h = ctx.terminal_height + + for fr in range(firehose_h): + scr_row = h - firehose_h + fr + 1 + fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity) + result.append(f"\033[{scr_row};1H{fline}\033[K") + return result + + def _firehose_line(self, items: list, w: int, intensity: float) -> str: + r = random.random() + if r < 0.35 * intensity: + title, src, ts = random.choice(items) + text = title[: w - 1] + color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM]) + return f"{color}{text}{RST}" + elif r < 0.55 * intensity: + d = random.choice([0.45, 0.55, 0.65, 0.75]) + return "".join( + f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" + f"{random.choice(config.GLITCH + config.KATA)}{RST}" + if random.random() < d + else " " + for _ in range(w) + ) + elif r < 0.78 * intensity: + sources = FEEDS if config.MODE == "news" else POETRY_SOURCES + src = random.choice(list(sources.keys())) + msgs = [ + f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}", + f" ░░ FEED ACTIVE :: {src}", + f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}", + f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}", + f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM " + f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}", + ] + text = random.choice(msgs)[: w - 1] + color = random.choice([G_LO, G_DIM, W_GHOST]) + return f"{color}{text}{RST}" + else: + title, _, _ = random.choice(items) + start = random.randint(0, max(0, len(title) - 20)) + frag = title[start : start + random.randint(10, 35)] + pad = random.randint(0, max(0, w - len(frag) - 8)) + gp = "".join( + random.choice(config.GLITCH) for _ in range(random.randint(1, 3)) + ) + text = (" " * pad + gp + " " + frag)[: w - 1] + color = random.choice([G_LO, C_DIM, W_GHOST]) + return f"{color}{text}{RST}" + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/glitch.py b/effects_plugins/glitch.py new file mode 100644 index 0000000..d23244a --- /dev/null +++ b/effects_plugins/glitch.py @@ -0,0 +1,37 @@ +import random + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST + + +class GlitchEffect: + name = "glitch" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not buf: + return buf + result = list(buf) + intensity = self.config.intensity + + glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16) + glitch_prob = glitch_prob * intensity + n_hits = 4 + int(ctx.mic_excess / 2) + n_hits = int(n_hits * intensity) + + if random.random() < glitch_prob: + for _ in range(min(n_hits, len(result))): + gi = random.randint(0, len(result) - 1) + scr_row = gi + 1 + result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}" + return result + + def _glitch_bar(self, w: int) -> str: + c = random.choice(["░", "▒", "─", "\xc2"]) + n = random.randint(3, w // 2) + o = random.randint(0, w - n) + return " " * o + f"{G_LO}{DIM}" + c * n + RST + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/noise.py b/effects_plugins/noise.py new file mode 100644 index 0000000..d7bf316 --- /dev/null +++ b/effects_plugins/noise.py @@ -0,0 +1,36 @@ +import random + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST + + +class NoiseEffect: + name = "noise" + config = EffectConfig(enabled=True, intensity=0.15) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not ctx.ticker_height: + return buf + result = list(buf) + intensity = self.config.intensity + probability = intensity * 0.15 + + for r in range(len(result)): + cy = ctx.scroll_cam + r + if random.random() < probability: + result[r] = self._generate_noise(ctx.terminal_width, cy) + return result + + def _generate_noise(self, w: int, cy: int) -> str: + d = random.choice([0.15, 0.25, 0.35, 0.12]) + return "".join( + f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" + f"{random.choice(config.GLITCH + config.KATA)}{RST}" + if random.random() < d + else " " + for _ in range(w) + ) + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/engine/effects/__init__.py b/engine/effects/__init__.py new file mode 100644 index 0000000..923d361 --- /dev/null +++ b/engine/effects/__init__.py @@ -0,0 +1,42 @@ +from engine.effects.chain import EffectChain +from engine.effects.controller import handle_effects_command, show_effects_menu +from engine.effects.legacy import ( + fade_line, + firehose_line, + glitch_bar, + next_headline, + noise, + vis_trunc, +) +from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor +from engine.effects.registry import EffectRegistry, get_registry, set_registry +from engine.effects.types import EffectConfig, EffectContext, PipelineConfig + + +def get_effect_chain(): + from engine.layers import get_effect_chain as _chain + + return _chain() + + +__all__ = [ + "EffectChain", + "EffectRegistry", + "EffectConfig", + "EffectContext", + "PipelineConfig", + "get_registry", + "set_registry", + "get_effect_chain", + "get_monitor", + "set_monitor", + "PerformanceMonitor", + "handle_effects_command", + "show_effects_menu", + "fade_line", + "firehose_line", + "glitch_bar", + "noise", + "next_headline", + "vis_trunc", +] diff --git a/engine/effects/chain.py b/engine/effects/chain.py new file mode 100644 index 0000000..c687266 --- /dev/null +++ b/engine/effects/chain.py @@ -0,0 +1,71 @@ +import time + +from engine.effects.performance import PerformanceMonitor, get_monitor +from engine.effects.registry import EffectRegistry +from engine.effects.types import EffectContext + + +class EffectChain: + def __init__( + self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None + ): + self._registry = registry + self._order: list[str] = [] + self._monitor = monitor + + def _get_monitor(self) -> PerformanceMonitor: + if self._monitor is not None: + return self._monitor + return get_monitor() + + def set_order(self, names: list[str]) -> None: + self._order = list(names) + + def get_order(self) -> list[str]: + return self._order.copy() + + def add_effect(self, name: str, position: int | None = None) -> bool: + if name not in self._registry.list_all(): + return False + if position is None: + self._order.append(name) + else: + self._order.insert(position, name) + return True + + def remove_effect(self, name: str) -> bool: + if name in self._order: + self._order.remove(name) + return True + return False + + def reorder(self, new_order: list[str]) -> bool: + all_plugins = set(self._registry.list_all().keys()) + if not all(name in all_plugins for name in new_order): + return False + self._order = list(new_order) + return True + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + monitor = self._get_monitor() + frame_number = ctx.frame_number + monitor.start_frame(frame_number) + + frame_start = time.perf_counter() + result = list(buf) + for name in self._order: + plugin = self._registry.get(name) + if plugin and plugin.config.enabled: + chars_in = sum(len(line) for line in result) + effect_start = time.perf_counter() + try: + result = plugin.process(result, ctx) + except Exception: + plugin.config.enabled = False + elapsed = time.perf_counter() - effect_start + chars_out = sum(len(line) for line in result) + monitor.record_effect(name, elapsed * 1000, chars_in, chars_out) + + total_elapsed = time.perf_counter() - frame_start + monitor.end_frame(frame_number, total_elapsed * 1000) + return result diff --git a/engine/effects/controller.py b/engine/effects/controller.py new file mode 100644 index 0000000..3e72881 --- /dev/null +++ b/engine/effects/controller.py @@ -0,0 +1,144 @@ +from engine.effects.performance import get_monitor +from engine.effects.registry import get_registry + +_effect_chain_ref = None + + +def _get_effect_chain(): + global _effect_chain_ref + if _effect_chain_ref is not None: + return _effect_chain_ref + try: + from engine.layers import get_effect_chain as _chain + + return _chain() + except Exception: + return None + + +def set_effect_chain_ref(chain) -> None: + global _effect_chain_ref + _effect_chain_ref = chain + + +def handle_effects_command(cmd: str) -> str: + """Handle /effects command from NTFY message. + + Commands: + /effects list - list all effects and their status + /effects on - enable an effect + /effects off - disable an effect + /effects intensity <0.0-1.0> - set intensity + /effects reorder ,,... - reorder pipeline + /effects stats - show performance statistics + """ + parts = cmd.strip().split() + if not parts or parts[0] != "/effects": + return "Unknown command" + + registry = get_registry() + chain = _get_effect_chain() + + if len(parts) == 1 or parts[1] == "list": + result = ["Effects:"] + for name, plugin in registry.list_all().items(): + status = "ON" if plugin.config.enabled else "OFF" + intensity = plugin.config.intensity + result.append(f" {name}: {status} (intensity={intensity})") + if chain: + result.append(f"Order: {chain.get_order()}") + return "\n".join(result) + + if parts[1] == "stats": + return _format_stats() + + if parts[1] == "reorder" and len(parts) >= 3: + new_order = parts[2].split(",") + if chain and chain.reorder(new_order): + return f"Reordered pipeline: {new_order}" + return "Failed to reorder pipeline" + + if len(parts) < 3: + return "Usage: /effects on|off|intensity " + + effect_name = parts[1] + action = parts[2] + + if effect_name not in registry.list_all(): + return f"Unknown effect: {effect_name}" + + if action == "on": + registry.enable(effect_name) + return f"Enabled: {effect_name}" + + if action == "off": + registry.disable(effect_name) + return f"Disabled: {effect_name}" + + if action == "intensity" and len(parts) >= 4: + try: + value = float(parts[3]) + if not 0.0 <= value <= 1.0: + return "Intensity must be between 0.0 and 1.0" + plugin = registry.get(effect_name) + if plugin: + plugin.config.intensity = value + return f"Set {effect_name} intensity to {value}" + except ValueError: + return "Invalid intensity value" + + return f"Unknown action: {action}" + + +def _format_stats() -> str: + monitor = get_monitor() + stats = monitor.get_stats() + + if "error" in stats: + return stats["error"] + + lines = ["Performance Stats:"] + + pipeline = stats["pipeline"] + lines.append( + f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)" + ) + + if stats["effects"]: + lines.append(" Per-effect (avg ms):") + for name, effect_stats in stats["effects"].items(): + lines.append( + f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms" + ) + + return "\n".join(lines) + + +def show_effects_menu() -> str: + """Generate effects menu text for display.""" + registry = get_registry() + chain = _get_effect_chain() + + lines = [ + "\033[1;38;5;231m=== EFFECTS MENU ===\033[0m", + "", + "Effects:", + ] + + for name, plugin in registry.list_all().items(): + status = "ON" if plugin.config.enabled else "OFF" + intensity = plugin.config.intensity + lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}") + + if chain: + lines.append("") + lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}") + + lines.append("") + lines.append("Controls:") + lines.append(" /effects on|off") + lines.append(" /effects intensity <0.0-1.0>") + lines.append(" /effects reorder name1,name2,...") + lines.append("") + + return "\n".join(lines) diff --git a/engine/effects.py b/engine/effects/legacy.py similarity index 100% rename from engine/effects.py rename to engine/effects/legacy.py diff --git a/engine/effects/performance.py b/engine/effects/performance.py new file mode 100644 index 0000000..7a26bb9 --- /dev/null +++ b/engine/effects/performance.py @@ -0,0 +1,103 @@ +from collections import deque +from dataclasses import dataclass + + +@dataclass +class EffectTiming: + name: str + duration_ms: float + buffer_chars_in: int + buffer_chars_out: int + + +@dataclass +class FrameTiming: + frame_number: int + total_ms: float + effects: list[EffectTiming] + + +class PerformanceMonitor: + """Collects and stores performance metrics for effect pipeline.""" + + def __init__(self, max_frames: int = 60): + self._max_frames = max_frames + self._frames: deque[FrameTiming] = deque(maxlen=max_frames) + self._current_frame: list[EffectTiming] = [] + + def start_frame(self, frame_number: int) -> None: + self._current_frame = [] + + def record_effect( + self, name: str, duration_ms: float, chars_in: int, chars_out: int + ) -> None: + self._current_frame.append( + EffectTiming( + name=name, + duration_ms=duration_ms, + buffer_chars_in=chars_in, + buffer_chars_out=chars_out, + ) + ) + + def end_frame(self, frame_number: int, total_ms: float) -> None: + self._frames.append( + FrameTiming( + frame_number=frame_number, + total_ms=total_ms, + effects=self._current_frame, + ) + ) + + def get_stats(self) -> dict: + if not self._frames: + return {"error": "No timing data available"} + + total_times = [f.total_ms for f in self._frames] + avg_total = sum(total_times) / len(total_times) + min_total = min(total_times) + max_total = max(total_times) + + effect_stats: dict[str, dict] = {} + for frame in self._frames: + for effect in frame.effects: + if effect.name not in effect_stats: + effect_stats[effect.name] = {"times": [], "total_chars": 0} + effect_stats[effect.name]["times"].append(effect.duration_ms) + effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out + + for name, stats in effect_stats.items(): + times = stats["times"] + stats["avg_ms"] = sum(times) / len(times) + stats["min_ms"] = min(times) + stats["max_ms"] = max(times) + del stats["times"] + + return { + "frame_count": len(self._frames), + "pipeline": { + "avg_ms": avg_total, + "min_ms": min_total, + "max_ms": max_total, + }, + "effects": effect_stats, + } + + def reset(self) -> None: + self._frames.clear() + self._current_frame = [] + + +_monitor: PerformanceMonitor | None = None + + +def get_monitor() -> PerformanceMonitor: + global _monitor + if _monitor is None: + _monitor = PerformanceMonitor() + return _monitor + + +def set_monitor(monitor: PerformanceMonitor) -> None: + global _monitor + _monitor = monitor diff --git a/engine/effects/registry.py b/engine/effects/registry.py new file mode 100644 index 0000000..bdf13d8 --- /dev/null +++ b/engine/effects/registry.py @@ -0,0 +1,59 @@ +from engine.effects.types import EffectConfig, EffectPlugin + + +class EffectRegistry: + def __init__(self): + self._plugins: dict[str, EffectPlugin] = {} + self._discovered: bool = False + + def register(self, plugin: EffectPlugin) -> None: + self._plugins[plugin.name] = plugin + + def get(self, name: str) -> EffectPlugin | None: + return self._plugins.get(name) + + def list_all(self) -> dict[str, EffectPlugin]: + return self._plugins.copy() + + def list_enabled(self) -> list[EffectPlugin]: + return [p for p in self._plugins.values() if p.config.enabled] + + def enable(self, name: str) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.config.enabled = True + return True + return False + + def disable(self, name: str) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.config.enabled = False + return True + return False + + def configure(self, name: str, config: EffectConfig) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.configure(config) + return True + return False + + def is_enabled(self, name: str) -> bool: + plugin = self._plugins.get(name) + return plugin.config.enabled if plugin else False + + +_registry: EffectRegistry | None = None + + +def get_registry() -> EffectRegistry: + global _registry + if _registry is None: + _registry = EffectRegistry() + return _registry + + +def set_registry(registry: EffectRegistry) -> None: + global _registry + _registry = registry diff --git a/engine/effects/types.py b/engine/effects/types.py new file mode 100644 index 0000000..1d2c340 --- /dev/null +++ b/engine/effects/types.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class EffectContext: + terminal_width: int + terminal_height: int + scroll_cam: int + ticker_height: int + mic_excess: float + grad_offset: float + frame_number: int + has_message: bool + items: list = field(default_factory=list) + + +@dataclass +class EffectConfig: + enabled: bool = True + intensity: float = 1.0 + params: dict[str, Any] = field(default_factory=dict) + + +class EffectPlugin: + name: str + config: EffectConfig + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + raise NotImplementedError + + def configure(self, config: EffectConfig) -> None: + raise NotImplementedError + + +@dataclass +class PipelineConfig: + order: list[str] = field(default_factory=list) + effects: dict[str, EffectConfig] = field(default_factory=dict) diff --git a/engine/layers.py b/engine/layers.py index ebc53ef..b5ac428 100644 --- a/engine/layers.py +++ b/engine/layers.py @@ -10,6 +10,8 @@ from datetime import datetime from engine import config from engine.effects import ( + EffectChain, + EffectContext, fade_line, firehose_line, glitch_bar, @@ -199,3 +201,60 @@ def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: fline = firehose_line(items, w) buf.append(f"\033[{scr_row};1H{fline}\033[K") return buf + + +_effect_chain = None + + +def init_effects() -> None: + """Initialize effect plugins and chain.""" + global _effect_chain + from engine.effects import EffectChain, get_registry + + registry = get_registry() + + import effects_plugins + + effects_plugins.discover_plugins() + + chain = EffectChain(registry) + chain.set_order(["noise", "fade", "glitch", "firehose"]) + _effect_chain = chain + + +def process_effects( + buf: list[str], + w: int, + h: int, + scroll_cam: int, + ticker_h: int, + mic_excess: float, + grad_offset: float, + frame_number: int, + has_message: bool, + items: list, +) -> list[str]: + """Process buffer through effect chain.""" + if _effect_chain is None: + init_effects() + + ctx = EffectContext( + terminal_width=w, + terminal_height=h, + scroll_cam=scroll_cam, + ticker_height=ticker_h, + mic_excess=mic_excess, + grad_offset=grad_offset, + frame_number=frame_number, + has_message=has_message, + items=items, + ) + return _effect_chain.process(buf, ctx) + + +def get_effect_chain() -> EffectChain | None: + """Get the effect chain instance.""" + global _effect_chain + if _effect_chain is None: + init_effects() + return _effect_chain diff --git a/tests/test_effects.py b/tests/test_effects.py new file mode 100644 index 0000000..12d41a5 --- /dev/null +++ b/tests/test_effects.py @@ -0,0 +1,427 @@ +""" +Tests for engine.effects module. +""" + +from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry + + +class MockEffect: + name = "mock" + config = EffectConfig(enabled=True, intensity=1.0) + + def __init__(self): + self.processed = False + self.last_ctx = None + + def process(self, buf, ctx): + self.processed = True + self.last_ctx = ctx + return buf + ["processed"] + + def configure(self, config): + self.config = config + + +class TestEffectConfig: + def test_defaults(self): + cfg = EffectConfig() + assert cfg.enabled is True + assert cfg.intensity == 1.0 + assert cfg.params == {} + + def test_custom_values(self): + cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"}) + assert cfg.enabled is False + assert cfg.intensity == 0.5 + assert cfg.params == {"key": "value"} + + +class TestEffectContext: + def test_defaults(self): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + assert ctx.terminal_width == 80 + assert ctx.terminal_height == 24 + assert ctx.ticker_height == 20 + assert ctx.items == [] + + def test_with_items(self): + items = [("Title", "Source", "12:00")] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + items=items, + ) + assert ctx.items == items + + +class TestEffectRegistry: + def test_init_empty(self): + registry = EffectRegistry() + assert len(registry.list_all()) == 0 + + def test_register(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + assert "mock" in registry.list_all() + + def test_get(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + retrieved = registry.get("mock") + assert retrieved is effect + + def test_get_nonexistent(self): + registry = EffectRegistry() + assert registry.get("nonexistent") is None + + def test_enable(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = False + registry.register(effect) + registry.enable("mock") + assert effect.config.enabled is True + + def test_disable(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = True + registry.register(effect) + registry.disable("mock") + assert effect.config.enabled is False + + def test_list_enabled(self): + registry = EffectRegistry() + + class EnabledEffect: + name = "enabled_effect" + config = EffectConfig(enabled=True, intensity=1.0) + + class DisabledEffect: + name = "disabled_effect" + config = EffectConfig(enabled=False, intensity=1.0) + + registry.register(EnabledEffect()) + registry.register(DisabledEffect()) + enabled = registry.list_enabled() + assert len(enabled) == 1 + assert enabled[0].name == "enabled_effect" + + def test_configure(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + new_config = EffectConfig(enabled=False, intensity=0.3) + registry.configure("mock", new_config) + assert effect.config.enabled is False + assert effect.config.intensity == 0.3 + + def test_is_enabled(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = True + registry.register(effect) + assert registry.is_enabled("mock") is True + assert registry.is_enabled("nonexistent") is False + + +class TestEffectChain: + def test_init(self): + registry = EffectRegistry() + chain = EffectChain(registry) + assert chain.get_order() == [] + + def test_set_order(self): + registry = EffectRegistry() + effect1 = MockEffect() + effect1.name = "effect1" + effect2 = MockEffect() + effect2.name = "effect2" + registry.register(effect1) + registry.register(effect2) + chain = EffectChain(registry) + chain.set_order(["effect1", "effect2"]) + assert chain.get_order() == ["effect1", "effect2"] + + def test_add_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.add_effect("test_effect") + assert "test_effect" in chain.get_order() + + def test_add_effect_invalid(self): + registry = EffectRegistry() + chain = EffectChain(registry) + result = chain.add_effect("nonexistent") + assert result is False + + def test_remove_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + chain.remove_effect("test_effect") + assert "test_effect" not in chain.get_order() + + def test_reorder(self): + registry = EffectRegistry() + effect1 = MockEffect() + effect1.name = "effect1" + effect2 = MockEffect() + effect2.name = "effect2" + effect3 = MockEffect() + effect3.name = "effect3" + registry.register(effect1) + registry.register(effect2) + registry.register(effect3) + chain = EffectChain(registry) + chain.set_order(["effect1", "effect2", "effect3"]) + result = chain.reorder(["effect3", "effect1", "effect2"]) + assert result is True + assert chain.get_order() == ["effect3", "effect1", "effect2"] + + def test_reorder_invalid(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "effect1" + registry.register(effect) + chain = EffectChain(registry) + result = chain.reorder(["effect1", "nonexistent"]) + assert result is False + + def test_process_empty_chain(self): + registry = EffectRegistry() + chain = EffectChain(registry) + buf = ["line1", "line2"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == buf + + def test_process_with_effects(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + buf = ["line1", "line2"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == ["line1", "line2", "processed"] + assert effect.processed is True + assert effect.last_ctx is ctx + + def test_process_disabled_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + effect.config.enabled = False + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + buf = ["line1"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == ["line1"] + assert effect.processed is False + + +class TestEffectsExports: + def test_all_exports_are_importable(self): + """Verify all exports in __all__ can actually be imported.""" + import engine.effects as effects_module + + for name in effects_module.__all__: + getattr(effects_module, name) + + +class TestPerformanceMonitor: + def test_empty_stats(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + stats = monitor.get_stats() + assert "error" in stats + + def test_record_and_retrieve(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + monitor.start_frame(1) + monitor.record_effect("test_effect", 1.5, 100, 150) + monitor.end_frame(1, 2.0) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["frame_count"] == 1 + assert "test_effect" in stats["effects"] + + def test_multiple_frames(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=3) + for i in range(5): + monitor.start_frame(i) + monitor.record_effect("effect1", 1.0, 100, 100) + monitor.record_effect("effect2", 0.5, 100, 100) + monitor.end_frame(i, 1.5) + + stats = monitor.get_stats() + assert stats["frame_count"] == 3 + assert "effect1" in stats["effects"] + assert "effect2" in stats["effects"] + + def test_reset(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + monitor.start_frame(1) + monitor.record_effect("test", 1.0, 100, 100) + monitor.end_frame(1, 1.0) + + monitor.reset() + stats = monitor.get_stats() + assert "error" in stats + + +class TestEffectPipelinePerformance: + def test_pipeline_stays_within_frame_budget(self): + """Verify effect pipeline completes within frame budget (33ms for 30fps).""" + from engine.effects import ( + EffectChain, + EffectConfig, + EffectContext, + EffectRegistry, + ) + + class DummyEffect: + name = "dummy" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf, ctx): + return [line * 2 for line in buf] + + registry = EffectRegistry() + registry.register(DummyEffect()) + + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=10) + chain = EffectChain(registry, monitor) + chain.set_order(["dummy"]) + + buf = ["x" * 80] * 20 + + for i in range(10): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=i, + has_message=False, + ) + chain.process(buf, ctx) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["pipeline"]["max_ms"] < 33.0 + + def test_individual_effects_performance(self): + """Verify individual effects don't exceed 10ms per frame.""" + from engine.effects import ( + EffectChain, + EffectConfig, + EffectContext, + EffectRegistry, + ) + + class SlowEffect: + name = "slow" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf, ctx): + result = [] + for line in buf: + result.append(line) + result.append(line + line) + return result + + registry = EffectRegistry() + registry.register(SlowEffect()) + + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=5) + chain = EffectChain(registry, monitor) + chain.set_order(["slow"]) + + buf = ["x" * 80] * 10 + + for i in range(5): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=i, + has_message=False, + ) + chain.process(buf, ctx) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["effects"]["slow"]["max_ms"] < 10.0