feat(cmdline): use C&C topic with response polling

- Rewrite cmdline to send commands via ntfy and wait for response
- Add NtfyResponsePoller class for serial-port-like interface
- Add integration tests for ntfy topics (test read/write)
- Add NTFY_CC_TOPIC export to config
This commit is contained in:
2026-03-15 17:46:40 -07:00
parent 3324adb07a
commit 648326cd37
3 changed files with 179 additions and 79 deletions

View File

@@ -5,24 +5,30 @@ Command-line utility for interacting with mainline via ntfy.
Usage:
python cmdline.py # Interactive TUI mode
python cmdline.py --help # Show help
python cmdline.py /effects list # Send single command
python cmdline.py /effects stats # Get performance stats
python cmdline.py /effects list # Send single command via ntfy
python cmdline.py /effects stats # Get performance stats via ntfy
python cmdline.py -w /effects stats # Watch mode (polls for stats)
The TUI mode provides:
- Arrow keys to navigate command history
- Tab completion for commands
- Auto-refresh for performance stats
C&C works like a serial port:
1. Send command to ntfy_cc_topic
2. Mainline receives, processes, responds to same topic
3. Cmdline polls for response
"""
import argparse
import json
import sys
import time
import threading
import urllib.request
from pathlib import Path
from engine import config
from engine.effects.controller import handle_effects_command
from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST
try:
@@ -33,64 +39,70 @@ except AttributeError:
TOPIC = CC_TOPIC
def send_command(cmd: str) -> str:
"""Send a command to the ntfy topic and return the response."""
if not cmd.startswith("/"):
return "Commands must start with /"
class NtfyResponsePoller:
"""Polls ntfy for command responses."""
url = TOPIC.replace("/json", "")
data = cmd.encode("utf-8")
def __init__(self, topic_url: str, timeout: float = 10.0):
self.topic_url = topic_url
self.timeout = timeout
self._last_id = None
self._lock = threading.Lock()
req = urllib.request.Request(
url,
data=data,
headers={
"User-Agent": "mainline-cmdline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
def _build_url(self) -> str:
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
try:
with urllib.request.urlopen(req, timeout=10) as resp:
return f"Command sent: {cmd}\n(Response would appear on mainline display)"
except Exception as e:
return f"Error sending command: {e}"
parsed = urlparse(self.topic_url)
params = parse_qs(parsed.query, keep_blank_values=True)
params["since"] = [self._last_id if self._last_id else "20s"]
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def send_and_wait(self, cmd: str) -> str:
"""Send command and wait for response."""
url = self.topic_url.replace("/json", "")
data = cmd.encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={
"User-Agent": "mainline-cmdline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
def local_command(cmd: str) -> str:
"""Handle command locally without sending to ntfy."""
if cmd.startswith("/effects"):
try:
import effects_plugins
from engine.effects.registry import get_registry
from engine.effects.chain import EffectChain
from engine.effects.controller import set_effect_chain_ref
effects_plugins.discover_plugins()
registry = get_registry()
chain = EffectChain(registry)
chain.set_order(["noise", "fade", "glitch", "firehose"])
set_effect_chain_ref(chain)
from engine.effects.controller import handle_effects_command
return handle_effects_command(cmd)
except ImportError as e:
return f"Error: {e}\n(Try: pip install Pillow)"
urllib.request.urlopen(req, timeout=5)
except Exception as e:
return f"Error: {type(e).__name__}: {e}"
if cmd == "/help":
return AVAILABLE_COMMANDS
if cmd == "/quit" or cmd == "/exit":
return "GOODBYE"
return f"Unknown command: {cmd}"
if cmd == "/help":
return AVAILABLE_COMMANDS
if cmd == "/quit" or cmd == "/exit":
return "GOODBYE"
return f"Unknown command: {cmd}"
return f"Error sending command: {e}"
return self._wait_for_response()
def _wait_for_response(self) -> str:
"""Poll for response message."""
start = time.time()
while time.time() - start < self.timeout:
try:
url = self._build_url()
req = urllib.request.Request(
url, headers={"User-Agent": "mainline-cmdline/0.1"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
for line in resp:
try:
data = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
if data.get("event") == "message":
self._last_id = data.get("id")
msg = data.get("message", "")
if msg:
return msg
except Exception:
pass
time.sleep(0.5)
return "Timeout waiting for response"
AVAILABLE_COMMANDS = """Available commands:
@@ -102,9 +114,6 @@ AVAILABLE_COMMANDS = """Available commands:
/effects stats - Show performance statistics
/help - Show this help
/quit - Exit
Local commands (don't require running mainline):
/effects * - All /effects commands work locally
"""
@@ -124,9 +133,7 @@ def interactive_mode():
import readline
print_header()
history = []
history_index = -1
poller = NtfyResponsePoller(TOPIC)
print(f"{G_DIM}Type /help for available commands, /quit to exit{RST}")
print()
@@ -142,20 +149,20 @@ def interactive_mode():
continue
if cmd.startswith("/"):
history.append(cmd)
history_index = len(history)
if cmd == "/quit" or cmd == "/exit":
print(f"{G_DIM}Goodbye!{RST}")
break
result = local_command(cmd)
if cmd == "/help":
print(f"\n{AVAILABLE_COMMANDS}\n")
continue
print(f"{G_DIM}Sending to mainline...{RST}")
result = poller.send_and_wait(cmd)
print(f"\n{result}\n")
else:
print(
f"{G_DIM}Commands must start with / - type /help for available commands{RST}\n"
)
print(f"{G_DIM}Commands must start with / - type /help{RST}\n")
print(CURSOR_ON, end="")
@@ -172,12 +179,6 @@ def main():
default=None,
help="Command to send (e.g., /effects list)",
)
parser.add_argument(
"--local",
"-l",
action="store_true",
help="Run command locally (no ntfy required)",
)
parser.add_argument(
"--watch",
"-w",
@@ -191,17 +192,14 @@ def main():
interactive_mode()
return
if args.local:
result = local_command(args.command)
print(result)
return
poller = NtfyResponsePoller(TOPIC)
if args.watch and "/effects stats" in args.command:
print_header()
print(f"{G_DIM}Watching /effects stats (Ctrl+C to exit)...{RST}\n")
try:
while True:
result = local_command(args.command)
result = poller.send_and_wait(args.command)
print(f"\033[2J\033[1;1H", end="")
print(f"{G_HI}Performance Stats - {time.strftime('%H:%M:%S')}{RST}")
print(f"{G_DIM}{'' * 40}{RST}")
@@ -211,7 +209,7 @@ def main():
print(f"\n{G_DIM}Stopped watching{RST}")
return
result = send_command(args.command)
result = poller.send_and_wait(args.command)
print(result)