diff --git a/cmdline.py b/cmdline.py new file mode 100644 index 0000000..9ee9ba6 --- /dev/null +++ b/cmdline.py @@ -0,0 +1,250 @@ +#!/usr/bin/env python3 +""" +Command-line utility for interacting with mainline via ntfy. + +Usage: + python cmdline.py # Interactive TUI mode + python cmdline.py --help # Show help + python cmdline.py /effects list # Send single command via ntfy + python cmdline.py /effects stats # Get performance stats via ntfy + python cmdline.py -w /effects stats # Watch mode (polls for stats) + +The TUI mode provides: + - Arrow keys to navigate command history + - Tab completion for commands + - Auto-refresh for performance stats + +C&C works like a serial port: + 1. Send command to ntfy_cc_topic + 2. Mainline receives, processes, responds to same topic + 3. Cmdline polls for response +""" + +import argparse +import json +import sys +import time +import threading +import urllib.request +from pathlib import Path + +from engine import config +from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST + +try: + CC_CMD_TOPIC = config.NTFY_CC_CMD_TOPIC + CC_RESP_TOPIC = config.NTFY_CC_RESP_TOPIC +except AttributeError: + CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json" + CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json" + + +class NtfyResponsePoller: + """Polls ntfy for command responses.""" + + def __init__(self, cmd_topic: str, resp_topic: str, timeout: float = 10.0): + self.cmd_topic = cmd_topic + self.resp_topic = resp_topic + self.timeout = timeout + self._last_id = None + self._lock = threading.Lock() + + def _build_url(self) -> str: + from urllib.parse import parse_qs, urlencode, urlparse, urlunparse + + parsed = urlparse(self.resp_topic) + params = parse_qs(parsed.query, keep_blank_values=True) + params["since"] = [self._last_id if self._last_id else "20s"] + new_query = urlencode({k: v[0] for k, v in params.items()}) + return urlunparse(parsed._replace(query=new_query)) + + def send_and_wait(self, cmd: str) -> str: + """Send command and wait for response.""" + url = self.cmd_topic.replace("/json", "") + data = cmd.encode("utf-8") + + req = urllib.request.Request( + url, + data=data, + headers={ + "User-Agent": "mainline-cmdline/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + urllib.request.urlopen(req, timeout=5) + except Exception as e: + return f"Error sending command: {e}" + + return self._wait_for_response(cmd) + + def _wait_for_response(self, expected_cmd: str = "") -> str: + """Poll for response message.""" + start = time.time() + while time.time() - start < self.timeout: + try: + url = self._build_url() + req = urllib.request.Request( + url, headers={"User-Agent": "mainline-cmdline/0.1"} + ) + with urllib.request.urlopen(req, timeout=10) as resp: + for line in resp: + try: + data = json.loads(line.decode("utf-8", errors="replace")) + except json.JSONDecodeError: + continue + if data.get("event") == "message": + self._last_id = data.get("id") + msg = data.get("message", "") + if msg: + return msg + except Exception: + pass + time.sleep(0.5) + return "Timeout waiting for response" + + +AVAILABLE_COMMANDS = """Available commands: + /effects list - List all effects and status + /effects on - Enable an effect + /effects off - Disable an effect + /effects intensity <0.0-1.0> - Set effect intensity + /effects reorder ,,... - Reorder pipeline + /effects stats - Show performance statistics + /help - Show this help + /quit - Exit +""" + + +def print_header(): + w = 60 + print(CLR, end="") + print(CURSOR_OFF, end="") + print(f"\033[1;1H", end="") + print(f" \033[1;38;5;231m╔{'═' * (w - 6)}╗\033[0m") + print( + f" \033[1;38;5;231m║\033[0m \033[1;38;5;82mMAINLINE\033[0m \033[3;38;5;245mCommand Center\033[0m \033[1;38;5;231m ║\033[0m" + ) + print(f" \033[1;38;5;231m╚{'═' * (w - 6)}╝\033[0m") + print(f" \033[2;38;5;37mCMD: {CC_CMD_TOPIC.split('/')[-2]}\033[0m") + print(f" \033[2;38;5;37mRESP: {CC_RESP_TOPIC.split('/')[-2]}\033[0m") + print() + + +def print_response(response: str, is_error: bool = False) -> None: + """Print response with nice formatting.""" + print() + if is_error: + print(f" \033[1;38;5;196m✗ Error\033[0m") + print(f" \033[38;5;196m{'─' * 40}\033[0m") + else: + print(f" \033[1;38;5;82m✓ Response\033[0m") + print(f" \033[38;5;37m{'─' * 40}\033[0m") + + for line in response.split("\n"): + print(f" {line}") + print() + + +def interactive_mode(): + """Interactive TUI for sending commands.""" + import readline + + print_header() + poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC) + + print(f" \033[38;5;245mType /help for commands, /quit to exit\033[0m") + print() + + while True: + try: + cmd = input(f" \033[1;38;5;82m❯\033[0m {G_HI}").strip() + except (EOFError, KeyboardInterrupt): + print() + break + + if not cmd: + continue + + if cmd.startswith("/"): + if cmd == "/quit" or cmd == "/exit": + print(f"\n \033[1;38;5;245mGoodbye!{RST}\n") + break + + if cmd == "/help": + print(f"\n{AVAILABLE_COMMANDS}\n") + continue + + print(f" \033[38;5;245m⟳ Sending to mainline...{RST}") + result = poller.send_and_wait(cmd) + print_response(result, is_error=result.startswith("Error")) + else: + print(f"\n \033[1;38;5;196m⚠ Commands must start with /{RST}\n") + + print(CURSOR_ON, end="") + return 0 + + +def main(): + parser = argparse.ArgumentParser( + description="Mainline command-line interface", + formatter_class=argparse.RawDescriptionHelpFormatter, + epilog=AVAILABLE_COMMANDS, + ) + parser.add_argument( + "command", + nargs="?", + default=None, + help="Command to send (e.g., /effects list)", + ) + parser.add_argument( + "--watch", + "-w", + action="store_true", + help="Watch mode: continuously poll for stats (Ctrl+C to exit)", + ) + + args = parser.parse_args() + + if args.command is None: + return interactive_mode() + + poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC) + + if args.watch and "/effects stats" in args.command: + import signal + + def handle_sigterm(*_): + print(f"\n \033[1;38;5;245mStopped watching{RST}") + print(CURSOR_ON, end="") + sys.exit(0) + + signal.signal(signal.SIGTERM, handle_sigterm) + + print_header() + print(f" \033[38;5;245mWatching /effects stats (Ctrl+C to exit)...{RST}\n") + try: + while True: + result = poller.send_and_wait(args.command) + print(f"\033[2J\033[1;1H", end="") + print( + f" \033[1;38;5;82m❯\033[0m Performance Stats - \033[1;38;5;245m{time.strftime('%H:%M:%S')}{RST}" + ) + print(f" \033[38;5;37m{'─' * 44}{RST}") + for line in result.split("\n"): + print(f" {line}") + time.sleep(2) + except KeyboardInterrupt: + print(f"\n \033[1;38;5;245mStopped watching{RST}") + return 0 + return 0 + + result = poller.send_and_wait(args.command) + print(result) + return 0 + + +if __name__ == "__main__": + main() diff --git a/effects_plugins/__init__.py b/effects_plugins/__init__.py new file mode 100644 index 0000000..fc3c8d5 --- /dev/null +++ b/effects_plugins/__init__.py @@ -0,0 +1,35 @@ +from pathlib import Path + +PLUGIN_DIR = Path(__file__).parent + + +def discover_plugins(): + from engine.effects.registry import get_registry + + registry = get_registry() + imported = {} + + for file_path in PLUGIN_DIR.glob("*.py"): + if file_path.name.startswith("_"): + continue + module_name = file_path.stem + if module_name in ("base", "types"): + continue + + try: + module = __import__(f"effects_plugins.{module_name}", fromlist=[""]) + for attr_name in dir(module): + attr = getattr(module, attr_name) + if ( + isinstance(attr, type) + and hasattr(attr, "name") + and hasattr(attr, "process") + and attr_name.endswith("Effect") + ): + plugin = attr() + registry.register(plugin) + imported[plugin.name] = plugin + except Exception: + pass + + return imported diff --git a/effects_plugins/fade.py b/effects_plugins/fade.py new file mode 100644 index 0000000..98ede65 --- /dev/null +++ b/effects_plugins/fade.py @@ -0,0 +1,58 @@ +import random + +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin + + +class FadeEffect: + name = "fade" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not ctx.ticker_height: + return buf + result = list(buf) + intensity = self.config.intensity + + top_zone = max(1, int(ctx.ticker_height * 0.25)) + bot_zone = max(1, int(ctx.ticker_height * 0.10)) + + for r in range(len(result)): + if r >= ctx.ticker_height: + continue + top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 + bot_f = ( + min(1.0, (ctx.ticker_height - 1 - r) / bot_zone) + if bot_zone > 0 + else 1.0 + ) + row_fade = min(top_f, bot_f) * intensity + + if row_fade < 1.0 and result[r].strip(): + result[r] = self._fade_line(result[r], row_fade) + + return result + + def _fade_line(self, s: str, fade: float) -> str: + if fade >= 1.0: + return s + if fade <= 0.0: + return "" + result = [] + i = 0 + while i < len(s): + if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[": + j = i + 2 + while j < len(s) and not s[j].isalpha(): + j += 1 + result.append(s[i : j + 1]) + i = j + 1 + elif s[i] == " ": + result.append(" ") + i += 1 + else: + result.append(s[i] if random.random() < fade else " ") + i += 1 + return "".join(result) + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/firehose.py b/effects_plugins/firehose.py new file mode 100644 index 0000000..4be520b --- /dev/null +++ b/effects_plugins/firehose.py @@ -0,0 +1,72 @@ +import random +from datetime import datetime + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.sources import FEEDS, POETRY_SOURCES +from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST + + +class FirehoseEffect: + name = "firehose" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0 + if firehose_h <= 0 or not ctx.items: + return buf + + result = list(buf) + intensity = self.config.intensity + h = ctx.terminal_height + + for fr in range(firehose_h): + scr_row = h - firehose_h + fr + 1 + fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity) + result.append(f"\033[{scr_row};1H{fline}\033[K") + return result + + def _firehose_line(self, items: list, w: int, intensity: float) -> str: + r = random.random() + if r < 0.35 * intensity: + title, src, ts = random.choice(items) + text = title[: w - 1] + color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM]) + return f"{color}{text}{RST}" + elif r < 0.55 * intensity: + d = random.choice([0.45, 0.55, 0.65, 0.75]) + return "".join( + f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" + f"{random.choice(config.GLITCH + config.KATA)}{RST}" + if random.random() < d + else " " + for _ in range(w) + ) + elif r < 0.78 * intensity: + sources = FEEDS if config.MODE == "news" else POETRY_SOURCES + src = random.choice(list(sources.keys())) + msgs = [ + f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}", + f" ░░ FEED ACTIVE :: {src}", + f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}", + f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}", + f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM " + f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}", + ] + text = random.choice(msgs)[: w - 1] + color = random.choice([G_LO, G_DIM, W_GHOST]) + return f"{color}{text}{RST}" + else: + title, _, _ = random.choice(items) + start = random.randint(0, max(0, len(title) - 20)) + frag = title[start : start + random.randint(10, 35)] + pad = random.randint(0, max(0, w - len(frag) - 8)) + gp = "".join( + random.choice(config.GLITCH) for _ in range(random.randint(1, 3)) + ) + text = (" " * pad + gp + " " + frag)[: w - 1] + color = random.choice([G_LO, C_DIM, W_GHOST]) + return f"{color}{text}{RST}" + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/glitch.py b/effects_plugins/glitch.py new file mode 100644 index 0000000..d23244a --- /dev/null +++ b/effects_plugins/glitch.py @@ -0,0 +1,37 @@ +import random + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST + + +class GlitchEffect: + name = "glitch" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not buf: + return buf + result = list(buf) + intensity = self.config.intensity + + glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16) + glitch_prob = glitch_prob * intensity + n_hits = 4 + int(ctx.mic_excess / 2) + n_hits = int(n_hits * intensity) + + if random.random() < glitch_prob: + for _ in range(min(n_hits, len(result))): + gi = random.randint(0, len(result) - 1) + scr_row = gi + 1 + result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}" + return result + + def _glitch_bar(self, w: int) -> str: + c = random.choice(["░", "▒", "─", "\xc2"]) + n = random.randint(3, w // 2) + o = random.randint(0, w - n) + return " " * o + f"{G_LO}{DIM}" + c * n + RST + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/effects_plugins/noise.py b/effects_plugins/noise.py new file mode 100644 index 0000000..d7bf316 --- /dev/null +++ b/effects_plugins/noise.py @@ -0,0 +1,36 @@ +import random + +from engine import config +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin +from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST + + +class NoiseEffect: + name = "noise" + config = EffectConfig(enabled=True, intensity=0.15) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not ctx.ticker_height: + return buf + result = list(buf) + intensity = self.config.intensity + probability = intensity * 0.15 + + for r in range(len(result)): + cy = ctx.scroll_cam + r + if random.random() < probability: + result[r] = self._generate_noise(ctx.terminal_width, cy) + return result + + def _generate_noise(self, w: int, cy: int) -> str: + d = random.choice([0.15, 0.25, 0.35, 0.12]) + return "".join( + f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" + f"{random.choice(config.GLITCH + config.KATA)}{RST}" + if random.random() < d + else " " + for _ in range(w) + ) + + def configure(self, cfg: EffectConfig) -> None: + self.config = cfg diff --git a/engine/display.py b/engine/display.py new file mode 100644 index 0000000..32eb09e --- /dev/null +++ b/engine/display.py @@ -0,0 +1,102 @@ +""" +Display output abstraction - allows swapping output backends. + +Protocol: + - init(width, height): Initialize display with terminal dimensions + - show(buffer): Render buffer (list of strings) to display + - clear(): Clear the display + - cleanup(): Shutdown display +""" + +import time +from typing import Protocol + + +class Display(Protocol): + """Protocol for display backends.""" + + def init(self, width: int, height: int) -> None: + """Initialize display with dimensions.""" + ... + + def show(self, buffer: list[str]) -> None: + """Show buffer on display.""" + ... + + def clear(self) -> None: + """Clear display.""" + ... + + def cleanup(self) -> None: + """Shutdown display.""" + ... + + +def get_monitor(): + """Get the performance monitor.""" + try: + from engine.effects.performance import get_monitor as _get_monitor + + return _get_monitor() + except Exception: + return None + + +class TerminalDisplay: + """ANSI terminal display backend.""" + + def __init__(self): + self.width = 80 + self.height = 24 + + def init(self, width: int, height: int) -> None: + from engine.terminal import CURSOR_OFF + + self.width = width + self.height = height + print(CURSOR_OFF, end="", flush=True) + + def show(self, buffer: list[str]) -> None: + import sys + + t0 = time.perf_counter() + sys.stdout.buffer.write("".join(buffer).encode()) + sys.stdout.flush() + elapsed_ms = (time.perf_counter() - t0) * 1000 + + monitor = get_monitor() + if monitor: + chars_in = sum(len(line) for line in buffer) + monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in) + + def clear(self) -> None: + from engine.terminal import CLR + + print(CLR, end="", flush=True) + + def cleanup(self) -> None: + from engine.terminal import CURSOR_ON + + print(CURSOR_ON, end="", flush=True) + + +class NullDisplay: + """Headless/null display - discards all output.""" + + def init(self, width: int, height: int) -> None: + self.width = width + self.height = height + + def show(self, buffer: list[str]) -> None: + monitor = get_monitor() + if monitor: + t0 = time.perf_counter() + chars_in = sum(len(line) for line in buffer) + elapsed_ms = (time.perf_counter() - t0) * 1000 + monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in) + + def clear(self) -> None: + pass + + def cleanup(self) -> None: + pass diff --git a/engine/effects/__init__.py b/engine/effects/__init__.py new file mode 100644 index 0000000..923d361 --- /dev/null +++ b/engine/effects/__init__.py @@ -0,0 +1,42 @@ +from engine.effects.chain import EffectChain +from engine.effects.controller import handle_effects_command, show_effects_menu +from engine.effects.legacy import ( + fade_line, + firehose_line, + glitch_bar, + next_headline, + noise, + vis_trunc, +) +from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor +from engine.effects.registry import EffectRegistry, get_registry, set_registry +from engine.effects.types import EffectConfig, EffectContext, PipelineConfig + + +def get_effect_chain(): + from engine.layers import get_effect_chain as _chain + + return _chain() + + +__all__ = [ + "EffectChain", + "EffectRegistry", + "EffectConfig", + "EffectContext", + "PipelineConfig", + "get_registry", + "set_registry", + "get_effect_chain", + "get_monitor", + "set_monitor", + "PerformanceMonitor", + "handle_effects_command", + "show_effects_menu", + "fade_line", + "firehose_line", + "glitch_bar", + "noise", + "next_headline", + "vis_trunc", +] diff --git a/engine/effects/chain.py b/engine/effects/chain.py new file mode 100644 index 0000000..c687266 --- /dev/null +++ b/engine/effects/chain.py @@ -0,0 +1,71 @@ +import time + +from engine.effects.performance import PerformanceMonitor, get_monitor +from engine.effects.registry import EffectRegistry +from engine.effects.types import EffectContext + + +class EffectChain: + def __init__( + self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None + ): + self._registry = registry + self._order: list[str] = [] + self._monitor = monitor + + def _get_monitor(self) -> PerformanceMonitor: + if self._monitor is not None: + return self._monitor + return get_monitor() + + def set_order(self, names: list[str]) -> None: + self._order = list(names) + + def get_order(self) -> list[str]: + return self._order.copy() + + def add_effect(self, name: str, position: int | None = None) -> bool: + if name not in self._registry.list_all(): + return False + if position is None: + self._order.append(name) + else: + self._order.insert(position, name) + return True + + def remove_effect(self, name: str) -> bool: + if name in self._order: + self._order.remove(name) + return True + return False + + def reorder(self, new_order: list[str]) -> bool: + all_plugins = set(self._registry.list_all().keys()) + if not all(name in all_plugins for name in new_order): + return False + self._order = list(new_order) + return True + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + monitor = self._get_monitor() + frame_number = ctx.frame_number + monitor.start_frame(frame_number) + + frame_start = time.perf_counter() + result = list(buf) + for name in self._order: + plugin = self._registry.get(name) + if plugin and plugin.config.enabled: + chars_in = sum(len(line) for line in result) + effect_start = time.perf_counter() + try: + result = plugin.process(result, ctx) + except Exception: + plugin.config.enabled = False + elapsed = time.perf_counter() - effect_start + chars_out = sum(len(line) for line in result) + monitor.record_effect(name, elapsed * 1000, chars_in, chars_out) + + total_elapsed = time.perf_counter() - frame_start + monitor.end_frame(frame_number, total_elapsed * 1000) + return result diff --git a/engine/effects/controller.py b/engine/effects/controller.py new file mode 100644 index 0000000..3e72881 --- /dev/null +++ b/engine/effects/controller.py @@ -0,0 +1,144 @@ +from engine.effects.performance import get_monitor +from engine.effects.registry import get_registry + +_effect_chain_ref = None + + +def _get_effect_chain(): + global _effect_chain_ref + if _effect_chain_ref is not None: + return _effect_chain_ref + try: + from engine.layers import get_effect_chain as _chain + + return _chain() + except Exception: + return None + + +def set_effect_chain_ref(chain) -> None: + global _effect_chain_ref + _effect_chain_ref = chain + + +def handle_effects_command(cmd: str) -> str: + """Handle /effects command from NTFY message. + + Commands: + /effects list - list all effects and their status + /effects on - enable an effect + /effects off - disable an effect + /effects intensity <0.0-1.0> - set intensity + /effects reorder ,,... - reorder pipeline + /effects stats - show performance statistics + """ + parts = cmd.strip().split() + if not parts or parts[0] != "/effects": + return "Unknown command" + + registry = get_registry() + chain = _get_effect_chain() + + if len(parts) == 1 or parts[1] == "list": + result = ["Effects:"] + for name, plugin in registry.list_all().items(): + status = "ON" if plugin.config.enabled else "OFF" + intensity = plugin.config.intensity + result.append(f" {name}: {status} (intensity={intensity})") + if chain: + result.append(f"Order: {chain.get_order()}") + return "\n".join(result) + + if parts[1] == "stats": + return _format_stats() + + if parts[1] == "reorder" and len(parts) >= 3: + new_order = parts[2].split(",") + if chain and chain.reorder(new_order): + return f"Reordered pipeline: {new_order}" + return "Failed to reorder pipeline" + + if len(parts) < 3: + return "Usage: /effects on|off|intensity " + + effect_name = parts[1] + action = parts[2] + + if effect_name not in registry.list_all(): + return f"Unknown effect: {effect_name}" + + if action == "on": + registry.enable(effect_name) + return f"Enabled: {effect_name}" + + if action == "off": + registry.disable(effect_name) + return f"Disabled: {effect_name}" + + if action == "intensity" and len(parts) >= 4: + try: + value = float(parts[3]) + if not 0.0 <= value <= 1.0: + return "Intensity must be between 0.0 and 1.0" + plugin = registry.get(effect_name) + if plugin: + plugin.config.intensity = value + return f"Set {effect_name} intensity to {value}" + except ValueError: + return "Invalid intensity value" + + return f"Unknown action: {action}" + + +def _format_stats() -> str: + monitor = get_monitor() + stats = monitor.get_stats() + + if "error" in stats: + return stats["error"] + + lines = ["Performance Stats:"] + + pipeline = stats["pipeline"] + lines.append( + f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)" + ) + + if stats["effects"]: + lines.append(" Per-effect (avg ms):") + for name, effect_stats in stats["effects"].items(): + lines.append( + f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms" + ) + + return "\n".join(lines) + + +def show_effects_menu() -> str: + """Generate effects menu text for display.""" + registry = get_registry() + chain = _get_effect_chain() + + lines = [ + "\033[1;38;5;231m=== EFFECTS MENU ===\033[0m", + "", + "Effects:", + ] + + for name, plugin in registry.list_all().items(): + status = "ON" if plugin.config.enabled else "OFF" + intensity = plugin.config.intensity + lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}") + + if chain: + lines.append("") + lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}") + + lines.append("") + lines.append("Controls:") + lines.append(" /effects on|off") + lines.append(" /effects intensity <0.0-1.0>") + lines.append(" /effects reorder name1,name2,...") + lines.append("") + + return "\n".join(lines) diff --git a/engine/effects.py b/engine/effects/legacy.py similarity index 100% rename from engine/effects.py rename to engine/effects/legacy.py diff --git a/engine/effects/performance.py b/engine/effects/performance.py new file mode 100644 index 0000000..7a26bb9 --- /dev/null +++ b/engine/effects/performance.py @@ -0,0 +1,103 @@ +from collections import deque +from dataclasses import dataclass + + +@dataclass +class EffectTiming: + name: str + duration_ms: float + buffer_chars_in: int + buffer_chars_out: int + + +@dataclass +class FrameTiming: + frame_number: int + total_ms: float + effects: list[EffectTiming] + + +class PerformanceMonitor: + """Collects and stores performance metrics for effect pipeline.""" + + def __init__(self, max_frames: int = 60): + self._max_frames = max_frames + self._frames: deque[FrameTiming] = deque(maxlen=max_frames) + self._current_frame: list[EffectTiming] = [] + + def start_frame(self, frame_number: int) -> None: + self._current_frame = [] + + def record_effect( + self, name: str, duration_ms: float, chars_in: int, chars_out: int + ) -> None: + self._current_frame.append( + EffectTiming( + name=name, + duration_ms=duration_ms, + buffer_chars_in=chars_in, + buffer_chars_out=chars_out, + ) + ) + + def end_frame(self, frame_number: int, total_ms: float) -> None: + self._frames.append( + FrameTiming( + frame_number=frame_number, + total_ms=total_ms, + effects=self._current_frame, + ) + ) + + def get_stats(self) -> dict: + if not self._frames: + return {"error": "No timing data available"} + + total_times = [f.total_ms for f in self._frames] + avg_total = sum(total_times) / len(total_times) + min_total = min(total_times) + max_total = max(total_times) + + effect_stats: dict[str, dict] = {} + for frame in self._frames: + for effect in frame.effects: + if effect.name not in effect_stats: + effect_stats[effect.name] = {"times": [], "total_chars": 0} + effect_stats[effect.name]["times"].append(effect.duration_ms) + effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out + + for name, stats in effect_stats.items(): + times = stats["times"] + stats["avg_ms"] = sum(times) / len(times) + stats["min_ms"] = min(times) + stats["max_ms"] = max(times) + del stats["times"] + + return { + "frame_count": len(self._frames), + "pipeline": { + "avg_ms": avg_total, + "min_ms": min_total, + "max_ms": max_total, + }, + "effects": effect_stats, + } + + def reset(self) -> None: + self._frames.clear() + self._current_frame = [] + + +_monitor: PerformanceMonitor | None = None + + +def get_monitor() -> PerformanceMonitor: + global _monitor + if _monitor is None: + _monitor = PerformanceMonitor() + return _monitor + + +def set_monitor(monitor: PerformanceMonitor) -> None: + global _monitor + _monitor = monitor diff --git a/engine/effects/registry.py b/engine/effects/registry.py new file mode 100644 index 0000000..bdf13d8 --- /dev/null +++ b/engine/effects/registry.py @@ -0,0 +1,59 @@ +from engine.effects.types import EffectConfig, EffectPlugin + + +class EffectRegistry: + def __init__(self): + self._plugins: dict[str, EffectPlugin] = {} + self._discovered: bool = False + + def register(self, plugin: EffectPlugin) -> None: + self._plugins[plugin.name] = plugin + + def get(self, name: str) -> EffectPlugin | None: + return self._plugins.get(name) + + def list_all(self) -> dict[str, EffectPlugin]: + return self._plugins.copy() + + def list_enabled(self) -> list[EffectPlugin]: + return [p for p in self._plugins.values() if p.config.enabled] + + def enable(self, name: str) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.config.enabled = True + return True + return False + + def disable(self, name: str) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.config.enabled = False + return True + return False + + def configure(self, name: str, config: EffectConfig) -> bool: + plugin = self._plugins.get(name) + if plugin: + plugin.configure(config) + return True + return False + + def is_enabled(self, name: str) -> bool: + plugin = self._plugins.get(name) + return plugin.config.enabled if plugin else False + + +_registry: EffectRegistry | None = None + + +def get_registry() -> EffectRegistry: + global _registry + if _registry is None: + _registry = EffectRegistry() + return _registry + + +def set_registry(registry: EffectRegistry) -> None: + global _registry + _registry = registry diff --git a/engine/effects/types.py b/engine/effects/types.py new file mode 100644 index 0000000..1d2c340 --- /dev/null +++ b/engine/effects/types.py @@ -0,0 +1,39 @@ +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class EffectContext: + terminal_width: int + terminal_height: int + scroll_cam: int + ticker_height: int + mic_excess: float + grad_offset: float + frame_number: int + has_message: bool + items: list = field(default_factory=list) + + +@dataclass +class EffectConfig: + enabled: bool = True + intensity: float = 1.0 + params: dict[str, Any] = field(default_factory=dict) + + +class EffectPlugin: + name: str + config: EffectConfig + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + raise NotImplementedError + + def configure(self, config: EffectConfig) -> None: + raise NotImplementedError + + +@dataclass +class PipelineConfig: + order: list[str] = field(default_factory=list) + effects: dict[str, EffectConfig] = field(default_factory=dict) diff --git a/engine/layers.py b/engine/layers.py index ebc53ef..b5ac428 100644 --- a/engine/layers.py +++ b/engine/layers.py @@ -10,6 +10,8 @@ from datetime import datetime from engine import config from engine.effects import ( + EffectChain, + EffectContext, fade_line, firehose_line, glitch_bar, @@ -199,3 +201,60 @@ def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: fline = firehose_line(items, w) buf.append(f"\033[{scr_row};1H{fline}\033[K") return buf + + +_effect_chain = None + + +def init_effects() -> None: + """Initialize effect plugins and chain.""" + global _effect_chain + from engine.effects import EffectChain, get_registry + + registry = get_registry() + + import effects_plugins + + effects_plugins.discover_plugins() + + chain = EffectChain(registry) + chain.set_order(["noise", "fade", "glitch", "firehose"]) + _effect_chain = chain + + +def process_effects( + buf: list[str], + w: int, + h: int, + scroll_cam: int, + ticker_h: int, + mic_excess: float, + grad_offset: float, + frame_number: int, + has_message: bool, + items: list, +) -> list[str]: + """Process buffer through effect chain.""" + if _effect_chain is None: + init_effects() + + ctx = EffectContext( + terminal_width=w, + terminal_height=h, + scroll_cam=scroll_cam, + ticker_height=ticker_h, + mic_excess=mic_excess, + grad_offset=grad_offset, + frame_number=frame_number, + has_message=has_message, + items=items, + ) + return _effect_chain.process(buf, ctx) + + +def get_effect_chain() -> EffectChain | None: + """Get the effect chain instance.""" + global _effect_chain + if _effect_chain is None: + init_effects() + return _effect_chain diff --git a/engine/scroll.py b/engine/scroll.py index 41445ad..d13408b 100644 --- a/engine/scroll.py +++ b/engine/scroll.py @@ -4,33 +4,42 @@ Orchestrates viewport, frame timing, and layers. """ import random -import sys import time from engine import config +from engine.display import ( + Display, + TerminalDisplay, +) +from engine.display import ( + get_monitor as _get_display_monitor, +) from engine.frame import calculate_scroll_step from engine.layers import ( apply_glitch, + process_effects, render_firehose, render_message_overlay, render_ticker_zone, ) -from engine.terminal import CLR from engine.viewport import th, tw +USE_EFFECT_CHAIN = True -def stream(items, ntfy_poller, mic_monitor): + +def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): """Main render loop with four layers: message, ticker, scroll motion, firehose.""" + if display is None: + display = TerminalDisplay() random.shuffle(items) pool = list(items) seen = set() queued = 0 time.sleep(0.5) - sys.stdout.write(CLR) - sys.stdout.flush() - w, h = tw(), th() + display.init(w, h) + display.clear() fh = config.FIREHOSE_H if config.FIREHOSE else 0 ticker_view_h = h - fh GAP = 3 @@ -42,6 +51,7 @@ def stream(items, ntfy_poller, mic_monitor): noise_cache = {} scroll_motion_accum = 0.0 msg_cache = (None, None) + frame_number = 0 while True: if queued >= config.HEADLINE_LIMIT and not active: @@ -93,19 +103,39 @@ def stream(items, ntfy_poller, mic_monitor): buf.extend(ticker_buf) mic_excess = mic_monitor.excess - buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) + render_start = time.perf_counter() - firehose_buf = render_firehose(items, w, fh, h) - buf.extend(firehose_buf) + if USE_EFFECT_CHAIN: + buf = process_effects( + buf, + w, + h, + scroll_cam, + ticker_h, + mic_excess, + grad_offset, + frame_number, + msg is not None, + items, + ) + else: + buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) + firehose_buf = render_firehose(items, w, fh, h) + buf.extend(firehose_buf) if msg_overlay: buf.extend(msg_overlay) - sys.stdout.buffer.write("".join(buf).encode()) - sys.stdout.flush() + render_elapsed = (time.perf_counter() - render_start) * 1000 + monitor = _get_display_monitor() + if monitor: + chars = sum(len(line) for line in buf) + monitor.record_effect("render", render_elapsed, chars, chars) + + display.show(buf) elapsed = time.monotonic() - t0 time.sleep(max(0, config.FRAME_DT - elapsed)) + frame_number += 1 - sys.stdout.write(CLR) - sys.stdout.flush() + display.cleanup() diff --git a/tests/test_controller.py b/tests/test_controller.py index 96ef02d..0f08b9b 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -83,3 +83,35 @@ class TestStreamControllerCleanup: controller.cleanup() mock_mic_instance.stop.assert_called_once() + + +class TestStreamControllerWarmup: + """Tests for StreamController topic warmup.""" + + def test_warmup_topics_idempotent(self): + """warmup_topics can be called multiple times.""" + StreamController._topics_warmed = False + + with patch("urllib.request.urlopen") as mock_urlopen: + StreamController.warmup_topics() + StreamController.warmup_topics() + + assert mock_urlopen.call_count >= 3 + + def test_warmup_topics_sets_flag(self): + """warmup_topics sets the warmed flag.""" + StreamController._topics_warmed = False + + with patch("urllib.request.urlopen"): + StreamController.warmup_topics() + + assert StreamController._topics_warmed is True + + def test_warmup_topics_skips_after_first(self): + """warmup_topics skips after first call.""" + StreamController._topics_warmed = True + + with patch("urllib.request.urlopen") as mock_urlopen: + StreamController.warmup_topics() + + mock_urlopen.assert_not_called() diff --git a/tests/test_display.py b/tests/test_display.py new file mode 100644 index 0000000..e2c08b4 --- /dev/null +++ b/tests/test_display.py @@ -0,0 +1,79 @@ +""" +Tests for engine.display module. +""" + +from engine.display import NullDisplay, TerminalDisplay + + +class TestDisplayProtocol: + """Test that display backends satisfy the Display protocol.""" + + def test_terminal_display_is_display(self): + """TerminalDisplay satisfies Display protocol.""" + display = TerminalDisplay() + assert hasattr(display, "init") + assert hasattr(display, "show") + assert hasattr(display, "clear") + assert hasattr(display, "cleanup") + + def test_null_display_is_display(self): + """NullDisplay satisfies Display protocol.""" + display = NullDisplay() + assert hasattr(display, "init") + assert hasattr(display, "show") + assert hasattr(display, "clear") + assert hasattr(display, "cleanup") + + +class TestTerminalDisplay: + """Tests for TerminalDisplay class.""" + + def test_init_sets_dimensions(self): + """init stores terminal dimensions.""" + display = TerminalDisplay() + display.init(80, 24) + assert display.width == 80 + assert display.height == 24 + + def test_show_returns_none(self): + """show returns None after writing to stdout.""" + display = TerminalDisplay() + display.width = 80 + display.height = 24 + display.show(["line1", "line2"]) + + def test_clear_does_not_error(self): + """clear works without error.""" + display = TerminalDisplay() + display.clear() + + def test_cleanup_does_not_error(self): + """cleanup works without error.""" + display = TerminalDisplay() + display.cleanup() + + +class TestNullDisplay: + """Tests for NullDisplay class.""" + + def test_init_stores_dimensions(self): + """init stores dimensions.""" + display = NullDisplay() + display.init(100, 50) + assert display.width == 100 + assert display.height == 50 + + def test_show_does_nothing(self): + """show discards buffer without error.""" + display = NullDisplay() + display.show(["line1", "line2", "line3"]) + + def test_clear_does_nothing(self): + """clear does nothing.""" + display = NullDisplay() + display.clear() + + def test_cleanup_does_nothing(self): + """cleanup does nothing.""" + display = NullDisplay() + display.cleanup() diff --git a/tests/test_effects.py b/tests/test_effects.py new file mode 100644 index 0000000..12d41a5 --- /dev/null +++ b/tests/test_effects.py @@ -0,0 +1,427 @@ +""" +Tests for engine.effects module. +""" + +from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry + + +class MockEffect: + name = "mock" + config = EffectConfig(enabled=True, intensity=1.0) + + def __init__(self): + self.processed = False + self.last_ctx = None + + def process(self, buf, ctx): + self.processed = True + self.last_ctx = ctx + return buf + ["processed"] + + def configure(self, config): + self.config = config + + +class TestEffectConfig: + def test_defaults(self): + cfg = EffectConfig() + assert cfg.enabled is True + assert cfg.intensity == 1.0 + assert cfg.params == {} + + def test_custom_values(self): + cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"}) + assert cfg.enabled is False + assert cfg.intensity == 0.5 + assert cfg.params == {"key": "value"} + + +class TestEffectContext: + def test_defaults(self): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + assert ctx.terminal_width == 80 + assert ctx.terminal_height == 24 + assert ctx.ticker_height == 20 + assert ctx.items == [] + + def test_with_items(self): + items = [("Title", "Source", "12:00")] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + items=items, + ) + assert ctx.items == items + + +class TestEffectRegistry: + def test_init_empty(self): + registry = EffectRegistry() + assert len(registry.list_all()) == 0 + + def test_register(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + assert "mock" in registry.list_all() + + def test_get(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + retrieved = registry.get("mock") + assert retrieved is effect + + def test_get_nonexistent(self): + registry = EffectRegistry() + assert registry.get("nonexistent") is None + + def test_enable(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = False + registry.register(effect) + registry.enable("mock") + assert effect.config.enabled is True + + def test_disable(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = True + registry.register(effect) + registry.disable("mock") + assert effect.config.enabled is False + + def test_list_enabled(self): + registry = EffectRegistry() + + class EnabledEffect: + name = "enabled_effect" + config = EffectConfig(enabled=True, intensity=1.0) + + class DisabledEffect: + name = "disabled_effect" + config = EffectConfig(enabled=False, intensity=1.0) + + registry.register(EnabledEffect()) + registry.register(DisabledEffect()) + enabled = registry.list_enabled() + assert len(enabled) == 1 + assert enabled[0].name == "enabled_effect" + + def test_configure(self): + registry = EffectRegistry() + effect = MockEffect() + registry.register(effect) + new_config = EffectConfig(enabled=False, intensity=0.3) + registry.configure("mock", new_config) + assert effect.config.enabled is False + assert effect.config.intensity == 0.3 + + def test_is_enabled(self): + registry = EffectRegistry() + effect = MockEffect() + effect.config.enabled = True + registry.register(effect) + assert registry.is_enabled("mock") is True + assert registry.is_enabled("nonexistent") is False + + +class TestEffectChain: + def test_init(self): + registry = EffectRegistry() + chain = EffectChain(registry) + assert chain.get_order() == [] + + def test_set_order(self): + registry = EffectRegistry() + effect1 = MockEffect() + effect1.name = "effect1" + effect2 = MockEffect() + effect2.name = "effect2" + registry.register(effect1) + registry.register(effect2) + chain = EffectChain(registry) + chain.set_order(["effect1", "effect2"]) + assert chain.get_order() == ["effect1", "effect2"] + + def test_add_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.add_effect("test_effect") + assert "test_effect" in chain.get_order() + + def test_add_effect_invalid(self): + registry = EffectRegistry() + chain = EffectChain(registry) + result = chain.add_effect("nonexistent") + assert result is False + + def test_remove_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + chain.remove_effect("test_effect") + assert "test_effect" not in chain.get_order() + + def test_reorder(self): + registry = EffectRegistry() + effect1 = MockEffect() + effect1.name = "effect1" + effect2 = MockEffect() + effect2.name = "effect2" + effect3 = MockEffect() + effect3.name = "effect3" + registry.register(effect1) + registry.register(effect2) + registry.register(effect3) + chain = EffectChain(registry) + chain.set_order(["effect1", "effect2", "effect3"]) + result = chain.reorder(["effect3", "effect1", "effect2"]) + assert result is True + assert chain.get_order() == ["effect3", "effect1", "effect2"] + + def test_reorder_invalid(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "effect1" + registry.register(effect) + chain = EffectChain(registry) + result = chain.reorder(["effect1", "nonexistent"]) + assert result is False + + def test_process_empty_chain(self): + registry = EffectRegistry() + chain = EffectChain(registry) + buf = ["line1", "line2"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == buf + + def test_process_with_effects(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + buf = ["line1", "line2"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == ["line1", "line2", "processed"] + assert effect.processed is True + assert effect.last_ctx is ctx + + def test_process_disabled_effect(self): + registry = EffectRegistry() + effect = MockEffect() + effect.name = "test_effect" + effect.config.enabled = False + registry.register(effect) + chain = EffectChain(registry) + chain.set_order(["test_effect"]) + buf = ["line1"] + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=0, + has_message=False, + ) + result = chain.process(buf, ctx) + assert result == ["line1"] + assert effect.processed is False + + +class TestEffectsExports: + def test_all_exports_are_importable(self): + """Verify all exports in __all__ can actually be imported.""" + import engine.effects as effects_module + + for name in effects_module.__all__: + getattr(effects_module, name) + + +class TestPerformanceMonitor: + def test_empty_stats(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + stats = monitor.get_stats() + assert "error" in stats + + def test_record_and_retrieve(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + monitor.start_frame(1) + monitor.record_effect("test_effect", 1.5, 100, 150) + monitor.end_frame(1, 2.0) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["frame_count"] == 1 + assert "test_effect" in stats["effects"] + + def test_multiple_frames(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=3) + for i in range(5): + monitor.start_frame(i) + monitor.record_effect("effect1", 1.0, 100, 100) + monitor.record_effect("effect2", 0.5, 100, 100) + monitor.end_frame(i, 1.5) + + stats = monitor.get_stats() + assert stats["frame_count"] == 3 + assert "effect1" in stats["effects"] + assert "effect2" in stats["effects"] + + def test_reset(self): + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor() + monitor.start_frame(1) + monitor.record_effect("test", 1.0, 100, 100) + monitor.end_frame(1, 1.0) + + monitor.reset() + stats = monitor.get_stats() + assert "error" in stats + + +class TestEffectPipelinePerformance: + def test_pipeline_stays_within_frame_budget(self): + """Verify effect pipeline completes within frame budget (33ms for 30fps).""" + from engine.effects import ( + EffectChain, + EffectConfig, + EffectContext, + EffectRegistry, + ) + + class DummyEffect: + name = "dummy" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf, ctx): + return [line * 2 for line in buf] + + registry = EffectRegistry() + registry.register(DummyEffect()) + + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=10) + chain = EffectChain(registry, monitor) + chain.set_order(["dummy"]) + + buf = ["x" * 80] * 20 + + for i in range(10): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=i, + has_message=False, + ) + chain.process(buf, ctx) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["pipeline"]["max_ms"] < 33.0 + + def test_individual_effects_performance(self): + """Verify individual effects don't exceed 10ms per frame.""" + from engine.effects import ( + EffectChain, + EffectConfig, + EffectContext, + EffectRegistry, + ) + + class SlowEffect: + name = "slow" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf, ctx): + result = [] + for line in buf: + result.append(line) + result.append(line + line) + return result + + registry = EffectRegistry() + registry.register(SlowEffect()) + + from engine.effects.performance import PerformanceMonitor + + monitor = PerformanceMonitor(max_frames=5) + chain = EffectChain(registry, monitor) + chain.set_order(["slow"]) + + buf = ["x" * 80] * 10 + + for i in range(5): + ctx = EffectContext( + terminal_width=80, + terminal_height=24, + scroll_cam=0, + ticker_height=20, + mic_excess=0.0, + grad_offset=0.0, + frame_number=i, + has_message=False, + ) + chain.process(buf, ctx) + + stats = monitor.get_stats() + assert "error" not in stats + assert stats["effects"]["slow"]["max_ms"] < 10.0 diff --git a/tests/test_effects_controller.py b/tests/test_effects_controller.py new file mode 100644 index 0000000..fd17fe8 --- /dev/null +++ b/tests/test_effects_controller.py @@ -0,0 +1,117 @@ +""" +Tests for engine.effects.controller module. +""" + +from unittest.mock import MagicMock, patch + +from engine.effects.controller import ( + handle_effects_command, + set_effect_chain_ref, +) + + +class TestHandleEffectsCommand: + """Tests for handle_effects_command function.""" + + def test_list_effects(self): + """list command returns formatted effects list.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_plugin.config.enabled = True + mock_plugin.config.intensity = 0.5 + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + with patch("engine.effects.controller._get_effect_chain") as mock_chain: + mock_chain.return_value.get_order.return_value = ["noise"] + + result = handle_effects_command("/effects list") + + assert "noise: ON" in result + assert "intensity=0.5" in result + + def test_enable_effect(self): + """enable command calls registry.enable.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_registry.return_value.get.return_value = mock_plugin + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + result = handle_effects_command("/effects noise on") + + assert "Enabled: noise" in result + mock_registry.return_value.enable.assert_called_once_with("noise") + + def test_disable_effect(self): + """disable command calls registry.disable.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_registry.return_value.get.return_value = mock_plugin + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + result = handle_effects_command("/effects noise off") + + assert "Disabled: noise" in result + mock_registry.return_value.disable.assert_called_once_with("noise") + + def test_set_intensity(self): + """intensity command sets plugin intensity.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_plugin.config.intensity = 0.5 + mock_registry.return_value.get.return_value = mock_plugin + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + result = handle_effects_command("/effects noise intensity 0.8") + + assert "intensity to 0.8" in result + assert mock_plugin.config.intensity == 0.8 + + def test_invalid_intensity_range(self): + """intensity outside 0.0-1.0 returns error.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_plugin = MagicMock() + mock_registry.return_value.get.return_value = mock_plugin + mock_registry.return_value.list_all.return_value = {"noise": mock_plugin} + + result = handle_effects_command("/effects noise intensity 1.5") + + assert "between 0.0 and 1.0" in result + + def test_reorder_pipeline(self): + """reorder command calls chain.reorder.""" + with patch("engine.effects.controller.get_registry") as mock_registry: + mock_registry.return_value.list_all.return_value = {} + + with patch("engine.effects.controller._get_effect_chain") as mock_chain: + mock_chain_instance = MagicMock() + mock_chain_instance.reorder.return_value = True + mock_chain.return_value = mock_chain_instance + + result = handle_effects_command("/effects reorder noise,fade") + + assert "Reordered pipeline" in result + mock_chain_instance.reorder.assert_called_once_with(["noise", "fade"]) + + def test_unknown_command(self): + """unknown command returns error.""" + result = handle_effects_command("/unknown") + assert "Unknown command" in result + + def test_non_effects_command(self): + """non-effects command returns error.""" + result = handle_effects_command("not a command") + assert "Unknown command" in result + + +class TestSetEffectChainRef: + """Tests for set_effect_chain_ref function.""" + + def test_sets_global_ref(self): + """set_effect_chain_ref updates global reference.""" + mock_chain = MagicMock() + set_effect_chain_ref(mock_chain) + + from engine.effects.controller import _get_effect_chain + + result = _get_effect_chain() + assert result == mock_chain