8 Commits

Author SHA1 Message Date
7ff78c66ed Merge pull request 'refactor to improve testability and modularization of the mainline terminal application' (#24) from david/Mainline:testability_modularization into main
Reviewed-on: #24
2026-03-16 09:11:13 +00:00
15de46722a refactor: phase 4 - event-driven architecture foundation
- Add EventBus class with pub/sub messaging (thread-safe)
- Add emitter Protocol classes (EventEmitter, Startable, Stoppable)
- Add event emission to NtfyPoller (NtfyMessageEvent)
- Add event emission to MicMonitor (MicLevelEvent)
- Update StreamController to publish stream start/end events
- Add comprehensive tests for eventbus and emitters modules
2026-03-15 19:15:08 -07:00
35e5c8d38b refactor: phase 3 - API efficiency improvements
Add typed dataclasses for tuple returns:
- types.py: HeadlineItem, FetchResult, Block dataclasses with legacy tuple converters
- fetch.py: Add type hints and HeadlineTuple type alias

Add pyright for static type checking:
- Add pyright to dependencies
- Verify type coverage with pyright (0 errors in core modules)

This enables:
- Named types instead of raw tuples (better IDE support, self-documenting)
- Type-safe APIs across modules
- Backward compatibility via to_tuple/from_tuple methods

Note: Lazy imports skipped for render.py - startup impact is minimal.
2026-03-15 19:13:32 -07:00
cdc8094de2 refactor: phase 2 - modularization of scroll engine
Split monolithic scroll.py into focused modules:
- viewport.py: terminal size (tw/th), ANSI positioning helpers
- frame.py: FrameTimer class, scroll step calculation
- layers.py: message overlay, ticker zone, firehose rendering
- scroll.py: simplified orchestrator, imports from new modules

Add stream controller and event types for future event-driven architecture:
- controller.py: StreamController for source initialization and stream lifecycle
- events.py: EventType enum and event dataclasses (HeadlineEvent, FrameTickEvent, etc.)

Added tests for new modules:
- test_viewport.py: 8 tests for viewport utilities
- test_frame.py: 10 tests for frame timing
- test_layers.py: 13 tests for layer compositing
- test_events.py: 11 tests for event types
- test_controller.py: 6 tests for stream controller

This enables:
- Testable chunks with clear responsibilities
- Reusable viewport utilities across modules
- Better separation of concerns in render pipeline
- Foundation for future event-driven architecture

Also includes Phase 1 documentation updates in code comments.
2026-03-15 19:13:32 -07:00
f170143939 refactor: phase 1 - testability improvements
- Add Config dataclass with get_config()/set_config() for injection
- Add Config.from_args() for CLI argument parsing (testable)
- Add platform font path detection (Darwin/Linux)
- Bound translate cache with @lru_cache(maxsize=500)
- Add fixtures for external dependencies (network, feeds, config)
- Add 15 tests for Config class, from_args, and platform detection

This enables testability by:
- Allowing config injection instead of global mutable state
- Supporting custom argv in from_args() for testing
- Providing reusable fixtures for mocking network/filesystem
- Preventing unbounded memory growth in translation cache

Fixes: _arg_value/_arg_int not accepting custom argv
2026-03-15 19:13:32 -07:00
19fb4bc4fe Merge pull request 'docs/update-readme' (#23) from docs/update-readme into main
Reviewed-on: #23
2026-03-16 00:09:10 +00:00
ae10fd78ca refactor: Restructure README, add uv and mise commands, and detail component extension and development workflows. 2026-03-15 17:08:32 -07:00
4afab642f7 docs: add README update design spec
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 16:56:58 -07:00
29 changed files with 305 additions and 2382 deletions

1
.gitignore vendored
View File

@@ -9,4 +9,3 @@ htmlcov/
.coverage
.pytest_cache/
*.egg-info/
coverage.xml

159
AGENTS.md
View File

@@ -22,37 +22,13 @@ uv sync
### Available Commands
```bash
# Development
mise run test # Run tests
mise run test-v # Run tests verbose
mise run test-cov # Run tests with coverage report
mise run lint # Run ruff linter
mise run lint-fix # Run ruff with auto-fix
mise run format # Run ruff formatter
mise run ci # Full CI pipeline
# Runtime
mise run run # Interactive terminal mode (news)
mise run run-poetry # Interactive terminal mode (poetry)
mise run run-firehose # Dense headline mode
# Daemon mode (recommended for long-running)
mise run daemon # Start mainline in background
mise run daemon-stop # Stop daemon
mise run daemon-restart # Restart daemon
# Command & Control
mise run cmd # Interactive CLI
mise run cmd "/cmd" # Send single command
mise run cmd-stats # Watch performance stats
mise run topics-init # Initialize ntfy topics
# Environment
mise run install # Install dependencies
mise run sync # Sync dependencies
mise run sync-all # Sync with all extras
mise run clean # Clean cache files
mise run clobber # Aggressive cleanup (git clean -fdx + caches)
mise run ci # Full CI pipeline (sync + test + coverage)
```
## Git Hooks
@@ -132,136 +108,3 @@ The project uses pytest with strict marker enforcement. Test configuration is in
- **eventbus.py** provides thread-safe event publishing for decoupled communication
- **controller.py** coordinates ntfy/mic monitoring
- The render pipeline: fetch → render → effects → scroll → terminal output
- **display.py** provides swappable display backends (TerminalDisplay, NullDisplay)
## Operating Modes
Mainline can run in two modes:
### 1. Standalone Mode (Original)
Run directly as a terminal application with interactive pickers:
```bash
mise run run # news stream
mise run run-poetry # poetry mode
mise run run-firehose # dense headline mode
```
This runs the full interactive experience with font picker and effects picker at startup.
### 2. Daemon + Command Mode (Recommended for Long-Running)
The recommended approach for persistent displays:
```bash
# Start the daemon (headless rendering)
mise run daemon
# Send commands via ntfy
mise run cmd "/effects list"
mise run cmd "/effects noise off"
mise run cmd "/effects stats"
# Watch mode (continuous stats polling)
mise run cmd-stats
# Stop the daemon
mise run daemon-stop
```
#### How It Works
- **Daemon**: Runs `mainline.py` in the background, renders to terminal
- **C&C Topics**: Uses separate ntfy topics (like UART serial):
- `klubhaus_terminal_mainline_cc_cmd` - commands TO mainline
- `klubhaus_terminal_mainline_cc_resp` - responses FROM mainline
- **Topics are auto-warmed** on first daemon start
#### Available Commands
```
/effects list - List all effects and status
/effects <name> on - Enable an effect
/effects <name> off - Disable an effect
/effects <name> intensity 0.5 - Set effect intensity (0.0-1.0)
/effects reorder noise,fade,glitch,firehose - Reorder pipeline
/effects stats - Show performance statistics
```
## Effects Plugin System
The effects system is implemented as a plugin architecture in `engine/effects/`.
### Core Components
| Module | Purpose |
|--------|---------|
| `effects/types.py` | `EffectConfig`, `EffectContext` dataclasses and `EffectPlugin` protocol |
| `effects/registry.py` | Plugin discovery and management (`EffectRegistry`) |
| `effects/chain.py` | Ordered pipeline execution (`EffectChain`) |
| `effects_plugins/*.py` | Externalized effect plugins |
### Creating a New Effect
Create a file in `effects_plugins/` with a class ending in `Effect`:
```python
from engine.effects.types import EffectConfig, EffectContext
class MyEffect:
name = "myeffect"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
# Process buffer and return modified buffer
return buf
def configure(self, config: EffectConfig) -> None:
self.config = config
```
### NTFY Commands
Send commands via `cmdline.py` or directly to the C&C topic:
```bash
# Using cmdline tool (recommended)
mise run cmd "/effects list"
mise run cmd "/effects noise on"
mise run cmd "/effects noise intensity 0.5"
mise run cmd "/effects reorder noise,glitch,fade,firehose"
# Or directly via curl
curl -d "/effects list" https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd
```
The cmdline tool polls the response topic for the daemon's reply.
## Conventional Commits
Commit messages follow the [Conventional Commits](https://www.conventionalcommits.org/) specification:
```
<type>(<scope>): <description>
[optional body]
[optional footer(s)]
```
### Types
- `feat`: A new feature
- `fix`: A bug fix
- `docs`: Documentation only changes
- `style`: Changes that don't affect code meaning (formatting)
- `refactor`: Code change that neither fixes a bug nor adds a feature
- `test`: Adding or updating tests
- `chore`: Changes to build process, dependencies, etc.
### Examples
```
feat(effects): add plugin architecture for visual effects
fix(layers): resolve glitch effect not applying on empty buffer
docs(AGENTS.md): add effects plugin system documentation
test(effects): add tests for EffectChain pipeline ordering
```

212
README.md
View File

@@ -6,7 +6,9 @@ A full-screen terminal news ticker that renders live global headlines in large O
---
## Run
## Using
### Run
```bash
python3 mainline.py # news stream
@@ -20,51 +22,15 @@ python3 mainline.py --font-dir ~/fonts # scan a different font folder
python3 mainline.py --font-index 1 # select face index within a collection
```
First run bootstraps a local `.mainline_venv/` and installs deps (`feedparser`, `Pillow`, `sounddevice`, `numpy`). Subsequent runs start immediately, loading from cache.
---
## Daemon Mode (Recommended for Long-Running)
For persistent displays (e.g., always-on terminal), use daemon mode with command-and-control over ntfy:
Or with uv:
```bash
# Start the daemon (runs in background, auto-warms ntfy topics)
mise run daemon
# Send commands via cmdline
mise run cmd "/effects list"
mise run cmd "/effects noise off"
mise run cmd "/effects noise intensity 0.5"
# Watch performance stats continuously
mise run cmd-stats
# Stop the daemon
mise run daemon-stop
uv run mainline.py
```
### How It Works
First run bootstraps a local `.mainline_venv/` and installs deps (`feedparser`, `Pillow`, `sounddevice`, `numpy`). Subsequent runs start immediately, loading from cache. With uv, run `uv sync` or `uv sync --all-extras` (includes mic support) instead.
- **Topics**: Uses separate ntfy topics for serial-like communication:
- `klubhaus_terminal_mainline_cc_cmd` - commands TO mainline
- `klubhaus_terminal_mainline_cc_resp` - responses FROM mainline
- Topics are automatically created on first daemon start
### Available Commands
```
/effects list - List all effects and status
/effects <name> on - Enable an effect
/effects <name> off - Disable an effect
/effects <name> intensity 0.5 - Set effect intensity (0.0-1.0)
/effects reorder noise,fade,glitch,firehose - Reorder pipeline
/effects stats - Show performance statistics
```
---
## Config
### Config
All constants live in `engine/config.py`:
@@ -84,23 +50,41 @@ All constants live in `engine/config.py`:
| `FRAME_DT` | `0.05` | Frame interval in seconds (20 FPS) |
| `GRAD_SPEED` | `0.08` | Gradient sweep speed (cycles/sec, ~12s full sweep) |
| `FIREHOSE_H` | `12` | Firehose zone height (terminal rows) |
| `NTFY_TOPIC` | klubhaus URL | ntfy.sh JSON endpoint to poll |
| `NTFY_POLL_INTERVAL` | `15` | Seconds between ntfy polls |
| `NTFY_TOPIC` | klubhaus URL | ntfy.sh JSON stream endpoint |
| `NTFY_RECONNECT_DELAY` | `5` | Seconds before reconnecting after a dropped SSE stream |
| `MESSAGE_DISPLAY_SECS` | `30` | How long an ntfy message holds the screen |
---
### Feeds
## Fonts
~25 sources across four categories: **Science & Technology**, **Economics & Business**, **World & Politics**, **Culture & Ideas**. Add or swap feeds in `engine/sources.py``FEEDS`.
A `fonts/` directory is bundled with demo faces (AlphatronDemo, CSBishopDrawn, CyberformDemo, KATA, Microbots, Neoform, Pixel Sparta, Robocops, Xeonic, and others). On startup, an interactive picker lists all discovered faces with a live half-block preview rendered at your configured size.
**Poetry mode** pulls from Project Gutenberg: Whitman, Dickinson, Thoreau, Emerson. Sources are in `engine/sources.py``POETRY_SOURCES`.
### Fonts
A `fonts/` directory is bundled with demo faces (AgorTechnoDemo, AlphatronDemo, CSBishopDrawn, CubaTechnologyDemo, CyberformDemo, KATA, Microbots, ModernSpaceDemo, Neoform, Pixel Sparta, RaceHugoDemo, Resond, Robocops, Synthetix, Xeonic, and others). On startup, an interactive picker lists all discovered faces with a live half-block preview rendered at your configured size.
Navigation: `↑`/`↓` or `j`/`k` to move, `Enter` or `q` to select. The selected face persists for that session.
To add your own fonts, drop `.otf`, `.ttf`, or `.ttc` files into `fonts/` (or point `--font-dir` at any other folder). Font collections (`.ttc`, multi-face `.otf`) are enumerated face-by-face.
### ntfy.sh
Mainline polls a configurable ntfy.sh topic in the background. When a message arrives, the scroll pauses and the message renders full-screen for `MESSAGE_DISPLAY_SECS` seconds, then the stream resumes.
To push a message:
```bash
curl -d "Body text" -H "Title: Alert title" https://ntfy.sh/your_topic
```
Update `NTFY_TOPIC` in `engine/config.py` to point at your own topic.
---
## How it works
## Internals
### How it works
- On launch, the font picker scans `fonts/` and presents a live-rendered TUI for face selection; `--no-font-picker` skips directly to stream
- Feeds are fetched and filtered on startup (sports and vapid content stripped); results are cached to `.mainline_cache_news.json` / `.mainline_cache_poetry.json` for fast restarts
@@ -109,11 +93,9 @@ To add your own fonts, drop `.otf`, `.ttf`, or `.ttc` files into `fonts/` (or po
- Subject-region detection runs a regex pass on each headline; matches trigger a Google Translate call and font swap to the appropriate script (CJK, Arabic, Devanagari, etc.) using macOS system fonts
- The mic stream runs in a background thread, feeding RMS dB into the glitch probability calculation each frame
- The viewport scrolls through a virtual canvas of pre-rendered blocks; fade zones at top and bottom dissolve characters probabilistically
- An ntfy.sh poller runs in a background thread; incoming messages interrupt the scroll and render full-screen until dismissed or expired
- An ntfy.sh SSE stream runs in a background thread; incoming messages interrupt the scroll and render full-screen until dismissed or expired
---
## Architecture
### Architecture
`mainline.py` is a thin entrypoint (venv bootstrap → `engine.app.main()`). All logic lives in the `engine/` package:
@@ -127,15 +109,7 @@ engine/
filter.py HTML stripping, content filter
translate.py Google Translate wrapper + region detection
render.py OTF → half-block pipeline (SSAA, gradient)
effects/ plugin-based effects system
types.py EffectConfig, EffectContext, EffectPlugin protocol
registry.py Plugin discovery and management
chain.py Ordered pipeline execution
performance.py Performance monitoring
controller.py NTFY command handler
legacy.py Original effects (noise, glitch, fade, firehose)
effects_plugins/ External effect plugins (noise, glitch, fade, firehose)
display.py Swappable display backends (TerminalDisplay, NullDisplay)
effects.py noise, glitch_bar, fade, firehose
fetch.py RSS/Gutenberg fetching + cache load/save
ntfy.py NtfyPoller — standalone, zero internal deps
mic.py MicMonitor — standalone, graceful fallback
@@ -144,7 +118,7 @@ engine/
frame.py scroll step calculation, timing
layers.py ticker zone, firehose, message overlay rendering
eventbus.py thread-safe event publishing for decoupled communication
events.py event types and definitions
events.py event types and definitions
controller.py coordinates ntfy/mic monitoring and event publishing
emitters.py background emitters for ntfy and mic
types.py type definitions and dataclasses
@@ -154,37 +128,108 @@ engine/
---
## Feeds
## Extending
~25 sources across four categories: **Science & Technology**, **Economics & Business**, **World & Politics**, **Culture & Ideas**. Add or swap feeds in `engine/sources.py``FEEDS`.
`ntfy.py` and `mic.py` are fully standalone and designed to be reused by any terminal visualizer. `engine.render` is the importable rendering pipeline for non-terminal targets.
**Poetry mode** pulls from Project Gutenberg: Whitman, Dickinson, Thoreau, Emerson. Sources are in `engine/sources.py``POETRY_SOURCES`.
---
## ntfy.sh Integration
Mainline polls a configurable ntfy.sh topic in the background. When a message arrives, the scroll pauses and the message renders full-screen for `MESSAGE_DISPLAY_SECS` seconds, then the stream resumes.
To push a message:
```bash
curl -d "Body text" -H "Title: Alert title" https://ntfy.sh/your_topic
```
Update `NTFY_TOPIC` in `engine/config.py` to point at your own topic. The `NtfyPoller` class is fully standalone and can be reused by other visualizers:
### NtfyPoller
```python
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
poller = NtfyPoller("https://ntfy.sh/my_topic/json")
poller.start()
# in render loop:
msg = poller.get_active_message() # returns (title, body, timestamp) or None
# in your render loop:
msg = poller.get_active_message() # → (title, body, timestamp) or None
if msg:
title, body, ts = msg
render_my_message(title, body) # visualizer-specific
```
Dependencies: `urllib.request`, `json`, `threading`, `time` — stdlib only. The `since=` parameter is managed automatically on reconnect.
### MicMonitor
```python
from engine.mic import MicMonitor
mic = MicMonitor(threshold_db=50)
result = mic.start() # None = sounddevice unavailable; False = stream failed; True = ok
if result:
excess = mic.excess # dB above threshold, clamped to 0
db = mic.db # raw RMS dB level
```
Dependencies: `sounddevice`, `numpy` — both optional; degrades gracefully if unavailable.
### Render pipeline
`engine.render` exposes the OTF → raster pipeline independently of the terminal scroll loop. The planned `serve.py` extension will import it directly to pre-render headlines as 1-bit bitmaps for an ESP32 thin client:
```python
# planned — serve.py does not yet exist
from engine.render import render_line, big_wrap
from engine.fetch import fetch_all
headlines = fetch_all()
for h in headlines:
rows = big_wrap(h.text, font, width=800) # list of half-block rows
# threshold to 1-bit, pack bytes, serve over HTTP
```
See `Mainline Renderer + ntfy Message Queue for ESP32.md` for the full server + thin client architecture.
---
## Ideas / Future
## Development
### Setup
Requires Python 3.10+ and [uv](https://docs.astral.sh/uv/).
```bash
uv sync # minimal (no mic)
uv sync --all-extras # with mic support (sounddevice + numpy)
uv sync --all-extras --group dev # full dev environment
```
### Tasks
With [mise](https://mise.jdx.dev/):
```bash
mise run test # run test suite
mise run test-cov # run with coverage report
mise run lint # ruff check
mise run lint-fix # ruff check --fix
mise run format # ruff format
mise run run # uv run mainline.py
mise run run-poetry # uv run mainline.py --poetry
mise run run-firehose # uv run mainline.py --firehose
```
### Testing
Tests live in `tests/` and cover `config`, `filter`, `mic`, `ntfy`, `sources`, and `terminal`.
```bash
uv run pytest
uv run pytest --cov=engine --cov-report=term-missing
```
### Linting
```bash
uv run ruff check engine/ mainline.py
uv run ruff format engine/ mainline.py
```
Pre-commit hooks run lint automatically via `hk`.
---
## Roadmap
### Performance
- **Concurrent feed fetching** — startup currently blocks sequentially on ~25 HTTP requests; `concurrent.futures.ThreadPoolExecutor` would cut load time to the slowest single feed
@@ -211,5 +256,4 @@ msg = poller.get_active_message() # returns (title, body, timestamp) or None
---
*macOS only (script/system font paths for translation are hardcoded). Primary display font is user-selectable via the bundled `fonts/` picker. Python 3.9+.*
# test
*macOS only (script/system font paths for translation are hardcoded). Primary display font is user-selectable via the bundled `fonts/` picker. Python 3.10+.*

View File

@@ -1,250 +0,0 @@
#!/usr/bin/env python3
"""
Command-line utility for interacting with mainline via ntfy.
Usage:
python cmdline.py # Interactive TUI mode
python cmdline.py --help # Show help
python cmdline.py /effects list # Send single command via ntfy
python cmdline.py /effects stats # Get performance stats via ntfy
python cmdline.py -w /effects stats # Watch mode (polls for stats)
The TUI mode provides:
- Arrow keys to navigate command history
- Tab completion for commands
- Auto-refresh for performance stats
C&C works like a serial port:
1. Send command to ntfy_cc_topic
2. Mainline receives, processes, responds to same topic
3. Cmdline polls for response
"""
import argparse
import json
import sys
import time
import threading
import urllib.request
from pathlib import Path
from engine import config
from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST
try:
CC_CMD_TOPIC = config.NTFY_CC_CMD_TOPIC
CC_RESP_TOPIC = config.NTFY_CC_RESP_TOPIC
except AttributeError:
CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
class NtfyResponsePoller:
"""Polls ntfy for command responses."""
def __init__(self, cmd_topic: str, resp_topic: str, timeout: float = 10.0):
self.cmd_topic = cmd_topic
self.resp_topic = resp_topic
self.timeout = timeout
self._last_id = None
self._lock = threading.Lock()
def _build_url(self) -> str:
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
parsed = urlparse(self.resp_topic)
params = parse_qs(parsed.query, keep_blank_values=True)
params["since"] = [self._last_id if self._last_id else "20s"]
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def send_and_wait(self, cmd: str) -> str:
"""Send command and wait for response."""
url = self.cmd_topic.replace("/json", "")
data = cmd.encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={
"User-Agent": "mainline-cmdline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=5)
except Exception as e:
return f"Error sending command: {e}"
return self._wait_for_response(cmd)
def _wait_for_response(self, expected_cmd: str = "") -> str:
"""Poll for response message."""
start = time.time()
while time.time() - start < self.timeout:
try:
url = self._build_url()
req = urllib.request.Request(
url, headers={"User-Agent": "mainline-cmdline/0.1"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
for line in resp:
try:
data = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
if data.get("event") == "message":
self._last_id = data.get("id")
msg = data.get("message", "")
if msg:
return msg
except Exception:
pass
time.sleep(0.5)
return "Timeout waiting for response"
AVAILABLE_COMMANDS = """Available commands:
/effects list - List all effects and status
/effects <name> on - Enable an effect
/effects <name> off - Disable an effect
/effects <name> intensity <0.0-1.0> - Set effect intensity
/effects reorder <name1>,<name2>,... - Reorder pipeline
/effects stats - Show performance statistics
/help - Show this help
/quit - Exit
"""
def print_header():
w = 60
print(CLR, end="")
print(CURSOR_OFF, end="")
print(f"\033[1;1H", end="")
print(f" \033[1;38;5;231m╔{'' * (w - 6)}\033[0m")
print(
f" \033[1;38;5;231m║\033[0m \033[1;38;5;82mMAINLINE\033[0m \033[3;38;5;245mCommand Center\033[0m \033[1;38;5;231m ║\033[0m"
)
print(f" \033[1;38;5;231m╚{'' * (w - 6)}\033[0m")
print(f" \033[2;38;5;37mCMD: {CC_CMD_TOPIC.split('/')[-2]}\033[0m")
print(f" \033[2;38;5;37mRESP: {CC_RESP_TOPIC.split('/')[-2]}\033[0m")
print()
def print_response(response: str, is_error: bool = False) -> None:
"""Print response with nice formatting."""
print()
if is_error:
print(f" \033[1;38;5;196m✗ Error\033[0m")
print(f" \033[38;5;196m{'' * 40}\033[0m")
else:
print(f" \033[1;38;5;82m✓ Response\033[0m")
print(f" \033[38;5;37m{'' * 40}\033[0m")
for line in response.split("\n"):
print(f" {line}")
print()
def interactive_mode():
"""Interactive TUI for sending commands."""
import readline
print_header()
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
print(f" \033[38;5;245mType /help for commands, /quit to exit\033[0m")
print()
while True:
try:
cmd = input(f" \033[1;38;5;82m\033[0m {G_HI}").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not cmd:
continue
if cmd.startswith("/"):
if cmd == "/quit" or cmd == "/exit":
print(f"\n \033[1;38;5;245mGoodbye!{RST}\n")
break
if cmd == "/help":
print(f"\n{AVAILABLE_COMMANDS}\n")
continue
print(f" \033[38;5;245m⟳ Sending to mainline...{RST}")
result = poller.send_and_wait(cmd)
print_response(result, is_error=result.startswith("Error"))
else:
print(f"\n \033[1;38;5;196m⚠ Commands must start with /{RST}\n")
print(CURSOR_ON, end="")
return 0
def main():
parser = argparse.ArgumentParser(
description="Mainline command-line interface",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=AVAILABLE_COMMANDS,
)
parser.add_argument(
"command",
nargs="?",
default=None,
help="Command to send (e.g., /effects list)",
)
parser.add_argument(
"--watch",
"-w",
action="store_true",
help="Watch mode: continuously poll for stats (Ctrl+C to exit)",
)
args = parser.parse_args()
if args.command is None:
return interactive_mode()
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
if args.watch and "/effects stats" in args.command:
import signal
def handle_sigterm(*_):
print(f"\n \033[1;38;5;245mStopped watching{RST}")
print(CURSOR_ON, end="")
sys.exit(0)
signal.signal(signal.SIGTERM, handle_sigterm)
print_header()
print(f" \033[38;5;245mWatching /effects stats (Ctrl+C to exit)...{RST}\n")
try:
while True:
result = poller.send_and_wait(args.command)
print(f"\033[2J\033[1;1H", end="")
print(
f" \033[1;38;5;82m\033[0m Performance Stats - \033[1;38;5;245m{time.strftime('%H:%M:%S')}{RST}"
)
print(f" \033[38;5;37m{'' * 44}{RST}")
for line in result.split("\n"):
print(f" {line}")
time.sleep(2)
except KeyboardInterrupt:
print(f"\n \033[1;38;5;245mStopped watching{RST}")
return 0
return 0
result = poller.send_and_wait(args.command)
print(result)
return 0
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,145 @@
# README Update Design — 2026-03-15
## Goal
Restructure and expand `README.md` to:
1. Align with the current codebase (Python 3.10+, uv/mise/pytest/ruff toolchain, 6 new fonts)
2. Add extensibility-focused content (`Extending` section)
3. Add developer workflow coverage (`Development` section)
4. Improve navigability via top-level grouping (Approach C)
---
## Proposed Structure
```
# MAINLINE
> tagline + description
## Using
### Run
### Config
### Feeds
### Fonts
### ntfy.sh
## Internals
### How it works
### Architecture
## Extending
### NtfyPoller
### MicMonitor
### Render pipeline
## Development
### Setup
### Tasks
### Testing
### Linting
## Roadmap
---
*footer*
```
---
## Section-by-section design
### Using
All existing content preserved verbatim. Two changes:
- **Run**: add `uv run mainline.py` as an alternative invocation; expand bootstrap note to mention `uv sync` / `uv sync --all-extras`
- **ntfy.sh**: remove `NtfyPoller` reuse code example (moves to Extending); keep push instructions and topic config
Subsections moved into Using (currently standalone):
- `Feeds` — it's configuration, not a concept
- `ntfy.sh` (usage half)
### Internals
All existing content preserved verbatim. One change:
- **Architecture**: append `tests/` directory listing to the module tree
### Extending
Entirely new section. Three subsections:
**NtfyPoller**
- Minimal working import + usage example
- Note: stdlib only dependencies
```python
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
poller.start()
# in your render loop:
msg = poller.get_active_message() # → (title, body, timestamp) or None
if msg:
title, body, ts = msg
render_my_message(title, body) # visualizer-specific
```
**MicMonitor**
- Minimal working import + usage example
- Note: sounddevice/numpy optional, degrades gracefully
```python
from engine.mic import MicMonitor
mic = MicMonitor(threshold_db=50)
if mic.start(): # returns False if sounddevice unavailable
excess = mic.excess # dB above threshold, clamped to 0
db = mic.db # raw RMS dB level
```
**Render pipeline**
- Brief prose about `engine.render` as importable pipeline
- Minimal sketch of serve.py / ESP32 usage pattern
- Reference to `Mainline Renderer + ntfy Message Queue for ESP32.md`
### Development
Entirely new section. Four subsections:
**Setup**
- Hard requirements: Python 3.10+, uv
- `uv sync` / `uv sync --all-extras` / `uv sync --group dev`
**Tasks** (via mise)
- `mise run test`, `test-cov`, `lint`, `lint-fix`, `format`, `run`, `run-poetry`, `run-firehose`
**Testing**
- Tests in `tests/` covering config, filter, mic, ntfy, sources, terminal
- `uv run pytest` and `uv run pytest --cov=engine --cov-report=term-missing`
**Linting**
- `uv run ruff check` and `uv run ruff format`
- Note: pre-commit hooks run lint via `hk`
### Roadmap
Existing `## Ideas / Future` content preserved verbatim. Only change: rename heading to `## Roadmap`.
### Footer
Update `Python 3.9+``Python 3.10+`.
---
## Files changed
- `README.md` — restructured and expanded as above
- No other files
---
## What is not changing
- All existing prose, examples, and config table values — preserved verbatim where retained
- The Ideas/Future content — kept intact under the new Roadmap heading
- The cyberpunk voice and terse style of the existing README

View File

@@ -1,35 +0,0 @@
from pathlib import Path
PLUGIN_DIR = Path(__file__).parent
def discover_plugins():
from engine.effects.registry import get_registry
registry = get_registry()
imported = {}
for file_path in PLUGIN_DIR.glob("*.py"):
if file_path.name.startswith("_"):
continue
module_name = file_path.stem
if module_name in ("base", "types"):
continue
try:
module = __import__(f"effects_plugins.{module_name}", fromlist=[""])
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and hasattr(attr, "name")
and hasattr(attr, "process")
and attr_name.endswith("Effect")
):
plugin = attr()
registry.register(plugin)
imported[plugin.name] = plugin
except Exception:
pass
return imported

View File

@@ -1,58 +0,0 @@
import random
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
class FadeEffect:
name = "fade"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not ctx.ticker_height:
return buf
result = list(buf)
intensity = self.config.intensity
top_zone = max(1, int(ctx.ticker_height * 0.25))
bot_zone = max(1, int(ctx.ticker_height * 0.10))
for r in range(len(result)):
if r >= ctx.ticker_height:
continue
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = (
min(1.0, (ctx.ticker_height - 1 - r) / bot_zone)
if bot_zone > 0
else 1.0
)
row_fade = min(top_f, bot_f) * intensity
if row_fade < 1.0 and result[r].strip():
result[r] = self._fade_line(result[r], row_fade)
return result
def _fade_line(self, s: str, fade: float) -> str:
if fade >= 1.0:
return s
if fade <= 0.0:
return ""
result = []
i = 0
while i < len(s):
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i : j + 1])
i = j + 1
elif s[i] == " ":
result.append(" ")
i += 1
else:
result.append(s[i] if random.random() < fade else " ")
i += 1
return "".join(result)
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -1,72 +0,0 @@
import random
from datetime import datetime
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
class FirehoseEffect:
name = "firehose"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0
if firehose_h <= 0 or not ctx.items:
return buf
result = list(buf)
intensity = self.config.intensity
h = ctx.terminal_height
for fr in range(firehose_h):
scr_row = h - firehose_h + fr + 1
fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity)
result.append(f"\033[{scr_row};1H{fline}\033[K")
return result
def _firehose_line(self, items: list, w: int, intensity: float) -> str:
r = random.random()
if r < 0.35 * intensity:
title, src, ts = random.choice(items)
text = title[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
return f"{color}{text}{RST}"
elif r < 0.55 * intensity:
d = random.choice([0.45, 0.55, 0.65, 0.75])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
elif r < 0.78 * intensity:
sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
src = random.choice(list(sources.keys()))
msgs = [
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
f" ░░ FEED ACTIVE :: {src}",
f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}",
f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}",
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
]
text = random.choice(msgs)[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST])
return f"{color}{text}{RST}"
else:
title, _, _ = random.choice(items)
start = random.randint(0, max(0, len(title) - 20))
frag = title[start : start + random.randint(10, 35)]
pad = random.randint(0, max(0, w - len(frag) - 8))
gp = "".join(
random.choice(config.GLITCH) for _ in range(random.randint(1, 3))
)
text = (" " * pad + gp + " " + frag)[: w - 1]
color = random.choice([G_LO, C_DIM, W_GHOST])
return f"{color}{text}{RST}"
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -1,37 +0,0 @@
import random
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST
class GlitchEffect:
name = "glitch"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not buf:
return buf
result = list(buf)
intensity = self.config.intensity
glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16)
glitch_prob = glitch_prob * intensity
n_hits = 4 + int(ctx.mic_excess / 2)
n_hits = int(n_hits * intensity)
if random.random() < glitch_prob:
for _ in range(min(n_hits, len(result))):
gi = random.randint(0, len(result) - 1)
scr_row = gi + 1
result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}"
return result
def _glitch_bar(self, w: int) -> str:
c = random.choice(["", "", "", "\xc2"])
n = random.randint(3, w // 2)
o = random.randint(0, w - n)
return " " * o + f"{G_LO}{DIM}" + c * n + RST
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -1,36 +0,0 @@
import random
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
class NoiseEffect:
name = "noise"
config = EffectConfig(enabled=True, intensity=0.15)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not ctx.ticker_height:
return buf
result = list(buf)
intensity = self.config.intensity
probability = intensity * 0.15
for r in range(len(result)):
cy = ctx.scroll_cam + r
if random.random() < probability:
result[r] = self._generate_noise(ctx.terminal_width, cy)
return result
def _generate_noise(self, w: int, cy: int) -> str:
d = random.choice([0.15, 0.25, 0.35, 0.12])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -11,8 +11,10 @@ import time
import tty
from engine import config, render
from engine.controller import StreamController
from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
from engine.terminal import (
CLR,
CURSOR_OFF,
@@ -247,110 +249,6 @@ def pick_font_face():
print()
def pick_effects_config():
"""Interactive picker for configuring effects pipeline."""
import effects_plugins
from engine.effects import get_effect_chain, get_registry
effects_plugins.discover_plugins()
registry = get_registry()
chain = get_effect_chain()
chain.set_order(["noise", "fade", "glitch", "firehose"])
effects = list(registry.list_all().values())
if not effects:
return
selected = 0
editing_intensity = False
intensity_value = 1.0
def _draw_effects_picker():
w = tw()
print(CLR, end="")
print("\033[1;1H", end="")
print(" \033[1;38;5;231mEFFECTS CONFIG\033[0m")
print(f" \033[2;38;5;37m{'' * (w - 4)}\033[0m")
print()
for i, effect in enumerate(effects):
prefix = " > " if i == selected else " "
marker = "[*]" if effect.config.enabled else "[ ]"
if editing_intensity and i == selected:
print(
f"{prefix}{marker} \033[1;38;5;82m{effect.name}\033[0m: intensity={intensity_value:.2f} (use +/- to adjust, Enter to confirm)"
)
else:
print(
f"{prefix}{marker} {effect.name}: intensity={effect.config.intensity:.2f}"
)
print()
print(f" \033[2;38;5;37m{'' * (w - 4)}\033[0m")
print(
" \033[38;5;245mControls: space=toggle on/off | +/-=adjust intensity | arrows=move | Enter=next effect | q=done\033[0m"
)
def _read_effects_key():
ch = sys.stdin.read(1)
if ch == "\x03":
return "interrupt"
if ch in ("\r", "\n"):
return "enter"
if ch == " ":
return "toggle"
if ch == "q":
return "quit"
if ch == "+" or ch == "=":
return "up"
if ch == "-" or ch == "_":
return "down"
if ch == "\x1b":
c1 = sys.stdin.read(1)
if c1 != "[":
return None
c2 = sys.stdin.read(1)
if c2 == "A":
return "up"
if c2 == "B":
return "down"
return None
return None
if not sys.stdin.isatty():
return
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while True:
_draw_effects_picker()
key = _read_effects_key()
if key == "quit" or key == "enter":
break
elif key == "up" and editing_intensity:
intensity_value = min(1.0, intensity_value + 0.1)
effects[selected].config.intensity = intensity_value
elif key == "down" and editing_intensity:
intensity_value = max(0.0, intensity_value - 0.1)
effects[selected].config.intensity = intensity_value
elif key == "up":
selected = max(0, selected - 1)
intensity_value = effects[selected].config.intensity
elif key == "down":
selected = min(len(effects) - 1, selected + 1)
intensity_value = effects[selected].config.intensity
elif key == "toggle":
effects[selected].config.enabled = not effects[selected].config.enabled
elif key == "interrupt":
raise KeyboardInterrupt
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def main():
atexit.register(lambda: print(CURSOR_ON, end="", flush=True))
@@ -361,13 +259,10 @@ def main():
signal.signal(signal.SIGINT, handle_sigint)
StreamController.warmup_topics()
w = tw()
print(CLR, end="")
print(CURSOR_OFF, end="")
pick_font_face()
pick_effects_config()
w = tw()
print()
time.sleep(0.4)
@@ -419,10 +314,9 @@ def main():
sys.exit(1)
print()
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
if controller.mic and controller.mic.available:
mic = MicMonitor(threshold_db=config.MIC_THRESHOLD_DB)
mic_ok = mic.start()
if mic.available:
boot_ln(
"Microphone",
"ACTIVE"
@@ -431,6 +325,12 @@ def main():
bool(mic_ok),
)
ntfy = NtfyPoller(
config.NTFY_TOPIC,
reconnect_delay=config.NTFY_RECONNECT_DELAY,
display_secs=config.MESSAGE_DISPLAY_SECS,
)
ntfy_ok = ntfy.start()
boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok)
if config.FIREHOSE:
@@ -443,7 +343,7 @@ def main():
print()
time.sleep(0.4)
controller.run(items)
stream(items, ntfy, mic)
print()
print(f" {W_GHOST}{'' * (tw() - 4)}{RST}")

View File

@@ -105,8 +105,6 @@ class Config:
firehose: bool = False
ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json"
ntfy_cc_cmd_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
ntfy_cc_resp_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
ntfy_reconnect_delay: int = 5
message_display_secs: int = 30
@@ -150,8 +148,6 @@ class Config:
mode="poetry" if "--poetry" in argv or "-p" in argv else "news",
firehose="--firehose" in argv,
ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json",
ntfy_cc_cmd_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json",
ntfy_cc_resp_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir=font_dir,
@@ -197,8 +193,6 @@ FIREHOSE = "--firehose" in sys.argv
# ─── NTFY MESSAGE QUEUE ──────────────────────────────────
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json"
NTFY_CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
NTFY_CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen

View File

@@ -3,7 +3,6 @@ Stream controller - manages input sources and orchestrates the render stream.
"""
from engine.config import Config, get_config
from engine.effects.controller import handle_effects_command
from engine.eventbus import EventBus
from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor
@@ -14,45 +13,11 @@ from engine.scroll import stream
class StreamController:
"""Controls the stream lifecycle - initializes sources and runs the stream."""
_topics_warmed = False
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
self.config = config or get_config()
self.event_bus = event_bus
self.mic: MicMonitor | None = None
self.ntfy: NtfyPoller | None = None
self.ntfy_cc: NtfyPoller | None = None
@classmethod
def warmup_topics(cls) -> None:
"""Warm up ntfy topics lazily (creates them if they don't exist)."""
if cls._topics_warmed:
return
import urllib.request
topics = [
"https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd",
"https://ntfy.sh/klubhaus_terminal_mainline_cc_resp",
"https://ntfy.sh/klubhaus_terminal_mainline",
]
for topic in topics:
try:
req = urllib.request.Request(
topic,
data=b"init",
headers={
"User-Agent": "mainline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
cls._topics_warmed = True
def initialize_sources(self) -> tuple[bool, bool]:
"""Initialize microphone and ntfy sources.
@@ -70,38 +35,7 @@ class StreamController:
)
ntfy_ok = self.ntfy.start()
self.ntfy_cc = NtfyPoller(
self.config.ntfy_cc_cmd_topic,
reconnect_delay=self.config.ntfy_reconnect_delay,
display_secs=5,
)
self.ntfy_cc.subscribe(self._handle_cc_message)
ntfy_cc_ok = self.ntfy_cc.start()
return bool(mic_ok), ntfy_ok and ntfy_cc_ok
def _handle_cc_message(self, event) -> None:
"""Handle incoming C&C message - like a serial port control interface."""
import urllib.request
cmd = event.body.strip() if hasattr(event, "body") else str(event).strip()
if not cmd.startswith("/"):
return
response = handle_effects_command(cmd)
topic_url = self.config.ntfy_cc_resp_topic.replace("/json", "")
data = response.encode("utf-8")
req = urllib.request.Request(
topic_url,
data=data,
headers={"User-Agent": "mainline/0.1", "Content-Type": "text/plain"},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
return bool(mic_ok), ntfy_ok
def run(self, items: list) -> None:
"""Run the stream with initialized sources."""

View File

@@ -1,102 +0,0 @@
"""
Display output abstraction - allows swapping output backends.
Protocol:
- init(width, height): Initialize display with terminal dimensions
- show(buffer): Render buffer (list of strings) to display
- clear(): Clear the display
- cleanup(): Shutdown display
"""
import time
from typing import Protocol
class Display(Protocol):
"""Protocol for display backends."""
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
class TerminalDisplay:
"""ANSI terminal display backend."""
def __init__(self):
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
from engine.terminal import CURSOR_OFF
self.width = width
self.height = height
print(CURSOR_OFF, end="", flush=True)
def show(self, buffer: list[str]) -> None:
import sys
t0 = time.perf_counter()
sys.stdout.buffer.write("".join(buffer).encode())
sys.stdout.flush()
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
from engine.terminal import CLR
print(CLR, end="", flush=True)
def cleanup(self) -> None:
from engine.terminal import CURSOR_ON
print(CURSOR_ON, end="", flush=True)
class NullDisplay:
"""Headless/null display - discards all output."""
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
def show(self, buffer: list[str]) -> None:
monitor = get_monitor()
if monitor:
t0 = time.perf_counter()
chars_in = sum(len(line) for line in buffer)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass

View File

@@ -1,42 +0,0 @@
from engine.effects.chain import EffectChain
from engine.effects.controller import handle_effects_command, show_effects_menu
from engine.effects.legacy import (
fade_line,
firehose_line,
glitch_bar,
next_headline,
noise,
vis_trunc,
)
from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor
from engine.effects.registry import EffectRegistry, get_registry, set_registry
from engine.effects.types import EffectConfig, EffectContext, PipelineConfig
def get_effect_chain():
from engine.layers import get_effect_chain as _chain
return _chain()
__all__ = [
"EffectChain",
"EffectRegistry",
"EffectConfig",
"EffectContext",
"PipelineConfig",
"get_registry",
"set_registry",
"get_effect_chain",
"get_monitor",
"set_monitor",
"PerformanceMonitor",
"handle_effects_command",
"show_effects_menu",
"fade_line",
"firehose_line",
"glitch_bar",
"noise",
"next_headline",
"vis_trunc",
]

View File

@@ -1,71 +0,0 @@
import time
from engine.effects.performance import PerformanceMonitor, get_monitor
from engine.effects.registry import EffectRegistry
from engine.effects.types import EffectContext
class EffectChain:
def __init__(
self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None
):
self._registry = registry
self._order: list[str] = []
self._monitor = monitor
def _get_monitor(self) -> PerformanceMonitor:
if self._monitor is not None:
return self._monitor
return get_monitor()
def set_order(self, names: list[str]) -> None:
self._order = list(names)
def get_order(self) -> list[str]:
return self._order.copy()
def add_effect(self, name: str, position: int | None = None) -> bool:
if name not in self._registry.list_all():
return False
if position is None:
self._order.append(name)
else:
self._order.insert(position, name)
return True
def remove_effect(self, name: str) -> bool:
if name in self._order:
self._order.remove(name)
return True
return False
def reorder(self, new_order: list[str]) -> bool:
all_plugins = set(self._registry.list_all().keys())
if not all(name in all_plugins for name in new_order):
return False
self._order = list(new_order)
return True
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
monitor = self._get_monitor()
frame_number = ctx.frame_number
monitor.start_frame(frame_number)
frame_start = time.perf_counter()
result = list(buf)
for name in self._order:
plugin = self._registry.get(name)
if plugin and plugin.config.enabled:
chars_in = sum(len(line) for line in result)
effect_start = time.perf_counter()
try:
result = plugin.process(result, ctx)
except Exception:
plugin.config.enabled = False
elapsed = time.perf_counter() - effect_start
chars_out = sum(len(line) for line in result)
monitor.record_effect(name, elapsed * 1000, chars_in, chars_out)
total_elapsed = time.perf_counter() - frame_start
monitor.end_frame(frame_number, total_elapsed * 1000)
return result

View File

@@ -1,144 +0,0 @@
from engine.effects.performance import get_monitor
from engine.effects.registry import get_registry
_effect_chain_ref = None
def _get_effect_chain():
global _effect_chain_ref
if _effect_chain_ref is not None:
return _effect_chain_ref
try:
from engine.layers import get_effect_chain as _chain
return _chain()
except Exception:
return None
def set_effect_chain_ref(chain) -> None:
global _effect_chain_ref
_effect_chain_ref = chain
def handle_effects_command(cmd: str) -> str:
"""Handle /effects command from NTFY message.
Commands:
/effects list - list all effects and their status
/effects <name> on - enable an effect
/effects <name> off - disable an effect
/effects <name> intensity <0.0-1.0> - set intensity
/effects reorder <name1>,<name2>,... - reorder pipeline
/effects stats - show performance statistics
"""
parts = cmd.strip().split()
if not parts or parts[0] != "/effects":
return "Unknown command"
registry = get_registry()
chain = _get_effect_chain()
if len(parts) == 1 or parts[1] == "list":
result = ["Effects:"]
for name, plugin in registry.list_all().items():
status = "ON" if plugin.config.enabled else "OFF"
intensity = plugin.config.intensity
result.append(f" {name}: {status} (intensity={intensity})")
if chain:
result.append(f"Order: {chain.get_order()}")
return "\n".join(result)
if parts[1] == "stats":
return _format_stats()
if parts[1] == "reorder" and len(parts) >= 3:
new_order = parts[2].split(",")
if chain and chain.reorder(new_order):
return f"Reordered pipeline: {new_order}"
return "Failed to reorder pipeline"
if len(parts) < 3:
return "Usage: /effects <name> on|off|intensity <value>"
effect_name = parts[1]
action = parts[2]
if effect_name not in registry.list_all():
return f"Unknown effect: {effect_name}"
if action == "on":
registry.enable(effect_name)
return f"Enabled: {effect_name}"
if action == "off":
registry.disable(effect_name)
return f"Disabled: {effect_name}"
if action == "intensity" and len(parts) >= 4:
try:
value = float(parts[3])
if not 0.0 <= value <= 1.0:
return "Intensity must be between 0.0 and 1.0"
plugin = registry.get(effect_name)
if plugin:
plugin.config.intensity = value
return f"Set {effect_name} intensity to {value}"
except ValueError:
return "Invalid intensity value"
return f"Unknown action: {action}"
def _format_stats() -> str:
monitor = get_monitor()
stats = monitor.get_stats()
if "error" in stats:
return stats["error"]
lines = ["Performance Stats:"]
pipeline = stats["pipeline"]
lines.append(
f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)"
)
if stats["effects"]:
lines.append(" Per-effect (avg ms):")
for name, effect_stats in stats["effects"].items():
lines.append(
f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms"
)
return "\n".join(lines)
def show_effects_menu() -> str:
"""Generate effects menu text for display."""
registry = get_registry()
chain = _get_effect_chain()
lines = [
"\033[1;38;5;231m=== EFFECTS MENU ===\033[0m",
"",
"Effects:",
]
for name, plugin in registry.list_all().items():
status = "ON" if plugin.config.enabled else "OFF"
intensity = plugin.config.intensity
lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}")
if chain:
lines.append("")
lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}")
lines.append("")
lines.append("Controls:")
lines.append(" /effects <name> on|off")
lines.append(" /effects <name> intensity <0.0-1.0>")
lines.append(" /effects reorder name1,name2,...")
lines.append("")
return "\n".join(lines)

View File

@@ -1,103 +0,0 @@
from collections import deque
from dataclasses import dataclass
@dataclass
class EffectTiming:
name: str
duration_ms: float
buffer_chars_in: int
buffer_chars_out: int
@dataclass
class FrameTiming:
frame_number: int
total_ms: float
effects: list[EffectTiming]
class PerformanceMonitor:
"""Collects and stores performance metrics for effect pipeline."""
def __init__(self, max_frames: int = 60):
self._max_frames = max_frames
self._frames: deque[FrameTiming] = deque(maxlen=max_frames)
self._current_frame: list[EffectTiming] = []
def start_frame(self, frame_number: int) -> None:
self._current_frame = []
def record_effect(
self, name: str, duration_ms: float, chars_in: int, chars_out: int
) -> None:
self._current_frame.append(
EffectTiming(
name=name,
duration_ms=duration_ms,
buffer_chars_in=chars_in,
buffer_chars_out=chars_out,
)
)
def end_frame(self, frame_number: int, total_ms: float) -> None:
self._frames.append(
FrameTiming(
frame_number=frame_number,
total_ms=total_ms,
effects=self._current_frame,
)
)
def get_stats(self) -> dict:
if not self._frames:
return {"error": "No timing data available"}
total_times = [f.total_ms for f in self._frames]
avg_total = sum(total_times) / len(total_times)
min_total = min(total_times)
max_total = max(total_times)
effect_stats: dict[str, dict] = {}
for frame in self._frames:
for effect in frame.effects:
if effect.name not in effect_stats:
effect_stats[effect.name] = {"times": [], "total_chars": 0}
effect_stats[effect.name]["times"].append(effect.duration_ms)
effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out
for name, stats in effect_stats.items():
times = stats["times"]
stats["avg_ms"] = sum(times) / len(times)
stats["min_ms"] = min(times)
stats["max_ms"] = max(times)
del stats["times"]
return {
"frame_count": len(self._frames),
"pipeline": {
"avg_ms": avg_total,
"min_ms": min_total,
"max_ms": max_total,
},
"effects": effect_stats,
}
def reset(self) -> None:
self._frames.clear()
self._current_frame = []
_monitor: PerformanceMonitor | None = None
def get_monitor() -> PerformanceMonitor:
global _monitor
if _monitor is None:
_monitor = PerformanceMonitor()
return _monitor
def set_monitor(monitor: PerformanceMonitor) -> None:
global _monitor
_monitor = monitor

View File

@@ -1,59 +0,0 @@
from engine.effects.types import EffectConfig, EffectPlugin
class EffectRegistry:
def __init__(self):
self._plugins: dict[str, EffectPlugin] = {}
self._discovered: bool = False
def register(self, plugin: EffectPlugin) -> None:
self._plugins[plugin.name] = plugin
def get(self, name: str) -> EffectPlugin | None:
return self._plugins.get(name)
def list_all(self) -> dict[str, EffectPlugin]:
return self._plugins.copy()
def list_enabled(self) -> list[EffectPlugin]:
return [p for p in self._plugins.values() if p.config.enabled]
def enable(self, name: str) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.config.enabled = True
return True
return False
def disable(self, name: str) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.config.enabled = False
return True
return False
def configure(self, name: str, config: EffectConfig) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.configure(config)
return True
return False
def is_enabled(self, name: str) -> bool:
plugin = self._plugins.get(name)
return plugin.config.enabled if plugin else False
_registry: EffectRegistry | None = None
def get_registry() -> EffectRegistry:
global _registry
if _registry is None:
_registry = EffectRegistry()
return _registry
def set_registry(registry: EffectRegistry) -> None:
global _registry
_registry = registry

View File

@@ -1,39 +0,0 @@
from dataclasses import dataclass, field
from typing import Any
@dataclass
class EffectContext:
terminal_width: int
terminal_height: int
scroll_cam: int
ticker_height: int
mic_excess: float
grad_offset: float
frame_number: int
has_message: bool
items: list = field(default_factory=list)
@dataclass
class EffectConfig:
enabled: bool = True
intensity: float = 1.0
params: dict[str, Any] = field(default_factory=dict)
class EffectPlugin:
name: str
config: EffectConfig
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
raise NotImplementedError
def configure(self, config: EffectConfig) -> None:
raise NotImplementedError
@dataclass
class PipelineConfig:
order: list[str] = field(default_factory=list)
effects: dict[str, EffectConfig] = field(default_factory=dict)

View File

@@ -10,8 +10,6 @@ from datetime import datetime
from engine import config
from engine.effects import (
EffectChain,
EffectContext,
fade_line,
firehose_line,
glitch_bar,
@@ -201,60 +199,3 @@ def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
return buf
_effect_chain = None
def init_effects() -> None:
"""Initialize effect plugins and chain."""
global _effect_chain
from engine.effects import EffectChain, get_registry
registry = get_registry()
import effects_plugins
effects_plugins.discover_plugins()
chain = EffectChain(registry)
chain.set_order(["noise", "fade", "glitch", "firehose"])
_effect_chain = chain
def process_effects(
buf: list[str],
w: int,
h: int,
scroll_cam: int,
ticker_h: int,
mic_excess: float,
grad_offset: float,
frame_number: int,
has_message: bool,
items: list,
) -> list[str]:
"""Process buffer through effect chain."""
if _effect_chain is None:
init_effects()
ctx = EffectContext(
terminal_width=w,
terminal_height=h,
scroll_cam=scroll_cam,
ticker_height=ticker_h,
mic_excess=mic_excess,
grad_offset=grad_offset,
frame_number=frame_number,
has_message=has_message,
items=items,
)
return _effect_chain.process(buf, ctx)
def get_effect_chain() -> EffectChain | None:
"""Get the effect chain instance."""
global _effect_chain
if _effect_chain is None:
init_effects()
return _effect_chain

View File

@@ -4,42 +4,33 @@ Orchestrates viewport, frame timing, and layers.
"""
import random
import sys
import time
from engine import config
from engine.display import (
Display,
TerminalDisplay,
)
from engine.display import (
get_monitor as _get_display_monitor,
)
from engine.frame import calculate_scroll_step
from engine.layers import (
apply_glitch,
process_effects,
render_firehose,
render_message_overlay,
render_ticker_zone,
)
from engine.terminal import CLR
from engine.viewport import th, tw
USE_EFFECT_CHAIN = True
def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
def stream(items, ntfy_poller, mic_monitor):
"""Main render loop with four layers: message, ticker, scroll motion, firehose."""
if display is None:
display = TerminalDisplay()
random.shuffle(items)
pool = list(items)
seen = set()
queued = 0
time.sleep(0.5)
sys.stdout.write(CLR)
sys.stdout.flush()
w, h = tw(), th()
display.init(w, h)
display.clear()
fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh
GAP = 3
@@ -51,7 +42,6 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
noise_cache = {}
scroll_motion_accum = 0.0
msg_cache = (None, None)
frame_number = 0
while True:
if queued >= config.HEADLINE_LIMIT and not active:
@@ -103,39 +93,19 @@ def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
buf.extend(ticker_buf)
mic_excess = mic_monitor.excess
render_start = time.perf_counter()
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
if USE_EFFECT_CHAIN:
buf = process_effects(
buf,
w,
h,
scroll_cam,
ticker_h,
mic_excess,
grad_offset,
frame_number,
msg is not None,
items,
)
else:
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
firehose_buf = render_firehose(items, w, fh, h)
buf.extend(firehose_buf)
firehose_buf = render_firehose(items, w, fh, h)
buf.extend(firehose_buf)
if msg_overlay:
buf.extend(msg_overlay)
render_elapsed = (time.perf_counter() - render_start) * 1000
monitor = _get_display_monitor()
if monitor:
chars = sum(len(line) for line in buf)
monitor.record_effect("render", render_elapsed, chars, chars)
display.show(buf)
sys.stdout.buffer.write("".join(buf).encode())
sys.stdout.flush()
elapsed = time.monotonic() - t0
time.sleep(max(0, config.FRAME_DT - elapsed))
frame_number += 1
display.cleanup()
sys.stdout.write(CLR)
sys.stdout.flush()

View File

@@ -25,41 +25,24 @@ run = "uv run mainline.py"
run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose"
daemon = "nohup uv run mainline.py > /dev/null 2>&1 &"
daemon-stop = "pkill -f 'uv run mainline.py' 2>/dev/null || true"
daemon-restart = "mise run daemon-stop && sleep 2 && mise run daemon"
# =====================
# Command & Control
# =====================
cmd = "uv run cmdline.py"
cmd-stats = "bash -c 'uv run cmdline.py -w \"/effects stats\"';:"
# Initialize ntfy topics (warm up before first use - also done automatically by mainline)
topics-init = "curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_resp > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline > /dev/null"
# =====================
# Environment
# =====================
sync = "uv sync"
sync-all = "uv sync --all-extras"
install = "mise run sync"
install-dev = "mise run sync && uv sync --group dev"
install = "uv sync"
install-dev = "uv sync --group dev"
bootstrap = "uv sync && uv run mainline.py --help"
clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out"
# Aggressive cleanup - removes all generated files, caches, and venv
clobber = "git clean -fdx && rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out"
clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache"
# =====================
# CI/CD
# =====================
ci = "mise run topics-init && uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml"
ci = "uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml"
ci-lint = "uv run ruff check engine/ mainline.py"
# =====================

View File

@@ -83,35 +83,3 @@ class TestStreamControllerCleanup:
controller.cleanup()
mock_mic_instance.stop.assert_called_once()
class TestStreamControllerWarmup:
"""Tests for StreamController topic warmup."""
def test_warmup_topics_idempotent(self):
"""warmup_topics can be called multiple times."""
StreamController._topics_warmed = False
with patch("urllib.request.urlopen") as mock_urlopen:
StreamController.warmup_topics()
StreamController.warmup_topics()
assert mock_urlopen.call_count >= 3
def test_warmup_topics_sets_flag(self):
"""warmup_topics sets the warmed flag."""
StreamController._topics_warmed = False
with patch("urllib.request.urlopen"):
StreamController.warmup_topics()
assert StreamController._topics_warmed is True
def test_warmup_topics_skips_after_first(self):
"""warmup_topics skips after first call."""
StreamController._topics_warmed = True
with patch("urllib.request.urlopen") as mock_urlopen:
StreamController.warmup_topics()
mock_urlopen.assert_not_called()

View File

@@ -1,79 +0,0 @@
"""
Tests for engine.display module.
"""
from engine.display import NullDisplay, TerminalDisplay
class TestDisplayProtocol:
"""Test that display backends satisfy the Display protocol."""
def test_terminal_display_is_display(self):
"""TerminalDisplay satisfies Display protocol."""
display = TerminalDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
def test_null_display_is_display(self):
"""NullDisplay satisfies Display protocol."""
display = NullDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
class TestTerminalDisplay:
"""Tests for TerminalDisplay class."""
def test_init_sets_dimensions(self):
"""init stores terminal dimensions."""
display = TerminalDisplay()
display.init(80, 24)
assert display.width == 80
assert display.height == 24
def test_show_returns_none(self):
"""show returns None after writing to stdout."""
display = TerminalDisplay()
display.width = 80
display.height = 24
display.show(["line1", "line2"])
def test_clear_does_not_error(self):
"""clear works without error."""
display = TerminalDisplay()
display.clear()
def test_cleanup_does_not_error(self):
"""cleanup works without error."""
display = TerminalDisplay()
display.cleanup()
class TestNullDisplay:
"""Tests for NullDisplay class."""
def test_init_stores_dimensions(self):
"""init stores dimensions."""
display = NullDisplay()
display.init(100, 50)
assert display.width == 100
assert display.height == 50
def test_show_does_nothing(self):
"""show discards buffer without error."""
display = NullDisplay()
display.show(["line1", "line2", "line3"])
def test_clear_does_nothing(self):
"""clear does nothing."""
display = NullDisplay()
display.clear()
def test_cleanup_does_nothing(self):
"""cleanup does nothing."""
display = NullDisplay()
display.cleanup()

View File

@@ -1,427 +0,0 @@
"""
Tests for engine.effects module.
"""
from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry
class MockEffect:
name = "mock"
config = EffectConfig(enabled=True, intensity=1.0)
def __init__(self):
self.processed = False
self.last_ctx = None
def process(self, buf, ctx):
self.processed = True
self.last_ctx = ctx
return buf + ["processed"]
def configure(self, config):
self.config = config
class TestEffectConfig:
def test_defaults(self):
cfg = EffectConfig()
assert cfg.enabled is True
assert cfg.intensity == 1.0
assert cfg.params == {}
def test_custom_values(self):
cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"})
assert cfg.enabled is False
assert cfg.intensity == 0.5
assert cfg.params == {"key": "value"}
class TestEffectContext:
def test_defaults(self):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
assert ctx.terminal_width == 80
assert ctx.terminal_height == 24
assert ctx.ticker_height == 20
assert ctx.items == []
def test_with_items(self):
items = [("Title", "Source", "12:00")]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
items=items,
)
assert ctx.items == items
class TestEffectRegistry:
def test_init_empty(self):
registry = EffectRegistry()
assert len(registry.list_all()) == 0
def test_register(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
assert "mock" in registry.list_all()
def test_get(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
retrieved = registry.get("mock")
assert retrieved is effect
def test_get_nonexistent(self):
registry = EffectRegistry()
assert registry.get("nonexistent") is None
def test_enable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = False
registry.register(effect)
registry.enable("mock")
assert effect.config.enabled is True
def test_disable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
registry.disable("mock")
assert effect.config.enabled is False
def test_list_enabled(self):
registry = EffectRegistry()
class EnabledEffect:
name = "enabled_effect"
config = EffectConfig(enabled=True, intensity=1.0)
class DisabledEffect:
name = "disabled_effect"
config = EffectConfig(enabled=False, intensity=1.0)
registry.register(EnabledEffect())
registry.register(DisabledEffect())
enabled = registry.list_enabled()
assert len(enabled) == 1
assert enabled[0].name == "enabled_effect"
def test_configure(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
new_config = EffectConfig(enabled=False, intensity=0.3)
registry.configure("mock", new_config)
assert effect.config.enabled is False
assert effect.config.intensity == 0.3
def test_is_enabled(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
assert registry.is_enabled("mock") is True
assert registry.is_enabled("nonexistent") is False
class TestEffectChain:
def test_init(self):
registry = EffectRegistry()
chain = EffectChain(registry)
assert chain.get_order() == []
def test_set_order(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
registry.register(effect1)
registry.register(effect2)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2"])
assert chain.get_order() == ["effect1", "effect2"]
def test_add_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.add_effect("test_effect")
assert "test_effect" in chain.get_order()
def test_add_effect_invalid(self):
registry = EffectRegistry()
chain = EffectChain(registry)
result = chain.add_effect("nonexistent")
assert result is False
def test_remove_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
chain.remove_effect("test_effect")
assert "test_effect" not in chain.get_order()
def test_reorder(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
effect3 = MockEffect()
effect3.name = "effect3"
registry.register(effect1)
registry.register(effect2)
registry.register(effect3)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2", "effect3"])
result = chain.reorder(["effect3", "effect1", "effect2"])
assert result is True
assert chain.get_order() == ["effect3", "effect1", "effect2"]
def test_reorder_invalid(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "effect1"
registry.register(effect)
chain = EffectChain(registry)
result = chain.reorder(["effect1", "nonexistent"])
assert result is False
def test_process_empty_chain(self):
registry = EffectRegistry()
chain = EffectChain(registry)
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == buf
def test_process_with_effects(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1", "line2", "processed"]
assert effect.processed is True
assert effect.last_ctx is ctx
def test_process_disabled_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
effect.config.enabled = False
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1"]
assert effect.processed is False
class TestEffectsExports:
def test_all_exports_are_importable(self):
"""Verify all exports in __all__ can actually be imported."""
import engine.effects as effects_module
for name in effects_module.__all__:
getattr(effects_module, name)
class TestPerformanceMonitor:
def test_empty_stats(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
stats = monitor.get_stats()
assert "error" in stats
def test_record_and_retrieve(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test_effect", 1.5, 100, 150)
monitor.end_frame(1, 2.0)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["frame_count"] == 1
assert "test_effect" in stats["effects"]
def test_multiple_frames(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=3)
for i in range(5):
monitor.start_frame(i)
monitor.record_effect("effect1", 1.0, 100, 100)
monitor.record_effect("effect2", 0.5, 100, 100)
monitor.end_frame(i, 1.5)
stats = monitor.get_stats()
assert stats["frame_count"] == 3
assert "effect1" in stats["effects"]
assert "effect2" in stats["effects"]
def test_reset(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test", 1.0, 100, 100)
monitor.end_frame(1, 1.0)
monitor.reset()
stats = monitor.get_stats()
assert "error" in stats
class TestEffectPipelinePerformance:
def test_pipeline_stays_within_frame_budget(self):
"""Verify effect pipeline completes within frame budget (33ms for 30fps)."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class DummyEffect:
name = "dummy"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
return [line * 2 for line in buf]
registry = EffectRegistry()
registry.register(DummyEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=10)
chain = EffectChain(registry, monitor)
chain.set_order(["dummy"])
buf = ["x" * 80] * 20
for i in range(10):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["pipeline"]["max_ms"] < 33.0
def test_individual_effects_performance(self):
"""Verify individual effects don't exceed 10ms per frame."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class SlowEffect:
name = "slow"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
result = []
for line in buf:
result.append(line)
result.append(line + line)
return result
registry = EffectRegistry()
registry.register(SlowEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=5)
chain = EffectChain(registry, monitor)
chain.set_order(["slow"])
buf = ["x" * 80] * 10
for i in range(5):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["effects"]["slow"]["max_ms"] < 10.0

View File

@@ -1,117 +0,0 @@
"""
Tests for engine.effects.controller module.
"""
from unittest.mock import MagicMock, patch
from engine.effects.controller import (
handle_effects_command,
set_effect_chain_ref,
)
class TestHandleEffectsCommand:
"""Tests for handle_effects_command function."""
def test_list_effects(self):
"""list command returns formatted effects list."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_plugin.config.enabled = True
mock_plugin.config.intensity = 0.5
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
mock_chain.return_value.get_order.return_value = ["noise"]
result = handle_effects_command("/effects list")
assert "noise: ON" in result
assert "intensity=0.5" in result
def test_enable_effect(self):
"""enable command calls registry.enable."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise on")
assert "Enabled: noise" in result
mock_registry.return_value.enable.assert_called_once_with("noise")
def test_disable_effect(self):
"""disable command calls registry.disable."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise off")
assert "Disabled: noise" in result
mock_registry.return_value.disable.assert_called_once_with("noise")
def test_set_intensity(self):
"""intensity command sets plugin intensity."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_plugin.config.intensity = 0.5
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise intensity 0.8")
assert "intensity to 0.8" in result
assert mock_plugin.config.intensity == 0.8
def test_invalid_intensity_range(self):
"""intensity outside 0.0-1.0 returns error."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise intensity 1.5")
assert "between 0.0 and 1.0" in result
def test_reorder_pipeline(self):
"""reorder command calls chain.reorder."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_registry.return_value.list_all.return_value = {}
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
mock_chain_instance = MagicMock()
mock_chain_instance.reorder.return_value = True
mock_chain.return_value = mock_chain_instance
result = handle_effects_command("/effects reorder noise,fade")
assert "Reordered pipeline" in result
mock_chain_instance.reorder.assert_called_once_with(["noise", "fade"])
def test_unknown_command(self):
"""unknown command returns error."""
result = handle_effects_command("/unknown")
assert "Unknown command" in result
def test_non_effects_command(self):
"""non-effects command returns error."""
result = handle_effects_command("not a command")
assert "Unknown command" in result
class TestSetEffectChainRef:
"""Tests for set_effect_chain_ref function."""
def test_sets_global_ref(self):
"""set_effect_chain_ref updates global reference."""
mock_chain = MagicMock()
set_effect_chain_ref(mock_chain)
from engine.effects.controller import _get_effect_chain
result = _get_effect_chain()
assert result == mock_chain

View File

@@ -1,127 +0,0 @@
"""
Integration tests for ntfy topics.
"""
import json
import time
import urllib.request
class TestNtfyTopics:
def test_cc_cmd_topic_exists_and_writable(self):
"""Verify C&C CMD topic exists and accepts messages."""
from engine.config import NTFY_CC_CMD_TOPIC
topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e
def test_cc_resp_topic_exists_and_writable(self):
"""Verify C&C RESP topic exists and accepts messages."""
from engine.config import NTFY_CC_RESP_TOPIC
topic_url = NTFY_CC_RESP_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to C&C RESP topic: {e}") from e
def test_message_topic_exists_and_writable(self):
"""Verify message topic exists and accepts messages."""
from engine.config import NTFY_TOPIC
topic_url = NTFY_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to message topic: {e}") from e
def test_cc_cmd_topic_readable(self):
"""Verify we can read messages from C&C CMD topic."""
from engine.config import NTFY_CC_CMD_TOPIC
test_message = f"integration_test_{int(time.time())}"
topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "")
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e
time.sleep(1)
poll_url = f"{NTFY_CC_CMD_TOPIC}?poll=1&limit=1"
req = urllib.request.Request(
poll_url,
headers={"User-Agent": "mainline-test/0.1"},
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
body = resp.read().decode("utf-8")
if body.strip():
data = json.loads(body.split("\n")[0])
assert isinstance(data, dict)
except Exception as e:
raise AssertionError(f"Failed to read from C&C CMD topic: {e}") from e
def test_topics_are_different(self):
"""Verify C&C CMD/RESP and message topics are different."""
from engine.config import NTFY_CC_CMD_TOPIC, NTFY_CC_RESP_TOPIC, NTFY_TOPIC
assert NTFY_CC_CMD_TOPIC != NTFY_TOPIC
assert NTFY_CC_RESP_TOPIC != NTFY_TOPIC
assert NTFY_CC_CMD_TOPIC != NTFY_CC_RESP_TOPIC
assert "_cc_cmd" in NTFY_CC_CMD_TOPIC
assert "_cc_resp" in NTFY_CC_RESP_TOPIC