forked from genewildish/Mainline
Compare commits
3 Commits
f91082186c
...
effects_pl
| Author | SHA1 | Date | |
|---|---|---|---|
| 4228400c43 | |||
| 05cc475858 | |||
| cfd7e8931e |
250
cmdline.py
Normal file
250
cmdline.py
Normal file
@@ -0,0 +1,250 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Command-line utility for interacting with mainline via ntfy.
|
||||
|
||||
Usage:
|
||||
python cmdline.py # Interactive TUI mode
|
||||
python cmdline.py --help # Show help
|
||||
python cmdline.py /effects list # Send single command via ntfy
|
||||
python cmdline.py /effects stats # Get performance stats via ntfy
|
||||
python cmdline.py -w /effects stats # Watch mode (polls for stats)
|
||||
|
||||
The TUI mode provides:
|
||||
- Arrow keys to navigate command history
|
||||
- Tab completion for commands
|
||||
- Auto-refresh for performance stats
|
||||
|
||||
C&C works like a serial port:
|
||||
1. Send command to ntfy_cc_topic
|
||||
2. Mainline receives, processes, responds to same topic
|
||||
3. Cmdline polls for response
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
import threading
|
||||
import urllib.request
|
||||
from pathlib import Path
|
||||
|
||||
from engine import config
|
||||
from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST
|
||||
|
||||
try:
|
||||
CC_CMD_TOPIC = config.NTFY_CC_CMD_TOPIC
|
||||
CC_RESP_TOPIC = config.NTFY_CC_RESP_TOPIC
|
||||
except AttributeError:
|
||||
CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
|
||||
CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
|
||||
|
||||
|
||||
class NtfyResponsePoller:
|
||||
"""Polls ntfy for command responses."""
|
||||
|
||||
def __init__(self, cmd_topic: str, resp_topic: str, timeout: float = 10.0):
|
||||
self.cmd_topic = cmd_topic
|
||||
self.resp_topic = resp_topic
|
||||
self.timeout = timeout
|
||||
self._last_id = None
|
||||
self._lock = threading.Lock()
|
||||
|
||||
def _build_url(self) -> str:
|
||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||
|
||||
parsed = urlparse(self.resp_topic)
|
||||
params = parse_qs(parsed.query, keep_blank_values=True)
|
||||
params["since"] = [self._last_id if self._last_id else "20s"]
|
||||
new_query = urlencode({k: v[0] for k, v in params.items()})
|
||||
return urlunparse(parsed._replace(query=new_query))
|
||||
|
||||
def send_and_wait(self, cmd: str) -> str:
|
||||
"""Send command and wait for response."""
|
||||
url = self.cmd_topic.replace("/json", "")
|
||||
data = cmd.encode("utf-8")
|
||||
|
||||
req = urllib.request.Request(
|
||||
url,
|
||||
data=data,
|
||||
headers={
|
||||
"User-Agent": "mainline-cmdline/0.1",
|
||||
"Content-Type": "text/plain",
|
||||
},
|
||||
method="POST",
|
||||
)
|
||||
|
||||
try:
|
||||
urllib.request.urlopen(req, timeout=5)
|
||||
except Exception as e:
|
||||
return f"Error sending command: {e}"
|
||||
|
||||
return self._wait_for_response(cmd)
|
||||
|
||||
def _wait_for_response(self, expected_cmd: str = "") -> str:
|
||||
"""Poll for response message."""
|
||||
start = time.time()
|
||||
while time.time() - start < self.timeout:
|
||||
try:
|
||||
url = self._build_url()
|
||||
req = urllib.request.Request(
|
||||
url, headers={"User-Agent": "mainline-cmdline/0.1"}
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||
for line in resp:
|
||||
try:
|
||||
data = json.loads(line.decode("utf-8", errors="replace"))
|
||||
except json.JSONDecodeError:
|
||||
continue
|
||||
if data.get("event") == "message":
|
||||
self._last_id = data.get("id")
|
||||
msg = data.get("message", "")
|
||||
if msg:
|
||||
return msg
|
||||
except Exception:
|
||||
pass
|
||||
time.sleep(0.5)
|
||||
return "Timeout waiting for response"
|
||||
|
||||
|
||||
AVAILABLE_COMMANDS = """Available commands:
|
||||
/effects list - List all effects and status
|
||||
/effects <name> on - Enable an effect
|
||||
/effects <name> off - Disable an effect
|
||||
/effects <name> intensity <0.0-1.0> - Set effect intensity
|
||||
/effects reorder <name1>,<name2>,... - Reorder pipeline
|
||||
/effects stats - Show performance statistics
|
||||
/help - Show this help
|
||||
/quit - Exit
|
||||
"""
|
||||
|
||||
|
||||
def print_header():
|
||||
w = 60
|
||||
print(CLR, end="")
|
||||
print(CURSOR_OFF, end="")
|
||||
print(f"\033[1;1H", end="")
|
||||
print(f" \033[1;38;5;231m╔{'═' * (w - 6)}╗\033[0m")
|
||||
print(
|
||||
f" \033[1;38;5;231m║\033[0m \033[1;38;5;82mMAINLINE\033[0m \033[3;38;5;245mCommand Center\033[0m \033[1;38;5;231m ║\033[0m"
|
||||
)
|
||||
print(f" \033[1;38;5;231m╚{'═' * (w - 6)}╝\033[0m")
|
||||
print(f" \033[2;38;5;37mCMD: {CC_CMD_TOPIC.split('/')[-2]}\033[0m")
|
||||
print(f" \033[2;38;5;37mRESP: {CC_RESP_TOPIC.split('/')[-2]}\033[0m")
|
||||
print()
|
||||
|
||||
|
||||
def print_response(response: str, is_error: bool = False) -> None:
|
||||
"""Print response with nice formatting."""
|
||||
print()
|
||||
if is_error:
|
||||
print(f" \033[1;38;5;196m✗ Error\033[0m")
|
||||
print(f" \033[38;5;196m{'─' * 40}\033[0m")
|
||||
else:
|
||||
print(f" \033[1;38;5;82m✓ Response\033[0m")
|
||||
print(f" \033[38;5;37m{'─' * 40}\033[0m")
|
||||
|
||||
for line in response.split("\n"):
|
||||
print(f" {line}")
|
||||
print()
|
||||
|
||||
|
||||
def interactive_mode():
|
||||
"""Interactive TUI for sending commands."""
|
||||
import readline
|
||||
|
||||
print_header()
|
||||
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
|
||||
|
||||
print(f" \033[38;5;245mType /help for commands, /quit to exit\033[0m")
|
||||
print()
|
||||
|
||||
while True:
|
||||
try:
|
||||
cmd = input(f" \033[1;38;5;82m❯\033[0m {G_HI}").strip()
|
||||
except (EOFError, KeyboardInterrupt):
|
||||
print()
|
||||
break
|
||||
|
||||
if not cmd:
|
||||
continue
|
||||
|
||||
if cmd.startswith("/"):
|
||||
if cmd == "/quit" or cmd == "/exit":
|
||||
print(f"\n \033[1;38;5;245mGoodbye!{RST}\n")
|
||||
break
|
||||
|
||||
if cmd == "/help":
|
||||
print(f"\n{AVAILABLE_COMMANDS}\n")
|
||||
continue
|
||||
|
||||
print(f" \033[38;5;245m⟳ Sending to mainline...{RST}")
|
||||
result = poller.send_and_wait(cmd)
|
||||
print_response(result, is_error=result.startswith("Error"))
|
||||
else:
|
||||
print(f"\n \033[1;38;5;196m⚠ Commands must start with /{RST}\n")
|
||||
|
||||
print(CURSOR_ON, end="")
|
||||
return 0
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Mainline command-line interface",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||
epilog=AVAILABLE_COMMANDS,
|
||||
)
|
||||
parser.add_argument(
|
||||
"command",
|
||||
nargs="?",
|
||||
default=None,
|
||||
help="Command to send (e.g., /effects list)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--watch",
|
||||
"-w",
|
||||
action="store_true",
|
||||
help="Watch mode: continuously poll for stats (Ctrl+C to exit)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command is None:
|
||||
return interactive_mode()
|
||||
|
||||
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
|
||||
|
||||
if args.watch and "/effects stats" in args.command:
|
||||
import signal
|
||||
|
||||
def handle_sigterm(*_):
|
||||
print(f"\n \033[1;38;5;245mStopped watching{RST}")
|
||||
print(CURSOR_ON, end="")
|
||||
sys.exit(0)
|
||||
|
||||
signal.signal(signal.SIGTERM, handle_sigterm)
|
||||
|
||||
print_header()
|
||||
print(f" \033[38;5;245mWatching /effects stats (Ctrl+C to exit)...{RST}\n")
|
||||
try:
|
||||
while True:
|
||||
result = poller.send_and_wait(args.command)
|
||||
print(f"\033[2J\033[1;1H", end="")
|
||||
print(
|
||||
f" \033[1;38;5;82m❯\033[0m Performance Stats - \033[1;38;5;245m{time.strftime('%H:%M:%S')}{RST}"
|
||||
)
|
||||
print(f" \033[38;5;37m{'─' * 44}{RST}")
|
||||
for line in result.split("\n"):
|
||||
print(f" {line}")
|
||||
time.sleep(2)
|
||||
except KeyboardInterrupt:
|
||||
print(f"\n \033[1;38;5;245mStopped watching{RST}")
|
||||
return 0
|
||||
return 0
|
||||
|
||||
result = poller.send_and_wait(args.command)
|
||||
print(result)
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
35
effects_plugins/__init__.py
Normal file
35
effects_plugins/__init__.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from pathlib import Path
|
||||
|
||||
PLUGIN_DIR = Path(__file__).parent
|
||||
|
||||
|
||||
def discover_plugins():
|
||||
from engine.effects.registry import get_registry
|
||||
|
||||
registry = get_registry()
|
||||
imported = {}
|
||||
|
||||
for file_path in PLUGIN_DIR.glob("*.py"):
|
||||
if file_path.name.startswith("_"):
|
||||
continue
|
||||
module_name = file_path.stem
|
||||
if module_name in ("base", "types"):
|
||||
continue
|
||||
|
||||
try:
|
||||
module = __import__(f"effects_plugins.{module_name}", fromlist=[""])
|
||||
for attr_name in dir(module):
|
||||
attr = getattr(module, attr_name)
|
||||
if (
|
||||
isinstance(attr, type)
|
||||
and hasattr(attr, "name")
|
||||
and hasattr(attr, "process")
|
||||
and attr_name.endswith("Effect")
|
||||
):
|
||||
plugin = attr()
|
||||
registry.register(plugin)
|
||||
imported[plugin.name] = plugin
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return imported
|
||||
58
effects_plugins/fade.py
Normal file
58
effects_plugins/fade.py
Normal file
@@ -0,0 +1,58 @@
|
||||
import random
|
||||
|
||||
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
|
||||
|
||||
class FadeEffect:
|
||||
name = "fade"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not ctx.ticker_height:
|
||||
return buf
|
||||
result = list(buf)
|
||||
intensity = self.config.intensity
|
||||
|
||||
top_zone = max(1, int(ctx.ticker_height * 0.25))
|
||||
bot_zone = max(1, int(ctx.ticker_height * 0.10))
|
||||
|
||||
for r in range(len(result)):
|
||||
if r >= ctx.ticker_height:
|
||||
continue
|
||||
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
|
||||
bot_f = (
|
||||
min(1.0, (ctx.ticker_height - 1 - r) / bot_zone)
|
||||
if bot_zone > 0
|
||||
else 1.0
|
||||
)
|
||||
row_fade = min(top_f, bot_f) * intensity
|
||||
|
||||
if row_fade < 1.0 and result[r].strip():
|
||||
result[r] = self._fade_line(result[r], row_fade)
|
||||
|
||||
return result
|
||||
|
||||
def _fade_line(self, s: str, fade: float) -> str:
|
||||
if fade >= 1.0:
|
||||
return s
|
||||
if fade <= 0.0:
|
||||
return ""
|
||||
result = []
|
||||
i = 0
|
||||
while i < len(s):
|
||||
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
|
||||
j = i + 2
|
||||
while j < len(s) and not s[j].isalpha():
|
||||
j += 1
|
||||
result.append(s[i : j + 1])
|
||||
i = j + 1
|
||||
elif s[i] == " ":
|
||||
result.append(" ")
|
||||
i += 1
|
||||
else:
|
||||
result.append(s[i] if random.random() < fade else " ")
|
||||
i += 1
|
||||
return "".join(result)
|
||||
|
||||
def configure(self, cfg: EffectConfig) -> None:
|
||||
self.config = cfg
|
||||
72
effects_plugins/firehose.py
Normal file
72
effects_plugins/firehose.py
Normal file
@@ -0,0 +1,72 @@
|
||||
import random
|
||||
from datetime import datetime
|
||||
|
||||
from engine import config
|
||||
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
from engine.sources import FEEDS, POETRY_SOURCES
|
||||
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
|
||||
|
||||
|
||||
class FirehoseEffect:
|
||||
name = "firehose"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0
|
||||
if firehose_h <= 0 or not ctx.items:
|
||||
return buf
|
||||
|
||||
result = list(buf)
|
||||
intensity = self.config.intensity
|
||||
h = ctx.terminal_height
|
||||
|
||||
for fr in range(firehose_h):
|
||||
scr_row = h - firehose_h + fr + 1
|
||||
fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity)
|
||||
result.append(f"\033[{scr_row};1H{fline}\033[K")
|
||||
return result
|
||||
|
||||
def _firehose_line(self, items: list, w: int, intensity: float) -> str:
|
||||
r = random.random()
|
||||
if r < 0.35 * intensity:
|
||||
title, src, ts = random.choice(items)
|
||||
text = title[: w - 1]
|
||||
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
|
||||
return f"{color}{text}{RST}"
|
||||
elif r < 0.55 * intensity:
|
||||
d = random.choice([0.45, 0.55, 0.65, 0.75])
|
||||
return "".join(
|
||||
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
|
||||
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
|
||||
if random.random() < d
|
||||
else " "
|
||||
for _ in range(w)
|
||||
)
|
||||
elif r < 0.78 * intensity:
|
||||
sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
|
||||
src = random.choice(list(sources.keys()))
|
||||
msgs = [
|
||||
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
|
||||
f" ░░ FEED ACTIVE :: {src}",
|
||||
f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}",
|
||||
f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}",
|
||||
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
|
||||
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
|
||||
]
|
||||
text = random.choice(msgs)[: w - 1]
|
||||
color = random.choice([G_LO, G_DIM, W_GHOST])
|
||||
return f"{color}{text}{RST}"
|
||||
else:
|
||||
title, _, _ = random.choice(items)
|
||||
start = random.randint(0, max(0, len(title) - 20))
|
||||
frag = title[start : start + random.randint(10, 35)]
|
||||
pad = random.randint(0, max(0, w - len(frag) - 8))
|
||||
gp = "".join(
|
||||
random.choice(config.GLITCH) for _ in range(random.randint(1, 3))
|
||||
)
|
||||
text = (" " * pad + gp + " " + frag)[: w - 1]
|
||||
color = random.choice([G_LO, C_DIM, W_GHOST])
|
||||
return f"{color}{text}{RST}"
|
||||
|
||||
def configure(self, cfg: EffectConfig) -> None:
|
||||
self.config = cfg
|
||||
37
effects_plugins/glitch.py
Normal file
37
effects_plugins/glitch.py
Normal file
@@ -0,0 +1,37 @@
|
||||
import random
|
||||
|
||||
from engine import config
|
||||
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST
|
||||
|
||||
|
||||
class GlitchEffect:
|
||||
name = "glitch"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not buf:
|
||||
return buf
|
||||
result = list(buf)
|
||||
intensity = self.config.intensity
|
||||
|
||||
glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16)
|
||||
glitch_prob = glitch_prob * intensity
|
||||
n_hits = 4 + int(ctx.mic_excess / 2)
|
||||
n_hits = int(n_hits * intensity)
|
||||
|
||||
if random.random() < glitch_prob:
|
||||
for _ in range(min(n_hits, len(result))):
|
||||
gi = random.randint(0, len(result) - 1)
|
||||
scr_row = gi + 1
|
||||
result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}"
|
||||
return result
|
||||
|
||||
def _glitch_bar(self, w: int) -> str:
|
||||
c = random.choice(["░", "▒", "─", "\xc2"])
|
||||
n = random.randint(3, w // 2)
|
||||
o = random.randint(0, w - n)
|
||||
return " " * o + f"{G_LO}{DIM}" + c * n + RST
|
||||
|
||||
def configure(self, cfg: EffectConfig) -> None:
|
||||
self.config = cfg
|
||||
36
effects_plugins/noise.py
Normal file
36
effects_plugins/noise.py
Normal file
@@ -0,0 +1,36 @@
|
||||
import random
|
||||
|
||||
from engine import config
|
||||
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
|
||||
|
||||
|
||||
class NoiseEffect:
|
||||
name = "noise"
|
||||
config = EffectConfig(enabled=True, intensity=0.15)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not ctx.ticker_height:
|
||||
return buf
|
||||
result = list(buf)
|
||||
intensity = self.config.intensity
|
||||
probability = intensity * 0.15
|
||||
|
||||
for r in range(len(result)):
|
||||
cy = ctx.scroll_cam + r
|
||||
if random.random() < probability:
|
||||
result[r] = self._generate_noise(ctx.terminal_width, cy)
|
||||
return result
|
||||
|
||||
def _generate_noise(self, w: int, cy: int) -> str:
|
||||
d = random.choice([0.15, 0.25, 0.35, 0.12])
|
||||
return "".join(
|
||||
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
|
||||
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
|
||||
if random.random() < d
|
||||
else " "
|
||||
for _ in range(w)
|
||||
)
|
||||
|
||||
def configure(self, cfg: EffectConfig) -> None:
|
||||
self.config = cfg
|
||||
102
engine/display.py
Normal file
102
engine/display.py
Normal file
@@ -0,0 +1,102 @@
|
||||
"""
|
||||
Display output abstraction - allows swapping output backends.
|
||||
|
||||
Protocol:
|
||||
- init(width, height): Initialize display with terminal dimensions
|
||||
- show(buffer): Render buffer (list of strings) to display
|
||||
- clear(): Clear the display
|
||||
- cleanup(): Shutdown display
|
||||
"""
|
||||
|
||||
import time
|
||||
from typing import Protocol
|
||||
|
||||
|
||||
class Display(Protocol):
|
||||
"""Protocol for display backends."""
|
||||
|
||||
def init(self, width: int, height: int) -> None:
|
||||
"""Initialize display with dimensions."""
|
||||
...
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
"""Show buffer on display."""
|
||||
...
|
||||
|
||||
def clear(self) -> None:
|
||||
"""Clear display."""
|
||||
...
|
||||
|
||||
def cleanup(self) -> None:
|
||||
"""Shutdown display."""
|
||||
...
|
||||
|
||||
|
||||
def get_monitor():
|
||||
"""Get the performance monitor."""
|
||||
try:
|
||||
from engine.effects.performance import get_monitor as _get_monitor
|
||||
|
||||
return _get_monitor()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class TerminalDisplay:
|
||||
"""ANSI terminal display backend."""
|
||||
|
||||
def __init__(self):
|
||||
self.width = 80
|
||||
self.height = 24
|
||||
|
||||
def init(self, width: int, height: int) -> None:
|
||||
from engine.terminal import CURSOR_OFF
|
||||
|
||||
self.width = width
|
||||
self.height = height
|
||||
print(CURSOR_OFF, end="", flush=True)
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
import sys
|
||||
|
||||
t0 = time.perf_counter()
|
||||
sys.stdout.buffer.write("".join(buffer).encode())
|
||||
sys.stdout.flush()
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
chars_in = sum(len(line) for line in buffer)
|
||||
monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in)
|
||||
|
||||
def clear(self) -> None:
|
||||
from engine.terminal import CLR
|
||||
|
||||
print(CLR, end="", flush=True)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
from engine.terminal import CURSOR_ON
|
||||
|
||||
print(CURSOR_ON, end="", flush=True)
|
||||
|
||||
|
||||
class NullDisplay:
|
||||
"""Headless/null display - discards all output."""
|
||||
|
||||
def init(self, width: int, height: int) -> None:
|
||||
self.width = width
|
||||
self.height = height
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
t0 = time.perf_counter()
|
||||
chars_in = sum(len(line) for line in buffer)
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in)
|
||||
|
||||
def clear(self) -> None:
|
||||
pass
|
||||
|
||||
def cleanup(self) -> None:
|
||||
pass
|
||||
42
engine/effects/__init__.py
Normal file
42
engine/effects/__init__.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from engine.effects.chain import EffectChain
|
||||
from engine.effects.controller import handle_effects_command, show_effects_menu
|
||||
from engine.effects.legacy import (
|
||||
fade_line,
|
||||
firehose_line,
|
||||
glitch_bar,
|
||||
next_headline,
|
||||
noise,
|
||||
vis_trunc,
|
||||
)
|
||||
from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor
|
||||
from engine.effects.registry import EffectRegistry, get_registry, set_registry
|
||||
from engine.effects.types import EffectConfig, EffectContext, PipelineConfig
|
||||
|
||||
|
||||
def get_effect_chain():
|
||||
from engine.layers import get_effect_chain as _chain
|
||||
|
||||
return _chain()
|
||||
|
||||
|
||||
__all__ = [
|
||||
"EffectChain",
|
||||
"EffectRegistry",
|
||||
"EffectConfig",
|
||||
"EffectContext",
|
||||
"PipelineConfig",
|
||||
"get_registry",
|
||||
"set_registry",
|
||||
"get_effect_chain",
|
||||
"get_monitor",
|
||||
"set_monitor",
|
||||
"PerformanceMonitor",
|
||||
"handle_effects_command",
|
||||
"show_effects_menu",
|
||||
"fade_line",
|
||||
"firehose_line",
|
||||
"glitch_bar",
|
||||
"noise",
|
||||
"next_headline",
|
||||
"vis_trunc",
|
||||
]
|
||||
71
engine/effects/chain.py
Normal file
71
engine/effects/chain.py
Normal file
@@ -0,0 +1,71 @@
|
||||
import time
|
||||
|
||||
from engine.effects.performance import PerformanceMonitor, get_monitor
|
||||
from engine.effects.registry import EffectRegistry
|
||||
from engine.effects.types import EffectContext
|
||||
|
||||
|
||||
class EffectChain:
|
||||
def __init__(
|
||||
self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None
|
||||
):
|
||||
self._registry = registry
|
||||
self._order: list[str] = []
|
||||
self._monitor = monitor
|
||||
|
||||
def _get_monitor(self) -> PerformanceMonitor:
|
||||
if self._monitor is not None:
|
||||
return self._monitor
|
||||
return get_monitor()
|
||||
|
||||
def set_order(self, names: list[str]) -> None:
|
||||
self._order = list(names)
|
||||
|
||||
def get_order(self) -> list[str]:
|
||||
return self._order.copy()
|
||||
|
||||
def add_effect(self, name: str, position: int | None = None) -> bool:
|
||||
if name not in self._registry.list_all():
|
||||
return False
|
||||
if position is None:
|
||||
self._order.append(name)
|
||||
else:
|
||||
self._order.insert(position, name)
|
||||
return True
|
||||
|
||||
def remove_effect(self, name: str) -> bool:
|
||||
if name in self._order:
|
||||
self._order.remove(name)
|
||||
return True
|
||||
return False
|
||||
|
||||
def reorder(self, new_order: list[str]) -> bool:
|
||||
all_plugins = set(self._registry.list_all().keys())
|
||||
if not all(name in all_plugins for name in new_order):
|
||||
return False
|
||||
self._order = list(new_order)
|
||||
return True
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
monitor = self._get_monitor()
|
||||
frame_number = ctx.frame_number
|
||||
monitor.start_frame(frame_number)
|
||||
|
||||
frame_start = time.perf_counter()
|
||||
result = list(buf)
|
||||
for name in self._order:
|
||||
plugin = self._registry.get(name)
|
||||
if plugin and plugin.config.enabled:
|
||||
chars_in = sum(len(line) for line in result)
|
||||
effect_start = time.perf_counter()
|
||||
try:
|
||||
result = plugin.process(result, ctx)
|
||||
except Exception:
|
||||
plugin.config.enabled = False
|
||||
elapsed = time.perf_counter() - effect_start
|
||||
chars_out = sum(len(line) for line in result)
|
||||
monitor.record_effect(name, elapsed * 1000, chars_in, chars_out)
|
||||
|
||||
total_elapsed = time.perf_counter() - frame_start
|
||||
monitor.end_frame(frame_number, total_elapsed * 1000)
|
||||
return result
|
||||
144
engine/effects/controller.py
Normal file
144
engine/effects/controller.py
Normal file
@@ -0,0 +1,144 @@
|
||||
from engine.effects.performance import get_monitor
|
||||
from engine.effects.registry import get_registry
|
||||
|
||||
_effect_chain_ref = None
|
||||
|
||||
|
||||
def _get_effect_chain():
|
||||
global _effect_chain_ref
|
||||
if _effect_chain_ref is not None:
|
||||
return _effect_chain_ref
|
||||
try:
|
||||
from engine.layers import get_effect_chain as _chain
|
||||
|
||||
return _chain()
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def set_effect_chain_ref(chain) -> None:
|
||||
global _effect_chain_ref
|
||||
_effect_chain_ref = chain
|
||||
|
||||
|
||||
def handle_effects_command(cmd: str) -> str:
|
||||
"""Handle /effects command from NTFY message.
|
||||
|
||||
Commands:
|
||||
/effects list - list all effects and their status
|
||||
/effects <name> on - enable an effect
|
||||
/effects <name> off - disable an effect
|
||||
/effects <name> intensity <0.0-1.0> - set intensity
|
||||
/effects reorder <name1>,<name2>,... - reorder pipeline
|
||||
/effects stats - show performance statistics
|
||||
"""
|
||||
parts = cmd.strip().split()
|
||||
if not parts or parts[0] != "/effects":
|
||||
return "Unknown command"
|
||||
|
||||
registry = get_registry()
|
||||
chain = _get_effect_chain()
|
||||
|
||||
if len(parts) == 1 or parts[1] == "list":
|
||||
result = ["Effects:"]
|
||||
for name, plugin in registry.list_all().items():
|
||||
status = "ON" if plugin.config.enabled else "OFF"
|
||||
intensity = plugin.config.intensity
|
||||
result.append(f" {name}: {status} (intensity={intensity})")
|
||||
if chain:
|
||||
result.append(f"Order: {chain.get_order()}")
|
||||
return "\n".join(result)
|
||||
|
||||
if parts[1] == "stats":
|
||||
return _format_stats()
|
||||
|
||||
if parts[1] == "reorder" and len(parts) >= 3:
|
||||
new_order = parts[2].split(",")
|
||||
if chain and chain.reorder(new_order):
|
||||
return f"Reordered pipeline: {new_order}"
|
||||
return "Failed to reorder pipeline"
|
||||
|
||||
if len(parts) < 3:
|
||||
return "Usage: /effects <name> on|off|intensity <value>"
|
||||
|
||||
effect_name = parts[1]
|
||||
action = parts[2]
|
||||
|
||||
if effect_name not in registry.list_all():
|
||||
return f"Unknown effect: {effect_name}"
|
||||
|
||||
if action == "on":
|
||||
registry.enable(effect_name)
|
||||
return f"Enabled: {effect_name}"
|
||||
|
||||
if action == "off":
|
||||
registry.disable(effect_name)
|
||||
return f"Disabled: {effect_name}"
|
||||
|
||||
if action == "intensity" and len(parts) >= 4:
|
||||
try:
|
||||
value = float(parts[3])
|
||||
if not 0.0 <= value <= 1.0:
|
||||
return "Intensity must be between 0.0 and 1.0"
|
||||
plugin = registry.get(effect_name)
|
||||
if plugin:
|
||||
plugin.config.intensity = value
|
||||
return f"Set {effect_name} intensity to {value}"
|
||||
except ValueError:
|
||||
return "Invalid intensity value"
|
||||
|
||||
return f"Unknown action: {action}"
|
||||
|
||||
|
||||
def _format_stats() -> str:
|
||||
monitor = get_monitor()
|
||||
stats = monitor.get_stats()
|
||||
|
||||
if "error" in stats:
|
||||
return stats["error"]
|
||||
|
||||
lines = ["Performance Stats:"]
|
||||
|
||||
pipeline = stats["pipeline"]
|
||||
lines.append(
|
||||
f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)"
|
||||
)
|
||||
|
||||
if stats["effects"]:
|
||||
lines.append(" Per-effect (avg ms):")
|
||||
for name, effect_stats in stats["effects"].items():
|
||||
lines.append(
|
||||
f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms"
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def show_effects_menu() -> str:
|
||||
"""Generate effects menu text for display."""
|
||||
registry = get_registry()
|
||||
chain = _get_effect_chain()
|
||||
|
||||
lines = [
|
||||
"\033[1;38;5;231m=== EFFECTS MENU ===\033[0m",
|
||||
"",
|
||||
"Effects:",
|
||||
]
|
||||
|
||||
for name, plugin in registry.list_all().items():
|
||||
status = "ON" if plugin.config.enabled else "OFF"
|
||||
intensity = plugin.config.intensity
|
||||
lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}")
|
||||
|
||||
if chain:
|
||||
lines.append("")
|
||||
lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}")
|
||||
|
||||
lines.append("")
|
||||
lines.append("Controls:")
|
||||
lines.append(" /effects <name> on|off")
|
||||
lines.append(" /effects <name> intensity <0.0-1.0>")
|
||||
lines.append(" /effects reorder name1,name2,...")
|
||||
lines.append("")
|
||||
|
||||
return "\n".join(lines)
|
||||
103
engine/effects/performance.py
Normal file
103
engine/effects/performance.py
Normal file
@@ -0,0 +1,103 @@
|
||||
from collections import deque
|
||||
from dataclasses import dataclass
|
||||
|
||||
|
||||
@dataclass
|
||||
class EffectTiming:
|
||||
name: str
|
||||
duration_ms: float
|
||||
buffer_chars_in: int
|
||||
buffer_chars_out: int
|
||||
|
||||
|
||||
@dataclass
|
||||
class FrameTiming:
|
||||
frame_number: int
|
||||
total_ms: float
|
||||
effects: list[EffectTiming]
|
||||
|
||||
|
||||
class PerformanceMonitor:
|
||||
"""Collects and stores performance metrics for effect pipeline."""
|
||||
|
||||
def __init__(self, max_frames: int = 60):
|
||||
self._max_frames = max_frames
|
||||
self._frames: deque[FrameTiming] = deque(maxlen=max_frames)
|
||||
self._current_frame: list[EffectTiming] = []
|
||||
|
||||
def start_frame(self, frame_number: int) -> None:
|
||||
self._current_frame = []
|
||||
|
||||
def record_effect(
|
||||
self, name: str, duration_ms: float, chars_in: int, chars_out: int
|
||||
) -> None:
|
||||
self._current_frame.append(
|
||||
EffectTiming(
|
||||
name=name,
|
||||
duration_ms=duration_ms,
|
||||
buffer_chars_in=chars_in,
|
||||
buffer_chars_out=chars_out,
|
||||
)
|
||||
)
|
||||
|
||||
def end_frame(self, frame_number: int, total_ms: float) -> None:
|
||||
self._frames.append(
|
||||
FrameTiming(
|
||||
frame_number=frame_number,
|
||||
total_ms=total_ms,
|
||||
effects=self._current_frame,
|
||||
)
|
||||
)
|
||||
|
||||
def get_stats(self) -> dict:
|
||||
if not self._frames:
|
||||
return {"error": "No timing data available"}
|
||||
|
||||
total_times = [f.total_ms for f in self._frames]
|
||||
avg_total = sum(total_times) / len(total_times)
|
||||
min_total = min(total_times)
|
||||
max_total = max(total_times)
|
||||
|
||||
effect_stats: dict[str, dict] = {}
|
||||
for frame in self._frames:
|
||||
for effect in frame.effects:
|
||||
if effect.name not in effect_stats:
|
||||
effect_stats[effect.name] = {"times": [], "total_chars": 0}
|
||||
effect_stats[effect.name]["times"].append(effect.duration_ms)
|
||||
effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out
|
||||
|
||||
for name, stats in effect_stats.items():
|
||||
times = stats["times"]
|
||||
stats["avg_ms"] = sum(times) / len(times)
|
||||
stats["min_ms"] = min(times)
|
||||
stats["max_ms"] = max(times)
|
||||
del stats["times"]
|
||||
|
||||
return {
|
||||
"frame_count": len(self._frames),
|
||||
"pipeline": {
|
||||
"avg_ms": avg_total,
|
||||
"min_ms": min_total,
|
||||
"max_ms": max_total,
|
||||
},
|
||||
"effects": effect_stats,
|
||||
}
|
||||
|
||||
def reset(self) -> None:
|
||||
self._frames.clear()
|
||||
self._current_frame = []
|
||||
|
||||
|
||||
_monitor: PerformanceMonitor | None = None
|
||||
|
||||
|
||||
def get_monitor() -> PerformanceMonitor:
|
||||
global _monitor
|
||||
if _monitor is None:
|
||||
_monitor = PerformanceMonitor()
|
||||
return _monitor
|
||||
|
||||
|
||||
def set_monitor(monitor: PerformanceMonitor) -> None:
|
||||
global _monitor
|
||||
_monitor = monitor
|
||||
59
engine/effects/registry.py
Normal file
59
engine/effects/registry.py
Normal file
@@ -0,0 +1,59 @@
|
||||
from engine.effects.types import EffectConfig, EffectPlugin
|
||||
|
||||
|
||||
class EffectRegistry:
|
||||
def __init__(self):
|
||||
self._plugins: dict[str, EffectPlugin] = {}
|
||||
self._discovered: bool = False
|
||||
|
||||
def register(self, plugin: EffectPlugin) -> None:
|
||||
self._plugins[plugin.name] = plugin
|
||||
|
||||
def get(self, name: str) -> EffectPlugin | None:
|
||||
return self._plugins.get(name)
|
||||
|
||||
def list_all(self) -> dict[str, EffectPlugin]:
|
||||
return self._plugins.copy()
|
||||
|
||||
def list_enabled(self) -> list[EffectPlugin]:
|
||||
return [p for p in self._plugins.values() if p.config.enabled]
|
||||
|
||||
def enable(self, name: str) -> bool:
|
||||
plugin = self._plugins.get(name)
|
||||
if plugin:
|
||||
plugin.config.enabled = True
|
||||
return True
|
||||
return False
|
||||
|
||||
def disable(self, name: str) -> bool:
|
||||
plugin = self._plugins.get(name)
|
||||
if plugin:
|
||||
plugin.config.enabled = False
|
||||
return True
|
||||
return False
|
||||
|
||||
def configure(self, name: str, config: EffectConfig) -> bool:
|
||||
plugin = self._plugins.get(name)
|
||||
if plugin:
|
||||
plugin.configure(config)
|
||||
return True
|
||||
return False
|
||||
|
||||
def is_enabled(self, name: str) -> bool:
|
||||
plugin = self._plugins.get(name)
|
||||
return plugin.config.enabled if plugin else False
|
||||
|
||||
|
||||
_registry: EffectRegistry | None = None
|
||||
|
||||
|
||||
def get_registry() -> EffectRegistry:
|
||||
global _registry
|
||||
if _registry is None:
|
||||
_registry = EffectRegistry()
|
||||
return _registry
|
||||
|
||||
|
||||
def set_registry(registry: EffectRegistry) -> None:
|
||||
global _registry
|
||||
_registry = registry
|
||||
39
engine/effects/types.py
Normal file
39
engine/effects/types.py
Normal file
@@ -0,0 +1,39 @@
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class EffectContext:
|
||||
terminal_width: int
|
||||
terminal_height: int
|
||||
scroll_cam: int
|
||||
ticker_height: int
|
||||
mic_excess: float
|
||||
grad_offset: float
|
||||
frame_number: int
|
||||
has_message: bool
|
||||
items: list = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class EffectConfig:
|
||||
enabled: bool = True
|
||||
intensity: float = 1.0
|
||||
params: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
class EffectPlugin:
|
||||
name: str
|
||||
config: EffectConfig
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
raise NotImplementedError
|
||||
|
||||
def configure(self, config: EffectConfig) -> None:
|
||||
raise NotImplementedError
|
||||
|
||||
|
||||
@dataclass
|
||||
class PipelineConfig:
|
||||
order: list[str] = field(default_factory=list)
|
||||
effects: dict[str, EffectConfig] = field(default_factory=dict)
|
||||
@@ -10,6 +10,8 @@ from datetime import datetime
|
||||
|
||||
from engine import config
|
||||
from engine.effects import (
|
||||
EffectChain,
|
||||
EffectContext,
|
||||
fade_line,
|
||||
firehose_line,
|
||||
glitch_bar,
|
||||
@@ -199,3 +201,60 @@ def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
|
||||
fline = firehose_line(items, w)
|
||||
buf.append(f"\033[{scr_row};1H{fline}\033[K")
|
||||
return buf
|
||||
|
||||
|
||||
_effect_chain = None
|
||||
|
||||
|
||||
def init_effects() -> None:
|
||||
"""Initialize effect plugins and chain."""
|
||||
global _effect_chain
|
||||
from engine.effects import EffectChain, get_registry
|
||||
|
||||
registry = get_registry()
|
||||
|
||||
import effects_plugins
|
||||
|
||||
effects_plugins.discover_plugins()
|
||||
|
||||
chain = EffectChain(registry)
|
||||
chain.set_order(["noise", "fade", "glitch", "firehose"])
|
||||
_effect_chain = chain
|
||||
|
||||
|
||||
def process_effects(
|
||||
buf: list[str],
|
||||
w: int,
|
||||
h: int,
|
||||
scroll_cam: int,
|
||||
ticker_h: int,
|
||||
mic_excess: float,
|
||||
grad_offset: float,
|
||||
frame_number: int,
|
||||
has_message: bool,
|
||||
items: list,
|
||||
) -> list[str]:
|
||||
"""Process buffer through effect chain."""
|
||||
if _effect_chain is None:
|
||||
init_effects()
|
||||
|
||||
ctx = EffectContext(
|
||||
terminal_width=w,
|
||||
terminal_height=h,
|
||||
scroll_cam=scroll_cam,
|
||||
ticker_height=ticker_h,
|
||||
mic_excess=mic_excess,
|
||||
grad_offset=grad_offset,
|
||||
frame_number=frame_number,
|
||||
has_message=has_message,
|
||||
items=items,
|
||||
)
|
||||
return _effect_chain.process(buf, ctx)
|
||||
|
||||
|
||||
def get_effect_chain() -> EffectChain | None:
|
||||
"""Get the effect chain instance."""
|
||||
global _effect_chain
|
||||
if _effect_chain is None:
|
||||
init_effects()
|
||||
return _effect_chain
|
||||
|
||||
@@ -4,33 +4,42 @@ Orchestrates viewport, frame timing, and layers.
|
||||
"""
|
||||
|
||||
import random
|
||||
import sys
|
||||
import time
|
||||
|
||||
from engine import config
|
||||
from engine.display import (
|
||||
Display,
|
||||
TerminalDisplay,
|
||||
)
|
||||
from engine.display import (
|
||||
get_monitor as _get_display_monitor,
|
||||
)
|
||||
from engine.frame import calculate_scroll_step
|
||||
from engine.layers import (
|
||||
apply_glitch,
|
||||
process_effects,
|
||||
render_firehose,
|
||||
render_message_overlay,
|
||||
render_ticker_zone,
|
||||
)
|
||||
from engine.terminal import CLR
|
||||
from engine.viewport import th, tw
|
||||
|
||||
USE_EFFECT_CHAIN = True
|
||||
|
||||
def stream(items, ntfy_poller, mic_monitor):
|
||||
|
||||
def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
|
||||
"""Main render loop with four layers: message, ticker, scroll motion, firehose."""
|
||||
if display is None:
|
||||
display = TerminalDisplay()
|
||||
random.shuffle(items)
|
||||
pool = list(items)
|
||||
seen = set()
|
||||
queued = 0
|
||||
|
||||
time.sleep(0.5)
|
||||
sys.stdout.write(CLR)
|
||||
sys.stdout.flush()
|
||||
|
||||
w, h = tw(), th()
|
||||
display.init(w, h)
|
||||
display.clear()
|
||||
fh = config.FIREHOSE_H if config.FIREHOSE else 0
|
||||
ticker_view_h = h - fh
|
||||
GAP = 3
|
||||
@@ -42,6 +51,7 @@ def stream(items, ntfy_poller, mic_monitor):
|
||||
noise_cache = {}
|
||||
scroll_motion_accum = 0.0
|
||||
msg_cache = (None, None)
|
||||
frame_number = 0
|
||||
|
||||
while True:
|
||||
if queued >= config.HEADLINE_LIMIT and not active:
|
||||
@@ -93,19 +103,39 @@ def stream(items, ntfy_poller, mic_monitor):
|
||||
buf.extend(ticker_buf)
|
||||
|
||||
mic_excess = mic_monitor.excess
|
||||
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
|
||||
render_start = time.perf_counter()
|
||||
|
||||
firehose_buf = render_firehose(items, w, fh, h)
|
||||
buf.extend(firehose_buf)
|
||||
if USE_EFFECT_CHAIN:
|
||||
buf = process_effects(
|
||||
buf,
|
||||
w,
|
||||
h,
|
||||
scroll_cam,
|
||||
ticker_h,
|
||||
mic_excess,
|
||||
grad_offset,
|
||||
frame_number,
|
||||
msg is not None,
|
||||
items,
|
||||
)
|
||||
else:
|
||||
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
|
||||
firehose_buf = render_firehose(items, w, fh, h)
|
||||
buf.extend(firehose_buf)
|
||||
|
||||
if msg_overlay:
|
||||
buf.extend(msg_overlay)
|
||||
|
||||
sys.stdout.buffer.write("".join(buf).encode())
|
||||
sys.stdout.flush()
|
||||
render_elapsed = (time.perf_counter() - render_start) * 1000
|
||||
monitor = _get_display_monitor()
|
||||
if monitor:
|
||||
chars = sum(len(line) for line in buf)
|
||||
monitor.record_effect("render", render_elapsed, chars, chars)
|
||||
|
||||
display.show(buf)
|
||||
|
||||
elapsed = time.monotonic() - t0
|
||||
time.sleep(max(0, config.FRAME_DT - elapsed))
|
||||
frame_number += 1
|
||||
|
||||
sys.stdout.write(CLR)
|
||||
sys.stdout.flush()
|
||||
display.cleanup()
|
||||
|
||||
@@ -83,3 +83,35 @@ class TestStreamControllerCleanup:
|
||||
controller.cleanup()
|
||||
|
||||
mock_mic_instance.stop.assert_called_once()
|
||||
|
||||
|
||||
class TestStreamControllerWarmup:
|
||||
"""Tests for StreamController topic warmup."""
|
||||
|
||||
def test_warmup_topics_idempotent(self):
|
||||
"""warmup_topics can be called multiple times."""
|
||||
StreamController._topics_warmed = False
|
||||
|
||||
with patch("urllib.request.urlopen") as mock_urlopen:
|
||||
StreamController.warmup_topics()
|
||||
StreamController.warmup_topics()
|
||||
|
||||
assert mock_urlopen.call_count >= 3
|
||||
|
||||
def test_warmup_topics_sets_flag(self):
|
||||
"""warmup_topics sets the warmed flag."""
|
||||
StreamController._topics_warmed = False
|
||||
|
||||
with patch("urllib.request.urlopen"):
|
||||
StreamController.warmup_topics()
|
||||
|
||||
assert StreamController._topics_warmed is True
|
||||
|
||||
def test_warmup_topics_skips_after_first(self):
|
||||
"""warmup_topics skips after first call."""
|
||||
StreamController._topics_warmed = True
|
||||
|
||||
with patch("urllib.request.urlopen") as mock_urlopen:
|
||||
StreamController.warmup_topics()
|
||||
|
||||
mock_urlopen.assert_not_called()
|
||||
|
||||
79
tests/test_display.py
Normal file
79
tests/test_display.py
Normal file
@@ -0,0 +1,79 @@
|
||||
"""
|
||||
Tests for engine.display module.
|
||||
"""
|
||||
|
||||
from engine.display import NullDisplay, TerminalDisplay
|
||||
|
||||
|
||||
class TestDisplayProtocol:
|
||||
"""Test that display backends satisfy the Display protocol."""
|
||||
|
||||
def test_terminal_display_is_display(self):
|
||||
"""TerminalDisplay satisfies Display protocol."""
|
||||
display = TerminalDisplay()
|
||||
assert hasattr(display, "init")
|
||||
assert hasattr(display, "show")
|
||||
assert hasattr(display, "clear")
|
||||
assert hasattr(display, "cleanup")
|
||||
|
||||
def test_null_display_is_display(self):
|
||||
"""NullDisplay satisfies Display protocol."""
|
||||
display = NullDisplay()
|
||||
assert hasattr(display, "init")
|
||||
assert hasattr(display, "show")
|
||||
assert hasattr(display, "clear")
|
||||
assert hasattr(display, "cleanup")
|
||||
|
||||
|
||||
class TestTerminalDisplay:
|
||||
"""Tests for TerminalDisplay class."""
|
||||
|
||||
def test_init_sets_dimensions(self):
|
||||
"""init stores terminal dimensions."""
|
||||
display = TerminalDisplay()
|
||||
display.init(80, 24)
|
||||
assert display.width == 80
|
||||
assert display.height == 24
|
||||
|
||||
def test_show_returns_none(self):
|
||||
"""show returns None after writing to stdout."""
|
||||
display = TerminalDisplay()
|
||||
display.width = 80
|
||||
display.height = 24
|
||||
display.show(["line1", "line2"])
|
||||
|
||||
def test_clear_does_not_error(self):
|
||||
"""clear works without error."""
|
||||
display = TerminalDisplay()
|
||||
display.clear()
|
||||
|
||||
def test_cleanup_does_not_error(self):
|
||||
"""cleanup works without error."""
|
||||
display = TerminalDisplay()
|
||||
display.cleanup()
|
||||
|
||||
|
||||
class TestNullDisplay:
|
||||
"""Tests for NullDisplay class."""
|
||||
|
||||
def test_init_stores_dimensions(self):
|
||||
"""init stores dimensions."""
|
||||
display = NullDisplay()
|
||||
display.init(100, 50)
|
||||
assert display.width == 100
|
||||
assert display.height == 50
|
||||
|
||||
def test_show_does_nothing(self):
|
||||
"""show discards buffer without error."""
|
||||
display = NullDisplay()
|
||||
display.show(["line1", "line2", "line3"])
|
||||
|
||||
def test_clear_does_nothing(self):
|
||||
"""clear does nothing."""
|
||||
display = NullDisplay()
|
||||
display.clear()
|
||||
|
||||
def test_cleanup_does_nothing(self):
|
||||
"""cleanup does nothing."""
|
||||
display = NullDisplay()
|
||||
display.cleanup()
|
||||
427
tests/test_effects.py
Normal file
427
tests/test_effects.py
Normal file
@@ -0,0 +1,427 @@
|
||||
"""
|
||||
Tests for engine.effects module.
|
||||
"""
|
||||
|
||||
from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry
|
||||
|
||||
|
||||
class MockEffect:
|
||||
name = "mock"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
def __init__(self):
|
||||
self.processed = False
|
||||
self.last_ctx = None
|
||||
|
||||
def process(self, buf, ctx):
|
||||
self.processed = True
|
||||
self.last_ctx = ctx
|
||||
return buf + ["processed"]
|
||||
|
||||
def configure(self, config):
|
||||
self.config = config
|
||||
|
||||
|
||||
class TestEffectConfig:
|
||||
def test_defaults(self):
|
||||
cfg = EffectConfig()
|
||||
assert cfg.enabled is True
|
||||
assert cfg.intensity == 1.0
|
||||
assert cfg.params == {}
|
||||
|
||||
def test_custom_values(self):
|
||||
cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"})
|
||||
assert cfg.enabled is False
|
||||
assert cfg.intensity == 0.5
|
||||
assert cfg.params == {"key": "value"}
|
||||
|
||||
|
||||
class TestEffectContext:
|
||||
def test_defaults(self):
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=20,
|
||||
mic_excess=0.0,
|
||||
grad_offset=0.0,
|
||||
frame_number=0,
|
||||
has_message=False,
|
||||
)
|
||||
assert ctx.terminal_width == 80
|
||||
assert ctx.terminal_height == 24
|
||||
assert ctx.ticker_height == 20
|
||||
assert ctx.items == []
|
||||
|
||||
def test_with_items(self):
|
||||
items = [("Title", "Source", "12:00")]
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=20,
|
||||
mic_excess=0.0,
|
||||
grad_offset=0.0,
|
||||
frame_number=0,
|
||||
has_message=False,
|
||||
items=items,
|
||||
)
|
||||
assert ctx.items == items
|
||||
|
||||
|
||||
class TestEffectRegistry:
|
||||
def test_init_empty(self):
|
||||
registry = EffectRegistry()
|
||||
assert len(registry.list_all()) == 0
|
||||
|
||||
def test_register(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
registry.register(effect)
|
||||
assert "mock" in registry.list_all()
|
||||
|
||||
def test_get(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
registry.register(effect)
|
||||
retrieved = registry.get("mock")
|
||||
assert retrieved is effect
|
||||
|
||||
def test_get_nonexistent(self):
|
||||
registry = EffectRegistry()
|
||||
assert registry.get("nonexistent") is None
|
||||
|
||||
def test_enable(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
effect.config.enabled = False
|
||||
registry.register(effect)
|
||||
registry.enable("mock")
|
||||
assert effect.config.enabled is True
|
||||
|
||||
def test_disable(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
effect.config.enabled = True
|
||||
registry.register(effect)
|
||||
registry.disable("mock")
|
||||
assert effect.config.enabled is False
|
||||
|
||||
def test_list_enabled(self):
|
||||
registry = EffectRegistry()
|
||||
|
||||
class EnabledEffect:
|
||||
name = "enabled_effect"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
class DisabledEffect:
|
||||
name = "disabled_effect"
|
||||
config = EffectConfig(enabled=False, intensity=1.0)
|
||||
|
||||
registry.register(EnabledEffect())
|
||||
registry.register(DisabledEffect())
|
||||
enabled = registry.list_enabled()
|
||||
assert len(enabled) == 1
|
||||
assert enabled[0].name == "enabled_effect"
|
||||
|
||||
def test_configure(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
registry.register(effect)
|
||||
new_config = EffectConfig(enabled=False, intensity=0.3)
|
||||
registry.configure("mock", new_config)
|
||||
assert effect.config.enabled is False
|
||||
assert effect.config.intensity == 0.3
|
||||
|
||||
def test_is_enabled(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
effect.config.enabled = True
|
||||
registry.register(effect)
|
||||
assert registry.is_enabled("mock") is True
|
||||
assert registry.is_enabled("nonexistent") is False
|
||||
|
||||
|
||||
class TestEffectChain:
|
||||
def test_init(self):
|
||||
registry = EffectRegistry()
|
||||
chain = EffectChain(registry)
|
||||
assert chain.get_order() == []
|
||||
|
||||
def test_set_order(self):
|
||||
registry = EffectRegistry()
|
||||
effect1 = MockEffect()
|
||||
effect1.name = "effect1"
|
||||
effect2 = MockEffect()
|
||||
effect2.name = "effect2"
|
||||
registry.register(effect1)
|
||||
registry.register(effect2)
|
||||
chain = EffectChain(registry)
|
||||
chain.set_order(["effect1", "effect2"])
|
||||
assert chain.get_order() == ["effect1", "effect2"]
|
||||
|
||||
def test_add_effect(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
effect.name = "test_effect"
|
||||
registry.register(effect)
|
||||
chain = EffectChain(registry)
|
||||
chain.add_effect("test_effect")
|
||||
assert "test_effect" in chain.get_order()
|
||||
|
||||
def test_add_effect_invalid(self):
|
||||
registry = EffectRegistry()
|
||||
chain = EffectChain(registry)
|
||||
result = chain.add_effect("nonexistent")
|
||||
assert result is False
|
||||
|
||||
def test_remove_effect(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
effect.name = "test_effect"
|
||||
registry.register(effect)
|
||||
chain = EffectChain(registry)
|
||||
chain.set_order(["test_effect"])
|
||||
chain.remove_effect("test_effect")
|
||||
assert "test_effect" not in chain.get_order()
|
||||
|
||||
def test_reorder(self):
|
||||
registry = EffectRegistry()
|
||||
effect1 = MockEffect()
|
||||
effect1.name = "effect1"
|
||||
effect2 = MockEffect()
|
||||
effect2.name = "effect2"
|
||||
effect3 = MockEffect()
|
||||
effect3.name = "effect3"
|
||||
registry.register(effect1)
|
||||
registry.register(effect2)
|
||||
registry.register(effect3)
|
||||
chain = EffectChain(registry)
|
||||
chain.set_order(["effect1", "effect2", "effect3"])
|
||||
result = chain.reorder(["effect3", "effect1", "effect2"])
|
||||
assert result is True
|
||||
assert chain.get_order() == ["effect3", "effect1", "effect2"]
|
||||
|
||||
def test_reorder_invalid(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
effect.name = "effect1"
|
||||
registry.register(effect)
|
||||
chain = EffectChain(registry)
|
||||
result = chain.reorder(["effect1", "nonexistent"])
|
||||
assert result is False
|
||||
|
||||
def test_process_empty_chain(self):
|
||||
registry = EffectRegistry()
|
||||
chain = EffectChain(registry)
|
||||
buf = ["line1", "line2"]
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=20,
|
||||
mic_excess=0.0,
|
||||
grad_offset=0.0,
|
||||
frame_number=0,
|
||||
has_message=False,
|
||||
)
|
||||
result = chain.process(buf, ctx)
|
||||
assert result == buf
|
||||
|
||||
def test_process_with_effects(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
effect.name = "test_effect"
|
||||
registry.register(effect)
|
||||
chain = EffectChain(registry)
|
||||
chain.set_order(["test_effect"])
|
||||
buf = ["line1", "line2"]
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=20,
|
||||
mic_excess=0.0,
|
||||
grad_offset=0.0,
|
||||
frame_number=0,
|
||||
has_message=False,
|
||||
)
|
||||
result = chain.process(buf, ctx)
|
||||
assert result == ["line1", "line2", "processed"]
|
||||
assert effect.processed is True
|
||||
assert effect.last_ctx is ctx
|
||||
|
||||
def test_process_disabled_effect(self):
|
||||
registry = EffectRegistry()
|
||||
effect = MockEffect()
|
||||
effect.name = "test_effect"
|
||||
effect.config.enabled = False
|
||||
registry.register(effect)
|
||||
chain = EffectChain(registry)
|
||||
chain.set_order(["test_effect"])
|
||||
buf = ["line1"]
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=20,
|
||||
mic_excess=0.0,
|
||||
grad_offset=0.0,
|
||||
frame_number=0,
|
||||
has_message=False,
|
||||
)
|
||||
result = chain.process(buf, ctx)
|
||||
assert result == ["line1"]
|
||||
assert effect.processed is False
|
||||
|
||||
|
||||
class TestEffectsExports:
|
||||
def test_all_exports_are_importable(self):
|
||||
"""Verify all exports in __all__ can actually be imported."""
|
||||
import engine.effects as effects_module
|
||||
|
||||
for name in effects_module.__all__:
|
||||
getattr(effects_module, name)
|
||||
|
||||
|
||||
class TestPerformanceMonitor:
|
||||
def test_empty_stats(self):
|
||||
from engine.effects.performance import PerformanceMonitor
|
||||
|
||||
monitor = PerformanceMonitor()
|
||||
stats = monitor.get_stats()
|
||||
assert "error" in stats
|
||||
|
||||
def test_record_and_retrieve(self):
|
||||
from engine.effects.performance import PerformanceMonitor
|
||||
|
||||
monitor = PerformanceMonitor()
|
||||
monitor.start_frame(1)
|
||||
monitor.record_effect("test_effect", 1.5, 100, 150)
|
||||
monitor.end_frame(1, 2.0)
|
||||
|
||||
stats = monitor.get_stats()
|
||||
assert "error" not in stats
|
||||
assert stats["frame_count"] == 1
|
||||
assert "test_effect" in stats["effects"]
|
||||
|
||||
def test_multiple_frames(self):
|
||||
from engine.effects.performance import PerformanceMonitor
|
||||
|
||||
monitor = PerformanceMonitor(max_frames=3)
|
||||
for i in range(5):
|
||||
monitor.start_frame(i)
|
||||
monitor.record_effect("effect1", 1.0, 100, 100)
|
||||
monitor.record_effect("effect2", 0.5, 100, 100)
|
||||
monitor.end_frame(i, 1.5)
|
||||
|
||||
stats = monitor.get_stats()
|
||||
assert stats["frame_count"] == 3
|
||||
assert "effect1" in stats["effects"]
|
||||
assert "effect2" in stats["effects"]
|
||||
|
||||
def test_reset(self):
|
||||
from engine.effects.performance import PerformanceMonitor
|
||||
|
||||
monitor = PerformanceMonitor()
|
||||
monitor.start_frame(1)
|
||||
monitor.record_effect("test", 1.0, 100, 100)
|
||||
monitor.end_frame(1, 1.0)
|
||||
|
||||
monitor.reset()
|
||||
stats = monitor.get_stats()
|
||||
assert "error" in stats
|
||||
|
||||
|
||||
class TestEffectPipelinePerformance:
|
||||
def test_pipeline_stays_within_frame_budget(self):
|
||||
"""Verify effect pipeline completes within frame budget (33ms for 30fps)."""
|
||||
from engine.effects import (
|
||||
EffectChain,
|
||||
EffectConfig,
|
||||
EffectContext,
|
||||
EffectRegistry,
|
||||
)
|
||||
|
||||
class DummyEffect:
|
||||
name = "dummy"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
def process(self, buf, ctx):
|
||||
return [line * 2 for line in buf]
|
||||
|
||||
registry = EffectRegistry()
|
||||
registry.register(DummyEffect())
|
||||
|
||||
from engine.effects.performance import PerformanceMonitor
|
||||
|
||||
monitor = PerformanceMonitor(max_frames=10)
|
||||
chain = EffectChain(registry, monitor)
|
||||
chain.set_order(["dummy"])
|
||||
|
||||
buf = ["x" * 80] * 20
|
||||
|
||||
for i in range(10):
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=20,
|
||||
mic_excess=0.0,
|
||||
grad_offset=0.0,
|
||||
frame_number=i,
|
||||
has_message=False,
|
||||
)
|
||||
chain.process(buf, ctx)
|
||||
|
||||
stats = monitor.get_stats()
|
||||
assert "error" not in stats
|
||||
assert stats["pipeline"]["max_ms"] < 33.0
|
||||
|
||||
def test_individual_effects_performance(self):
|
||||
"""Verify individual effects don't exceed 10ms per frame."""
|
||||
from engine.effects import (
|
||||
EffectChain,
|
||||
EffectConfig,
|
||||
EffectContext,
|
||||
EffectRegistry,
|
||||
)
|
||||
|
||||
class SlowEffect:
|
||||
name = "slow"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
def process(self, buf, ctx):
|
||||
result = []
|
||||
for line in buf:
|
||||
result.append(line)
|
||||
result.append(line + line)
|
||||
return result
|
||||
|
||||
registry = EffectRegistry()
|
||||
registry.register(SlowEffect())
|
||||
|
||||
from engine.effects.performance import PerformanceMonitor
|
||||
|
||||
monitor = PerformanceMonitor(max_frames=5)
|
||||
chain = EffectChain(registry, monitor)
|
||||
chain.set_order(["slow"])
|
||||
|
||||
buf = ["x" * 80] * 10
|
||||
|
||||
for i in range(5):
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=20,
|
||||
mic_excess=0.0,
|
||||
grad_offset=0.0,
|
||||
frame_number=i,
|
||||
has_message=False,
|
||||
)
|
||||
chain.process(buf, ctx)
|
||||
|
||||
stats = monitor.get_stats()
|
||||
assert "error" not in stats
|
||||
assert stats["effects"]["slow"]["max_ms"] < 10.0
|
||||
117
tests/test_effects_controller.py
Normal file
117
tests/test_effects_controller.py
Normal file
@@ -0,0 +1,117 @@
|
||||
"""
|
||||
Tests for engine.effects.controller module.
|
||||
"""
|
||||
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
from engine.effects.controller import (
|
||||
handle_effects_command,
|
||||
set_effect_chain_ref,
|
||||
)
|
||||
|
||||
|
||||
class TestHandleEffectsCommand:
|
||||
"""Tests for handle_effects_command function."""
|
||||
|
||||
def test_list_effects(self):
|
||||
"""list command returns formatted effects list."""
|
||||
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||
mock_plugin = MagicMock()
|
||||
mock_plugin.config.enabled = True
|
||||
mock_plugin.config.intensity = 0.5
|
||||
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||
|
||||
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
|
||||
mock_chain.return_value.get_order.return_value = ["noise"]
|
||||
|
||||
result = handle_effects_command("/effects list")
|
||||
|
||||
assert "noise: ON" in result
|
||||
assert "intensity=0.5" in result
|
||||
|
||||
def test_enable_effect(self):
|
||||
"""enable command calls registry.enable."""
|
||||
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||
mock_plugin = MagicMock()
|
||||
mock_registry.return_value.get.return_value = mock_plugin
|
||||
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||
|
||||
result = handle_effects_command("/effects noise on")
|
||||
|
||||
assert "Enabled: noise" in result
|
||||
mock_registry.return_value.enable.assert_called_once_with("noise")
|
||||
|
||||
def test_disable_effect(self):
|
||||
"""disable command calls registry.disable."""
|
||||
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||
mock_plugin = MagicMock()
|
||||
mock_registry.return_value.get.return_value = mock_plugin
|
||||
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||
|
||||
result = handle_effects_command("/effects noise off")
|
||||
|
||||
assert "Disabled: noise" in result
|
||||
mock_registry.return_value.disable.assert_called_once_with("noise")
|
||||
|
||||
def test_set_intensity(self):
|
||||
"""intensity command sets plugin intensity."""
|
||||
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||
mock_plugin = MagicMock()
|
||||
mock_plugin.config.intensity = 0.5
|
||||
mock_registry.return_value.get.return_value = mock_plugin
|
||||
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||
|
||||
result = handle_effects_command("/effects noise intensity 0.8")
|
||||
|
||||
assert "intensity to 0.8" in result
|
||||
assert mock_plugin.config.intensity == 0.8
|
||||
|
||||
def test_invalid_intensity_range(self):
|
||||
"""intensity outside 0.0-1.0 returns error."""
|
||||
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||
mock_plugin = MagicMock()
|
||||
mock_registry.return_value.get.return_value = mock_plugin
|
||||
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||
|
||||
result = handle_effects_command("/effects noise intensity 1.5")
|
||||
|
||||
assert "between 0.0 and 1.0" in result
|
||||
|
||||
def test_reorder_pipeline(self):
|
||||
"""reorder command calls chain.reorder."""
|
||||
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||
mock_registry.return_value.list_all.return_value = {}
|
||||
|
||||
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
|
||||
mock_chain_instance = MagicMock()
|
||||
mock_chain_instance.reorder.return_value = True
|
||||
mock_chain.return_value = mock_chain_instance
|
||||
|
||||
result = handle_effects_command("/effects reorder noise,fade")
|
||||
|
||||
assert "Reordered pipeline" in result
|
||||
mock_chain_instance.reorder.assert_called_once_with(["noise", "fade"])
|
||||
|
||||
def test_unknown_command(self):
|
||||
"""unknown command returns error."""
|
||||
result = handle_effects_command("/unknown")
|
||||
assert "Unknown command" in result
|
||||
|
||||
def test_non_effects_command(self):
|
||||
"""non-effects command returns error."""
|
||||
result = handle_effects_command("not a command")
|
||||
assert "Unknown command" in result
|
||||
|
||||
|
||||
class TestSetEffectChainRef:
|
||||
"""Tests for set_effect_chain_ref function."""
|
||||
|
||||
def test_sets_global_ref(self):
|
||||
"""set_effect_chain_ref updates global reference."""
|
||||
mock_chain = MagicMock()
|
||||
set_effect_chain_ref(mock_chain)
|
||||
|
||||
from engine.effects.controller import _get_effect_chain
|
||||
|
||||
result = _get_effect_chain()
|
||||
assert result == mock_chain
|
||||
Reference in New Issue
Block a user