Compare commits
5 Commits
feat/code-
...
backup/fea
| Author | SHA1 | Date | |
|---|---|---|---|
| 2650f7245e | |||
| b1f2b9d2be | |||
| c08a7d3cb0 | |||
| d5a3edba97 | |||
| fb35458718 |
250
cmdline.py
Normal file
250
cmdline.py
Normal file
@@ -0,0 +1,250 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Command-line utility for interacting with mainline via ntfy.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
python cmdline.py # Interactive TUI mode
|
||||||
|
python cmdline.py --help # Show help
|
||||||
|
python cmdline.py /effects list # Send single command via ntfy
|
||||||
|
python cmdline.py /effects stats # Get performance stats via ntfy
|
||||||
|
python cmdline.py -w /effects stats # Watch mode (polls for stats)
|
||||||
|
|
||||||
|
The TUI mode provides:
|
||||||
|
- Arrow keys to navigate command history
|
||||||
|
- Tab completion for commands
|
||||||
|
- Auto-refresh for performance stats
|
||||||
|
|
||||||
|
C&C works like a serial port:
|
||||||
|
1. Send command to ntfy_cc_topic
|
||||||
|
2. Mainline receives, processes, responds to same topic
|
||||||
|
3. Cmdline polls for response
|
||||||
|
"""
|
||||||
|
|
||||||
|
import argparse
|
||||||
|
import json
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
import threading
|
||||||
|
import urllib.request
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
from engine import config
|
||||||
|
from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST
|
||||||
|
|
||||||
|
try:
|
||||||
|
CC_CMD_TOPIC = config.NTFY_CC_CMD_TOPIC
|
||||||
|
CC_RESP_TOPIC = config.NTFY_CC_RESP_TOPIC
|
||||||
|
except AttributeError:
|
||||||
|
CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
|
||||||
|
CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
|
||||||
|
|
||||||
|
|
||||||
|
class NtfyResponsePoller:
|
||||||
|
"""Polls ntfy for command responses."""
|
||||||
|
|
||||||
|
def __init__(self, cmd_topic: str, resp_topic: str, timeout: float = 10.0):
|
||||||
|
self.cmd_topic = cmd_topic
|
||||||
|
self.resp_topic = resp_topic
|
||||||
|
self.timeout = timeout
|
||||||
|
self._last_id = None
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def _build_url(self) -> str:
|
||||||
|
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||||
|
|
||||||
|
parsed = urlparse(self.resp_topic)
|
||||||
|
params = parse_qs(parsed.query, keep_blank_values=True)
|
||||||
|
params["since"] = [self._last_id if self._last_id else "20s"]
|
||||||
|
new_query = urlencode({k: v[0] for k, v in params.items()})
|
||||||
|
return urlunparse(parsed._replace(query=new_query))
|
||||||
|
|
||||||
|
def send_and_wait(self, cmd: str) -> str:
|
||||||
|
"""Send command and wait for response."""
|
||||||
|
url = self.cmd_topic.replace("/json", "")
|
||||||
|
data = cmd.encode("utf-8")
|
||||||
|
|
||||||
|
req = urllib.request.Request(
|
||||||
|
url,
|
||||||
|
data=data,
|
||||||
|
headers={
|
||||||
|
"User-Agent": "mainline-cmdline/0.1",
|
||||||
|
"Content-Type": "text/plain",
|
||||||
|
},
|
||||||
|
method="POST",
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
urllib.request.urlopen(req, timeout=5)
|
||||||
|
except Exception as e:
|
||||||
|
return f"Error sending command: {e}"
|
||||||
|
|
||||||
|
return self._wait_for_response(cmd)
|
||||||
|
|
||||||
|
def _wait_for_response(self, expected_cmd: str = "") -> str:
|
||||||
|
"""Poll for response message."""
|
||||||
|
start = time.time()
|
||||||
|
while time.time() - start < self.timeout:
|
||||||
|
try:
|
||||||
|
url = self._build_url()
|
||||||
|
req = urllib.request.Request(
|
||||||
|
url, headers={"User-Agent": "mainline-cmdline/0.1"}
|
||||||
|
)
|
||||||
|
with urllib.request.urlopen(req, timeout=10) as resp:
|
||||||
|
for line in resp:
|
||||||
|
try:
|
||||||
|
data = json.loads(line.decode("utf-8", errors="replace"))
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
continue
|
||||||
|
if data.get("event") == "message":
|
||||||
|
self._last_id = data.get("id")
|
||||||
|
msg = data.get("message", "")
|
||||||
|
if msg:
|
||||||
|
return msg
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
time.sleep(0.5)
|
||||||
|
return "Timeout waiting for response"
|
||||||
|
|
||||||
|
|
||||||
|
AVAILABLE_COMMANDS = """Available commands:
|
||||||
|
/effects list - List all effects and status
|
||||||
|
/effects <name> on - Enable an effect
|
||||||
|
/effects <name> off - Disable an effect
|
||||||
|
/effects <name> intensity <0.0-1.0> - Set effect intensity
|
||||||
|
/effects reorder <name1>,<name2>,... - Reorder pipeline
|
||||||
|
/effects stats - Show performance statistics
|
||||||
|
/help - Show this help
|
||||||
|
/quit - Exit
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def print_header():
|
||||||
|
w = 60
|
||||||
|
print(CLR, end="")
|
||||||
|
print(CURSOR_OFF, end="")
|
||||||
|
print(f"\033[1;1H", end="")
|
||||||
|
print(f" \033[1;38;5;231m╔{'═' * (w - 6)}╗\033[0m")
|
||||||
|
print(
|
||||||
|
f" \033[1;38;5;231m║\033[0m \033[1;38;5;82mMAINLINE\033[0m \033[3;38;5;245mCommand Center\033[0m \033[1;38;5;231m ║\033[0m"
|
||||||
|
)
|
||||||
|
print(f" \033[1;38;5;231m╚{'═' * (w - 6)}╝\033[0m")
|
||||||
|
print(f" \033[2;38;5;37mCMD: {CC_CMD_TOPIC.split('/')[-2]}\033[0m")
|
||||||
|
print(f" \033[2;38;5;37mRESP: {CC_RESP_TOPIC.split('/')[-2]}\033[0m")
|
||||||
|
print()
|
||||||
|
|
||||||
|
|
||||||
|
def print_response(response: str, is_error: bool = False) -> None:
|
||||||
|
"""Print response with nice formatting."""
|
||||||
|
print()
|
||||||
|
if is_error:
|
||||||
|
print(f" \033[1;38;5;196m✗ Error\033[0m")
|
||||||
|
print(f" \033[38;5;196m{'─' * 40}\033[0m")
|
||||||
|
else:
|
||||||
|
print(f" \033[1;38;5;82m✓ Response\033[0m")
|
||||||
|
print(f" \033[38;5;37m{'─' * 40}\033[0m")
|
||||||
|
|
||||||
|
for line in response.split("\n"):
|
||||||
|
print(f" {line}")
|
||||||
|
print()
|
||||||
|
|
||||||
|
|
||||||
|
def interactive_mode():
|
||||||
|
"""Interactive TUI for sending commands."""
|
||||||
|
import readline
|
||||||
|
|
||||||
|
print_header()
|
||||||
|
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
|
||||||
|
|
||||||
|
print(f" \033[38;5;245mType /help for commands, /quit to exit\033[0m")
|
||||||
|
print()
|
||||||
|
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
cmd = input(f" \033[1;38;5;82m❯\033[0m {G_HI}").strip()
|
||||||
|
except (EOFError, KeyboardInterrupt):
|
||||||
|
print()
|
||||||
|
break
|
||||||
|
|
||||||
|
if not cmd:
|
||||||
|
continue
|
||||||
|
|
||||||
|
if cmd.startswith("/"):
|
||||||
|
if cmd == "/quit" or cmd == "/exit":
|
||||||
|
print(f"\n \033[1;38;5;245mGoodbye!{RST}\n")
|
||||||
|
break
|
||||||
|
|
||||||
|
if cmd == "/help":
|
||||||
|
print(f"\n{AVAILABLE_COMMANDS}\n")
|
||||||
|
continue
|
||||||
|
|
||||||
|
print(f" \033[38;5;245m⟳ Sending to mainline...{RST}")
|
||||||
|
result = poller.send_and_wait(cmd)
|
||||||
|
print_response(result, is_error=result.startswith("Error"))
|
||||||
|
else:
|
||||||
|
print(f"\n \033[1;38;5;196m⚠ Commands must start with /{RST}\n")
|
||||||
|
|
||||||
|
print(CURSOR_ON, end="")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
parser = argparse.ArgumentParser(
|
||||||
|
description="Mainline command-line interface",
|
||||||
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
||||||
|
epilog=AVAILABLE_COMMANDS,
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"command",
|
||||||
|
nargs="?",
|
||||||
|
default=None,
|
||||||
|
help="Command to send (e.g., /effects list)",
|
||||||
|
)
|
||||||
|
parser.add_argument(
|
||||||
|
"--watch",
|
||||||
|
"-w",
|
||||||
|
action="store_true",
|
||||||
|
help="Watch mode: continuously poll for stats (Ctrl+C to exit)",
|
||||||
|
)
|
||||||
|
|
||||||
|
args = parser.parse_args()
|
||||||
|
|
||||||
|
if args.command is None:
|
||||||
|
return interactive_mode()
|
||||||
|
|
||||||
|
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
|
||||||
|
|
||||||
|
if args.watch and "/effects stats" in args.command:
|
||||||
|
import signal
|
||||||
|
|
||||||
|
def handle_sigterm(*_):
|
||||||
|
print(f"\n \033[1;38;5;245mStopped watching{RST}")
|
||||||
|
print(CURSOR_ON, end="")
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
signal.signal(signal.SIGTERM, handle_sigterm)
|
||||||
|
|
||||||
|
print_header()
|
||||||
|
print(f" \033[38;5;245mWatching /effects stats (Ctrl+C to exit)...{RST}\n")
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
result = poller.send_and_wait(args.command)
|
||||||
|
print(f"\033[2J\033[1;1H", end="")
|
||||||
|
print(
|
||||||
|
f" \033[1;38;5;82m❯\033[0m Performance Stats - \033[1;38;5;245m{time.strftime('%H:%M:%S')}{RST}"
|
||||||
|
)
|
||||||
|
print(f" \033[38;5;37m{'─' * 44}{RST}")
|
||||||
|
for line in result.split("\n"):
|
||||||
|
print(f" {line}")
|
||||||
|
time.sleep(2)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
print(f"\n \033[1;38;5;245mStopped watching{RST}")
|
||||||
|
return 0
|
||||||
|
return 0
|
||||||
|
|
||||||
|
result = poller.send_and_wait(args.command)
|
||||||
|
print(result)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
main()
|
||||||
35
effects_plugins/__init__.py
Normal file
35
effects_plugins/__init__.py
Normal file
@@ -0,0 +1,35 @@
|
|||||||
|
from pathlib import Path
|
||||||
|
|
||||||
|
PLUGIN_DIR = Path(__file__).parent
|
||||||
|
|
||||||
|
|
||||||
|
def discover_plugins():
|
||||||
|
from engine.effects.registry import get_registry
|
||||||
|
|
||||||
|
registry = get_registry()
|
||||||
|
imported = {}
|
||||||
|
|
||||||
|
for file_path in PLUGIN_DIR.glob("*.py"):
|
||||||
|
if file_path.name.startswith("_"):
|
||||||
|
continue
|
||||||
|
module_name = file_path.stem
|
||||||
|
if module_name in ("base", "types"):
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
module = __import__(f"effects_plugins.{module_name}", fromlist=[""])
|
||||||
|
for attr_name in dir(module):
|
||||||
|
attr = getattr(module, attr_name)
|
||||||
|
if (
|
||||||
|
isinstance(attr, type)
|
||||||
|
and hasattr(attr, "name")
|
||||||
|
and hasattr(attr, "process")
|
||||||
|
and attr_name.endswith("Effect")
|
||||||
|
):
|
||||||
|
plugin = attr()
|
||||||
|
registry.register(plugin)
|
||||||
|
imported[plugin.name] = plugin
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return imported
|
||||||
58
effects_plugins/fade.py
Normal file
58
effects_plugins/fade.py
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
import random
|
||||||
|
|
||||||
|
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||||
|
|
||||||
|
|
||||||
|
class FadeEffect:
|
||||||
|
name = "fade"
|
||||||
|
config = EffectConfig(enabled=True, intensity=1.0)
|
||||||
|
|
||||||
|
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||||
|
if not ctx.ticker_height:
|
||||||
|
return buf
|
||||||
|
result = list(buf)
|
||||||
|
intensity = self.config.intensity
|
||||||
|
|
||||||
|
top_zone = max(1, int(ctx.ticker_height * 0.25))
|
||||||
|
bot_zone = max(1, int(ctx.ticker_height * 0.10))
|
||||||
|
|
||||||
|
for r in range(len(result)):
|
||||||
|
if r >= ctx.ticker_height:
|
||||||
|
continue
|
||||||
|
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
|
||||||
|
bot_f = (
|
||||||
|
min(1.0, (ctx.ticker_height - 1 - r) / bot_zone)
|
||||||
|
if bot_zone > 0
|
||||||
|
else 1.0
|
||||||
|
)
|
||||||
|
row_fade = min(top_f, bot_f) * intensity
|
||||||
|
|
||||||
|
if row_fade < 1.0 and result[r].strip():
|
||||||
|
result[r] = self._fade_line(result[r], row_fade)
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _fade_line(self, s: str, fade: float) -> str:
|
||||||
|
if fade >= 1.0:
|
||||||
|
return s
|
||||||
|
if fade <= 0.0:
|
||||||
|
return ""
|
||||||
|
result = []
|
||||||
|
i = 0
|
||||||
|
while i < len(s):
|
||||||
|
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
|
||||||
|
j = i + 2
|
||||||
|
while j < len(s) and not s[j].isalpha():
|
||||||
|
j += 1
|
||||||
|
result.append(s[i : j + 1])
|
||||||
|
i = j + 1
|
||||||
|
elif s[i] == " ":
|
||||||
|
result.append(" ")
|
||||||
|
i += 1
|
||||||
|
else:
|
||||||
|
result.append(s[i] if random.random() < fade else " ")
|
||||||
|
i += 1
|
||||||
|
return "".join(result)
|
||||||
|
|
||||||
|
def configure(self, cfg: EffectConfig) -> None:
|
||||||
|
self.config = cfg
|
||||||
72
effects_plugins/firehose.py
Normal file
72
effects_plugins/firehose.py
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
import random
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from engine import config
|
||||||
|
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||||
|
from engine.sources import FEEDS, POETRY_SOURCES
|
||||||
|
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
|
||||||
|
|
||||||
|
|
||||||
|
class FirehoseEffect:
|
||||||
|
name = "firehose"
|
||||||
|
config = EffectConfig(enabled=True, intensity=1.0)
|
||||||
|
|
||||||
|
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||||
|
firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0
|
||||||
|
if firehose_h <= 0 or not ctx.items:
|
||||||
|
return buf
|
||||||
|
|
||||||
|
result = list(buf)
|
||||||
|
intensity = self.config.intensity
|
||||||
|
h = ctx.terminal_height
|
||||||
|
|
||||||
|
for fr in range(firehose_h):
|
||||||
|
scr_row = h - firehose_h + fr + 1
|
||||||
|
fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity)
|
||||||
|
result.append(f"\033[{scr_row};1H{fline}\033[K")
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _firehose_line(self, items: list, w: int, intensity: float) -> str:
|
||||||
|
r = random.random()
|
||||||
|
if r < 0.35 * intensity:
|
||||||
|
title, src, ts = random.choice(items)
|
||||||
|
text = title[: w - 1]
|
||||||
|
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
|
||||||
|
return f"{color}{text}{RST}"
|
||||||
|
elif r < 0.55 * intensity:
|
||||||
|
d = random.choice([0.45, 0.55, 0.65, 0.75])
|
||||||
|
return "".join(
|
||||||
|
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
|
||||||
|
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
|
||||||
|
if random.random() < d
|
||||||
|
else " "
|
||||||
|
for _ in range(w)
|
||||||
|
)
|
||||||
|
elif r < 0.78 * intensity:
|
||||||
|
sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
|
||||||
|
src = random.choice(list(sources.keys()))
|
||||||
|
msgs = [
|
||||||
|
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
|
||||||
|
f" ░░ FEED ACTIVE :: {src}",
|
||||||
|
f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}",
|
||||||
|
f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}",
|
||||||
|
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
|
||||||
|
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
|
||||||
|
]
|
||||||
|
text = random.choice(msgs)[: w - 1]
|
||||||
|
color = random.choice([G_LO, G_DIM, W_GHOST])
|
||||||
|
return f"{color}{text}{RST}"
|
||||||
|
else:
|
||||||
|
title, _, _ = random.choice(items)
|
||||||
|
start = random.randint(0, max(0, len(title) - 20))
|
||||||
|
frag = title[start : start + random.randint(10, 35)]
|
||||||
|
pad = random.randint(0, max(0, w - len(frag) - 8))
|
||||||
|
gp = "".join(
|
||||||
|
random.choice(config.GLITCH) for _ in range(random.randint(1, 3))
|
||||||
|
)
|
||||||
|
text = (" " * pad + gp + " " + frag)[: w - 1]
|
||||||
|
color = random.choice([G_LO, C_DIM, W_GHOST])
|
||||||
|
return f"{color}{text}{RST}"
|
||||||
|
|
||||||
|
def configure(self, cfg: EffectConfig) -> None:
|
||||||
|
self.config = cfg
|
||||||
37
effects_plugins/glitch.py
Normal file
37
effects_plugins/glitch.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
import random
|
||||||
|
|
||||||
|
from engine import config
|
||||||
|
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||||
|
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST
|
||||||
|
|
||||||
|
|
||||||
|
class GlitchEffect:
|
||||||
|
name = "glitch"
|
||||||
|
config = EffectConfig(enabled=True, intensity=1.0)
|
||||||
|
|
||||||
|
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||||
|
if not buf:
|
||||||
|
return buf
|
||||||
|
result = list(buf)
|
||||||
|
intensity = self.config.intensity
|
||||||
|
|
||||||
|
glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16)
|
||||||
|
glitch_prob = glitch_prob * intensity
|
||||||
|
n_hits = 4 + int(ctx.mic_excess / 2)
|
||||||
|
n_hits = int(n_hits * intensity)
|
||||||
|
|
||||||
|
if random.random() < glitch_prob:
|
||||||
|
for _ in range(min(n_hits, len(result))):
|
||||||
|
gi = random.randint(0, len(result) - 1)
|
||||||
|
scr_row = gi + 1
|
||||||
|
result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}"
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _glitch_bar(self, w: int) -> str:
|
||||||
|
c = random.choice(["░", "▒", "─", "\xc2"])
|
||||||
|
n = random.randint(3, w // 2)
|
||||||
|
o = random.randint(0, w - n)
|
||||||
|
return " " * o + f"{G_LO}{DIM}" + c * n + RST
|
||||||
|
|
||||||
|
def configure(self, cfg: EffectConfig) -> None:
|
||||||
|
self.config = cfg
|
||||||
36
effects_plugins/noise.py
Normal file
36
effects_plugins/noise.py
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
import random
|
||||||
|
|
||||||
|
from engine import config
|
||||||
|
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||||
|
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
|
||||||
|
|
||||||
|
|
||||||
|
class NoiseEffect:
|
||||||
|
name = "noise"
|
||||||
|
config = EffectConfig(enabled=True, intensity=0.15)
|
||||||
|
|
||||||
|
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||||
|
if not ctx.ticker_height:
|
||||||
|
return buf
|
||||||
|
result = list(buf)
|
||||||
|
intensity = self.config.intensity
|
||||||
|
probability = intensity * 0.15
|
||||||
|
|
||||||
|
for r in range(len(result)):
|
||||||
|
cy = ctx.scroll_cam + r
|
||||||
|
if random.random() < probability:
|
||||||
|
result[r] = self._generate_noise(ctx.terminal_width, cy)
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _generate_noise(self, w: int, cy: int) -> str:
|
||||||
|
d = random.choice([0.15, 0.25, 0.35, 0.12])
|
||||||
|
return "".join(
|
||||||
|
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
|
||||||
|
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
|
||||||
|
if random.random() < d
|
||||||
|
else " "
|
||||||
|
for _ in range(w)
|
||||||
|
)
|
||||||
|
|
||||||
|
def configure(self, cfg: EffectConfig) -> None:
|
||||||
|
self.config = cfg
|
||||||
102
engine/display.py
Normal file
102
engine/display.py
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
"""
|
||||||
|
Display output abstraction - allows swapping output backends.
|
||||||
|
|
||||||
|
Protocol:
|
||||||
|
- init(width, height): Initialize display with terminal dimensions
|
||||||
|
- show(buffer): Render buffer (list of strings) to display
|
||||||
|
- clear(): Clear the display
|
||||||
|
- cleanup(): Shutdown display
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
from typing import Protocol
|
||||||
|
|
||||||
|
|
||||||
|
class Display(Protocol):
|
||||||
|
"""Protocol for display backends."""
|
||||||
|
|
||||||
|
def init(self, width: int, height: int) -> None:
|
||||||
|
"""Initialize display with dimensions."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def show(self, buffer: list[str]) -> None:
|
||||||
|
"""Show buffer on display."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
"""Clear display."""
|
||||||
|
...
|
||||||
|
|
||||||
|
def cleanup(self) -> None:
|
||||||
|
"""Shutdown display."""
|
||||||
|
...
|
||||||
|
|
||||||
|
|
||||||
|
def get_monitor():
|
||||||
|
"""Get the performance monitor."""
|
||||||
|
try:
|
||||||
|
from engine.effects.performance import get_monitor as _get_monitor
|
||||||
|
|
||||||
|
return _get_monitor()
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
class TerminalDisplay:
|
||||||
|
"""ANSI terminal display backend."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.width = 80
|
||||||
|
self.height = 24
|
||||||
|
|
||||||
|
def init(self, width: int, height: int) -> None:
|
||||||
|
from engine.terminal import CURSOR_OFF
|
||||||
|
|
||||||
|
self.width = width
|
||||||
|
self.height = height
|
||||||
|
print(CURSOR_OFF, end="", flush=True)
|
||||||
|
|
||||||
|
def show(self, buffer: list[str]) -> None:
|
||||||
|
import sys
|
||||||
|
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
sys.stdout.buffer.write("".join(buffer).encode())
|
||||||
|
sys.stdout.flush()
|
||||||
|
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||||
|
|
||||||
|
monitor = get_monitor()
|
||||||
|
if monitor:
|
||||||
|
chars_in = sum(len(line) for line in buffer)
|
||||||
|
monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in)
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
from engine.terminal import CLR
|
||||||
|
|
||||||
|
print(CLR, end="", flush=True)
|
||||||
|
|
||||||
|
def cleanup(self) -> None:
|
||||||
|
from engine.terminal import CURSOR_ON
|
||||||
|
|
||||||
|
print(CURSOR_ON, end="", flush=True)
|
||||||
|
|
||||||
|
|
||||||
|
class NullDisplay:
|
||||||
|
"""Headless/null display - discards all output."""
|
||||||
|
|
||||||
|
def init(self, width: int, height: int) -> None:
|
||||||
|
self.width = width
|
||||||
|
self.height = height
|
||||||
|
|
||||||
|
def show(self, buffer: list[str]) -> None:
|
||||||
|
monitor = get_monitor()
|
||||||
|
if monitor:
|
||||||
|
t0 = time.perf_counter()
|
||||||
|
chars_in = sum(len(line) for line in buffer)
|
||||||
|
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||||
|
monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in)
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def cleanup(self) -> None:
|
||||||
|
pass
|
||||||
42
engine/effects/__init__.py
Normal file
42
engine/effects/__init__.py
Normal file
@@ -0,0 +1,42 @@
|
|||||||
|
from engine.effects.chain import EffectChain
|
||||||
|
from engine.effects.controller import handle_effects_command, show_effects_menu
|
||||||
|
from engine.effects.legacy import (
|
||||||
|
fade_line,
|
||||||
|
firehose_line,
|
||||||
|
glitch_bar,
|
||||||
|
next_headline,
|
||||||
|
noise,
|
||||||
|
vis_trunc,
|
||||||
|
)
|
||||||
|
from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor
|
||||||
|
from engine.effects.registry import EffectRegistry, get_registry, set_registry
|
||||||
|
from engine.effects.types import EffectConfig, EffectContext, PipelineConfig
|
||||||
|
|
||||||
|
|
||||||
|
def get_effect_chain():
|
||||||
|
from engine.layers import get_effect_chain as _chain
|
||||||
|
|
||||||
|
return _chain()
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"EffectChain",
|
||||||
|
"EffectRegistry",
|
||||||
|
"EffectConfig",
|
||||||
|
"EffectContext",
|
||||||
|
"PipelineConfig",
|
||||||
|
"get_registry",
|
||||||
|
"set_registry",
|
||||||
|
"get_effect_chain",
|
||||||
|
"get_monitor",
|
||||||
|
"set_monitor",
|
||||||
|
"PerformanceMonitor",
|
||||||
|
"handle_effects_command",
|
||||||
|
"show_effects_menu",
|
||||||
|
"fade_line",
|
||||||
|
"firehose_line",
|
||||||
|
"glitch_bar",
|
||||||
|
"noise",
|
||||||
|
"next_headline",
|
||||||
|
"vis_trunc",
|
||||||
|
]
|
||||||
71
engine/effects/chain.py
Normal file
71
engine/effects/chain.py
Normal file
@@ -0,0 +1,71 @@
|
|||||||
|
import time
|
||||||
|
|
||||||
|
from engine.effects.performance import PerformanceMonitor, get_monitor
|
||||||
|
from engine.effects.registry import EffectRegistry
|
||||||
|
from engine.effects.types import EffectContext
|
||||||
|
|
||||||
|
|
||||||
|
class EffectChain:
|
||||||
|
def __init__(
|
||||||
|
self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None
|
||||||
|
):
|
||||||
|
self._registry = registry
|
||||||
|
self._order: list[str] = []
|
||||||
|
self._monitor = monitor
|
||||||
|
|
||||||
|
def _get_monitor(self) -> PerformanceMonitor:
|
||||||
|
if self._monitor is not None:
|
||||||
|
return self._monitor
|
||||||
|
return get_monitor()
|
||||||
|
|
||||||
|
def set_order(self, names: list[str]) -> None:
|
||||||
|
self._order = list(names)
|
||||||
|
|
||||||
|
def get_order(self) -> list[str]:
|
||||||
|
return self._order.copy()
|
||||||
|
|
||||||
|
def add_effect(self, name: str, position: int | None = None) -> bool:
|
||||||
|
if name not in self._registry.list_all():
|
||||||
|
return False
|
||||||
|
if position is None:
|
||||||
|
self._order.append(name)
|
||||||
|
else:
|
||||||
|
self._order.insert(position, name)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def remove_effect(self, name: str) -> bool:
|
||||||
|
if name in self._order:
|
||||||
|
self._order.remove(name)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def reorder(self, new_order: list[str]) -> bool:
|
||||||
|
all_plugins = set(self._registry.list_all().keys())
|
||||||
|
if not all(name in all_plugins for name in new_order):
|
||||||
|
return False
|
||||||
|
self._order = list(new_order)
|
||||||
|
return True
|
||||||
|
|
||||||
|
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||||
|
monitor = self._get_monitor()
|
||||||
|
frame_number = ctx.frame_number
|
||||||
|
monitor.start_frame(frame_number)
|
||||||
|
|
||||||
|
frame_start = time.perf_counter()
|
||||||
|
result = list(buf)
|
||||||
|
for name in self._order:
|
||||||
|
plugin = self._registry.get(name)
|
||||||
|
if plugin and plugin.config.enabled:
|
||||||
|
chars_in = sum(len(line) for line in result)
|
||||||
|
effect_start = time.perf_counter()
|
||||||
|
try:
|
||||||
|
result = plugin.process(result, ctx)
|
||||||
|
except Exception:
|
||||||
|
plugin.config.enabled = False
|
||||||
|
elapsed = time.perf_counter() - effect_start
|
||||||
|
chars_out = sum(len(line) for line in result)
|
||||||
|
monitor.record_effect(name, elapsed * 1000, chars_in, chars_out)
|
||||||
|
|
||||||
|
total_elapsed = time.perf_counter() - frame_start
|
||||||
|
monitor.end_frame(frame_number, total_elapsed * 1000)
|
||||||
|
return result
|
||||||
144
engine/effects/controller.py
Normal file
144
engine/effects/controller.py
Normal file
@@ -0,0 +1,144 @@
|
|||||||
|
from engine.effects.performance import get_monitor
|
||||||
|
from engine.effects.registry import get_registry
|
||||||
|
|
||||||
|
_effect_chain_ref = None
|
||||||
|
|
||||||
|
|
||||||
|
def _get_effect_chain():
|
||||||
|
global _effect_chain_ref
|
||||||
|
if _effect_chain_ref is not None:
|
||||||
|
return _effect_chain_ref
|
||||||
|
try:
|
||||||
|
from engine.layers import get_effect_chain as _chain
|
||||||
|
|
||||||
|
return _chain()
|
||||||
|
except Exception:
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def set_effect_chain_ref(chain) -> None:
|
||||||
|
global _effect_chain_ref
|
||||||
|
_effect_chain_ref = chain
|
||||||
|
|
||||||
|
|
||||||
|
def handle_effects_command(cmd: str) -> str:
|
||||||
|
"""Handle /effects command from NTFY message.
|
||||||
|
|
||||||
|
Commands:
|
||||||
|
/effects list - list all effects and their status
|
||||||
|
/effects <name> on - enable an effect
|
||||||
|
/effects <name> off - disable an effect
|
||||||
|
/effects <name> intensity <0.0-1.0> - set intensity
|
||||||
|
/effects reorder <name1>,<name2>,... - reorder pipeline
|
||||||
|
/effects stats - show performance statistics
|
||||||
|
"""
|
||||||
|
parts = cmd.strip().split()
|
||||||
|
if not parts or parts[0] != "/effects":
|
||||||
|
return "Unknown command"
|
||||||
|
|
||||||
|
registry = get_registry()
|
||||||
|
chain = _get_effect_chain()
|
||||||
|
|
||||||
|
if len(parts) == 1 or parts[1] == "list":
|
||||||
|
result = ["Effects:"]
|
||||||
|
for name, plugin in registry.list_all().items():
|
||||||
|
status = "ON" if plugin.config.enabled else "OFF"
|
||||||
|
intensity = plugin.config.intensity
|
||||||
|
result.append(f" {name}: {status} (intensity={intensity})")
|
||||||
|
if chain:
|
||||||
|
result.append(f"Order: {chain.get_order()}")
|
||||||
|
return "\n".join(result)
|
||||||
|
|
||||||
|
if parts[1] == "stats":
|
||||||
|
return _format_stats()
|
||||||
|
|
||||||
|
if parts[1] == "reorder" and len(parts) >= 3:
|
||||||
|
new_order = parts[2].split(",")
|
||||||
|
if chain and chain.reorder(new_order):
|
||||||
|
return f"Reordered pipeline: {new_order}"
|
||||||
|
return "Failed to reorder pipeline"
|
||||||
|
|
||||||
|
if len(parts) < 3:
|
||||||
|
return "Usage: /effects <name> on|off|intensity <value>"
|
||||||
|
|
||||||
|
effect_name = parts[1]
|
||||||
|
action = parts[2]
|
||||||
|
|
||||||
|
if effect_name not in registry.list_all():
|
||||||
|
return f"Unknown effect: {effect_name}"
|
||||||
|
|
||||||
|
if action == "on":
|
||||||
|
registry.enable(effect_name)
|
||||||
|
return f"Enabled: {effect_name}"
|
||||||
|
|
||||||
|
if action == "off":
|
||||||
|
registry.disable(effect_name)
|
||||||
|
return f"Disabled: {effect_name}"
|
||||||
|
|
||||||
|
if action == "intensity" and len(parts) >= 4:
|
||||||
|
try:
|
||||||
|
value = float(parts[3])
|
||||||
|
if not 0.0 <= value <= 1.0:
|
||||||
|
return "Intensity must be between 0.0 and 1.0"
|
||||||
|
plugin = registry.get(effect_name)
|
||||||
|
if plugin:
|
||||||
|
plugin.config.intensity = value
|
||||||
|
return f"Set {effect_name} intensity to {value}"
|
||||||
|
except ValueError:
|
||||||
|
return "Invalid intensity value"
|
||||||
|
|
||||||
|
return f"Unknown action: {action}"
|
||||||
|
|
||||||
|
|
||||||
|
def _format_stats() -> str:
|
||||||
|
monitor = get_monitor()
|
||||||
|
stats = monitor.get_stats()
|
||||||
|
|
||||||
|
if "error" in stats:
|
||||||
|
return stats["error"]
|
||||||
|
|
||||||
|
lines = ["Performance Stats:"]
|
||||||
|
|
||||||
|
pipeline = stats["pipeline"]
|
||||||
|
lines.append(
|
||||||
|
f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)"
|
||||||
|
)
|
||||||
|
|
||||||
|
if stats["effects"]:
|
||||||
|
lines.append(" Per-effect (avg ms):")
|
||||||
|
for name, effect_stats in stats["effects"].items():
|
||||||
|
lines.append(
|
||||||
|
f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms"
|
||||||
|
)
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
|
|
||||||
|
|
||||||
|
def show_effects_menu() -> str:
|
||||||
|
"""Generate effects menu text for display."""
|
||||||
|
registry = get_registry()
|
||||||
|
chain = _get_effect_chain()
|
||||||
|
|
||||||
|
lines = [
|
||||||
|
"\033[1;38;5;231m=== EFFECTS MENU ===\033[0m",
|
||||||
|
"",
|
||||||
|
"Effects:",
|
||||||
|
]
|
||||||
|
|
||||||
|
for name, plugin in registry.list_all().items():
|
||||||
|
status = "ON" if plugin.config.enabled else "OFF"
|
||||||
|
intensity = plugin.config.intensity
|
||||||
|
lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}")
|
||||||
|
|
||||||
|
if chain:
|
||||||
|
lines.append("")
|
||||||
|
lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}")
|
||||||
|
|
||||||
|
lines.append("")
|
||||||
|
lines.append("Controls:")
|
||||||
|
lines.append(" /effects <name> on|off")
|
||||||
|
lines.append(" /effects <name> intensity <0.0-1.0>")
|
||||||
|
lines.append(" /effects reorder name1,name2,...")
|
||||||
|
lines.append("")
|
||||||
|
|
||||||
|
return "\n".join(lines)
|
||||||
103
engine/effects/performance.py
Normal file
103
engine/effects/performance.py
Normal file
@@ -0,0 +1,103 @@
|
|||||||
|
from collections import deque
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EffectTiming:
|
||||||
|
name: str
|
||||||
|
duration_ms: float
|
||||||
|
buffer_chars_in: int
|
||||||
|
buffer_chars_out: int
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FrameTiming:
|
||||||
|
frame_number: int
|
||||||
|
total_ms: float
|
||||||
|
effects: list[EffectTiming]
|
||||||
|
|
||||||
|
|
||||||
|
class PerformanceMonitor:
|
||||||
|
"""Collects and stores performance metrics for effect pipeline."""
|
||||||
|
|
||||||
|
def __init__(self, max_frames: int = 60):
|
||||||
|
self._max_frames = max_frames
|
||||||
|
self._frames: deque[FrameTiming] = deque(maxlen=max_frames)
|
||||||
|
self._current_frame: list[EffectTiming] = []
|
||||||
|
|
||||||
|
def start_frame(self, frame_number: int) -> None:
|
||||||
|
self._current_frame = []
|
||||||
|
|
||||||
|
def record_effect(
|
||||||
|
self, name: str, duration_ms: float, chars_in: int, chars_out: int
|
||||||
|
) -> None:
|
||||||
|
self._current_frame.append(
|
||||||
|
EffectTiming(
|
||||||
|
name=name,
|
||||||
|
duration_ms=duration_ms,
|
||||||
|
buffer_chars_in=chars_in,
|
||||||
|
buffer_chars_out=chars_out,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def end_frame(self, frame_number: int, total_ms: float) -> None:
|
||||||
|
self._frames.append(
|
||||||
|
FrameTiming(
|
||||||
|
frame_number=frame_number,
|
||||||
|
total_ms=total_ms,
|
||||||
|
effects=self._current_frame,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_stats(self) -> dict:
|
||||||
|
if not self._frames:
|
||||||
|
return {"error": "No timing data available"}
|
||||||
|
|
||||||
|
total_times = [f.total_ms for f in self._frames]
|
||||||
|
avg_total = sum(total_times) / len(total_times)
|
||||||
|
min_total = min(total_times)
|
||||||
|
max_total = max(total_times)
|
||||||
|
|
||||||
|
effect_stats: dict[str, dict] = {}
|
||||||
|
for frame in self._frames:
|
||||||
|
for effect in frame.effects:
|
||||||
|
if effect.name not in effect_stats:
|
||||||
|
effect_stats[effect.name] = {"times": [], "total_chars": 0}
|
||||||
|
effect_stats[effect.name]["times"].append(effect.duration_ms)
|
||||||
|
effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out
|
||||||
|
|
||||||
|
for name, stats in effect_stats.items():
|
||||||
|
times = stats["times"]
|
||||||
|
stats["avg_ms"] = sum(times) / len(times)
|
||||||
|
stats["min_ms"] = min(times)
|
||||||
|
stats["max_ms"] = max(times)
|
||||||
|
del stats["times"]
|
||||||
|
|
||||||
|
return {
|
||||||
|
"frame_count": len(self._frames),
|
||||||
|
"pipeline": {
|
||||||
|
"avg_ms": avg_total,
|
||||||
|
"min_ms": min_total,
|
||||||
|
"max_ms": max_total,
|
||||||
|
},
|
||||||
|
"effects": effect_stats,
|
||||||
|
}
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
self._frames.clear()
|
||||||
|
self._current_frame = []
|
||||||
|
|
||||||
|
|
||||||
|
_monitor: PerformanceMonitor | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_monitor() -> PerformanceMonitor:
|
||||||
|
global _monitor
|
||||||
|
if _monitor is None:
|
||||||
|
_monitor = PerformanceMonitor()
|
||||||
|
return _monitor
|
||||||
|
|
||||||
|
|
||||||
|
def set_monitor(monitor: PerformanceMonitor) -> None:
|
||||||
|
global _monitor
|
||||||
|
_monitor = monitor
|
||||||
59
engine/effects/registry.py
Normal file
59
engine/effects/registry.py
Normal file
@@ -0,0 +1,59 @@
|
|||||||
|
from engine.effects.types import EffectConfig, EffectPlugin
|
||||||
|
|
||||||
|
|
||||||
|
class EffectRegistry:
|
||||||
|
def __init__(self):
|
||||||
|
self._plugins: dict[str, EffectPlugin] = {}
|
||||||
|
self._discovered: bool = False
|
||||||
|
|
||||||
|
def register(self, plugin: EffectPlugin) -> None:
|
||||||
|
self._plugins[plugin.name] = plugin
|
||||||
|
|
||||||
|
def get(self, name: str) -> EffectPlugin | None:
|
||||||
|
return self._plugins.get(name)
|
||||||
|
|
||||||
|
def list_all(self) -> dict[str, EffectPlugin]:
|
||||||
|
return self._plugins.copy()
|
||||||
|
|
||||||
|
def list_enabled(self) -> list[EffectPlugin]:
|
||||||
|
return [p for p in self._plugins.values() if p.config.enabled]
|
||||||
|
|
||||||
|
def enable(self, name: str) -> bool:
|
||||||
|
plugin = self._plugins.get(name)
|
||||||
|
if plugin:
|
||||||
|
plugin.config.enabled = True
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def disable(self, name: str) -> bool:
|
||||||
|
plugin = self._plugins.get(name)
|
||||||
|
if plugin:
|
||||||
|
plugin.config.enabled = False
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def configure(self, name: str, config: EffectConfig) -> bool:
|
||||||
|
plugin = self._plugins.get(name)
|
||||||
|
if plugin:
|
||||||
|
plugin.configure(config)
|
||||||
|
return True
|
||||||
|
return False
|
||||||
|
|
||||||
|
def is_enabled(self, name: str) -> bool:
|
||||||
|
plugin = self._plugins.get(name)
|
||||||
|
return plugin.config.enabled if plugin else False
|
||||||
|
|
||||||
|
|
||||||
|
_registry: EffectRegistry | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_registry() -> EffectRegistry:
|
||||||
|
global _registry
|
||||||
|
if _registry is None:
|
||||||
|
_registry = EffectRegistry()
|
||||||
|
return _registry
|
||||||
|
|
||||||
|
|
||||||
|
def set_registry(registry: EffectRegistry) -> None:
|
||||||
|
global _registry
|
||||||
|
_registry = registry
|
||||||
39
engine/effects/types.py
Normal file
39
engine/effects/types.py
Normal file
@@ -0,0 +1,39 @@
|
|||||||
|
from dataclasses import dataclass, field
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EffectContext:
|
||||||
|
terminal_width: int
|
||||||
|
terminal_height: int
|
||||||
|
scroll_cam: int
|
||||||
|
ticker_height: int
|
||||||
|
mic_excess: float
|
||||||
|
grad_offset: float
|
||||||
|
frame_number: int
|
||||||
|
has_message: bool
|
||||||
|
items: list = field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class EffectConfig:
|
||||||
|
enabled: bool = True
|
||||||
|
intensity: float = 1.0
|
||||||
|
params: dict[str, Any] = field(default_factory=dict)
|
||||||
|
|
||||||
|
|
||||||
|
class EffectPlugin:
|
||||||
|
name: str
|
||||||
|
config: EffectConfig
|
||||||
|
|
||||||
|
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def configure(self, config: EffectConfig) -> None:
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class PipelineConfig:
|
||||||
|
order: list[str] = field(default_factory=list)
|
||||||
|
effects: dict[str, EffectConfig] = field(default_factory=dict)
|
||||||
@@ -10,6 +10,8 @@ from datetime import datetime
|
|||||||
|
|
||||||
from engine import config
|
from engine import config
|
||||||
from engine.effects import (
|
from engine.effects import (
|
||||||
|
EffectChain,
|
||||||
|
EffectContext,
|
||||||
fade_line,
|
fade_line,
|
||||||
firehose_line,
|
firehose_line,
|
||||||
glitch_bar,
|
glitch_bar,
|
||||||
@@ -199,3 +201,60 @@ def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
|
|||||||
fline = firehose_line(items, w)
|
fline = firehose_line(items, w)
|
||||||
buf.append(f"\033[{scr_row};1H{fline}\033[K")
|
buf.append(f"\033[{scr_row};1H{fline}\033[K")
|
||||||
return buf
|
return buf
|
||||||
|
|
||||||
|
|
||||||
|
_effect_chain = None
|
||||||
|
|
||||||
|
|
||||||
|
def init_effects() -> None:
|
||||||
|
"""Initialize effect plugins and chain."""
|
||||||
|
global _effect_chain
|
||||||
|
from engine.effects import EffectChain, get_registry
|
||||||
|
|
||||||
|
registry = get_registry()
|
||||||
|
|
||||||
|
import effects_plugins
|
||||||
|
|
||||||
|
effects_plugins.discover_plugins()
|
||||||
|
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
chain.set_order(["noise", "fade", "glitch", "firehose"])
|
||||||
|
_effect_chain = chain
|
||||||
|
|
||||||
|
|
||||||
|
def process_effects(
|
||||||
|
buf: list[str],
|
||||||
|
w: int,
|
||||||
|
h: int,
|
||||||
|
scroll_cam: int,
|
||||||
|
ticker_h: int,
|
||||||
|
mic_excess: float,
|
||||||
|
grad_offset: float,
|
||||||
|
frame_number: int,
|
||||||
|
has_message: bool,
|
||||||
|
items: list,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Process buffer through effect chain."""
|
||||||
|
if _effect_chain is None:
|
||||||
|
init_effects()
|
||||||
|
|
||||||
|
ctx = EffectContext(
|
||||||
|
terminal_width=w,
|
||||||
|
terminal_height=h,
|
||||||
|
scroll_cam=scroll_cam,
|
||||||
|
ticker_height=ticker_h,
|
||||||
|
mic_excess=mic_excess,
|
||||||
|
grad_offset=grad_offset,
|
||||||
|
frame_number=frame_number,
|
||||||
|
has_message=has_message,
|
||||||
|
items=items,
|
||||||
|
)
|
||||||
|
return _effect_chain.process(buf, ctx)
|
||||||
|
|
||||||
|
|
||||||
|
def get_effect_chain() -> EffectChain | None:
|
||||||
|
"""Get the effect chain instance."""
|
||||||
|
global _effect_chain
|
||||||
|
if _effect_chain is None:
|
||||||
|
init_effects()
|
||||||
|
return _effect_chain
|
||||||
|
|||||||
@@ -4,33 +4,42 @@ Orchestrates viewport, frame timing, and layers.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import random
|
import random
|
||||||
import sys
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from engine import config
|
from engine import config
|
||||||
|
from engine.display import (
|
||||||
|
Display,
|
||||||
|
TerminalDisplay,
|
||||||
|
)
|
||||||
|
from engine.display import (
|
||||||
|
get_monitor as _get_display_monitor,
|
||||||
|
)
|
||||||
from engine.frame import calculate_scroll_step
|
from engine.frame import calculate_scroll_step
|
||||||
from engine.layers import (
|
from engine.layers import (
|
||||||
apply_glitch,
|
apply_glitch,
|
||||||
|
process_effects,
|
||||||
render_firehose,
|
render_firehose,
|
||||||
render_message_overlay,
|
render_message_overlay,
|
||||||
render_ticker_zone,
|
render_ticker_zone,
|
||||||
)
|
)
|
||||||
from engine.terminal import CLR
|
|
||||||
from engine.viewport import th, tw
|
from engine.viewport import th, tw
|
||||||
|
|
||||||
|
USE_EFFECT_CHAIN = True
|
||||||
|
|
||||||
def stream(items, ntfy_poller, mic_monitor):
|
|
||||||
|
def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
|
||||||
"""Main render loop with four layers: message, ticker, scroll motion, firehose."""
|
"""Main render loop with four layers: message, ticker, scroll motion, firehose."""
|
||||||
|
if display is None:
|
||||||
|
display = TerminalDisplay()
|
||||||
random.shuffle(items)
|
random.shuffle(items)
|
||||||
pool = list(items)
|
pool = list(items)
|
||||||
seen = set()
|
seen = set()
|
||||||
queued = 0
|
queued = 0
|
||||||
|
|
||||||
time.sleep(0.5)
|
time.sleep(0.5)
|
||||||
sys.stdout.write(CLR)
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
w, h = tw(), th()
|
w, h = tw(), th()
|
||||||
|
display.init(w, h)
|
||||||
|
display.clear()
|
||||||
fh = config.FIREHOSE_H if config.FIREHOSE else 0
|
fh = config.FIREHOSE_H if config.FIREHOSE else 0
|
||||||
ticker_view_h = h - fh
|
ticker_view_h = h - fh
|
||||||
GAP = 3
|
GAP = 3
|
||||||
@@ -42,6 +51,7 @@ def stream(items, ntfy_poller, mic_monitor):
|
|||||||
noise_cache = {}
|
noise_cache = {}
|
||||||
scroll_motion_accum = 0.0
|
scroll_motion_accum = 0.0
|
||||||
msg_cache = (None, None)
|
msg_cache = (None, None)
|
||||||
|
frame_number = 0
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if queued >= config.HEADLINE_LIMIT and not active:
|
if queued >= config.HEADLINE_LIMIT and not active:
|
||||||
@@ -93,19 +103,39 @@ def stream(items, ntfy_poller, mic_monitor):
|
|||||||
buf.extend(ticker_buf)
|
buf.extend(ticker_buf)
|
||||||
|
|
||||||
mic_excess = mic_monitor.excess
|
mic_excess = mic_monitor.excess
|
||||||
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
|
render_start = time.perf_counter()
|
||||||
|
|
||||||
|
if USE_EFFECT_CHAIN:
|
||||||
|
buf = process_effects(
|
||||||
|
buf,
|
||||||
|
w,
|
||||||
|
h,
|
||||||
|
scroll_cam,
|
||||||
|
ticker_h,
|
||||||
|
mic_excess,
|
||||||
|
grad_offset,
|
||||||
|
frame_number,
|
||||||
|
msg is not None,
|
||||||
|
items,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
|
||||||
firehose_buf = render_firehose(items, w, fh, h)
|
firehose_buf = render_firehose(items, w, fh, h)
|
||||||
buf.extend(firehose_buf)
|
buf.extend(firehose_buf)
|
||||||
|
|
||||||
if msg_overlay:
|
if msg_overlay:
|
||||||
buf.extend(msg_overlay)
|
buf.extend(msg_overlay)
|
||||||
|
|
||||||
sys.stdout.buffer.write("".join(buf).encode())
|
render_elapsed = (time.perf_counter() - render_start) * 1000
|
||||||
sys.stdout.flush()
|
monitor = _get_display_monitor()
|
||||||
|
if monitor:
|
||||||
|
chars = sum(len(line) for line in buf)
|
||||||
|
monitor.record_effect("render", render_elapsed, chars, chars)
|
||||||
|
|
||||||
|
display.show(buf)
|
||||||
|
|
||||||
elapsed = time.monotonic() - t0
|
elapsed = time.monotonic() - t0
|
||||||
time.sleep(max(0, config.FRAME_DT - elapsed))
|
time.sleep(max(0, config.FRAME_DT - elapsed))
|
||||||
|
frame_number += 1
|
||||||
|
|
||||||
sys.stdout.write(CLR)
|
display.cleanup()
|
||||||
sys.stdout.flush()
|
|
||||||
|
|||||||
@@ -83,3 +83,35 @@ class TestStreamControllerCleanup:
|
|||||||
controller.cleanup()
|
controller.cleanup()
|
||||||
|
|
||||||
mock_mic_instance.stop.assert_called_once()
|
mock_mic_instance.stop.assert_called_once()
|
||||||
|
|
||||||
|
|
||||||
|
class TestStreamControllerWarmup:
|
||||||
|
"""Tests for StreamController topic warmup."""
|
||||||
|
|
||||||
|
def test_warmup_topics_idempotent(self):
|
||||||
|
"""warmup_topics can be called multiple times."""
|
||||||
|
StreamController._topics_warmed = False
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen") as mock_urlopen:
|
||||||
|
StreamController.warmup_topics()
|
||||||
|
StreamController.warmup_topics()
|
||||||
|
|
||||||
|
assert mock_urlopen.call_count >= 3
|
||||||
|
|
||||||
|
def test_warmup_topics_sets_flag(self):
|
||||||
|
"""warmup_topics sets the warmed flag."""
|
||||||
|
StreamController._topics_warmed = False
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen"):
|
||||||
|
StreamController.warmup_topics()
|
||||||
|
|
||||||
|
assert StreamController._topics_warmed is True
|
||||||
|
|
||||||
|
def test_warmup_topics_skips_after_first(self):
|
||||||
|
"""warmup_topics skips after first call."""
|
||||||
|
StreamController._topics_warmed = True
|
||||||
|
|
||||||
|
with patch("urllib.request.urlopen") as mock_urlopen:
|
||||||
|
StreamController.warmup_topics()
|
||||||
|
|
||||||
|
mock_urlopen.assert_not_called()
|
||||||
|
|||||||
79
tests/test_display.py
Normal file
79
tests/test_display.py
Normal file
@@ -0,0 +1,79 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.display module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from engine.display import NullDisplay, TerminalDisplay
|
||||||
|
|
||||||
|
|
||||||
|
class TestDisplayProtocol:
|
||||||
|
"""Test that display backends satisfy the Display protocol."""
|
||||||
|
|
||||||
|
def test_terminal_display_is_display(self):
|
||||||
|
"""TerminalDisplay satisfies Display protocol."""
|
||||||
|
display = TerminalDisplay()
|
||||||
|
assert hasattr(display, "init")
|
||||||
|
assert hasattr(display, "show")
|
||||||
|
assert hasattr(display, "clear")
|
||||||
|
assert hasattr(display, "cleanup")
|
||||||
|
|
||||||
|
def test_null_display_is_display(self):
|
||||||
|
"""NullDisplay satisfies Display protocol."""
|
||||||
|
display = NullDisplay()
|
||||||
|
assert hasattr(display, "init")
|
||||||
|
assert hasattr(display, "show")
|
||||||
|
assert hasattr(display, "clear")
|
||||||
|
assert hasattr(display, "cleanup")
|
||||||
|
|
||||||
|
|
||||||
|
class TestTerminalDisplay:
|
||||||
|
"""Tests for TerminalDisplay class."""
|
||||||
|
|
||||||
|
def test_init_sets_dimensions(self):
|
||||||
|
"""init stores terminal dimensions."""
|
||||||
|
display = TerminalDisplay()
|
||||||
|
display.init(80, 24)
|
||||||
|
assert display.width == 80
|
||||||
|
assert display.height == 24
|
||||||
|
|
||||||
|
def test_show_returns_none(self):
|
||||||
|
"""show returns None after writing to stdout."""
|
||||||
|
display = TerminalDisplay()
|
||||||
|
display.width = 80
|
||||||
|
display.height = 24
|
||||||
|
display.show(["line1", "line2"])
|
||||||
|
|
||||||
|
def test_clear_does_not_error(self):
|
||||||
|
"""clear works without error."""
|
||||||
|
display = TerminalDisplay()
|
||||||
|
display.clear()
|
||||||
|
|
||||||
|
def test_cleanup_does_not_error(self):
|
||||||
|
"""cleanup works without error."""
|
||||||
|
display = TerminalDisplay()
|
||||||
|
display.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
class TestNullDisplay:
|
||||||
|
"""Tests for NullDisplay class."""
|
||||||
|
|
||||||
|
def test_init_stores_dimensions(self):
|
||||||
|
"""init stores dimensions."""
|
||||||
|
display = NullDisplay()
|
||||||
|
display.init(100, 50)
|
||||||
|
assert display.width == 100
|
||||||
|
assert display.height == 50
|
||||||
|
|
||||||
|
def test_show_does_nothing(self):
|
||||||
|
"""show discards buffer without error."""
|
||||||
|
display = NullDisplay()
|
||||||
|
display.show(["line1", "line2", "line3"])
|
||||||
|
|
||||||
|
def test_clear_does_nothing(self):
|
||||||
|
"""clear does nothing."""
|
||||||
|
display = NullDisplay()
|
||||||
|
display.clear()
|
||||||
|
|
||||||
|
def test_cleanup_does_nothing(self):
|
||||||
|
"""cleanup does nothing."""
|
||||||
|
display = NullDisplay()
|
||||||
|
display.cleanup()
|
||||||
427
tests/test_effects.py
Normal file
427
tests/test_effects.py
Normal file
@@ -0,0 +1,427 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.effects module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry
|
||||||
|
|
||||||
|
|
||||||
|
class MockEffect:
|
||||||
|
name = "mock"
|
||||||
|
config = EffectConfig(enabled=True, intensity=1.0)
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.processed = False
|
||||||
|
self.last_ctx = None
|
||||||
|
|
||||||
|
def process(self, buf, ctx):
|
||||||
|
self.processed = True
|
||||||
|
self.last_ctx = ctx
|
||||||
|
return buf + ["processed"]
|
||||||
|
|
||||||
|
def configure(self, config):
|
||||||
|
self.config = config
|
||||||
|
|
||||||
|
|
||||||
|
class TestEffectConfig:
|
||||||
|
def test_defaults(self):
|
||||||
|
cfg = EffectConfig()
|
||||||
|
assert cfg.enabled is True
|
||||||
|
assert cfg.intensity == 1.0
|
||||||
|
assert cfg.params == {}
|
||||||
|
|
||||||
|
def test_custom_values(self):
|
||||||
|
cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"})
|
||||||
|
assert cfg.enabled is False
|
||||||
|
assert cfg.intensity == 0.5
|
||||||
|
assert cfg.params == {"key": "value"}
|
||||||
|
|
||||||
|
|
||||||
|
class TestEffectContext:
|
||||||
|
def test_defaults(self):
|
||||||
|
ctx = EffectContext(
|
||||||
|
terminal_width=80,
|
||||||
|
terminal_height=24,
|
||||||
|
scroll_cam=0,
|
||||||
|
ticker_height=20,
|
||||||
|
mic_excess=0.0,
|
||||||
|
grad_offset=0.0,
|
||||||
|
frame_number=0,
|
||||||
|
has_message=False,
|
||||||
|
)
|
||||||
|
assert ctx.terminal_width == 80
|
||||||
|
assert ctx.terminal_height == 24
|
||||||
|
assert ctx.ticker_height == 20
|
||||||
|
assert ctx.items == []
|
||||||
|
|
||||||
|
def test_with_items(self):
|
||||||
|
items = [("Title", "Source", "12:00")]
|
||||||
|
ctx = EffectContext(
|
||||||
|
terminal_width=80,
|
||||||
|
terminal_height=24,
|
||||||
|
scroll_cam=0,
|
||||||
|
ticker_height=20,
|
||||||
|
mic_excess=0.0,
|
||||||
|
grad_offset=0.0,
|
||||||
|
frame_number=0,
|
||||||
|
has_message=False,
|
||||||
|
items=items,
|
||||||
|
)
|
||||||
|
assert ctx.items == items
|
||||||
|
|
||||||
|
|
||||||
|
class TestEffectRegistry:
|
||||||
|
def test_init_empty(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
assert len(registry.list_all()) == 0
|
||||||
|
|
||||||
|
def test_register(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
registry.register(effect)
|
||||||
|
assert "mock" in registry.list_all()
|
||||||
|
|
||||||
|
def test_get(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
registry.register(effect)
|
||||||
|
retrieved = registry.get("mock")
|
||||||
|
assert retrieved is effect
|
||||||
|
|
||||||
|
def test_get_nonexistent(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
assert registry.get("nonexistent") is None
|
||||||
|
|
||||||
|
def test_enable(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
effect.config.enabled = False
|
||||||
|
registry.register(effect)
|
||||||
|
registry.enable("mock")
|
||||||
|
assert effect.config.enabled is True
|
||||||
|
|
||||||
|
def test_disable(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
effect.config.enabled = True
|
||||||
|
registry.register(effect)
|
||||||
|
registry.disable("mock")
|
||||||
|
assert effect.config.enabled is False
|
||||||
|
|
||||||
|
def test_list_enabled(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
|
||||||
|
class EnabledEffect:
|
||||||
|
name = "enabled_effect"
|
||||||
|
config = EffectConfig(enabled=True, intensity=1.0)
|
||||||
|
|
||||||
|
class DisabledEffect:
|
||||||
|
name = "disabled_effect"
|
||||||
|
config = EffectConfig(enabled=False, intensity=1.0)
|
||||||
|
|
||||||
|
registry.register(EnabledEffect())
|
||||||
|
registry.register(DisabledEffect())
|
||||||
|
enabled = registry.list_enabled()
|
||||||
|
assert len(enabled) == 1
|
||||||
|
assert enabled[0].name == "enabled_effect"
|
||||||
|
|
||||||
|
def test_configure(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
registry.register(effect)
|
||||||
|
new_config = EffectConfig(enabled=False, intensity=0.3)
|
||||||
|
registry.configure("mock", new_config)
|
||||||
|
assert effect.config.enabled is False
|
||||||
|
assert effect.config.intensity == 0.3
|
||||||
|
|
||||||
|
def test_is_enabled(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
effect.config.enabled = True
|
||||||
|
registry.register(effect)
|
||||||
|
assert registry.is_enabled("mock") is True
|
||||||
|
assert registry.is_enabled("nonexistent") is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestEffectChain:
|
||||||
|
def test_init(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
assert chain.get_order() == []
|
||||||
|
|
||||||
|
def test_set_order(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect1 = MockEffect()
|
||||||
|
effect1.name = "effect1"
|
||||||
|
effect2 = MockEffect()
|
||||||
|
effect2.name = "effect2"
|
||||||
|
registry.register(effect1)
|
||||||
|
registry.register(effect2)
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
chain.set_order(["effect1", "effect2"])
|
||||||
|
assert chain.get_order() == ["effect1", "effect2"]
|
||||||
|
|
||||||
|
def test_add_effect(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
effect.name = "test_effect"
|
||||||
|
registry.register(effect)
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
chain.add_effect("test_effect")
|
||||||
|
assert "test_effect" in chain.get_order()
|
||||||
|
|
||||||
|
def test_add_effect_invalid(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
result = chain.add_effect("nonexistent")
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_remove_effect(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
effect.name = "test_effect"
|
||||||
|
registry.register(effect)
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
chain.set_order(["test_effect"])
|
||||||
|
chain.remove_effect("test_effect")
|
||||||
|
assert "test_effect" not in chain.get_order()
|
||||||
|
|
||||||
|
def test_reorder(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect1 = MockEffect()
|
||||||
|
effect1.name = "effect1"
|
||||||
|
effect2 = MockEffect()
|
||||||
|
effect2.name = "effect2"
|
||||||
|
effect3 = MockEffect()
|
||||||
|
effect3.name = "effect3"
|
||||||
|
registry.register(effect1)
|
||||||
|
registry.register(effect2)
|
||||||
|
registry.register(effect3)
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
chain.set_order(["effect1", "effect2", "effect3"])
|
||||||
|
result = chain.reorder(["effect3", "effect1", "effect2"])
|
||||||
|
assert result is True
|
||||||
|
assert chain.get_order() == ["effect3", "effect1", "effect2"]
|
||||||
|
|
||||||
|
def test_reorder_invalid(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
effect.name = "effect1"
|
||||||
|
registry.register(effect)
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
result = chain.reorder(["effect1", "nonexistent"])
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
def test_process_empty_chain(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
buf = ["line1", "line2"]
|
||||||
|
ctx = EffectContext(
|
||||||
|
terminal_width=80,
|
||||||
|
terminal_height=24,
|
||||||
|
scroll_cam=0,
|
||||||
|
ticker_height=20,
|
||||||
|
mic_excess=0.0,
|
||||||
|
grad_offset=0.0,
|
||||||
|
frame_number=0,
|
||||||
|
has_message=False,
|
||||||
|
)
|
||||||
|
result = chain.process(buf, ctx)
|
||||||
|
assert result == buf
|
||||||
|
|
||||||
|
def test_process_with_effects(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
effect.name = "test_effect"
|
||||||
|
registry.register(effect)
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
chain.set_order(["test_effect"])
|
||||||
|
buf = ["line1", "line2"]
|
||||||
|
ctx = EffectContext(
|
||||||
|
terminal_width=80,
|
||||||
|
terminal_height=24,
|
||||||
|
scroll_cam=0,
|
||||||
|
ticker_height=20,
|
||||||
|
mic_excess=0.0,
|
||||||
|
grad_offset=0.0,
|
||||||
|
frame_number=0,
|
||||||
|
has_message=False,
|
||||||
|
)
|
||||||
|
result = chain.process(buf, ctx)
|
||||||
|
assert result == ["line1", "line2", "processed"]
|
||||||
|
assert effect.processed is True
|
||||||
|
assert effect.last_ctx is ctx
|
||||||
|
|
||||||
|
def test_process_disabled_effect(self):
|
||||||
|
registry = EffectRegistry()
|
||||||
|
effect = MockEffect()
|
||||||
|
effect.name = "test_effect"
|
||||||
|
effect.config.enabled = False
|
||||||
|
registry.register(effect)
|
||||||
|
chain = EffectChain(registry)
|
||||||
|
chain.set_order(["test_effect"])
|
||||||
|
buf = ["line1"]
|
||||||
|
ctx = EffectContext(
|
||||||
|
terminal_width=80,
|
||||||
|
terminal_height=24,
|
||||||
|
scroll_cam=0,
|
||||||
|
ticker_height=20,
|
||||||
|
mic_excess=0.0,
|
||||||
|
grad_offset=0.0,
|
||||||
|
frame_number=0,
|
||||||
|
has_message=False,
|
||||||
|
)
|
||||||
|
result = chain.process(buf, ctx)
|
||||||
|
assert result == ["line1"]
|
||||||
|
assert effect.processed is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestEffectsExports:
|
||||||
|
def test_all_exports_are_importable(self):
|
||||||
|
"""Verify all exports in __all__ can actually be imported."""
|
||||||
|
import engine.effects as effects_module
|
||||||
|
|
||||||
|
for name in effects_module.__all__:
|
||||||
|
getattr(effects_module, name)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPerformanceMonitor:
|
||||||
|
def test_empty_stats(self):
|
||||||
|
from engine.effects.performance import PerformanceMonitor
|
||||||
|
|
||||||
|
monitor = PerformanceMonitor()
|
||||||
|
stats = monitor.get_stats()
|
||||||
|
assert "error" in stats
|
||||||
|
|
||||||
|
def test_record_and_retrieve(self):
|
||||||
|
from engine.effects.performance import PerformanceMonitor
|
||||||
|
|
||||||
|
monitor = PerformanceMonitor()
|
||||||
|
monitor.start_frame(1)
|
||||||
|
monitor.record_effect("test_effect", 1.5, 100, 150)
|
||||||
|
monitor.end_frame(1, 2.0)
|
||||||
|
|
||||||
|
stats = monitor.get_stats()
|
||||||
|
assert "error" not in stats
|
||||||
|
assert stats["frame_count"] == 1
|
||||||
|
assert "test_effect" in stats["effects"]
|
||||||
|
|
||||||
|
def test_multiple_frames(self):
|
||||||
|
from engine.effects.performance import PerformanceMonitor
|
||||||
|
|
||||||
|
monitor = PerformanceMonitor(max_frames=3)
|
||||||
|
for i in range(5):
|
||||||
|
monitor.start_frame(i)
|
||||||
|
monitor.record_effect("effect1", 1.0, 100, 100)
|
||||||
|
monitor.record_effect("effect2", 0.5, 100, 100)
|
||||||
|
monitor.end_frame(i, 1.5)
|
||||||
|
|
||||||
|
stats = monitor.get_stats()
|
||||||
|
assert stats["frame_count"] == 3
|
||||||
|
assert "effect1" in stats["effects"]
|
||||||
|
assert "effect2" in stats["effects"]
|
||||||
|
|
||||||
|
def test_reset(self):
|
||||||
|
from engine.effects.performance import PerformanceMonitor
|
||||||
|
|
||||||
|
monitor = PerformanceMonitor()
|
||||||
|
monitor.start_frame(1)
|
||||||
|
monitor.record_effect("test", 1.0, 100, 100)
|
||||||
|
monitor.end_frame(1, 1.0)
|
||||||
|
|
||||||
|
monitor.reset()
|
||||||
|
stats = monitor.get_stats()
|
||||||
|
assert "error" in stats
|
||||||
|
|
||||||
|
|
||||||
|
class TestEffectPipelinePerformance:
|
||||||
|
def test_pipeline_stays_within_frame_budget(self):
|
||||||
|
"""Verify effect pipeline completes within frame budget (33ms for 30fps)."""
|
||||||
|
from engine.effects import (
|
||||||
|
EffectChain,
|
||||||
|
EffectConfig,
|
||||||
|
EffectContext,
|
||||||
|
EffectRegistry,
|
||||||
|
)
|
||||||
|
|
||||||
|
class DummyEffect:
|
||||||
|
name = "dummy"
|
||||||
|
config = EffectConfig(enabled=True, intensity=1.0)
|
||||||
|
|
||||||
|
def process(self, buf, ctx):
|
||||||
|
return [line * 2 for line in buf]
|
||||||
|
|
||||||
|
registry = EffectRegistry()
|
||||||
|
registry.register(DummyEffect())
|
||||||
|
|
||||||
|
from engine.effects.performance import PerformanceMonitor
|
||||||
|
|
||||||
|
monitor = PerformanceMonitor(max_frames=10)
|
||||||
|
chain = EffectChain(registry, monitor)
|
||||||
|
chain.set_order(["dummy"])
|
||||||
|
|
||||||
|
buf = ["x" * 80] * 20
|
||||||
|
|
||||||
|
for i in range(10):
|
||||||
|
ctx = EffectContext(
|
||||||
|
terminal_width=80,
|
||||||
|
terminal_height=24,
|
||||||
|
scroll_cam=0,
|
||||||
|
ticker_height=20,
|
||||||
|
mic_excess=0.0,
|
||||||
|
grad_offset=0.0,
|
||||||
|
frame_number=i,
|
||||||
|
has_message=False,
|
||||||
|
)
|
||||||
|
chain.process(buf, ctx)
|
||||||
|
|
||||||
|
stats = monitor.get_stats()
|
||||||
|
assert "error" not in stats
|
||||||
|
assert stats["pipeline"]["max_ms"] < 33.0
|
||||||
|
|
||||||
|
def test_individual_effects_performance(self):
|
||||||
|
"""Verify individual effects don't exceed 10ms per frame."""
|
||||||
|
from engine.effects import (
|
||||||
|
EffectChain,
|
||||||
|
EffectConfig,
|
||||||
|
EffectContext,
|
||||||
|
EffectRegistry,
|
||||||
|
)
|
||||||
|
|
||||||
|
class SlowEffect:
|
||||||
|
name = "slow"
|
||||||
|
config = EffectConfig(enabled=True, intensity=1.0)
|
||||||
|
|
||||||
|
def process(self, buf, ctx):
|
||||||
|
result = []
|
||||||
|
for line in buf:
|
||||||
|
result.append(line)
|
||||||
|
result.append(line + line)
|
||||||
|
return result
|
||||||
|
|
||||||
|
registry = EffectRegistry()
|
||||||
|
registry.register(SlowEffect())
|
||||||
|
|
||||||
|
from engine.effects.performance import PerformanceMonitor
|
||||||
|
|
||||||
|
monitor = PerformanceMonitor(max_frames=5)
|
||||||
|
chain = EffectChain(registry, monitor)
|
||||||
|
chain.set_order(["slow"])
|
||||||
|
|
||||||
|
buf = ["x" * 80] * 10
|
||||||
|
|
||||||
|
for i in range(5):
|
||||||
|
ctx = EffectContext(
|
||||||
|
terminal_width=80,
|
||||||
|
terminal_height=24,
|
||||||
|
scroll_cam=0,
|
||||||
|
ticker_height=20,
|
||||||
|
mic_excess=0.0,
|
||||||
|
grad_offset=0.0,
|
||||||
|
frame_number=i,
|
||||||
|
has_message=False,
|
||||||
|
)
|
||||||
|
chain.process(buf, ctx)
|
||||||
|
|
||||||
|
stats = monitor.get_stats()
|
||||||
|
assert "error" not in stats
|
||||||
|
assert stats["effects"]["slow"]["max_ms"] < 10.0
|
||||||
117
tests/test_effects_controller.py
Normal file
117
tests/test_effects_controller.py
Normal file
@@ -0,0 +1,117 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.effects.controller module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from engine.effects.controller import (
|
||||||
|
handle_effects_command,
|
||||||
|
set_effect_chain_ref,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestHandleEffectsCommand:
|
||||||
|
"""Tests for handle_effects_command function."""
|
||||||
|
|
||||||
|
def test_list_effects(self):
|
||||||
|
"""list command returns formatted effects list."""
|
||||||
|
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||||
|
mock_plugin = MagicMock()
|
||||||
|
mock_plugin.config.enabled = True
|
||||||
|
mock_plugin.config.intensity = 0.5
|
||||||
|
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||||
|
|
||||||
|
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
|
||||||
|
mock_chain.return_value.get_order.return_value = ["noise"]
|
||||||
|
|
||||||
|
result = handle_effects_command("/effects list")
|
||||||
|
|
||||||
|
assert "noise: ON" in result
|
||||||
|
assert "intensity=0.5" in result
|
||||||
|
|
||||||
|
def test_enable_effect(self):
|
||||||
|
"""enable command calls registry.enable."""
|
||||||
|
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||||
|
mock_plugin = MagicMock()
|
||||||
|
mock_registry.return_value.get.return_value = mock_plugin
|
||||||
|
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||||
|
|
||||||
|
result = handle_effects_command("/effects noise on")
|
||||||
|
|
||||||
|
assert "Enabled: noise" in result
|
||||||
|
mock_registry.return_value.enable.assert_called_once_with("noise")
|
||||||
|
|
||||||
|
def test_disable_effect(self):
|
||||||
|
"""disable command calls registry.disable."""
|
||||||
|
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||||
|
mock_plugin = MagicMock()
|
||||||
|
mock_registry.return_value.get.return_value = mock_plugin
|
||||||
|
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||||
|
|
||||||
|
result = handle_effects_command("/effects noise off")
|
||||||
|
|
||||||
|
assert "Disabled: noise" in result
|
||||||
|
mock_registry.return_value.disable.assert_called_once_with("noise")
|
||||||
|
|
||||||
|
def test_set_intensity(self):
|
||||||
|
"""intensity command sets plugin intensity."""
|
||||||
|
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||||
|
mock_plugin = MagicMock()
|
||||||
|
mock_plugin.config.intensity = 0.5
|
||||||
|
mock_registry.return_value.get.return_value = mock_plugin
|
||||||
|
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||||
|
|
||||||
|
result = handle_effects_command("/effects noise intensity 0.8")
|
||||||
|
|
||||||
|
assert "intensity to 0.8" in result
|
||||||
|
assert mock_plugin.config.intensity == 0.8
|
||||||
|
|
||||||
|
def test_invalid_intensity_range(self):
|
||||||
|
"""intensity outside 0.0-1.0 returns error."""
|
||||||
|
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||||
|
mock_plugin = MagicMock()
|
||||||
|
mock_registry.return_value.get.return_value = mock_plugin
|
||||||
|
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
|
||||||
|
|
||||||
|
result = handle_effects_command("/effects noise intensity 1.5")
|
||||||
|
|
||||||
|
assert "between 0.0 and 1.0" in result
|
||||||
|
|
||||||
|
def test_reorder_pipeline(self):
|
||||||
|
"""reorder command calls chain.reorder."""
|
||||||
|
with patch("engine.effects.controller.get_registry") as mock_registry:
|
||||||
|
mock_registry.return_value.list_all.return_value = {}
|
||||||
|
|
||||||
|
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
|
||||||
|
mock_chain_instance = MagicMock()
|
||||||
|
mock_chain_instance.reorder.return_value = True
|
||||||
|
mock_chain.return_value = mock_chain_instance
|
||||||
|
|
||||||
|
result = handle_effects_command("/effects reorder noise,fade")
|
||||||
|
|
||||||
|
assert "Reordered pipeline" in result
|
||||||
|
mock_chain_instance.reorder.assert_called_once_with(["noise", "fade"])
|
||||||
|
|
||||||
|
def test_unknown_command(self):
|
||||||
|
"""unknown command returns error."""
|
||||||
|
result = handle_effects_command("/unknown")
|
||||||
|
assert "Unknown command" in result
|
||||||
|
|
||||||
|
def test_non_effects_command(self):
|
||||||
|
"""non-effects command returns error."""
|
||||||
|
result = handle_effects_command("not a command")
|
||||||
|
assert "Unknown command" in result
|
||||||
|
|
||||||
|
|
||||||
|
class TestSetEffectChainRef:
|
||||||
|
"""Tests for set_effect_chain_ref function."""
|
||||||
|
|
||||||
|
def test_sets_global_ref(self):
|
||||||
|
"""set_effect_chain_ref updates global reference."""
|
||||||
|
mock_chain = MagicMock()
|
||||||
|
set_effect_chain_ref(mock_chain)
|
||||||
|
|
||||||
|
from engine.effects.controller import _get_effect_chain
|
||||||
|
|
||||||
|
result = _get_effect_chain()
|
||||||
|
assert result == mock_chain
|
||||||
Reference in New Issue
Block a user