From 648326cd373171dafcc25e92a65568af66dac502 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Sun, 15 Mar 2026 17:46:40 -0700 Subject: [PATCH] feat(cmdline): use C&C topic with response polling - Rewrite cmdline to send commands via ntfy and wait for response - Add NtfyResponsePoller class for serial-port-like interface - Add integration tests for ntfy topics (test read/write) - Add NTFY_CC_TOPIC export to config --- cmdline.py | 156 ++++++++++++++++----------------- engine/config.py | 1 + tests/test_ntfy_integration.py | 101 +++++++++++++++++++++ 3 files changed, 179 insertions(+), 79 deletions(-) create mode 100644 tests/test_ntfy_integration.py diff --git a/cmdline.py b/cmdline.py index c6b0240..3ca0794 100644 --- a/cmdline.py +++ b/cmdline.py @@ -5,24 +5,30 @@ Command-line utility for interacting with mainline via ntfy. Usage: python cmdline.py # Interactive TUI mode python cmdline.py --help # Show help - python cmdline.py /effects list # Send single command - python cmdline.py /effects stats # Get performance stats + python cmdline.py /effects list # Send single command via ntfy + python cmdline.py /effects stats # Get performance stats via ntfy + python cmdline.py -w /effects stats # Watch mode (polls for stats) The TUI mode provides: - Arrow keys to navigate command history - Tab completion for commands - Auto-refresh for performance stats + +C&C works like a serial port: + 1. Send command to ntfy_cc_topic + 2. Mainline receives, processes, responds to same topic + 3. Cmdline polls for response """ import argparse import json import sys import time +import threading import urllib.request from pathlib import Path from engine import config -from engine.effects.controller import handle_effects_command from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST try: @@ -33,64 +39,70 @@ except AttributeError: TOPIC = CC_TOPIC -def send_command(cmd: str) -> str: - """Send a command to the ntfy topic and return the response.""" - if not cmd.startswith("/"): - return "Commands must start with /" +class NtfyResponsePoller: + """Polls ntfy for command responses.""" - url = TOPIC.replace("/json", "") - data = cmd.encode("utf-8") + def __init__(self, topic_url: str, timeout: float = 10.0): + self.topic_url = topic_url + self.timeout = timeout + self._last_id = None + self._lock = threading.Lock() - req = urllib.request.Request( - url, - data=data, - headers={ - "User-Agent": "mainline-cmdline/0.1", - "Content-Type": "text/plain", - }, - method="POST", - ) + def _build_url(self) -> str: + from urllib.parse import parse_qs, urlencode, urlparse, urlunparse - try: - with urllib.request.urlopen(req, timeout=10) as resp: - return f"Command sent: {cmd}\n(Response would appear on mainline display)" - except Exception as e: - return f"Error sending command: {e}" + parsed = urlparse(self.topic_url) + params = parse_qs(parsed.query, keep_blank_values=True) + params["since"] = [self._last_id if self._last_id else "20s"] + new_query = urlencode({k: v[0] for k, v in params.items()}) + return urlunparse(parsed._replace(query=new_query)) + def send_and_wait(self, cmd: str) -> str: + """Send command and wait for response.""" + url = self.topic_url.replace("/json", "") + data = cmd.encode("utf-8") + + req = urllib.request.Request( + url, + data=data, + headers={ + "User-Agent": "mainline-cmdline/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) -def local_command(cmd: str) -> str: - """Handle command locally without sending to ntfy.""" - if cmd.startswith("/effects"): try: - import effects_plugins - from engine.effects.registry import get_registry - from engine.effects.chain import EffectChain - from engine.effects.controller import set_effect_chain_ref - - effects_plugins.discover_plugins() - registry = get_registry() - chain = EffectChain(registry) - chain.set_order(["noise", "fade", "glitch", "firehose"]) - - set_effect_chain_ref(chain) - - from engine.effects.controller import handle_effects_command - - return handle_effects_command(cmd) - except ImportError as e: - return f"Error: {e}\n(Try: pip install Pillow)" + urllib.request.urlopen(req, timeout=5) except Exception as e: - return f"Error: {type(e).__name__}: {e}" - if cmd == "/help": - return AVAILABLE_COMMANDS - if cmd == "/quit" or cmd == "/exit": - return "GOODBYE" - return f"Unknown command: {cmd}" - if cmd == "/help": - return AVAILABLE_COMMANDS - if cmd == "/quit" or cmd == "/exit": - return "GOODBYE" - return f"Unknown command: {cmd}" + return f"Error sending command: {e}" + + return self._wait_for_response() + + def _wait_for_response(self) -> str: + """Poll for response message.""" + start = time.time() + while time.time() - start < self.timeout: + try: + url = self._build_url() + req = urllib.request.Request( + url, headers={"User-Agent": "mainline-cmdline/0.1"} + ) + with urllib.request.urlopen(req, timeout=10) as resp: + for line in resp: + try: + data = json.loads(line.decode("utf-8", errors="replace")) + except json.JSONDecodeError: + continue + if data.get("event") == "message": + self._last_id = data.get("id") + msg = data.get("message", "") + if msg: + return msg + except Exception: + pass + time.sleep(0.5) + return "Timeout waiting for response" AVAILABLE_COMMANDS = """Available commands: @@ -102,9 +114,6 @@ AVAILABLE_COMMANDS = """Available commands: /effects stats - Show performance statistics /help - Show this help /quit - Exit - -Local commands (don't require running mainline): - /effects * - All /effects commands work locally """ @@ -124,9 +133,7 @@ def interactive_mode(): import readline print_header() - - history = [] - history_index = -1 + poller = NtfyResponsePoller(TOPIC) print(f"{G_DIM}Type /help for available commands, /quit to exit{RST}") print() @@ -142,20 +149,20 @@ def interactive_mode(): continue if cmd.startswith("/"): - history.append(cmd) - history_index = len(history) - if cmd == "/quit" or cmd == "/exit": print(f"{G_DIM}Goodbye!{RST}") break - result = local_command(cmd) + if cmd == "/help": + print(f"\n{AVAILABLE_COMMANDS}\n") + continue + + print(f"{G_DIM}Sending to mainline...{RST}") + result = poller.send_and_wait(cmd) print(f"\n{result}\n") else: - print( - f"{G_DIM}Commands must start with / - type /help for available commands{RST}\n" - ) + print(f"{G_DIM}Commands must start with / - type /help{RST}\n") print(CURSOR_ON, end="") @@ -172,12 +179,6 @@ def main(): default=None, help="Command to send (e.g., /effects list)", ) - parser.add_argument( - "--local", - "-l", - action="store_true", - help="Run command locally (no ntfy required)", - ) parser.add_argument( "--watch", "-w", @@ -191,17 +192,14 @@ def main(): interactive_mode() return - if args.local: - result = local_command(args.command) - print(result) - return + poller = NtfyResponsePoller(TOPIC) if args.watch and "/effects stats" in args.command: print_header() print(f"{G_DIM}Watching /effects stats (Ctrl+C to exit)...{RST}\n") try: while True: - result = local_command(args.command) + result = poller.send_and_wait(args.command) print(f"\033[2J\033[1;1H", end="") print(f"{G_HI}Performance Stats - {time.strftime('%H:%M:%S')}{RST}") print(f"{G_DIM}{'─' * 40}{RST}") @@ -211,7 +209,7 @@ def main(): print(f"\n{G_DIM}Stopped watching{RST}") return - result = send_command(args.command) + result = poller.send_and_wait(args.command) print(result) diff --git a/engine/config.py b/engine/config.py index 944976c..a24a8e0 100644 --- a/engine/config.py +++ b/engine/config.py @@ -195,6 +195,7 @@ FIREHOSE = "--firehose" in sys.argv # ─── NTFY MESSAGE QUEUE ────────────────────────────────── NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json" +NTFY_CC_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc/json" NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen diff --git a/tests/test_ntfy_integration.py b/tests/test_ntfy_integration.py new file mode 100644 index 0000000..5205930 --- /dev/null +++ b/tests/test_ntfy_integration.py @@ -0,0 +1,101 @@ +""" +Integration tests for ntfy topics. +""" + +import json +import time +import urllib.request + + +class TestNtfyTopics: + def test_cc_topic_exists_and_writable(self): + """Verify C&C topic exists and accepts messages.""" + from engine.config import NTFY_CC_TOPIC + + topic_url = NTFY_CC_TOPIC.replace("/json", "") + test_message = f"test_{int(time.time())}" + + req = urllib.request.Request( + topic_url, + data=test_message.encode("utf-8"), + headers={ + "User-Agent": "mainline-test/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + assert resp.status == 200 + except Exception as e: + raise AssertionError(f"Failed to write to C&C topic: {e}") from e + + def test_message_topic_exists_and_writable(self): + """Verify message topic exists and accepts messages.""" + from engine.config import NTFY_TOPIC + + topic_url = NTFY_TOPIC.replace("/json", "") + test_message = f"test_{int(time.time())}" + + req = urllib.request.Request( + topic_url, + data=test_message.encode("utf-8"), + headers={ + "User-Agent": "mainline-test/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + assert resp.status == 200 + except Exception as e: + raise AssertionError(f"Failed to write to message topic: {e}") from e + + def test_cc_topic_readable(self): + """Verify we can read messages from C&C topic.""" + from engine.config import NTFY_CC_TOPIC + + test_message = f"integration_test_{int(time.time())}" + topic_url = NTFY_CC_TOPIC.replace("/json", "") + + req = urllib.request.Request( + topic_url, + data=test_message.encode("utf-8"), + headers={ + "User-Agent": "mainline-test/0.1", + "Content-Type": "text/plain", + }, + method="POST", + ) + + try: + urllib.request.urlopen(req, timeout=10) + except Exception as e: + raise AssertionError(f"Failed to write to C&C topic: {e}") from e + + time.sleep(1) + + poll_url = f"{NTFY_CC_TOPIC}?poll=1&limit=1" + req = urllib.request.Request( + poll_url, + headers={"User-Agent": "mainline-test/0.1"}, + ) + + try: + with urllib.request.urlopen(req, timeout=10) as resp: + body = resp.read().decode("utf-8") + if body.strip(): + data = json.loads(body.split("\n")[0]) + assert isinstance(data, dict) + except Exception as e: + raise AssertionError(f"Failed to read from C&C topic: {e}") from e + + def test_topics_are_different(self): + """Verify C&C and message topics are different.""" + from engine.config import NTFY_CC_TOPIC, NTFY_TOPIC + + assert NTFY_CC_TOPIC != NTFY_TOPIC + assert "_cc" in NTFY_CC_TOPIC