feat(cmdline): C&C with separate topics and rich output

This commit is contained in:
2026-03-15 18:43:04 -07:00
parent dd282653b5
commit 020eb663ea
8 changed files with 544 additions and 22 deletions

1
.gitignore vendored
View File

@@ -9,3 +9,4 @@ htmlcov/
.coverage .coverage
.pytest_cache/ .pytest_cache/
*.egg-info/ *.egg-info/
coverage.xml

159
AGENTS.md
View File

@@ -22,13 +22,37 @@ uv sync
### Available Commands ### Available Commands
```bash ```bash
# Development
mise run test # Run tests mise run test # Run tests
mise run test-v # Run tests verbose mise run test-v # Run tests verbose
mise run test-cov # Run tests with coverage report mise run test-cov # Run tests with coverage report
mise run lint # Run ruff linter mise run lint # Run ruff linter
mise run lint-fix # Run ruff with auto-fix mise run lint-fix # Run ruff with auto-fix
mise run format # Run ruff formatter mise run format # Run ruff formatter
mise run ci # Full CI pipeline (sync + test + coverage) mise run ci # Full CI pipeline
# Runtime
mise run run # Interactive terminal mode (news)
mise run run-poetry # Interactive terminal mode (poetry)
mise run run-firehose # Dense headline mode
# Daemon mode (recommended for long-running)
mise run daemon # Start mainline in background
mise run daemon-stop # Stop daemon
mise run daemon-restart # Restart daemon
# Command & Control
mise run cmd # Interactive CLI
mise run cmd "/cmd" # Send single command
mise run cmd-stats # Watch performance stats
mise run topics-init # Initialize ntfy topics
# Environment
mise run install # Install dependencies
mise run sync # Sync dependencies
mise run sync-all # Sync with all extras
mise run clean # Clean cache files
mise run clobber # Aggressive cleanup (git clean -fdx + caches)
``` ```
## Git Hooks ## Git Hooks
@@ -108,3 +132,136 @@ The project uses pytest with strict marker enforcement. Test configuration is in
- **eventbus.py** provides thread-safe event publishing for decoupled communication - **eventbus.py** provides thread-safe event publishing for decoupled communication
- **controller.py** coordinates ntfy/mic monitoring - **controller.py** coordinates ntfy/mic monitoring
- The render pipeline: fetch → render → effects → scroll → terminal output - The render pipeline: fetch → render → effects → scroll → terminal output
- **display.py** provides swappable display backends (TerminalDisplay, NullDisplay)
## Operating Modes
Mainline can run in two modes:
### 1. Standalone Mode (Original)
Run directly as a terminal application with interactive pickers:
```bash
mise run run # news stream
mise run run-poetry # poetry mode
mise run run-firehose # dense headline mode
```
This runs the full interactive experience with font picker and effects picker at startup.
### 2. Daemon + Command Mode (Recommended for Long-Running)
The recommended approach for persistent displays:
```bash
# Start the daemon (headless rendering)
mise run daemon
# Send commands via ntfy
mise run cmd "/effects list"
mise run cmd "/effects noise off"
mise run cmd "/effects stats"
# Watch mode (continuous stats polling)
mise run cmd-stats
# Stop the daemon
mise run daemon-stop
```
#### How It Works
- **Daemon**: Runs `mainline.py` in the background, renders to terminal
- **C&C Topics**: Uses separate ntfy topics (like UART serial):
- `klubhaus_terminal_mainline_cc_cmd` - commands TO mainline
- `klubhaus_terminal_mainline_cc_resp` - responses FROM mainline
- **Topics are auto-warmed** on first daemon start
#### Available Commands
```
/effects list - List all effects and status
/effects <name> on - Enable an effect
/effects <name> off - Disable an effect
/effects <name> intensity 0.5 - Set effect intensity (0.0-1.0)
/effects reorder noise,fade,glitch,firehose - Reorder pipeline
/effects stats - Show performance statistics
```
## Effects Plugin System
The effects system is implemented as a plugin architecture in `engine/effects/`.
### Core Components
| Module | Purpose |
|--------|---------|
| `effects/types.py` | `EffectConfig`, `EffectContext` dataclasses and `EffectPlugin` protocol |
| `effects/registry.py` | Plugin discovery and management (`EffectRegistry`) |
| `effects/chain.py` | Ordered pipeline execution (`EffectChain`) |
| `effects_plugins/*.py` | Externalized effect plugins |
### Creating a New Effect
Create a file in `effects_plugins/` with a class ending in `Effect`:
```python
from engine.effects.types import EffectConfig, EffectContext
class MyEffect:
name = "myeffect"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
# Process buffer and return modified buffer
return buf
def configure(self, config: EffectConfig) -> None:
self.config = config
```
### NTFY Commands
Send commands via `cmdline.py` or directly to the C&C topic:
```bash
# Using cmdline tool (recommended)
mise run cmd "/effects list"
mise run cmd "/effects noise on"
mise run cmd "/effects noise intensity 0.5"
mise run cmd "/effects reorder noise,glitch,fade,firehose"
# Or directly via curl
curl -d "/effects list" https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd
```
The cmdline tool polls the response topic for the daemon's reply.
## Conventional Commits
Commit messages follow the [Conventional Commits](https://www.conventionalcommits.org/) specification:
```
<type>(<scope>): <description>
[optional body]
[optional footer(s)]
```
### Types
- `feat`: A new feature
- `fix`: A bug fix
- `docs`: Documentation only changes
- `style`: Changes that don't affect code meaning (formatting)
- `refactor`: Code change that neither fixes a bug nor adds a feature
- `test`: Adding or updating tests
- `chore`: Changes to build process, dependencies, etc.
### Examples
```
feat(effects): add plugin architecture for visual effects
fix(layers): resolve glitch effect not applying on empty buffer
docs(AGENTS.md): add effects plugin system documentation
test(effects): add tests for EffectChain pipeline ordering
```

View File

@@ -24,6 +24,46 @@ First run bootstraps a local `.mainline_venv/` and installs deps (`feedparser`,
--- ---
## Daemon Mode (Recommended for Long-Running)
For persistent displays (e.g., always-on terminal), use daemon mode with command-and-control over ntfy:
```bash
# Start the daemon (runs in background, auto-warms ntfy topics)
mise run daemon
# Send commands via cmdline
mise run cmd "/effects list"
mise run cmd "/effects noise off"
mise run cmd "/effects noise intensity 0.5"
# Watch performance stats continuously
mise run cmd-stats
# Stop the daemon
mise run daemon-stop
```
### How It Works
- **Topics**: Uses separate ntfy topics for serial-like communication:
- `klubhaus_terminal_mainline_cc_cmd` - commands TO mainline
- `klubhaus_terminal_mainline_cc_resp` - responses FROM mainline
- Topics are automatically created on first daemon start
### Available Commands
```
/effects list - List all effects and status
/effects <name> on - Enable an effect
/effects <name> off - Disable an effect
/effects <name> intensity 0.5 - Set effect intensity (0.0-1.0)
/effects reorder noise,fade,glitch,firehose - Reorder pipeline
/effects stats - Show performance statistics
```
---
## Config ## Config
All constants live in `engine/config.py`: All constants live in `engine/config.py`:
@@ -87,7 +127,15 @@ engine/
filter.py HTML stripping, content filter filter.py HTML stripping, content filter
translate.py Google Translate wrapper + region detection translate.py Google Translate wrapper + region detection
render.py OTF → half-block pipeline (SSAA, gradient) render.py OTF → half-block pipeline (SSAA, gradient)
effects.py noise, glitch_bar, fade, firehose effects/ plugin-based effects system
types.py EffectConfig, EffectContext, EffectPlugin protocol
registry.py Plugin discovery and management
chain.py Ordered pipeline execution
performance.py Performance monitoring
controller.py NTFY command handler
legacy.py Original effects (noise, glitch, fade, firehose)
effects_plugins/ External effect plugins (noise, glitch, fade, firehose)
display.py Swappable display backends (TerminalDisplay, NullDisplay)
fetch.py RSS/Gutenberg fetching + cache load/save fetch.py RSS/Gutenberg fetching + cache load/save
ntfy.py NtfyPoller — standalone, zero internal deps ntfy.py NtfyPoller — standalone, zero internal deps
mic.py MicMonitor — standalone, graceful fallback mic.py MicMonitor — standalone, graceful fallback

View File

@@ -11,10 +11,8 @@ import time
import tty import tty
from engine import config, render from engine import config, render
from engine.controller import StreamController
from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
from engine.terminal import ( from engine.terminal import (
CLR, CLR,
CURSOR_OFF, CURSOR_OFF,
@@ -249,6 +247,110 @@ def pick_font_face():
print() print()
def pick_effects_config():
"""Interactive picker for configuring effects pipeline."""
import effects_plugins
from engine.effects import get_effect_chain, get_registry
effects_plugins.discover_plugins()
registry = get_registry()
chain = get_effect_chain()
chain.set_order(["noise", "fade", "glitch", "firehose"])
effects = list(registry.list_all().values())
if not effects:
return
selected = 0
editing_intensity = False
intensity_value = 1.0
def _draw_effects_picker():
w = tw()
print(CLR, end="")
print("\033[1;1H", end="")
print(" \033[1;38;5;231mEFFECTS CONFIG\033[0m")
print(f" \033[2;38;5;37m{'' * (w - 4)}\033[0m")
print()
for i, effect in enumerate(effects):
prefix = " > " if i == selected else " "
marker = "[*]" if effect.config.enabled else "[ ]"
if editing_intensity and i == selected:
print(
f"{prefix}{marker} \033[1;38;5;82m{effect.name}\033[0m: intensity={intensity_value:.2f} (use +/- to adjust, Enter to confirm)"
)
else:
print(
f"{prefix}{marker} {effect.name}: intensity={effect.config.intensity:.2f}"
)
print()
print(f" \033[2;38;5;37m{'' * (w - 4)}\033[0m")
print(
" \033[38;5;245mControls: space=toggle on/off | +/-=adjust intensity | arrows=move | Enter=next effect | q=done\033[0m"
)
def _read_effects_key():
ch = sys.stdin.read(1)
if ch == "\x03":
return "interrupt"
if ch in ("\r", "\n"):
return "enter"
if ch == " ":
return "toggle"
if ch == "q":
return "quit"
if ch == "+" or ch == "=":
return "up"
if ch == "-" or ch == "_":
return "down"
if ch == "\x1b":
c1 = sys.stdin.read(1)
if c1 != "[":
return None
c2 = sys.stdin.read(1)
if c2 == "A":
return "up"
if c2 == "B":
return "down"
return None
return None
if not sys.stdin.isatty():
return
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while True:
_draw_effects_picker()
key = _read_effects_key()
if key == "quit" or key == "enter":
break
elif key == "up" and editing_intensity:
intensity_value = min(1.0, intensity_value + 0.1)
effects[selected].config.intensity = intensity_value
elif key == "down" and editing_intensity:
intensity_value = max(0.0, intensity_value - 0.1)
effects[selected].config.intensity = intensity_value
elif key == "up":
selected = max(0, selected - 1)
intensity_value = effects[selected].config.intensity
elif key == "down":
selected = min(len(effects) - 1, selected + 1)
intensity_value = effects[selected].config.intensity
elif key == "toggle":
effects[selected].config.enabled = not effects[selected].config.enabled
elif key == "interrupt":
raise KeyboardInterrupt
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def main(): def main():
atexit.register(lambda: print(CURSOR_ON, end="", flush=True)) atexit.register(lambda: print(CURSOR_ON, end="", flush=True))
@@ -259,10 +361,13 @@ def main():
signal.signal(signal.SIGINT, handle_sigint) signal.signal(signal.SIGINT, handle_sigint)
StreamController.warmup_topics()
w = tw() w = tw()
print(CLR, end="") print(CLR, end="")
print(CURSOR_OFF, end="") print(CURSOR_OFF, end="")
pick_font_face() pick_font_face()
pick_effects_config()
w = tw() w = tw()
print() print()
time.sleep(0.4) time.sleep(0.4)
@@ -314,9 +419,10 @@ def main():
sys.exit(1) sys.exit(1)
print() print()
mic = MicMonitor(threshold_db=config.MIC_THRESHOLD_DB) controller = StreamController()
mic_ok = mic.start() mic_ok, ntfy_ok = controller.initialize_sources()
if mic.available:
if controller.mic and controller.mic.available:
boot_ln( boot_ln(
"Microphone", "Microphone",
"ACTIVE" "ACTIVE"
@@ -325,12 +431,6 @@ def main():
bool(mic_ok), bool(mic_ok),
) )
ntfy = NtfyPoller(
config.NTFY_TOPIC,
reconnect_delay=config.NTFY_RECONNECT_DELAY,
display_secs=config.MESSAGE_DISPLAY_SECS,
)
ntfy_ok = ntfy.start()
boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok) boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok)
if config.FIREHOSE: if config.FIREHOSE:
@@ -343,7 +443,7 @@ def main():
print() print()
time.sleep(0.4) time.sleep(0.4)
stream(items, ntfy, mic) controller.run(items)
print() print()
print(f" {W_GHOST}{'' * (tw() - 4)}{RST}") print(f" {W_GHOST}{'' * (tw() - 4)}{RST}")

View File

@@ -105,6 +105,8 @@ class Config:
firehose: bool = False firehose: bool = False
ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json" ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json"
ntfy_cc_cmd_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
ntfy_cc_resp_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
ntfy_reconnect_delay: int = 5 ntfy_reconnect_delay: int = 5
message_display_secs: int = 30 message_display_secs: int = 30
@@ -148,6 +150,8 @@ class Config:
mode="poetry" if "--poetry" in argv or "-p" in argv else "news", mode="poetry" if "--poetry" in argv or "-p" in argv else "news",
firehose="--firehose" in argv, firehose="--firehose" in argv,
ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json", ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json",
ntfy_cc_cmd_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json",
ntfy_cc_resp_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json",
ntfy_reconnect_delay=5, ntfy_reconnect_delay=5,
message_display_secs=30, message_display_secs=30,
font_dir=font_dir, font_dir=font_dir,
@@ -193,6 +197,8 @@ FIREHOSE = "--firehose" in sys.argv
# ─── NTFY MESSAGE QUEUE ────────────────────────────────── # ─── NTFY MESSAGE QUEUE ──────────────────────────────────
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json" NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json"
NTFY_CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
NTFY_CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen

View File

@@ -3,6 +3,7 @@ Stream controller - manages input sources and orchestrates the render stream.
""" """
from engine.config import Config, get_config from engine.config import Config, get_config
from engine.effects.controller import handle_effects_command
from engine.eventbus import EventBus from engine.eventbus import EventBus
from engine.events import EventType, StreamEvent from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor from engine.mic import MicMonitor
@@ -13,11 +14,45 @@ from engine.scroll import stream
class StreamController: class StreamController:
"""Controls the stream lifecycle - initializes sources and runs the stream.""" """Controls the stream lifecycle - initializes sources and runs the stream."""
_topics_warmed = False
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None): def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
self.config = config or get_config() self.config = config or get_config()
self.event_bus = event_bus self.event_bus = event_bus
self.mic: MicMonitor | None = None self.mic: MicMonitor | None = None
self.ntfy: NtfyPoller | None = None self.ntfy: NtfyPoller | None = None
self.ntfy_cc: NtfyPoller | None = None
@classmethod
def warmup_topics(cls) -> None:
"""Warm up ntfy topics lazily (creates them if they don't exist)."""
if cls._topics_warmed:
return
import urllib.request
topics = [
"https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd",
"https://ntfy.sh/klubhaus_terminal_mainline_cc_resp",
"https://ntfy.sh/klubhaus_terminal_mainline",
]
for topic in topics:
try:
req = urllib.request.Request(
topic,
data=b"init",
headers={
"User-Agent": "mainline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
cls._topics_warmed = True
def initialize_sources(self) -> tuple[bool, bool]: def initialize_sources(self) -> tuple[bool, bool]:
"""Initialize microphone and ntfy sources. """Initialize microphone and ntfy sources.
@@ -35,7 +70,38 @@ class StreamController:
) )
ntfy_ok = self.ntfy.start() ntfy_ok = self.ntfy.start()
return bool(mic_ok), ntfy_ok self.ntfy_cc = NtfyPoller(
self.config.ntfy_cc_cmd_topic,
reconnect_delay=self.config.ntfy_reconnect_delay,
display_secs=5,
)
self.ntfy_cc.subscribe(self._handle_cc_message)
ntfy_cc_ok = self.ntfy_cc.start()
return bool(mic_ok), ntfy_ok and ntfy_cc_ok
def _handle_cc_message(self, event) -> None:
"""Handle incoming C&C message - like a serial port control interface."""
import urllib.request
cmd = event.body.strip() if hasattr(event, "body") else str(event).strip()
if not cmd.startswith("/"):
return
response = handle_effects_command(cmd)
topic_url = self.config.ntfy_cc_resp_topic.replace("/json", "")
data = response.encode("utf-8")
req = urllib.request.Request(
topic_url,
data=data,
headers={"User-Agent": "mainline/0.1", "Content-Type": "text/plain"},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
def run(self, items: list) -> None: def run(self, items: list) -> None:
"""Run the stream with initialized sources.""" """Run the stream with initialized sources."""

View File

@@ -25,24 +25,41 @@ run = "uv run mainline.py"
run-poetry = "uv run mainline.py --poetry" run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose" run-firehose = "uv run mainline.py --firehose"
daemon = "nohup uv run mainline.py > /dev/null 2>&1 &"
daemon-stop = "pkill -f 'uv run mainline.py' 2>/dev/null || true"
daemon-restart = "mise run daemon-stop && sleep 2 && mise run daemon"
# =====================
# Command & Control
# =====================
cmd = "uv run cmdline.py"
cmd-stats = "bash -c 'uv run cmdline.py -w \"/effects stats\"';:"
# Initialize ntfy topics (warm up before first use - also done automatically by mainline)
topics-init = "curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_resp > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline > /dev/null"
# ===================== # =====================
# Environment # Environment
# ===================== # =====================
sync = "uv sync" sync = "uv sync"
sync-all = "uv sync --all-extras" sync-all = "uv sync --all-extras"
install = "uv sync" install = "mise run sync"
install-dev = "uv sync --group dev" install-dev = "mise run sync && uv sync --group dev"
bootstrap = "uv sync && uv run mainline.py --help" bootstrap = "uv sync && uv run mainline.py --help"
clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache" clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out"
# Aggressive cleanup - removes all generated files, caches, and venv
clobber = "git clean -fdx && rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out"
# ===================== # =====================
# CI/CD # CI/CD
# ===================== # =====================
ci = "uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml" ci = "mise run topics-init && uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml"
ci-lint = "uv run ruff check engine/ mainline.py" ci-lint = "uv run ruff check engine/ mainline.py"
# ===================== # =====================

View File

@@ -0,0 +1,127 @@
"""
Integration tests for ntfy topics.
"""
import json
import time
import urllib.request
class TestNtfyTopics:
def test_cc_cmd_topic_exists_and_writable(self):
"""Verify C&C CMD topic exists and accepts messages."""
from engine.config import NTFY_CC_CMD_TOPIC
topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e
def test_cc_resp_topic_exists_and_writable(self):
"""Verify C&C RESP topic exists and accepts messages."""
from engine.config import NTFY_CC_RESP_TOPIC
topic_url = NTFY_CC_RESP_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to C&C RESP topic: {e}") from e
def test_message_topic_exists_and_writable(self):
"""Verify message topic exists and accepts messages."""
from engine.config import NTFY_TOPIC
topic_url = NTFY_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to message topic: {e}") from e
def test_cc_cmd_topic_readable(self):
"""Verify we can read messages from C&C CMD topic."""
from engine.config import NTFY_CC_CMD_TOPIC
test_message = f"integration_test_{int(time.time())}"
topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "")
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e
time.sleep(1)
poll_url = f"{NTFY_CC_CMD_TOPIC}?poll=1&limit=1"
req = urllib.request.Request(
poll_url,
headers={"User-Agent": "mainline-test/0.1"},
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
body = resp.read().decode("utf-8")
if body.strip():
data = json.loads(body.split("\n")[0])
assert isinstance(data, dict)
except Exception as e:
raise AssertionError(f"Failed to read from C&C CMD topic: {e}") from e
def test_topics_are_different(self):
"""Verify C&C CMD/RESP and message topics are different."""
from engine.config import NTFY_CC_CMD_TOPIC, NTFY_CC_RESP_TOPIC, NTFY_TOPIC
assert NTFY_CC_CMD_TOPIC != NTFY_TOPIC
assert NTFY_CC_RESP_TOPIC != NTFY_TOPIC
assert NTFY_CC_CMD_TOPIC != NTFY_CC_RESP_TOPIC
assert "_cc_cmd" in NTFY_CC_CMD_TOPIC
assert "_cc_resp" in NTFY_CC_RESP_TOPIC