From ba050ada24b93b80334a652dd9fbb29e4de3d853 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 18:43:04 -0700 Subject: [PATCH] feat(cmdline): C&C with separate topics and rich output --- engine/app.py | 153 ++++++++++++++++++++++++--------- engine/config.py | 6 ++ engine/controller.py | 68 ++++++++++++++- mise.toml | 25 +++++- tests/test_ntfy_integration.py | 127 +++++++++++++++++++++++++++ 5 files changed, 334 insertions(+), 45 deletions(-) create mode 100644 tests/test_ntfy_integration.py diff --git a/engine/app.py b/engine/app.py index 3bd8952..3770bd3 100644 --- a/engine/app.py +++ b/engine/app.py @@ -11,10 +11,8 @@ import time import tty from engine import config, render +from engine.controller import StreamController from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache -from engine.mic import MicMonitor -from engine.ntfy import NtfyPoller -from engine.scroll import stream from engine.terminal import ( CLR, CURSOR_OFF, @@ -29,30 +27,6 @@ from engine.terminal import ( slow_print, tw, ) -from engine.websocket_display import WebSocketDisplay - - -def _get_display(): - """Get the appropriate display(s) based on config.""" - from engine.display import MultiDisplay, TerminalDisplay - - displays = [] - - if config.DISPLAY in ("terminal", "both"): - displays.append(TerminalDisplay()) - - if config.DISPLAY in ("websocket", "both") or config.WEBSOCKET: - ws = WebSocketDisplay(host="0.0.0.0", port=config.WEBSOCKET_PORT) - ws.start_server() - ws.start_http_server() - displays.append(ws) - - if not displays: - return None - if len(displays) == 1: - return displays[0] - return MultiDisplay(displays) - TITLE = [ " ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗", @@ -273,6 +247,110 @@ def pick_font_face(): print() +def pick_effects_config(): + """Interactive picker for configuring effects pipeline.""" + import effects_plugins + from engine.effects import get_effect_chain, get_registry + + effects_plugins.discover_plugins() + + registry = get_registry() + chain = get_effect_chain() + chain.set_order(["noise", "fade", "glitch", "firehose"]) + + effects = list(registry.list_all().values()) + if not effects: + return + + selected = 0 + editing_intensity = False + intensity_value = 1.0 + + def _draw_effects_picker(): + w = tw() + print(CLR, end="") + print("\033[1;1H", end="") + print(" \033[1;38;5;231mEFFECTS CONFIG\033[0m") + print(f" \033[2;38;5;37m{'─' * (w - 4)}\033[0m") + print() + + for i, effect in enumerate(effects): + prefix = " > " if i == selected else " " + marker = "[*]" if effect.config.enabled else "[ ]" + if editing_intensity and i == selected: + print( + f"{prefix}{marker} \033[1;38;5;82m{effect.name}\033[0m: intensity={intensity_value:.2f} (use +/- to adjust, Enter to confirm)" + ) + else: + print( + f"{prefix}{marker} {effect.name}: intensity={effect.config.intensity:.2f}" + ) + + print() + print(f" \033[2;38;5;37m{'─' * (w - 4)}\033[0m") + print( + " \033[38;5;245mControls: space=toggle on/off | +/-=adjust intensity | arrows=move | Enter=next effect | q=done\033[0m" + ) + + def _read_effects_key(): + ch = sys.stdin.read(1) + if ch == "\x03": + return "interrupt" + if ch in ("\r", "\n"): + return "enter" + if ch == " ": + return "toggle" + if ch == "q": + return "quit" + if ch == "+" or ch == "=": + return "up" + if ch == "-" or ch == "_": + return "down" + if ch == "\x1b": + c1 = sys.stdin.read(1) + if c1 != "[": + return None + c2 = sys.stdin.read(1) + if c2 == "A": + return "up" + if c2 == "B": + return "down" + return None + return None + + if not sys.stdin.isatty(): + return + + fd = sys.stdin.fileno() + old_settings = termios.tcgetattr(fd) + try: + tty.setcbreak(fd) + while True: + _draw_effects_picker() + key = _read_effects_key() + + if key == "quit" or key == "enter": + break + elif key == "up" and editing_intensity: + intensity_value = min(1.0, intensity_value + 0.1) + effects[selected].config.intensity = intensity_value + elif key == "down" and editing_intensity: + intensity_value = max(0.0, intensity_value - 0.1) + effects[selected].config.intensity = intensity_value + elif key == "up": + selected = max(0, selected - 1) + intensity_value = effects[selected].config.intensity + elif key == "down": + selected = min(len(effects) - 1, selected + 1) + intensity_value = effects[selected].config.intensity + elif key == "toggle": + effects[selected].config.enabled = not effects[selected].config.enabled + elif key == "interrupt": + raise KeyboardInterrupt + finally: + termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) + + def main(): atexit.register(lambda: print(CURSOR_ON, end="", flush=True)) @@ -283,10 +361,13 @@ def main(): signal.signal(signal.SIGINT, handle_sigint) + StreamController.warmup_topics() + w = tw() print(CLR, end="") print(CURSOR_OFF, end="") pick_font_face() + pick_effects_config() w = tw() print() time.sleep(0.4) @@ -338,9 +419,10 @@ def main(): sys.exit(1) print() - mic = MicMonitor(threshold_db=config.MIC_THRESHOLD_DB) - mic_ok = mic.start() - if mic.available: + controller = StreamController() + mic_ok, ntfy_ok = controller.initialize_sources() + + if controller.mic and controller.mic.available: boot_ln( "Microphone", "ACTIVE" @@ -349,12 +431,6 @@ def main(): bool(mic_ok), ) - ntfy = NtfyPoller( - config.NTFY_TOPIC, - reconnect_delay=config.NTFY_RECONNECT_DELAY, - display_secs=config.MESSAGE_DISPLAY_SECS, - ) - ntfy_ok = ntfy.start() boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok) if config.FIREHOSE: @@ -367,10 +443,7 @@ def main(): print() time.sleep(0.4) - display = _get_display() - stream(items, ntfy, mic, display) - if display: - display.cleanup() + controller.run(items) print() print(f" {W_GHOST}{'─' * (tw() - 4)}{RST}") diff --git a/engine/config.py b/engine/config.py index 795367a..efce6ca 100644 --- a/engine/config.py +++ b/engine/config.py @@ -105,6 +105,8 @@ class Config: firehose: bool = False ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json" + ntfy_cc_cmd_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json" + ntfy_cc_resp_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json" ntfy_reconnect_delay: int = 5 message_display_secs: int = 30 @@ -152,6 +154,8 @@ class Config: mode="poetry" if "--poetry" in argv or "-p" in argv else "news", firehose="--firehose" in argv, ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json", + ntfy_cc_cmd_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json", + ntfy_cc_resp_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json", ntfy_reconnect_delay=5, message_display_secs=30, font_dir=font_dir, @@ -200,6 +204,8 @@ FIREHOSE = "--firehose" in sys.argv # ─── NTFY MESSAGE QUEUE ────────────────────────────────── NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json" +NTFY_CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json" +NTFY_CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json" NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen diff --git a/engine/controller.py b/engine/controller.py index 5b07e67..3cbb71e 100644 --- a/engine/controller.py +++ b/engine/controller.py @@ -3,6 +3,7 @@ Stream controller - manages input sources and orchestrates the render stream. """ from engine.config import Config, get_config +from engine.effects.controller import handle_effects_command from engine.eventbus import EventBus from engine.events import EventType, StreamEvent from engine.mic import MicMonitor @@ -24,11 +25,45 @@ def _get_display(config: Config): class StreamController: """Controls the stream lifecycle - initializes sources and runs the stream.""" + _topics_warmed = False + def __init__(self, config: Config | None = None, event_bus: EventBus | None = None): self.config = config or get_config() self.event_bus = event_bus self.mic: MicMonitor | None = None self.ntfy: NtfyPoller | None = None + self.ntfy_cc: NtfyPoller | None = None + + @classmethod + def warmup_topics(cls) -> None: + """Warm up ntfy topics lazily (creates them if they don't exist).""" + if cls._topics_warmed: + return + + import urllib.request + + topics = [ + "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd", + "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp", + "https://ntfy.sh/klubhaus_terminal_mainline", + ] + + for topic in topics: + try: + req = urllib.request.Request( + topic, + data=b"init", + headers={ + "User-Agent": "mainline/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + urllib.request.urlopen(req, timeout=5) + except Exception: + pass + + cls._topics_warmed = True def initialize_sources(self) -> tuple[bool, bool]: """Initialize microphone and ntfy sources. @@ -46,7 +81,38 @@ class StreamController: ) ntfy_ok = self.ntfy.start() - return bool(mic_ok), ntfy_ok + self.ntfy_cc = NtfyPoller( + self.config.ntfy_cc_cmd_topic, + reconnect_delay=self.config.ntfy_reconnect_delay, + display_secs=5, + ) + self.ntfy_cc.subscribe(self._handle_cc_message) + ntfy_cc_ok = self.ntfy_cc.start() + + return bool(mic_ok), ntfy_ok and ntfy_cc_ok + + def _handle_cc_message(self, event) -> None: + """Handle incoming C&C message - like a serial port control interface.""" + import urllib.request + + cmd = event.body.strip() if hasattr(event, "body") else str(event).strip() + if not cmd.startswith("/"): + return + + response = handle_effects_command(cmd) + + topic_url = self.config.ntfy_cc_resp_topic.replace("/json", "") + data = response.encode("utf-8") + req = urllib.request.Request( + topic_url, + data=data, + headers={"User-Agent": "mainline/0.1", "Content-Type": "text/plain"}, + method="POST", + ) + try: + urllib.request.urlopen(req, timeout=5) + except Exception: + pass def run(self, items: list) -> None: """Run the stream with initialized sources.""" diff --git a/mise.toml b/mise.toml index ea0f6ed..76d1fc6 100644 --- a/mise.toml +++ b/mise.toml @@ -31,24 +31,41 @@ run-websocket = { run = "uv run mainline.py --websocket", depends = ["sync-all"] run-both = { run = "uv run mainline.py --display both", depends = ["sync-all"] } run-client = { run = "uv run mainline.py --display both & WEBSOCKET_PID=$! && sleep 2 && case $(uname -s) in Darwin) open http://localhost:8766 ;; Linux) xdg-open http://localhost:8766 ;; CYGWIN*) cmd /c start http://localhost:8766 ;; *) echo 'Unknown platform' ;; esac && wait $WEBSOCKET_PID", depends = ["sync-all"] } +daemon = "nohup uv run mainline.py > /dev/null 2>&1 &" +daemon-stop = "pkill -f 'uv run mainline.py' 2>/dev/null || true" +daemon-restart = "mise run daemon-stop && sleep 2 && mise run daemon" + +# ===================== +# Command & Control +# ===================== + +cmd = "uv run cmdline.py" +cmd-stats = "bash -c 'uv run cmdline.py -w \"/effects stats\"';:" + +# Initialize ntfy topics (warm up before first use - also done automatically by mainline) +topics-init = "curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_resp > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline > /dev/null" + # ===================== # Environment # ===================== sync = "uv sync" sync-all = "uv sync --all-extras" -install = "uv sync" -install-dev = "uv sync --group dev" +install = "mise run sync" +install-dev = "mise run sync && uv sync --group dev" bootstrap = "uv sync && uv run mainline.py --help" -clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache" +clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out" + +# Aggressive cleanup - removes all generated files, caches, and venv +clobber = "git clean -fdx && rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out" # ===================== # CI/CD # ===================== -ci = "uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml" +ci = "mise run topics-init && uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml" ci-lint = "uv run ruff check engine/ mainline.py" # ===================== diff --git a/tests/test_ntfy_integration.py b/tests/test_ntfy_integration.py new file mode 100644 index 0000000..d21acab --- /dev/null +++ b/tests/test_ntfy_integration.py @@ -0,0 +1,127 @@ +""" +Integration tests for ntfy topics. +""" + +import json +import time +import urllib.request + + +class TestNtfyTopics: + def test_cc_cmd_topic_exists_and_writable(self): + """Verify C&C CMD topic exists and accepts messages.""" + from engine.config import NTFY_CC_CMD_TOPIC + + topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "") + test_message = f"test_{int(time.time())}" + + req = urllib.request.Request( + topic_url, + data=test_message.encode("utf-8"), + headers={ + "User-Agent": "mainline-test/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + assert resp.status == 200 + except Exception as e: + raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e + + def test_cc_resp_topic_exists_and_writable(self): + """Verify C&C RESP topic exists and accepts messages.""" + from engine.config import NTFY_CC_RESP_TOPIC + + topic_url = NTFY_CC_RESP_TOPIC.replace("/json", "") + test_message = f"test_{int(time.time())}" + + req = urllib.request.Request( + topic_url, + data=test_message.encode("utf-8"), + headers={ + "User-Agent": "mainline-test/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + assert resp.status == 200 + except Exception as e: + raise AssertionError(f"Failed to write to C&C RESP topic: {e}") from e + + def test_message_topic_exists_and_writable(self): + """Verify message topic exists and accepts messages.""" + from engine.config import NTFY_TOPIC + + topic_url = NTFY_TOPIC.replace("/json", "") + test_message = f"test_{int(time.time())}" + + req = urllib.request.Request( + topic_url, + data=test_message.encode("utf-8"), + headers={ + "User-Agent": "mainline-test/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + assert resp.status == 200 + except Exception as e: + raise AssertionError(f"Failed to write to message topic: {e}") from e + + def test_cc_cmd_topic_readable(self): + """Verify we can read messages from C&C CMD topic.""" + from engine.config import NTFY_CC_CMD_TOPIC + + test_message = f"integration_test_{int(time.time())}" + topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "") + + req = urllib.request.Request( + topic_url, + data=test_message.encode("utf-8"), + headers={ + "User-Agent": "mainline-test/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + urllib.request.urlopen(req, timeout=10) + except Exception as e: + raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e + + time.sleep(1) + + poll_url = f"{NTFY_CC_CMD_TOPIC}?poll=1&limit=1" + req = urllib.request.Request( + poll_url, + headers={"User-Agent": "mainline-test/0.1"}, + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + body = resp.read().decode("utf-8") + if body.strip(): + data = json.loads(body.split("\n")[0]) + assert isinstance(data, dict) + except Exception as e: + raise AssertionError(f"Failed to read from C&C CMD topic: {e}") from e + + def test_topics_are_different(self): + """Verify C&C CMD/RESP and message topics are different.""" + from engine.config import NTFY_CC_CMD_TOPIC, NTFY_CC_RESP_TOPIC, NTFY_TOPIC + + assert NTFY_CC_CMD_TOPIC != NTFY_TOPIC + assert NTFY_CC_RESP_TOPIC != NTFY_TOPIC + assert NTFY_CC_CMD_TOPIC != NTFY_CC_RESP_TOPIC + assert "_cc_cmd" in NTFY_CC_CMD_TOPIC + assert "_cc_resp" in NTFY_CC_RESP_TOPIC