12 Commits

Author SHA1 Message Date
7eb3fca935 feat(benchmark): add hook mode with baseline cache for pre-push checks
- Fix lint errors and LSP issues in benchmark.py
- Add --hook mode to compare against saved baseline
- Add --baseline flag to save results as baseline
- Add --threshold to configure degradation threshold (default 20%)
- Add benchmark step to pre-push hook in hk.pkl
- Update AGENTS.md with hk documentation links and benchmark runner docs
2026-03-15 22:54:51 -07:00
829c4ab63d refactor: modularize display backends and add benchmark runner
- Create engine/display/ package with registry pattern
- Move displays to engine/display/backends/ (terminal, null, websocket, sixel)
- Add DisplayRegistry with auto-discovery
- Add benchmark.py for performance testing effects × displays matrix
- Add mise tasks: benchmark, benchmark-json, benchmark-report
- Update controller to use new display module
2026-03-15 22:25:28 -07:00
22dd063baa feat: add SixelDisplay backend for terminal graphics
- Implement pure Python Sixel encoder (no C dependency)
- Add SixelDisplay class to display.py with ANSI parsing
- Update controller._get_display() to handle sixel mode
- Add --display sixel CLI flag
- Add mise run-sixel task
- Update docs with display modes
2026-03-15 22:13:44 -07:00
0f7203e4e0 feat: enable C&C, compact mise tasks, update docs
- Cherry-pick C&C support (ntfy poller for commands, response handling)
- Compact mise.toml with native dependency chaining
- Update AGENTS.md with C&C documentation
- Update README.md with display modes and C&C usage
2026-03-15 21:55:26 -07:00
ba050ada24 feat(cmdline): C&C with separate topics and rich output 2026-03-15 21:47:53 -07:00
d7b044ceae feat(display): add configurable multi-backend display system 2026-03-15 21:17:16 -07:00
ac1306373d feat(websocket): add WebSocket display backend for browser client 2026-03-15 20:54:03 -07:00
2650f7245e merge: effects_plugins 2026-03-15 19:20:53 -07:00
b1f2b9d2be feat(daemon): add display abstraction and daemon mode with C&C 2026-03-15 19:20:47 -07:00
c08a7d3cb0 feat(cmdline): add command-line interface for mainline control 2026-03-15 19:20:47 -07:00
d5a3edba97 feat(effects): add plugin architecture with performance monitoring 2026-03-15 19:20:47 -07:00
fb35458718 merge: testability_modularization 2026-03-15 19:20:43 -07:00
42 changed files with 5001 additions and 178 deletions

1
.gitignore vendored
View File

@@ -9,3 +9,4 @@ htmlcov/
.coverage .coverage
.pytest_cache/ .pytest_cache/
*.egg-info/ *.egg-info/
coverage.xml

104
AGENTS.md
View File

@@ -16,19 +16,33 @@ This project uses:
mise run install mise run install
# Or equivalently: # Or equivalently:
uv sync uv sync --all-extras # includes mic support
``` ```
### Available Commands ### Available Commands
```bash ```bash
mise run test # Run tests mise run test # Run tests
mise run test-v # Run tests verbose mise run test-v # Run tests verbose
mise run test-cov # Run tests with coverage report mise run test-cov # Run tests with coverage report
mise run lint # Run ruff linter mise run test-browser # Run e2e browser tests (requires playwright)
mise run lint-fix # Run ruff with auto-fix mise run lint # Run ruff linter
mise run format # Run ruff formatter mise run lint-fix # Run ruff with auto-fix
mise run ci # Full CI pipeline (sync + test + coverage) mise run format # Run ruff formatter
mise run ci # Full CI pipeline (topics-init + lint + test-cov)
```
### Runtime Commands
```bash
mise run run # Run mainline (terminal)
mise run run-poetry # Run with poetry feed
mise run run-firehose # Run in firehose mode
mise run run-websocket # Run with WebSocket display only
mise run run-sixel # Run with Sixel graphics display
mise run run-both # Run with both terminal and WebSocket
mise run run-client # Run both + open browser
mise run cmd # Run C&C command interface
``` ```
## Git Hooks ## Git Hooks
@@ -46,9 +60,52 @@ hk init --mise
mise run pre-commit mise run pre-commit
``` ```
**IMPORTANT**: Always review the hk documentation before modifying `hk.pkl`:
- [hk Configuration Guide](https://hk.jdx.dev/configuration.html)
- [hk Hooks Reference](https://hk.jdx.dev/hooks.html)
- [hk Builtins](https://hk.jdx.dev/builtins.html)
The project uses hk configured in `hk.pkl`: The project uses hk configured in `hk.pkl`:
- **pre-commit**: runs ruff-format and ruff (with auto-fix) - **pre-commit**: runs ruff-format and ruff (with auto-fix)
- **pre-push**: runs ruff check - **pre-push**: runs ruff check + benchmark hook
## Benchmark Runner
Run performance benchmarks:
```bash
mise run benchmark # Run all benchmarks (text output)
mise run benchmark-json # Run benchmarks (JSON output)
mise run benchmark-report # Run benchmarks (Markdown report)
```
### Benchmark Commands
```bash
# Run benchmarks
uv run python -m engine.benchmark
# Run with specific displays/effects
uv run python -m engine.benchmark --displays null,terminal --effects fade,glitch
# Save baseline for hook comparisons
uv run python -m engine.benchmark --baseline
# Run in hook mode (compares against baseline)
uv run python -m engine.benchmark --hook
# Hook mode with custom threshold (default: 20% degradation)
uv run python -m engine.benchmark --hook --threshold 0.3
# Custom baseline location
uv run python -m engine.benchmark --hook --cache /path/to/cache.json
```
### Hook Mode
The `--hook` mode compares current benchmarks against a saved baseline. If performance degrades beyond the threshold (default 20%), it exits with code 1. This is useful for preventing performance regressions in feature branches.
The pre-push hook runs benchmark in hook mode to catch performance regressions before pushing.
## Workflow Rules ## Workflow Rules
@@ -106,5 +163,32 @@ The project uses pytest with strict marker enforcement. Test configuration is in
- **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies - **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies
- **eventbus.py** provides thread-safe event publishing for decoupled communication - **eventbus.py** provides thread-safe event publishing for decoupled communication
- **controller.py** coordinates ntfy/mic monitoring - **controller.py** coordinates ntfy/mic monitoring and event publishing
- **effects/** - plugin architecture with performance monitoring
- The render pipeline: fetch → render → effects → scroll → terminal output - The render pipeline: fetch → render → effects → scroll → terminal output
### Display System
- **Display abstraction** (`engine/display.py`): swap display backends via the Display protocol
- `TerminalDisplay` - ANSI terminal output
- `WebSocketDisplay` - broadcasts to web clients via WebSocket
- `SixelDisplay` - renders to Sixel graphics (pure Python, no C dependency)
- `MultiDisplay` - forwards to multiple displays simultaneously
- **WebSocket display** (`engine/websocket_display.py`): real-time frame broadcasting to web browsers
- WebSocket server on port 8765
- HTTP server on port 8766 (serves HTML client)
- Client at `client/index.html` with ANSI color parsing and fullscreen support
- **Display modes** (`--display` flag):
- `terminal` - Default ANSI terminal output
- `websocket` - Web browser display (requires websockets package)
- `sixel` - Sixel graphics in supported terminals (iTerm2, mintty, etc.)
- `both` - Terminal + WebSocket simultaneously
### Command & Control
- C&C uses separate ntfy topics for commands and responses
- `NTFY_CC_CMD_TOPIC` - commands from cmdline.py
- `NTFY_CC_RESP_TOPIC` - responses back to cmdline.py
- Effects controller handles `/effects` commands (list, on/off, intensity, reorder, stats)

224
README.md
View File

@@ -15,7 +15,8 @@ python3 mainline.py # news stream
python3 mainline.py --poetry # literary consciousness mode python3 mainline.py --poetry # literary consciousness mode
python3 mainline.py -p # same python3 mainline.py -p # same
python3 mainline.py --firehose # dense rapid-fire headline mode python3 mainline.py --firehose # dense rapid-fire headline mode
python3 mainline.py --refresh # force re-fetch (bypass cache) python3 mainline.py --display websocket # web browser display only
python3 mainline.py --display both # terminal + web browser
python3 mainline.py --no-font-picker # skip interactive font picker python3 mainline.py --no-font-picker # skip interactive font picker
python3 mainline.py --font-file path.otf # use a specific font file python3 mainline.py --font-file path.otf # use a specific font file
python3 mainline.py --font-dir ~/fonts # scan a different font folder python3 mainline.py --font-dir ~/fonts # scan a different font folder
@@ -28,7 +29,20 @@ Or with uv:
uv run mainline.py uv run mainline.py
``` ```
First run bootstraps a local `.mainline_venv/` and installs deps (`feedparser`, `Pillow`, `sounddevice`, `numpy`). Subsequent runs start immediately, loading from cache. With uv, run `uv sync` or `uv sync --all-extras` (includes mic support) instead. First run bootstraps dependencies. Use `uv sync --all-extras` for mic support.
### Command & Control (C&C)
Control mainline remotely using `cmdline.py`:
```bash
uv run cmdline.py # Interactive TUI
uv run cmdline.py /effects list # List all effects
uv run cmdline.py /effects stats # Show performance stats
uv run cmdline.py -w /effects stats # Watch mode (auto-refresh)
```
Commands are sent via ntfy.sh topics - useful for controlling a daemonized mainline instance.
### Config ### Config
@@ -39,20 +53,31 @@ All constants live in `engine/config.py`:
| `HEADLINE_LIMIT` | `1000` | Total headlines per session | | `HEADLINE_LIMIT` | `1000` | Total headlines per session |
| `FEED_TIMEOUT` | `10` | Per-feed HTTP timeout (seconds) | | `FEED_TIMEOUT` | `10` | Per-feed HTTP timeout (seconds) |
| `MIC_THRESHOLD_DB` | `50` | dB floor above which glitches spike | | `MIC_THRESHOLD_DB` | `50` | dB floor above which glitches spike |
| `NTFY_TOPIC` | klubhaus URL | ntfy.sh JSON stream for messages |
| `NTFY_CC_CMD_TOPIC` | klubhaus URL | ntfy.sh topic for C&C commands |
| `NTFY_CC_RESP_TOPIC` | klubhaus URL | ntfy.sh topic for C&C responses |
| `NTFY_RECONNECT_DELAY` | `5` | Seconds before reconnecting after dropped SSE |
| `MESSAGE_DISPLAY_SECS` | `30` | How long an ntfy message holds the screen |
| `FONT_DIR` | `fonts/` | Folder scanned for `.otf`, `.ttf`, `.ttc` files | | `FONT_DIR` | `fonts/` | Folder scanned for `.otf`, `.ttf`, `.ttc` files |
| `FONT_PATH` | first file in `FONT_DIR` | Active display font (overridden by picker or `--font-file`) | | `FONT_PATH` | first file in `FONT_DIR` | Active display font |
| `FONT_INDEX` | `0` | Face index within a font collection file | | `FONT_PICKER` | `True` | Show interactive font picker at boot |
| `FONT_PICKER` | `True` | Show interactive font picker at boot (`--no-font-picker` to skip) |
| `FONT_SZ` | `60` | Font render size (affects block density) | | `FONT_SZ` | `60` | Font render size (affects block density) |
| `RENDER_H` | `8` | Terminal rows per headline line | | `RENDER_H` | `8` | Terminal rows per headline line |
| `SSAA` | `4` | Super-sampling factor (render at 4× then downsample) | | `SSAA` | `4` | Super-sampling factor |
| `SCROLL_DUR` | `5.625` | Seconds per headline | | `SCROLL_DUR` | `5.625` | Seconds per headline |
| `FRAME_DT` | `0.05` | Frame interval in seconds (20 FPS) | | `FRAME_DT` | `0.05` | Frame interval in seconds (20 FPS) |
| `GRAD_SPEED` | `0.08` | Gradient sweep speed (cycles/sec, ~12s full sweep) |
| `FIREHOSE_H` | `12` | Firehose zone height (terminal rows) | | `FIREHOSE_H` | `12` | Firehose zone height (terminal rows) |
| `NTFY_TOPIC` | klubhaus URL | ntfy.sh JSON stream endpoint | | `GRAD_SPEED` | `0.08` | Gradient sweep speed |
| `NTFY_RECONNECT_DELAY` | `5` | Seconds before reconnecting after a dropped SSE stream |
| `MESSAGE_DISPLAY_SECS` | `30` | How long an ntfy message holds the screen | ### Display Modes
Mainline supports multiple display backends:
- **Terminal** (`--display terminal`): ANSI terminal output (default)
- **WebSocket** (`--display websocket`): Stream to web browser clients
- **Both** (`--display both`): Terminal + WebSocket simultaneously
WebSocket mode serves a web client at http://localhost:8766 with ANSI color support and fullscreen mode.
### Feeds ### Feeds
@@ -62,15 +87,15 @@ All constants live in `engine/config.py`:
### Fonts ### Fonts
A `fonts/` directory is bundled with demo faces (AgorTechnoDemo, AlphatronDemo, CSBishopDrawn, CubaTechnologyDemo, CyberformDemo, KATA, Microbots, ModernSpaceDemo, Neoform, Pixel Sparta, RaceHugoDemo, Resond, Robocops, Synthetix, Xeonic, and others). On startup, an interactive picker lists all discovered faces with a live half-block preview rendered at your configured size. A `fonts/` directory is bundled with demo faces. On startup, an interactive picker lists all discovered faces with a live half-block preview.
Navigation: `↑`/`↓` or `j`/`k` to move, `Enter` or `q` to select. The selected face persists for that session. Navigation: `↑`/`↓` or `j`/`k` to move, `Enter` or `q` to select.
To add your own fonts, drop `.otf`, `.ttf`, or `.ttc` files into `fonts/` (or point `--font-dir` at any other folder). Font collections (`.ttc`, multi-face `.otf`) are enumerated face-by-face. To add your own fonts, drop `.otf`, `.ttf`, or `.ttc` files into `fonts/`.
### ntfy.sh ### ntfy.sh
Mainline polls a configurable ntfy.sh topic in the background. When a message arrives, the scroll pauses and the message renders full-screen for `MESSAGE_DISPLAY_SECS` seconds, then the stream resumes. Mainline polls a configurable ntfy.sh topic in the background. When a message arrives, the scroll pauses and the message renders full-screen.
To push a message: To push a message:
@@ -78,108 +103,54 @@ To push a message:
curl -d "Body text" -H "Title: Alert title" https://ntfy.sh/your_topic curl -d "Body text" -H "Title: Alert title" https://ntfy.sh/your_topic
``` ```
Update `NTFY_TOPIC` in `engine/config.py` to point at your own topic.
--- ---
## Internals ## Internals
### How it works ### How it works
- On launch, the font picker scans `fonts/` and presents a live-rendered TUI for face selection; `--no-font-picker` skips directly to stream - On launch, the font picker scans `fonts/` and presents a live-rendered TUI for face selection
- Feeds are fetched and filtered on startup (sports and vapid content stripped); results are cached to `.mainline_cache_news.json` / `.mainline_cache_poetry.json` for fast restarts - Feeds are fetched and filtered on startup; results are cached for fast restarts
- Headlines are rasterized via Pillow with 4× SSAA into half-block characters (`▀▄█ `) at the configured font size - Headlines are rasterized via Pillow with 4× SSAA into half-block characters
- The ticker uses a sweeping white-hot → deep green gradient; ntfy messages use a complementary white-hot → magenta/maroon gradient to distinguish them visually - The ticker uses a sweeping white-hot → deep green gradient
- Subject-region detection runs a regex pass on each headline; matches trigger a Google Translate call and font swap to the appropriate script (CJK, Arabic, Devanagari, etc.) using macOS system fonts - Subject-region detection triggers Google Translate and font swap for non-Latin scripts
- The mic stream runs in a background thread, feeding RMS dB into the glitch probability calculation each frame - The mic stream runs in a background thread, feeding RMS dB into glitch probability
- The viewport scrolls through a virtual canvas of pre-rendered blocks; fade zones at top and bottom dissolve characters probabilistically - The viewport scrolls through pre-rendered blocks with fade zones
- An ntfy.sh SSE stream runs in a background thread; incoming messages interrupt the scroll and render full-screen until dismissed or expired - An ntfy.sh SSE stream runs in a background thread for messages and C&C commands
### Architecture ### Architecture
`mainline.py` is a thin entrypoint (venv bootstrap → `engine.app.main()`). All logic lives in the `engine/` package:
``` ```
engine/ engine/
__init__.py package marker __init__.py package marker
app.py main(), font picker TUI, boot sequence, signal handler app.py main(), font picker TUI, boot sequence, C&C poller
config.py constants, CLI flags, glyph tables config.py constants, CLI flags, glyph tables
sources.py FEEDS, POETRY_SOURCES, language/script maps sources.py FEEDS, POETRY_SOURCES, language/script maps
terminal.py ANSI codes, tw/th, type_out, boot_ln terminal.py ANSI codes, tw/th, type_out, boot_ln
filter.py HTML stripping, content filter filter.py HTML stripping, content filter
translate.py Google Translate wrapper + region detection translate.py Google Translate wrapper + region detection
render.py OTF → half-block pipeline (SSAA, gradient) render.py OTF → half-block pipeline (SSAA, gradient)
effects.py noise, glitch_bar, fade, firehose effects/ plugin architecture for visual effects
fetch.py RSS/Gutenberg fetching + cache load/save controller.py handles /effects commands
ntfy.py NtfyPoller — standalone, zero internal deps chain.py effect pipeline chaining
mic.py MicMonitor — standalone, graceful fallback registry.py effect registration and lookup
scroll.py stream() frame loop + message rendering performance.py performance monitoring
viewport.py terminal dimension tracking (tw/th) fetch.py RSS/Gutenberg fetching + cache
frame.py scroll step calculation, timing ntfy.py NtfyPoller — standalone, zero internal deps
layers.py ticker zone, firehose, message overlay rendering mic.py MicMonitor — standalone, graceful fallback
eventbus.py thread-safe event publishing for decoupled communication scroll.py stream() frame loop + message rendering
events.py event types and definitions viewport.py terminal dimension tracking
controller.py coordinates ntfy/mic monitoring and event publishing frame.py scroll step calculation, timing
emitters.py background emitters for ntfy and mic layers.py ticker zone, firehose, message overlay
types.py type definitions and dataclasses eventbus.py thread-safe event publishing
events.py event types and definitions
controller.py coordinates ntfy/mic monitoring
emitters.py background emitters
types.py type definitions
display.py Display protocol (Terminal, WebSocket, Multi)
websocket_display.py WebSocket server for browser clients
``` ```
`ntfy.py` and `mic.py` have zero internal dependencies and can be imported by any other visualizer.
---
## Extending
`ntfy.py` and `mic.py` are fully standalone and designed to be reused by any terminal visualizer. `engine.render` is the importable rendering pipeline for non-terminal targets.
### NtfyPoller
```python
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json")
poller.start()
# in your render loop:
msg = poller.get_active_message() # → (title, body, timestamp) or None
if msg:
title, body, ts = msg
render_my_message(title, body) # visualizer-specific
```
Dependencies: `urllib.request`, `json`, `threading`, `time` — stdlib only. The `since=` parameter is managed automatically on reconnect.
### MicMonitor
```python
from engine.mic import MicMonitor
mic = MicMonitor(threshold_db=50)
result = mic.start() # None = sounddevice unavailable; False = stream failed; True = ok
if result:
excess = mic.excess # dB above threshold, clamped to 0
db = mic.db # raw RMS dB level
```
Dependencies: `sounddevice`, `numpy` — both optional; degrades gracefully if unavailable.
### Render pipeline
`engine.render` exposes the OTF → raster pipeline independently of the terminal scroll loop. The planned `serve.py` extension will import it directly to pre-render headlines as 1-bit bitmaps for an ESP32 thin client:
```python
# planned — serve.py does not yet exist
from engine.render import render_line, big_wrap
from engine.fetch import fetch_all
headlines = fetch_all()
for h in headlines:
rows = big_wrap(h.text, font, width=800) # list of half-block rows
# threshold to 1-bit, pack bytes, serve over HTTP
```
See `Mainline Renderer + ntfy Message Queue for ESP32.md` for the full server + thin client architecture.
--- ---
## Development ## Development
@@ -190,7 +161,7 @@ Requires Python 3.10+ and [uv](https://docs.astral.sh/uv/).
```bash ```bash
uv sync # minimal (no mic) uv sync # minimal (no mic)
uv sync --all-extras # with mic support (sounddevice + numpy) uv sync --all-extras # with mic support
uv sync --all-extras --group dev # full dev environment uv sync --all-extras --group dev # full dev environment
``` ```
@@ -204,15 +175,19 @@ mise run test-cov # run with coverage report
mise run lint # ruff check mise run lint # ruff check
mise run lint-fix # ruff check --fix mise run lint-fix # ruff check --fix
mise run format # ruff format mise run format # ruff format
mise run run # uv run mainline.py
mise run run-poetry # uv run mainline.py --poetry mise run run # terminal display
mise run run-firehose # uv run mainline.py --firehose mise run run-websocket # web display only
mise run run-both # terminal + web
mise run run-client # both + open browser
mise run cmd # C&C command interface
mise run cmd-stats # watch effects stats
mise run topics-init # initialize ntfy topics
``` ```
### Testing ### Testing
Tests live in `tests/` and cover `config`, `filter`, `mic`, `ntfy`, `sources`, and `terminal`.
```bash ```bash
uv run pytest uv run pytest
uv run pytest --cov=engine --cov-report=term-missing uv run pytest --cov=engine --cov-report=term-missing
@@ -232,28 +207,23 @@ Pre-commit hooks run lint automatically via `hk`.
## Roadmap ## Roadmap
### Performance ### Performance
- **Concurrent feed fetching** — startup currently blocks sequentially on ~25 HTTP requests; `concurrent.futures.ThreadPoolExecutor` would cut load time to the slowest single feed - Concurrent feed fetching with ThreadPoolExecutor
- **Background refresh** — re-fetch feeds in a daemon thread so a long session stays current without restart - Background feed refresh daemon
- **Translation pre-fetch** — run translate calls concurrently during the boot sequence rather than on first render - Translation pre-fetch during boot
### Graphics ### Graphics
- **Matrix rain underlay** — katakana column rain rendered at low opacity beneath the scrolling blocks as a background layer - Matrix rain katakana underlay
- **CRT simulation** — subtle dim scanlines every N rows, occasional brightness ripple across the full screen - CRT scanline simulation
- **Sixel / iTerm2 inline images** — bypass half-blocks entirely and stream actual bitmap frames for true resolution; would require a capable terminal - Sixel/iTerm2 inline images
- **Parallax secondary column** — a second, dimmer, faster-scrolling stream of ambient text at reduced opacity on one side - Parallax secondary column
### Cyberpunk Vibes ### Cyberpunk Vibes
- **Keyword watch list** — highlight or strobe any headline matching tracked terms (names, topics, tickers) - Keyword watch list with strobe effects
- **Breaking interrupt** — full-screen flash + synthesized blip when a high-priority keyword hits - Breaking interrupt with synthesized audio
- **Live data overlay** — secondary ticker strip at screen edge: BTC price, ISS position, geomagnetic index - Live data overlay (BTC, ISS position)
- **Theme switcher** — `--amber` (phosphor), `--ice` (electric cyan), `--red` (alert state) palette modes via CLI flag - Theme switcher (amber, ice, red)
- **Persona modes** — `--surveillance`, `--oracle`, `--underground` as feed presets with matching color themes and boot copy - Persona modes (surveillance, oracle, underground)
- **Synthesized audio** — short static bursts tied to glitch events, independent of mic input
### Extensibility
- **serve.py** — HTTP server that imports `engine.render` and `engine.fetch` directly to stream 1-bit bitmaps to an ESP32 display
- **Rust port** — `ntfy.py` and `render.py` are the natural first targets; clear module boundaries make incremental porting viable
--- ---
*macOS only (script/system font paths for translation are hardcoded). Primary display font is user-selectable via the bundled `fonts/` picker. Python 3.10+.* *Python 3.10+. Primary display font is user-selectable via bundled `fonts/` picker.*

366
client/index.html Normal file
View File

@@ -0,0 +1,366 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Mainline Terminal</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
background: #0a0a0a;
color: #ccc;
font-family: 'Fira Code', 'Cascadia Code', 'Consolas', monospace;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
min-height: 100vh;
padding: 20px;
}
body.fullscreen {
padding: 0;
}
body.fullscreen #controls {
display: none;
}
#container {
position: relative;
}
canvas {
background: #000;
border: 1px solid #333;
image-rendering: pixelated;
image-rendering: crisp-edges;
}
body.fullscreen canvas {
border: none;
width: 100vw;
height: 100vh;
max-width: 100vw;
max-height: 100vh;
}
#controls {
display: flex;
gap: 10px;
margin-top: 10px;
align-items: center;
}
#controls button {
background: #333;
color: #ccc;
border: 1px solid #555;
padding: 5px 12px;
cursor: pointer;
font-family: inherit;
font-size: 12px;
}
#controls button:hover {
background: #444;
}
#controls input {
width: 60px;
background: #222;
color: #ccc;
border: 1px solid #444;
padding: 4px 8px;
font-family: inherit;
text-align: center;
}
#status {
margin-top: 10px;
font-size: 12px;
color: #666;
}
#status.connected {
color: #4f4;
}
#status.disconnected {
color: #f44;
}
</style>
</head>
<body>
<div id="container">
<canvas id="terminal"></canvas>
</div>
<div id="controls">
<label>Cols: <input type="number" id="cols" value="80" min="20" max="200"></label>
<label>Rows: <input type="number" id="rows" value="24" min="10" max="60"></label>
<button id="apply">Apply</button>
<button id="fullscreen">Fullscreen</button>
</div>
<div id="status" class="disconnected">Connecting...</div>
<script>
const canvas = document.getElementById('terminal');
const ctx = canvas.getContext('2d');
const status = document.getElementById('status');
const colsInput = document.getElementById('cols');
const rowsInput = document.getElementById('rows');
const applyBtn = document.getElementById('apply');
const fullscreenBtn = document.getElementById('fullscreen');
const CHAR_WIDTH = 9;
const CHAR_HEIGHT = 16;
const ANSI_COLORS = {
0: '#000000', 1: '#cd3131', 2: '#0dbc79', 3: '#e5e510',
4: '#2472c8', 5: '#bc3fbc', 6: '#11a8cd', 7: '#e5e5e5',
8: '#666666', 9: '#f14c4c', 10: '#23d18b', 11: '#f5f543',
12: '#3b8eea', 13: '#d670d6', 14: '#29b8db', 15: '#ffffff',
};
let cols = 80;
let rows = 24;
let ws = null;
function resizeCanvas() {
canvas.width = cols * CHAR_WIDTH;
canvas.height = rows * CHAR_HEIGHT;
}
function parseAnsi(text) {
if (!text) return [];
const tokens = [];
let currentText = '';
let fg = '#cccccc';
let bg = '#000000';
let bold = false;
let i = 0;
let inEscape = false;
let escapeCode = '';
while (i < text.length) {
const char = text[i];
if (inEscape) {
if (char >= '0' && char <= '9' || char === ';' || char === '[') {
escapeCode += char;
}
if (char === 'm') {
const codes = escapeCode.replace('\x1b[', '').split(';');
for (const code of codes) {
const num = parseInt(code) || 0;
if (num === 0) {
fg = '#cccccc';
bg = '#000000';
bold = false;
} else if (num === 1) {
bold = true;
} else if (num === 22) {
bold = false;
} else if (num === 39) {
fg = '#cccccc';
} else if (num === 49) {
bg = '#000000';
} else if (num >= 30 && num <= 37) {
fg = ANSI_COLORS[num - 30 + (bold ? 8 : 0)] || '#cccccc';
} else if (num >= 40 && num <= 47) {
bg = ANSI_COLORS[num - 40] || '#000000';
} else if (num >= 90 && num <= 97) {
fg = ANSI_COLORS[num - 90 + 8] || '#cccccc';
} else if (num >= 100 && num <= 107) {
bg = ANSI_COLORS[num - 100 + 8] || '#000000';
} else if (num >= 1 && num <= 256) {
// 256 colors
if (num < 16) {
fg = ANSI_COLORS[num] || '#cccccc';
} else if (num < 232) {
const c = num - 16;
const r = Math.floor(c / 36) * 51;
const g = Math.floor((c % 36) / 6) * 51;
const b = (c % 6) * 51;
fg = `#${r.toString(16).padStart(2,'0')}${g.toString(16).padStart(2,'0')}${b.toString(16).padStart(2,'0')}`;
} else {
const gray = (num - 232) * 10 + 8;
fg = `#${gray.toString(16).repeat(2)}`;
}
}
}
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
currentText = '';
}
inEscape = false;
escapeCode = '';
}
} else if (char === '\x1b' && text[i + 1] === '[') {
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
currentText = '';
}
inEscape = true;
escapeCode = '';
i++;
} else {
currentText += char;
}
i++;
}
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
}
return tokens;
}
function renderLine(text, x, y, lineHeight) {
const tokens = parseAnsi(text);
let xOffset = x;
for (const token of tokens) {
if (token.text) {
if (token.bold) {
ctx.font = 'bold 16px monospace';
} else {
ctx.font = '16px monospace';
}
const metrics = ctx.measureText(token.text);
if (token.bg !== '#000000') {
ctx.fillStyle = token.bg;
ctx.fillRect(xOffset, y - 2, metrics.width + 1, lineHeight);
}
ctx.fillStyle = token.fg;
ctx.fillText(token.text, xOffset, y);
xOffset += metrics.width;
}
}
}
function connect() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.hostname}:8765`;
ws = new WebSocket(wsUrl);
ws.onopen = () => {
status.textContent = 'Connected';
status.className = 'connected';
sendSize();
};
ws.onclose = () => {
status.textContent = 'Disconnected - Reconnecting...';
status.className = 'disconnected';
setTimeout(connect, 1000);
};
ws.onerror = () => {
status.textContent = 'Connection error';
status.className = 'disconnected';
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'frame') {
cols = data.width || 80;
rows = data.height || 24;
colsInput.value = cols;
rowsInput.value = rows;
resizeCanvas();
render(data.lines || []);
} else if (data.type === 'clear') {
ctx.fillStyle = '#000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
}
} catch (e) {
console.error('Failed to parse message:', e);
}
};
}
function sendSize() {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'resize',
width: parseInt(colsInput.value),
height: parseInt(rowsInput.value)
}));
}
}
function render(lines) {
ctx.fillStyle = '#000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
ctx.font = '16px monospace';
ctx.textBaseline = 'top';
const lineHeight = CHAR_HEIGHT;
const maxLines = Math.min(lines.length, rows);
for (let i = 0; i < maxLines; i++) {
const line = lines[i] || '';
renderLine(line, 0, i * lineHeight, lineHeight);
}
}
function calculateViewportSize() {
const isFullscreen = document.fullscreenElement !== null;
const padding = isFullscreen ? 0 : 40;
const controlsHeight = isFullscreen ? 0 : 60;
const availableWidth = window.innerWidth - padding;
const availableHeight = window.innerHeight - controlsHeight;
cols = Math.max(20, Math.floor(availableWidth / CHAR_WIDTH));
rows = Math.max(10, Math.floor(availableHeight / CHAR_HEIGHT));
colsInput.value = cols;
rowsInput.value = rows;
resizeCanvas();
console.log('Fullscreen:', isFullscreen, 'Size:', cols, 'x', rows);
sendSize();
}
applyBtn.addEventListener('click', () => {
cols = parseInt(colsInput.value);
rows = parseInt(rowsInput.value);
resizeCanvas();
sendSize();
});
fullscreenBtn.addEventListener('click', () => {
if (!document.fullscreenElement) {
document.body.classList.add('fullscreen');
document.documentElement.requestFullscreen().then(() => {
calculateViewportSize();
});
} else {
document.exitFullscreen().then(() => {
calculateViewportSize();
});
}
});
document.addEventListener('fullscreenchange', () => {
if (!document.fullscreenElement) {
document.body.classList.remove('fullscreen');
calculateViewportSize();
}
});
window.addEventListener('resize', () => {
if (document.fullscreenElement) {
calculateViewportSize();
}
});
// Initial setup
resizeCanvas();
connect();
</script>
</body>
</html>

256
cmdline.py Normal file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Command-line utility for interacting with mainline via ntfy.
Usage:
python cmdline.py # Interactive TUI mode
python cmdline.py --help # Show help
python cmdline.py /effects list # Send single command via ntfy
python cmdline.py /effects stats # Get performance stats via ntfy
python cmdline.py -w /effects stats # Watch mode (polls for stats)
The TUI mode provides:
- Arrow keys to navigate command history
- Tab completion for commands
- Auto-refresh for performance stats
C&C works like a serial port:
1. Send command to ntfy_cc_topic
2. Mainline receives, processes, responds to same topic
3. Cmdline polls for response
"""
import os
os.environ["FORCE_COLOR"] = "1"
os.environ["TERM"] = "xterm-256color"
import argparse
import json
import sys
import time
import threading
import urllib.request
from pathlib import Path
from engine import config
from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST
try:
CC_CMD_TOPIC = config.NTFY_CC_CMD_TOPIC
CC_RESP_TOPIC = config.NTFY_CC_RESP_TOPIC
except AttributeError:
CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
class NtfyResponsePoller:
"""Polls ntfy for command responses."""
def __init__(self, cmd_topic: str, resp_topic: str, timeout: float = 10.0):
self.cmd_topic = cmd_topic
self.resp_topic = resp_topic
self.timeout = timeout
self._last_id = None
self._lock = threading.Lock()
def _build_url(self) -> str:
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
parsed = urlparse(self.resp_topic)
params = parse_qs(parsed.query, keep_blank_values=True)
params["since"] = [self._last_id if self._last_id else "20s"]
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def send_and_wait(self, cmd: str) -> str:
"""Send command and wait for response."""
url = self.cmd_topic.replace("/json", "")
data = cmd.encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={
"User-Agent": "mainline-cmdline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=5)
except Exception as e:
return f"Error sending command: {e}"
return self._wait_for_response(cmd)
def _wait_for_response(self, expected_cmd: str = "") -> str:
"""Poll for response message."""
start = time.time()
while time.time() - start < self.timeout:
try:
url = self._build_url()
req = urllib.request.Request(
url, headers={"User-Agent": "mainline-cmdline/0.1"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
for line in resp:
try:
data = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
if data.get("event") == "message":
self._last_id = data.get("id")
msg = data.get("message", "")
if msg:
return msg
except Exception:
pass
time.sleep(0.5)
return "Timeout waiting for response"
AVAILABLE_COMMANDS = """Available commands:
/effects list - List all effects and status
/effects <name> on - Enable an effect
/effects <name> off - Disable an effect
/effects <name> intensity <0.0-1.0> - Set effect intensity
/effects reorder <name1>,<name2>,... - Reorder pipeline
/effects stats - Show performance statistics
/help - Show this help
/quit - Exit
"""
def print_header():
w = 60
print(CLR, end="")
print(CURSOR_OFF, end="")
print(f"\033[1;1H", end="")
print(f" \033[1;38;5;231m╔{'' * (w - 6)}\033[0m")
print(
f" \033[1;38;5;231m║\033[0m \033[1;38;5;82mMAINLINE\033[0m \033[3;38;5;245mCommand Center\033[0m \033[1;38;5;231m ║\033[0m"
)
print(f" \033[1;38;5;231m╚{'' * (w - 6)}\033[0m")
print(f" \033[2;38;5;37mCMD: {CC_CMD_TOPIC.split('/')[-2]}\033[0m")
print(f" \033[2;38;5;37mRESP: {CC_RESP_TOPIC.split('/')[-2]}\033[0m")
print()
def print_response(response: str, is_error: bool = False) -> None:
"""Print response with nice formatting."""
print()
if is_error:
print(f" \033[1;38;5;196m✗ Error\033[0m")
print(f" \033[38;5;196m{'' * 40}\033[0m")
else:
print(f" \033[1;38;5;82m✓ Response\033[0m")
print(f" \033[38;5;37m{'' * 40}\033[0m")
for line in response.split("\n"):
print(f" {line}")
print()
def interactive_mode():
"""Interactive TUI for sending commands."""
import readline
print_header()
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
print(f" \033[38;5;245mType /help for commands, /quit to exit\033[0m")
print()
while True:
try:
cmd = input(f" \033[1;38;5;82m\033[0m {G_HI}").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not cmd:
continue
if cmd.startswith("/"):
if cmd == "/quit" or cmd == "/exit":
print(f"\n \033[1;38;5;245mGoodbye!{RST}\n")
break
if cmd == "/help":
print(f"\n{AVAILABLE_COMMANDS}\n")
continue
print(f" \033[38;5;245m⟳ Sending to mainline...{RST}")
result = poller.send_and_wait(cmd)
print_response(result, is_error=result.startswith("Error"))
else:
print(f"\n \033[1;38;5;196m⚠ Commands must start with /{RST}\n")
print(CURSOR_ON, end="")
return 0
def main():
parser = argparse.ArgumentParser(
description="Mainline command-line interface",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=AVAILABLE_COMMANDS,
)
parser.add_argument(
"command",
nargs="?",
default=None,
help="Command to send (e.g., /effects list)",
)
parser.add_argument(
"--watch",
"-w",
action="store_true",
help="Watch mode: continuously poll for stats (Ctrl+C to exit)",
)
args = parser.parse_args()
if args.command is None:
return interactive_mode()
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
if args.watch and "/effects stats" in args.command:
import signal
def handle_sigterm(*_):
print(f"\n \033[1;38;5;245mStopped watching{RST}")
print(CURSOR_ON, end="")
sys.exit(0)
signal.signal(signal.SIGTERM, handle_sigterm)
print_header()
print(f" \033[38;5;245mWatching /effects stats (Ctrl+C to exit)...{RST}\n")
try:
while True:
result = poller.send_and_wait(args.command)
print(f"\033[2J\033[1;1H", end="")
print(
f" \033[1;38;5;82m\033[0m Performance Stats - \033[1;38;5;245m{time.strftime('%H:%M:%S')}{RST}"
)
print(f" \033[38;5;37m{'' * 44}{RST}")
for line in result.split("\n"):
print(f" {line}")
time.sleep(2)
except KeyboardInterrupt:
print(f"\n \033[1;38;5;245mStopped watching{RST}")
return 0
return 0
result = poller.send_and_wait(args.command)
print(result)
return 0
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,35 @@
from pathlib import Path
PLUGIN_DIR = Path(__file__).parent
def discover_plugins():
from engine.effects.registry import get_registry
registry = get_registry()
imported = {}
for file_path in PLUGIN_DIR.glob("*.py"):
if file_path.name.startswith("_"):
continue
module_name = file_path.stem
if module_name in ("base", "types"):
continue
try:
module = __import__(f"effects_plugins.{module_name}", fromlist=[""])
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and hasattr(attr, "name")
and hasattr(attr, "process")
and attr_name.endswith("Effect")
):
plugin = attr()
registry.register(plugin)
imported[plugin.name] = plugin
except Exception:
pass
return imported

58
effects_plugins/fade.py Normal file
View File

@@ -0,0 +1,58 @@
import random
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
class FadeEffect:
name = "fade"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not ctx.ticker_height:
return buf
result = list(buf)
intensity = self.config.intensity
top_zone = max(1, int(ctx.ticker_height * 0.25))
bot_zone = max(1, int(ctx.ticker_height * 0.10))
for r in range(len(result)):
if r >= ctx.ticker_height:
continue
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = (
min(1.0, (ctx.ticker_height - 1 - r) / bot_zone)
if bot_zone > 0
else 1.0
)
row_fade = min(top_f, bot_f) * intensity
if row_fade < 1.0 and result[r].strip():
result[r] = self._fade_line(result[r], row_fade)
return result
def _fade_line(self, s: str, fade: float) -> str:
if fade >= 1.0:
return s
if fade <= 0.0:
return ""
result = []
i = 0
while i < len(s):
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i : j + 1])
i = j + 1
elif s[i] == " ":
result.append(" ")
i += 1
else:
result.append(s[i] if random.random() < fade else " ")
i += 1
return "".join(result)
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -0,0 +1,72 @@
import random
from datetime import datetime
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
class FirehoseEffect:
name = "firehose"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0
if firehose_h <= 0 or not ctx.items:
return buf
result = list(buf)
intensity = self.config.intensity
h = ctx.terminal_height
for fr in range(firehose_h):
scr_row = h - firehose_h + fr + 1
fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity)
result.append(f"\033[{scr_row};1H{fline}\033[K")
return result
def _firehose_line(self, items: list, w: int, intensity: float) -> str:
r = random.random()
if r < 0.35 * intensity:
title, src, ts = random.choice(items)
text = title[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
return f"{color}{text}{RST}"
elif r < 0.55 * intensity:
d = random.choice([0.45, 0.55, 0.65, 0.75])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
elif r < 0.78 * intensity:
sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
src = random.choice(list(sources.keys()))
msgs = [
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
f" ░░ FEED ACTIVE :: {src}",
f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}",
f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}",
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
]
text = random.choice(msgs)[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST])
return f"{color}{text}{RST}"
else:
title, _, _ = random.choice(items)
start = random.randint(0, max(0, len(title) - 20))
frag = title[start : start + random.randint(10, 35)]
pad = random.randint(0, max(0, w - len(frag) - 8))
gp = "".join(
random.choice(config.GLITCH) for _ in range(random.randint(1, 3))
)
text = (" " * pad + gp + " " + frag)[: w - 1]
color = random.choice([G_LO, C_DIM, W_GHOST])
return f"{color}{text}{RST}"
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

37
effects_plugins/glitch.py Normal file
View File

@@ -0,0 +1,37 @@
import random
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST
class GlitchEffect:
name = "glitch"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not buf:
return buf
result = list(buf)
intensity = self.config.intensity
glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16)
glitch_prob = glitch_prob * intensity
n_hits = 4 + int(ctx.mic_excess / 2)
n_hits = int(n_hits * intensity)
if random.random() < glitch_prob:
for _ in range(min(n_hits, len(result))):
gi = random.randint(0, len(result) - 1)
scr_row = gi + 1
result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}"
return result
def _glitch_bar(self, w: int) -> str:
c = random.choice(["", "", "", "\xc2"])
n = random.randint(3, w // 2)
o = random.randint(0, w - n)
return " " * o + f"{G_LO}{DIM}" + c * n + RST
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

36
effects_plugins/noise.py Normal file
View File

@@ -0,0 +1,36 @@
import random
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
class NoiseEffect:
name = "noise"
config = EffectConfig(enabled=True, intensity=0.15)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not ctx.ticker_height:
return buf
result = list(buf)
intensity = self.config.intensity
probability = intensity * 0.15
for r in range(len(result)):
cy = ctx.scroll_cam + r
if random.random() < probability:
result[r] = self._generate_noise(ctx.terminal_width, cy)
return result
def _generate_noise(self, w: int, cy: int) -> str:
d = random.choice([0.15, 0.25, 0.35, 0.12])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -11,10 +11,8 @@ import time
import tty import tty
from engine import config, render from engine import config, render
from engine.controller import StreamController
from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
from engine.terminal import ( from engine.terminal import (
CLR, CLR,
CURSOR_OFF, CURSOR_OFF,
@@ -249,6 +247,110 @@ def pick_font_face():
print() print()
def pick_effects_config():
"""Interactive picker for configuring effects pipeline."""
import effects_plugins
from engine.effects import get_effect_chain, get_registry
effects_plugins.discover_plugins()
registry = get_registry()
chain = get_effect_chain()
chain.set_order(["noise", "fade", "glitch", "firehose"])
effects = list(registry.list_all().values())
if not effects:
return
selected = 0
editing_intensity = False
intensity_value = 1.0
def _draw_effects_picker():
w = tw()
print(CLR, end="")
print("\033[1;1H", end="")
print(" \033[1;38;5;231mEFFECTS CONFIG\033[0m")
print(f" \033[2;38;5;37m{'' * (w - 4)}\033[0m")
print()
for i, effect in enumerate(effects):
prefix = " > " if i == selected else " "
marker = "[*]" if effect.config.enabled else "[ ]"
if editing_intensity and i == selected:
print(
f"{prefix}{marker} \033[1;38;5;82m{effect.name}\033[0m: intensity={intensity_value:.2f} (use +/- to adjust, Enter to confirm)"
)
else:
print(
f"{prefix}{marker} {effect.name}: intensity={effect.config.intensity:.2f}"
)
print()
print(f" \033[2;38;5;37m{'' * (w - 4)}\033[0m")
print(
" \033[38;5;245mControls: space=toggle on/off | +/-=adjust intensity | arrows=move | Enter=next effect | q=done\033[0m"
)
def _read_effects_key():
ch = sys.stdin.read(1)
if ch == "\x03":
return "interrupt"
if ch in ("\r", "\n"):
return "enter"
if ch == " ":
return "toggle"
if ch == "q":
return "quit"
if ch == "+" or ch == "=":
return "up"
if ch == "-" or ch == "_":
return "down"
if ch == "\x1b":
c1 = sys.stdin.read(1)
if c1 != "[":
return None
c2 = sys.stdin.read(1)
if c2 == "A":
return "up"
if c2 == "B":
return "down"
return None
return None
if not sys.stdin.isatty():
return
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while True:
_draw_effects_picker()
key = _read_effects_key()
if key == "quit" or key == "enter":
break
elif key == "up" and editing_intensity:
intensity_value = min(1.0, intensity_value + 0.1)
effects[selected].config.intensity = intensity_value
elif key == "down" and editing_intensity:
intensity_value = max(0.0, intensity_value - 0.1)
effects[selected].config.intensity = intensity_value
elif key == "up":
selected = max(0, selected - 1)
intensity_value = effects[selected].config.intensity
elif key == "down":
selected = min(len(effects) - 1, selected + 1)
intensity_value = effects[selected].config.intensity
elif key == "toggle":
effects[selected].config.enabled = not effects[selected].config.enabled
elif key == "interrupt":
raise KeyboardInterrupt
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def main(): def main():
atexit.register(lambda: print(CURSOR_ON, end="", flush=True)) atexit.register(lambda: print(CURSOR_ON, end="", flush=True))
@@ -259,10 +361,13 @@ def main():
signal.signal(signal.SIGINT, handle_sigint) signal.signal(signal.SIGINT, handle_sigint)
StreamController.warmup_topics()
w = tw() w = tw()
print(CLR, end="") print(CLR, end="")
print(CURSOR_OFF, end="") print(CURSOR_OFF, end="")
pick_font_face() pick_font_face()
pick_effects_config()
w = tw() w = tw()
print() print()
time.sleep(0.4) time.sleep(0.4)
@@ -314,9 +419,10 @@ def main():
sys.exit(1) sys.exit(1)
print() print()
mic = MicMonitor(threshold_db=config.MIC_THRESHOLD_DB) controller = StreamController()
mic_ok = mic.start() mic_ok, ntfy_ok = controller.initialize_sources()
if mic.available:
if controller.mic and controller.mic.available:
boot_ln( boot_ln(
"Microphone", "Microphone",
"ACTIVE" "ACTIVE"
@@ -325,12 +431,6 @@ def main():
bool(mic_ok), bool(mic_ok),
) )
ntfy = NtfyPoller(
config.NTFY_TOPIC,
reconnect_delay=config.NTFY_RECONNECT_DELAY,
display_secs=config.MESSAGE_DISPLAY_SECS,
)
ntfy_ok = ntfy.start()
boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok) boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok)
if config.FIREHOSE: if config.FIREHOSE:
@@ -343,7 +443,7 @@ def main():
print() print()
time.sleep(0.4) time.sleep(0.4)
stream(items, ntfy, mic) controller.run(items)
print() print()
print(f" {W_GHOST}{'' * (tw() - 4)}{RST}") print(f" {W_GHOST}{'' * (tw() - 4)}{RST}")

659
engine/benchmark.py Normal file
View File

@@ -0,0 +1,659 @@
#!/usr/bin/env python3
"""
Benchmark runner for mainline - tests performance across effects and displays.
Usage:
python -m engine.benchmark
python -m engine.benchmark --output report.md
python -m engine.benchmark --displays terminal,websocket --effects glitch,fade
python -m engine.benchmark --format json --output benchmark.json
Headless mode (default): suppress all terminal output during benchmarks.
"""
import argparse
import json
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from io import StringIO
from pathlib import Path
from typing import Any
import numpy as np
@dataclass
class BenchmarkResult:
"""Result of a single benchmark run."""
name: str
display: str
effect: str | None
iterations: int
total_time_ms: float
avg_time_ms: float
std_dev_ms: float
min_ms: float
max_ms: float
fps: float
chars_processed: int
chars_per_sec: float
@dataclass
class BenchmarkReport:
"""Complete benchmark report."""
timestamp: str
python_version: str
results: list[BenchmarkResult] = field(default_factory=list)
summary: dict[str, Any] = field(default_factory=dict)
def get_sample_buffer(width: int = 80, height: int = 24) -> list[str]:
"""Generate a sample buffer for benchmarking."""
lines = []
for i in range(height):
line = f"\x1b[32mLine {i}\x1b[0m " + "A" * (width - 10)
lines.append(line)
return lines
def benchmark_display(
display_class, buffer: list[str], iterations: int = 100
) -> BenchmarkResult | None:
"""Benchmark a single display."""
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
sys.stdout = StringIO()
sys.stderr = StringIO()
display = display_class()
display.init(80, 24)
times = []
chars = sum(len(line) for line in buffer)
for _ in range(iterations):
t0 = time.perf_counter()
display.show(buffer)
elapsed = (time.perf_counter() - t0) * 1000
times.append(elapsed)
display.cleanup()
except Exception:
return None
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
times_arr = np.array(times)
return BenchmarkResult(
name=f"display_{display_class.__name__}",
display=display_class.__name__,
effect=None,
iterations=iterations,
total_time_ms=sum(times),
avg_time_ms=float(np.mean(times_arr)),
std_dev_ms=float(np.std(times_arr)),
min_ms=float(np.min(times_arr)),
max_ms=float(np.max(times_arr)),
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
chars_processed=chars * iterations,
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
if sum(times) > 0
else 0.0,
)
def benchmark_effect_with_display(
effect_class, display, buffer: list[str], iterations: int = 100
) -> BenchmarkResult | None:
"""Benchmark an effect with a display."""
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
from engine.effects.types import EffectConfig, EffectContext
sys.stdout = StringIO()
sys.stderr = StringIO()
effect = effect_class()
effect.configure(EffectConfig(enabled=True, intensity=1.0))
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=0,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
times = []
chars = sum(len(line) for line in buffer)
for _ in range(iterations):
processed = effect.process(buffer, ctx)
t0 = time.perf_counter()
display.show(processed)
elapsed = (time.perf_counter() - t0) * 1000
times.append(elapsed)
display.cleanup()
except Exception:
return None
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
times_arr = np.array(times)
return BenchmarkResult(
name=f"effect_{effect_class.__name__}_with_{display.__class__.__name__}",
display=display.__class__.__name__,
effect=effect_class.__name__,
iterations=iterations,
total_time_ms=sum(times),
avg_time_ms=float(np.mean(times_arr)),
std_dev_ms=float(np.std(times_arr)),
min_ms=float(np.min(times_arr)),
max_ms=float(np.max(times_arr)),
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
chars_processed=chars * iterations,
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
if sum(times) > 0
else 0.0,
)
def get_available_displays():
"""Get available display classes."""
from engine.display import (
DisplayRegistry,
NullDisplay,
TerminalDisplay,
)
DisplayRegistry.initialize()
displays = [
("null", NullDisplay),
("terminal", TerminalDisplay),
]
try:
from engine.display.backends.websocket import WebSocketDisplay
displays.append(("websocket", WebSocketDisplay))
except Exception:
pass
try:
from engine.display.backends.sixel import SixelDisplay
displays.append(("sixel", SixelDisplay))
except Exception:
pass
return displays
def get_available_effects():
"""Get available effect classes."""
try:
from engine.effects import get_registry
try:
from effects_plugins import discover_plugins
discover_plugins()
except Exception:
pass
except Exception:
return []
effects = []
registry = get_registry()
for name, effect in registry.list_all().items():
if effect:
effect_cls = type(effect)
effects.append((name, effect_cls))
return effects
def run_benchmarks(
displays: list[tuple[str, Any]] | None = None,
effects: list[tuple[str, Any]] | None = None,
iterations: int = 100,
verbose: bool = False,
) -> BenchmarkReport:
"""Run all benchmarks and return report."""
from datetime import datetime
if displays is None:
displays = get_available_displays()
if effects is None:
effects = get_available_effects()
buffer = get_sample_buffer(80, 24)
results = []
if verbose:
print(f"Running benchmarks ({iterations} iterations each)...")
for name, display_class in displays:
if verbose:
print(f"Benchmarking display: {name}")
result = benchmark_display(display_class, buffer, iterations)
if result:
results.append(result)
if verbose:
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
if verbose:
print()
for effect_name, effect_class in effects:
for display_name, display_class in displays:
if display_name == "websocket":
continue
if verbose:
print(f"Benchmarking effect: {effect_name} with {display_name}")
display = display_class()
display.init(80, 24)
result = benchmark_effect_with_display(
effect_class, display, buffer, iterations
)
if result:
results.append(result)
if verbose:
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
summary = generate_summary(results)
return BenchmarkReport(
timestamp=datetime.now().isoformat(),
python_version=sys.version,
results=results,
summary=summary,
)
def generate_summary(results: list[BenchmarkResult]) -> dict[str, Any]:
"""Generate summary statistics from results."""
by_display: dict[str, list[BenchmarkResult]] = {}
by_effect: dict[str, list[BenchmarkResult]] = {}
for r in results:
if r.display not in by_display:
by_display[r.display] = []
by_display[r.display].append(r)
if r.effect:
if r.effect not in by_effect:
by_effect[r.effect] = []
by_effect[r.effect].append(r)
summary = {
"by_display": {},
"by_effect": {},
"overall": {
"total_tests": len(results),
"displays_tested": len(by_display),
"effects_tested": len(by_effect),
},
}
for display, res in by_display.items():
fps_values = [r.fps for r in res]
summary["by_display"][display] = {
"avg_fps": float(np.mean(fps_values)),
"min_fps": float(np.min(fps_values)),
"max_fps": float(np.max(fps_values)),
"tests": len(res),
}
for effect, res in by_effect.items():
fps_values = [r.fps for r in res]
summary["by_effect"][effect] = {
"avg_fps": float(np.mean(fps_values)),
"min_fps": float(np.min(fps_values)),
"max_fps": float(np.max(fps_values)),
"tests": len(res),
}
return summary
DEFAULT_CACHE_PATH = Path.home() / ".mainline_benchmark_cache.json"
def load_baseline(cache_path: Path | None = None) -> dict[str, Any] | None:
"""Load baseline benchmark results from cache."""
path = cache_path or DEFAULT_CACHE_PATH
if not path.exists():
return None
try:
with open(path) as f:
return json.load(f)
except Exception:
return None
def save_baseline(
results: list[BenchmarkResult],
cache_path: Path | None = None,
) -> None:
"""Save benchmark results as baseline to cache."""
path = cache_path or DEFAULT_CACHE_PATH
baseline = {
"timestamp": datetime.now().isoformat(),
"results": {
r.name: {
"fps": r.fps,
"avg_time_ms": r.avg_time_ms,
"chars_per_sec": r.chars_per_sec,
}
for r in results
},
}
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(baseline, f, indent=2)
def compare_with_baseline(
results: list[BenchmarkResult],
baseline: dict[str, Any],
threshold: float = 0.2,
verbose: bool = True,
) -> tuple[bool, list[str]]:
"""Compare current results with baseline. Returns (pass, messages)."""
baseline_results = baseline.get("results", {})
failures = []
warnings = []
for r in results:
if r.name not in baseline_results:
warnings.append(f"New test: {r.name} (no baseline)")
continue
b = baseline_results[r.name]
if b["fps"] == 0:
continue
degradation = (b["fps"] - r.fps) / b["fps"]
if degradation > threshold:
failures.append(
f"{r.name}: FPS degraded {degradation * 100:.1f}% "
f"(baseline: {b['fps']:.1f}, current: {r.fps:.1f})"
)
elif verbose:
print(f" {r.name}: {r.fps:.1f} FPS (baseline: {b['fps']:.1f})")
passed = len(failures) == 0
messages = []
if failures:
messages.extend(failures)
if warnings:
messages.extend(warnings)
return passed, messages
def run_hook_mode(
displays: list[tuple[str, Any]] | None = None,
effects: list[tuple[str, Any]] | None = None,
iterations: int = 20,
threshold: float = 0.2,
cache_path: Path | None = None,
verbose: bool = False,
) -> int:
"""Run in hook mode: compare against baseline, exit 0 on pass, 1 on fail."""
baseline = load_baseline(cache_path)
if baseline is None:
print("No baseline found. Run with --baseline to create one.")
return 1
report = run_benchmarks(displays, effects, iterations, verbose)
passed, messages = compare_with_baseline(
report.results, baseline, threshold, verbose
)
print("\n=== Benchmark Hook Results ===")
if passed:
print("PASSED - No significant performance degradation")
return 0
else:
print("FAILED - Performance degradation detected:")
for msg in messages:
print(f" - {msg}")
return 1
def format_report_text(report: BenchmarkReport) -> str:
"""Format report as human-readable text."""
lines = [
"# Mainline Performance Benchmark Report",
"",
f"Generated: {report.timestamp}",
f"Python: {report.python_version}",
"",
"## Summary",
"",
f"Total tests: {report.summary['overall']['total_tests']}",
f"Displays tested: {report.summary['overall']['displays_tested']}",
f"Effects tested: {report.summary['overall']['effects_tested']}",
"",
"## By Display",
"",
]
for display, stats in report.summary["by_display"].items():
lines.append(f"### {display}")
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
lines.append(f"- Tests: {stats['tests']}")
lines.append("")
if report.summary["by_effect"]:
lines.append("## By Effect")
lines.append("")
for effect, stats in report.summary["by_effect"].items():
lines.append(f"### {effect}")
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
lines.append(f"- Tests: {stats['tests']}")
lines.append("")
lines.append("## Detailed Results")
lines.append("")
lines.append("| Display | Effect | FPS | Avg ms | StdDev ms | Min ms | Max ms |")
lines.append("|---------|--------|-----|--------|-----------|--------|--------|")
for r in report.results:
effect_col = r.effect if r.effect else "-"
lines.append(
f"| {r.display} | {effect_col} | {r.fps:.1f} | {r.avg_time_ms:.2f} | "
f"{r.std_dev_ms:.2f} | {r.min_ms:.2f} | {r.max_ms:.2f} |"
)
return "\n".join(lines)
def format_report_json(report: BenchmarkReport) -> str:
"""Format report as JSON."""
data = {
"timestamp": report.timestamp,
"python_version": report.python_version,
"summary": report.summary,
"results": [
{
"name": r.name,
"display": r.display,
"effect": r.effect,
"iterations": r.iterations,
"total_time_ms": r.total_time_ms,
"avg_time_ms": r.avg_time_ms,
"std_dev_ms": r.std_dev_ms,
"min_ms": r.min_ms,
"max_ms": r.max_ms,
"fps": r.fps,
"chars_processed": r.chars_processed,
"chars_per_sec": r.chars_per_sec,
}
for r in report.results
],
}
return json.dumps(data, indent=2)
def main():
parser = argparse.ArgumentParser(description="Run mainline benchmarks")
parser.add_argument(
"--displays",
help="Comma-separated list of displays to test (default: all)",
)
parser.add_argument(
"--effects",
help="Comma-separated list of effects to test (default: all)",
)
parser.add_argument(
"--iterations",
type=int,
default=100,
help="Number of iterations per test (default: 100)",
)
parser.add_argument(
"--output",
help="Output file path (default: stdout)",
)
parser.add_argument(
"--format",
choices=["text", "json"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Show progress during benchmarking",
)
parser.add_argument(
"--hook",
action="store_true",
help="Run in hook mode: compare against baseline, exit 0 pass, 1 fail",
)
parser.add_argument(
"--baseline",
action="store_true",
help="Save current results as baseline for future hook comparisons",
)
parser.add_argument(
"--threshold",
type=float,
default=0.2,
help="Performance degradation threshold for hook mode (default: 0.2 = 20%%)",
)
parser.add_argument(
"--cache",
type=str,
default=None,
help="Path to baseline cache file (default: ~/.mainline_benchmark_cache.json)",
)
args = parser.parse_args()
cache_path = Path(args.cache) if args.cache else DEFAULT_CACHE_PATH
if args.hook:
displays = None
if args.displays:
display_map = dict(get_available_displays())
displays = [
(name, display_map[name])
for name in args.displays.split(",")
if name in display_map
]
effects = None
if args.effects:
effect_map = dict(get_available_effects())
effects = [
(name, effect_map[name])
for name in args.effects.split(",")
if name in effect_map
]
return run_hook_mode(
displays,
effects,
iterations=args.iterations,
threshold=args.threshold,
cache_path=cache_path,
verbose=args.verbose,
)
displays = None
if args.displays:
display_map = dict(get_available_displays())
displays = [
(name, display_map[name])
for name in args.displays.split(",")
if name in display_map
]
effects = None
if args.effects:
effect_map = dict(get_available_effects())
effects = [
(name, effect_map[name])
for name in args.effects.split(",")
if name in effect_map
]
report = run_benchmarks(displays, effects, args.iterations, args.verbose)
if args.baseline:
save_baseline(report.results, cache_path)
print(f"Baseline saved to {cache_path}")
return 0
if args.format == "json":
output = format_report_json(report)
else:
output = format_report_text(report)
if args.output:
with open(args.output, "w") as f:
f.write(output)
else:
print(output)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -105,6 +105,8 @@ class Config:
firehose: bool = False firehose: bool = False
ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json" ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json"
ntfy_cc_cmd_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
ntfy_cc_resp_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
ntfy_reconnect_delay: int = 5 ntfy_reconnect_delay: int = 5
message_display_secs: int = 30 message_display_secs: int = 30
@@ -127,6 +129,10 @@ class Config:
script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths) script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths)
display: str = "terminal"
websocket: bool = False
websocket_port: int = 8765
@classmethod @classmethod
def from_args(cls, argv: list[str] | None = None) -> "Config": def from_args(cls, argv: list[str] | None = None) -> "Config":
"""Create Config from CLI arguments (or custom argv for testing).""" """Create Config from CLI arguments (or custom argv for testing)."""
@@ -148,6 +154,8 @@ class Config:
mode="poetry" if "--poetry" in argv or "-p" in argv else "news", mode="poetry" if "--poetry" in argv or "-p" in argv else "news",
firehose="--firehose" in argv, firehose="--firehose" in argv,
ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json", ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json",
ntfy_cc_cmd_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json",
ntfy_cc_resp_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json",
ntfy_reconnect_delay=5, ntfy_reconnect_delay=5,
message_display_secs=30, message_display_secs=30,
font_dir=font_dir, font_dir=font_dir,
@@ -164,6 +172,9 @@ class Config:
glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋", glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋",
kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ", kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ",
script_fonts=_get_platform_font_paths(), script_fonts=_get_platform_font_paths(),
display=_arg_value("--display", argv) or "terminal",
websocket="--websocket" in argv,
websocket_port=_arg_int("--websocket-port", 8765, argv),
) )
@@ -193,6 +204,8 @@ FIREHOSE = "--firehose" in sys.argv
# ─── NTFY MESSAGE QUEUE ────────────────────────────────── # ─── NTFY MESSAGE QUEUE ──────────────────────────────────
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json" NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json"
NTFY_CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
NTFY_CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen
@@ -223,6 +236,11 @@ GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep)
GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ" KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
# ─── WEBSOCKET ─────────────────────────────────────────────
DISPLAY = _arg_value("--display", sys.argv) or "terminal"
WEBSOCKET = "--websocket" in sys.argv
WEBSOCKET_PORT = _arg_int("--websocket-port", 8765)
def set_font_selection(font_path=None, font_index=None): def set_font_selection(font_path=None, font_index=None):
"""Set runtime primary font selection.""" """Set runtime primary font selection."""

View File

@@ -3,6 +3,15 @@ Stream controller - manages input sources and orchestrates the render stream.
""" """
from engine.config import Config, get_config from engine.config import Config, get_config
from engine.display import (
DisplayRegistry,
MultiDisplay,
NullDisplay,
SixelDisplay,
TerminalDisplay,
WebSocketDisplay,
)
from engine.effects.controller import handle_effects_command
from engine.eventbus import EventBus from engine.eventbus import EventBus
from engine.events import EventType, StreamEvent from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor from engine.mic import MicMonitor
@@ -10,14 +19,76 @@ from engine.ntfy import NtfyPoller
from engine.scroll import stream from engine.scroll import stream
def _get_display(config: Config):
"""Get the appropriate display based on config."""
DisplayRegistry.initialize()
display_mode = config.display.lower()
displays = []
if display_mode in ("terminal", "both"):
displays.append(TerminalDisplay())
if display_mode in ("websocket", "both"):
ws = WebSocketDisplay(host="0.0.0.0", port=config.websocket_port)
ws.start_server()
ws.start_http_server()
displays.append(ws)
if display_mode == "sixel":
displays.append(SixelDisplay())
if not displays:
return NullDisplay()
if len(displays) == 1:
return displays[0]
return MultiDisplay(displays)
class StreamController: class StreamController:
"""Controls the stream lifecycle - initializes sources and runs the stream.""" """Controls the stream lifecycle - initializes sources and runs the stream."""
_topics_warmed = False
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None): def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
self.config = config or get_config() self.config = config or get_config()
self.event_bus = event_bus self.event_bus = event_bus
self.mic: MicMonitor | None = None self.mic: MicMonitor | None = None
self.ntfy: NtfyPoller | None = None self.ntfy: NtfyPoller | None = None
self.ntfy_cc: NtfyPoller | None = None
@classmethod
def warmup_topics(cls) -> None:
"""Warm up ntfy topics lazily (creates them if they don't exist)."""
if cls._topics_warmed:
return
import urllib.request
topics = [
"https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd",
"https://ntfy.sh/klubhaus_terminal_mainline_cc_resp",
"https://ntfy.sh/klubhaus_terminal_mainline",
]
for topic in topics:
try:
req = urllib.request.Request(
topic,
data=b"init",
headers={
"User-Agent": "mainline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
cls._topics_warmed = True
def initialize_sources(self) -> tuple[bool, bool]: def initialize_sources(self) -> tuple[bool, bool]:
"""Initialize microphone and ntfy sources. """Initialize microphone and ntfy sources.
@@ -35,7 +106,38 @@ class StreamController:
) )
ntfy_ok = self.ntfy.start() ntfy_ok = self.ntfy.start()
return bool(mic_ok), ntfy_ok self.ntfy_cc = NtfyPoller(
self.config.ntfy_cc_cmd_topic,
reconnect_delay=self.config.ntfy_reconnect_delay,
display_secs=5,
)
self.ntfy_cc.subscribe(self._handle_cc_message)
ntfy_cc_ok = self.ntfy_cc.start()
return bool(mic_ok), ntfy_ok and ntfy_cc_ok
def _handle_cc_message(self, event) -> None:
"""Handle incoming C&C message - like a serial port control interface."""
import urllib.request
cmd = event.body.strip() if hasattr(event, "body") else str(event).strip()
if not cmd.startswith("/"):
return
response = handle_effects_command(cmd)
topic_url = self.config.ntfy_cc_resp_topic.replace("/json", "")
data = response.encode("utf-8")
req = urllib.request.Request(
topic_url,
data=data,
headers={"User-Agent": "mainline/0.1", "Content-Type": "text/plain"},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
def run(self, items: list) -> None: def run(self, items: list) -> None:
"""Run the stream with initialized sources.""" """Run the stream with initialized sources."""
@@ -51,7 +153,10 @@ class StreamController:
), ),
) )
stream(items, self.ntfy, self.mic) display = _get_display(self.config)
stream(items, self.ntfy, self.mic, display)
if display:
display.cleanup()
if self.event_bus: if self.event_bus:
self.event_bus.publish( self.event_bus.publish(

102
engine/display/__init__.py Normal file
View File

@@ -0,0 +1,102 @@
"""
Display backend system with registry pattern.
Allows swapping output backends via the Display protocol.
Supports auto-discovery of display backends.
"""
from typing import Protocol
from engine.display.backends.multi import MultiDisplay
from engine.display.backends.null import NullDisplay
from engine.display.backends.sixel import SixelDisplay
from engine.display.backends.terminal import TerminalDisplay
from engine.display.backends.websocket import WebSocketDisplay
class Display(Protocol):
"""Protocol for display backends."""
width: int
height: int
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
class DisplayRegistry:
"""Registry for display backends with auto-discovery."""
_backends: dict[str, type[Display]] = {}
_initialized = False
@classmethod
def register(cls, name: str, backend_class: type[Display]) -> None:
"""Register a display backend."""
cls._backends[name.lower()] = backend_class
@classmethod
def get(cls, name: str) -> type[Display] | None:
"""Get a display backend class by name."""
return cls._backends.get(name.lower())
@classmethod
def list_backends(cls) -> list[str]:
"""List all available display backend names."""
return list(cls._backends.keys())
@classmethod
def create(cls, name: str, **kwargs) -> Display | None:
"""Create a display instance by name."""
backend_class = cls.get(name)
if backend_class:
return backend_class(**kwargs)
return None
@classmethod
def initialize(cls) -> None:
"""Initialize and register all built-in backends."""
if cls._initialized:
return
cls.register("terminal", TerminalDisplay)
cls.register("null", NullDisplay)
cls.register("websocket", WebSocketDisplay)
cls.register("sixel", SixelDisplay)
cls._initialized = True
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
__all__ = [
"Display",
"DisplayRegistry",
"get_monitor",
"TerminalDisplay",
"NullDisplay",
"WebSocketDisplay",
"SixelDisplay",
"MultiDisplay",
]

View File

@@ -0,0 +1,33 @@
"""
Multi display backend - forwards to multiple displays.
"""
class MultiDisplay:
"""Display that forwards to multiple displays."""
width: int = 80
height: int = 24
def __init__(self, displays: list):
self.displays = displays
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
for d in self.displays:
d.init(width, height)
def show(self, buffer: list[str]) -> None:
for d in self.displays:
d.show(buffer)
def clear(self) -> None:
for d in self.displays:
d.clear()
def cleanup(self) -> None:
for d in self.displays:
d.cleanup()

View File

@@ -0,0 +1,32 @@
"""
Null/headless display backend.
"""
import time
class NullDisplay:
"""Headless/null display - discards all output."""
width: int = 80
height: int = 24
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
def show(self, buffer: list[str]) -> None:
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
t0 = time.perf_counter()
chars_in = sum(len(line) for line in buffer)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass

View File

@@ -0,0 +1,269 @@
"""
Sixel graphics display backend - renders to sixel graphics in terminal.
"""
import time
def _parse_ansi(
text: str,
) -> list[tuple[str, tuple[int, int, int], tuple[int, int, int], bool]]:
"""Parse ANSI text into tokens with fg/bg colors.
Returns list of (text, fg_rgb, bg_rgb, bold).
"""
tokens = []
current_text = ""
fg = (204, 204, 204)
bg = (0, 0, 0)
bold = False
i = 0
ANSI_COLORS = {
0: (0, 0, 0),
1: (205, 49, 49),
2: (13, 188, 121),
3: (229, 229, 16),
4: (36, 114, 200),
5: (188, 63, 188),
6: (17, 168, 205),
7: (229, 229, 229),
8: (102, 102, 102),
9: (241, 76, 76),
10: (35, 209, 139),
11: (245, 245, 67),
12: (59, 142, 234),
13: (214, 112, 214),
14: (41, 184, 219),
15: (255, 255, 255),
}
while i < len(text):
char = text[i]
if char == "\x1b" and i + 1 < len(text) and text[i + 1] == "[":
if current_text:
tokens.append((current_text, fg, bg, bold))
current_text = ""
i += 2
code = ""
while i < len(text):
c = text[i]
if c.isalpha():
break
code += c
i += 1
if code:
codes = code.split(";")
for c in codes:
try:
n = int(c) if c else 0
except ValueError:
continue
if n == 0:
fg = (204, 204, 204)
bg = (0, 0, 0)
bold = False
elif n == 1:
bold = True
elif n == 22:
bold = False
elif n == 39:
fg = (204, 204, 204)
elif n == 49:
bg = (0, 0, 0)
elif 30 <= n <= 37:
fg = ANSI_COLORS.get(n - 30 + (8 if bold else 0), fg)
elif 40 <= n <= 47:
bg = ANSI_COLORS.get(n - 40, bg)
elif 90 <= n <= 97:
fg = ANSI_COLORS.get(n - 90 + 8, fg)
elif 100 <= n <= 107:
bg = ANSI_COLORS.get(n - 100 + 8, bg)
elif 1 <= n <= 256:
if n < 16:
fg = ANSI_COLORS.get(n, fg)
elif n < 232:
c = n - 16
r = (c // 36) * 51
g = ((c % 36) // 6) * 51
b = (c % 6) * 51
fg = (r, g, b)
else:
gray = (n - 232) * 10 + 8
fg = (gray, gray, gray)
else:
current_text += char
i += 1
if current_text:
tokens.append((current_text, fg, bg, bold))
return tokens if tokens else [("", fg, bg, bold)]
def _encode_sixel(image) -> str:
"""Encode a PIL Image to sixel format (pure Python)."""
img = image.convert("RGBA")
width, height = img.size
pixels = img.load()
palette = []
pixel_palette_idx = {}
def get_color_idx(r, g, b, a):
if a < 128:
return -1
key = (r // 32, g // 32, b // 32)
if key not in pixel_palette_idx:
idx = len(palette)
if idx < 256:
palette.append((r, g, b))
pixel_palette_idx[key] = idx
return pixel_palette_idx.get(key, 0)
for y in range(height):
for x in range(width):
r, g, b, a = pixels[x, y]
get_color_idx(r, g, b, a)
if not palette:
return ""
if len(palette) == 1:
palette = [palette[0], (0, 0, 0)]
sixel_data = []
sixel_data.append(
f'"{"".join(f"#{i};2;{r};{g};{b}" for i, (r, g, b) in enumerate(palette))}'
)
for x in range(width):
col_data = []
for y in range(0, height, 6):
bits = 0
color_idx = -1
for dy in range(6):
if y + dy < height:
r, g, b, a = pixels[x, y + dy]
if a >= 128:
bits |= 1 << dy
idx = get_color_idx(r, g, b, a)
if color_idx == -1:
color_idx = idx
elif color_idx != idx:
color_idx = -2
if color_idx >= 0:
col_data.append(
chr(63 + color_idx) + chr(63 + bits)
if bits
else chr(63 + color_idx) + "?"
)
elif color_idx == -2:
pass
if col_data:
sixel_data.append("".join(col_data) + "$")
else:
sixel_data.append("-" if x < width - 1 else "$")
sixel_data.append("\x1b\\")
return "\x1bPq" + "".join(sixel_data)
class SixelDisplay:
"""Sixel graphics display backend - renders to sixel graphics in terminal."""
width: int = 80
height: int = 24
def __init__(self, cell_width: int = 9, cell_height: int = 16):
self.width = 80
self.height = 24
self.cell_width = cell_width
self.cell_height = cell_height
self._initialized = False
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
self._initialized = True
def show(self, buffer: list[str]) -> None:
import sys
t0 = time.perf_counter()
img_width = self.width * self.cell_width
img_height = self.height * self.cell_height
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
return
img = Image.new("RGBA", (img_width, img_height), (0, 0, 0, 255))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf",
self.cell_height - 2,
)
except Exception:
try:
font = ImageFont.load_default()
except Exception:
font = None
for row_idx, line in enumerate(buffer[: self.height]):
if row_idx >= self.height:
break
tokens = _parse_ansi(line)
x_pos = 0
y_pos = row_idx * self.cell_height
for text, fg, bg, bold in tokens:
if not text:
continue
if bg != (0, 0, 0):
bbox = draw.textbbox((x_pos, y_pos), text, font=font)
draw.rectangle(bbox, fill=(*bg, 255))
if bold and font:
draw.text((x_pos - 1, y_pos - 1), text, fill=(*fg, 255), font=font)
draw.text((x_pos, y_pos), text, fill=(*fg, 255), font=font)
if font:
x_pos += draw.textlength(text, font=font)
sixel = _encode_sixel(img)
sys.stdout.buffer.write(sixel.encode("utf-8"))
sys.stdout.flush()
elapsed_ms = (time.perf_counter() - t0) * 1000
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("sixel_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
import sys
sys.stdout.buffer.write(b"\x1b[2J\x1b[H")
sys.stdout.flush()
def cleanup(self) -> None:
pass

View File

@@ -0,0 +1,48 @@
"""
ANSI terminal display backend.
"""
import time
class TerminalDisplay:
"""ANSI terminal display backend."""
width: int = 80
height: int = 24
def __init__(self):
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
from engine.terminal import CURSOR_OFF
self.width = width
self.height = height
print(CURSOR_OFF, end="", flush=True)
def show(self, buffer: list[str]) -> None:
import sys
t0 = time.perf_counter()
sys.stdout.buffer.write("".join(buffer).encode())
sys.stdout.flush()
elapsed_ms = (time.perf_counter() - t0) * 1000
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
from engine.terminal import CLR
print(CLR, end="", flush=True)
def cleanup(self) -> None:
from engine.terminal import CURSOR_ON
print(CURSOR_ON, end="", flush=True)

View File

@@ -0,0 +1,266 @@
"""
WebSocket display backend - broadcasts frame buffer to connected web clients.
"""
import asyncio
import json
import threading
import time
from typing import Protocol
try:
import websockets
except ImportError:
websockets = None
class Display(Protocol):
"""Protocol for display backends."""
width: int
height: int
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
class WebSocketDisplay:
"""WebSocket display backend - broadcasts to HTML Canvas clients."""
width: int = 80
height: int = 24
def __init__(
self,
host: str = "0.0.0.0",
port: int = 8765,
http_port: int = 8766,
):
self.host = host
self.port = port
self.http_port = http_port
self.width = 80
self.height = 24
self._clients: set = set()
self._server_running = False
self._http_running = False
self._server_thread: threading.Thread | None = None
self._http_thread: threading.Thread | None = None
self._available = True
self._max_clients = 10
self._client_connected_callback = None
self._client_disconnected_callback = None
self._frame_delay = 0.0
try:
import websockets as _ws
self._available = _ws is not None
except ImportError:
self._available = False
def is_available(self) -> bool:
"""Check if WebSocket support is available."""
return self._available
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions and start server."""
self.width = width
self.height = height
self.start_server()
self.start_http_server()
def show(self, buffer: list[str]) -> None:
"""Broadcast buffer to all connected clients."""
t0 = time.perf_counter()
if self._clients:
frame_data = {
"type": "frame",
"width": self.width,
"height": self.height,
"lines": buffer,
}
message = json.dumps(frame_data)
disconnected = set()
for client in list(self._clients):
try:
asyncio.run(client.send(message))
except Exception:
disconnected.add(client)
for client in disconnected:
self._clients.discard(client)
if self._client_disconnected_callback:
self._client_disconnected_callback(client)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("websocket_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
"""Broadcast clear command to all clients."""
if self._clients:
clear_data = {"type": "clear"}
message = json.dumps(clear_data)
for client in list(self._clients):
try:
asyncio.run(client.send(message))
except Exception:
pass
def cleanup(self) -> None:
"""Stop the servers."""
self.stop_server()
self.stop_http_server()
async def _websocket_handler(self, websocket):
"""Handle WebSocket connections."""
if len(self._clients) >= self._max_clients:
await websocket.close()
return
self._clients.add(websocket)
if self._client_connected_callback:
self._client_connected_callback(websocket)
try:
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "resize":
self.width = data.get("width", 80)
self.height = data.get("height", 24)
except json.JSONDecodeError:
pass
except Exception:
pass
finally:
self._clients.discard(websocket)
if self._client_disconnected_callback:
self._client_disconnected_callback(websocket)
async def _run_websocket_server(self):
"""Run the WebSocket server."""
async with websockets.serve(self._websocket_handler, self.host, self.port):
while self._server_running:
await asyncio.sleep(0.1)
async def _run_http_server(self):
"""Run simple HTTP server for the client."""
import os
from http.server import HTTPServer, SimpleHTTPRequestHandler
client_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "client"
)
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=client_dir, **kwargs)
def log_message(self, format, *args):
pass
httpd = HTTPServer((self.host, self.http_port), Handler)
while self._http_running:
httpd.handle_request()
def _run_async(self, coro):
"""Run coroutine in background."""
try:
asyncio.run(coro)
except Exception as e:
print(f"WebSocket async error: {e}")
def start_server(self):
"""Start the WebSocket server in a background thread."""
if not self._available:
return
if self._server_thread is not None:
return
self._server_running = True
self._server_thread = threading.Thread(
target=self._run_async, args=(self._run_websocket_server(),), daemon=True
)
self._server_thread.start()
def stop_server(self):
"""Stop the WebSocket server."""
self._server_running = False
self._server_thread = None
def start_http_server(self):
"""Start the HTTP server in a background thread."""
if not self._available:
return
if self._http_thread is not None:
return
self._http_running = True
self._http_running = True
self._http_thread = threading.Thread(
target=self._run_async, args=(self._run_http_server(),), daemon=True
)
self._http_thread.start()
def stop_http_server(self):
"""Stop the HTTP server."""
self._http_running = False
self._http_thread = None
def client_count(self) -> int:
"""Return number of connected clients."""
return len(self._clients)
def get_ws_port(self) -> int:
"""Return WebSocket port."""
return self.port
def get_http_port(self) -> int:
"""Return HTTP port."""
return self.http_port
def set_frame_delay(self, delay: float) -> None:
"""Set delay between frames in seconds."""
self._frame_delay = delay
def get_frame_delay(self) -> float:
"""Get delay between frames."""
return self._frame_delay
def set_client_connected_callback(self, callback) -> None:
"""Set callback for client connections."""
self._client_connected_callback = callback
def set_client_disconnected_callback(self, callback) -> None:
"""Set callback for client disconnections."""
self._client_disconnected_callback = callback

View File

@@ -0,0 +1,42 @@
from engine.effects.chain import EffectChain
from engine.effects.controller import handle_effects_command, show_effects_menu
from engine.effects.legacy import (
fade_line,
firehose_line,
glitch_bar,
next_headline,
noise,
vis_trunc,
)
from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor
from engine.effects.registry import EffectRegistry, get_registry, set_registry
from engine.effects.types import EffectConfig, EffectContext, PipelineConfig
def get_effect_chain():
from engine.layers import get_effect_chain as _chain
return _chain()
__all__ = [
"EffectChain",
"EffectRegistry",
"EffectConfig",
"EffectContext",
"PipelineConfig",
"get_registry",
"set_registry",
"get_effect_chain",
"get_monitor",
"set_monitor",
"PerformanceMonitor",
"handle_effects_command",
"show_effects_menu",
"fade_line",
"firehose_line",
"glitch_bar",
"noise",
"next_headline",
"vis_trunc",
]

71
engine/effects/chain.py Normal file
View File

@@ -0,0 +1,71 @@
import time
from engine.effects.performance import PerformanceMonitor, get_monitor
from engine.effects.registry import EffectRegistry
from engine.effects.types import EffectContext
class EffectChain:
def __init__(
self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None
):
self._registry = registry
self._order: list[str] = []
self._monitor = monitor
def _get_monitor(self) -> PerformanceMonitor:
if self._monitor is not None:
return self._monitor
return get_monitor()
def set_order(self, names: list[str]) -> None:
self._order = list(names)
def get_order(self) -> list[str]:
return self._order.copy()
def add_effect(self, name: str, position: int | None = None) -> bool:
if name not in self._registry.list_all():
return False
if position is None:
self._order.append(name)
else:
self._order.insert(position, name)
return True
def remove_effect(self, name: str) -> bool:
if name in self._order:
self._order.remove(name)
return True
return False
def reorder(self, new_order: list[str]) -> bool:
all_plugins = set(self._registry.list_all().keys())
if not all(name in all_plugins for name in new_order):
return False
self._order = list(new_order)
return True
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
monitor = self._get_monitor()
frame_number = ctx.frame_number
monitor.start_frame(frame_number)
frame_start = time.perf_counter()
result = list(buf)
for name in self._order:
plugin = self._registry.get(name)
if plugin and plugin.config.enabled:
chars_in = sum(len(line) for line in result)
effect_start = time.perf_counter()
try:
result = plugin.process(result, ctx)
except Exception:
plugin.config.enabled = False
elapsed = time.perf_counter() - effect_start
chars_out = sum(len(line) for line in result)
monitor.record_effect(name, elapsed * 1000, chars_in, chars_out)
total_elapsed = time.perf_counter() - frame_start
monitor.end_frame(frame_number, total_elapsed * 1000)
return result

View File

@@ -0,0 +1,144 @@
from engine.effects.performance import get_monitor
from engine.effects.registry import get_registry
_effect_chain_ref = None
def _get_effect_chain():
global _effect_chain_ref
if _effect_chain_ref is not None:
return _effect_chain_ref
try:
from engine.layers import get_effect_chain as _chain
return _chain()
except Exception:
return None
def set_effect_chain_ref(chain) -> None:
global _effect_chain_ref
_effect_chain_ref = chain
def handle_effects_command(cmd: str) -> str:
"""Handle /effects command from NTFY message.
Commands:
/effects list - list all effects and their status
/effects <name> on - enable an effect
/effects <name> off - disable an effect
/effects <name> intensity <0.0-1.0> - set intensity
/effects reorder <name1>,<name2>,... - reorder pipeline
/effects stats - show performance statistics
"""
parts = cmd.strip().split()
if not parts or parts[0] != "/effects":
return "Unknown command"
registry = get_registry()
chain = _get_effect_chain()
if len(parts) == 1 or parts[1] == "list":
result = ["Effects:"]
for name, plugin in registry.list_all().items():
status = "ON" if plugin.config.enabled else "OFF"
intensity = plugin.config.intensity
result.append(f" {name}: {status} (intensity={intensity})")
if chain:
result.append(f"Order: {chain.get_order()}")
return "\n".join(result)
if parts[1] == "stats":
return _format_stats()
if parts[1] == "reorder" and len(parts) >= 3:
new_order = parts[2].split(",")
if chain and chain.reorder(new_order):
return f"Reordered pipeline: {new_order}"
return "Failed to reorder pipeline"
if len(parts) < 3:
return "Usage: /effects <name> on|off|intensity <value>"
effect_name = parts[1]
action = parts[2]
if effect_name not in registry.list_all():
return f"Unknown effect: {effect_name}"
if action == "on":
registry.enable(effect_name)
return f"Enabled: {effect_name}"
if action == "off":
registry.disable(effect_name)
return f"Disabled: {effect_name}"
if action == "intensity" and len(parts) >= 4:
try:
value = float(parts[3])
if not 0.0 <= value <= 1.0:
return "Intensity must be between 0.0 and 1.0"
plugin = registry.get(effect_name)
if plugin:
plugin.config.intensity = value
return f"Set {effect_name} intensity to {value}"
except ValueError:
return "Invalid intensity value"
return f"Unknown action: {action}"
def _format_stats() -> str:
monitor = get_monitor()
stats = monitor.get_stats()
if "error" in stats:
return stats["error"]
lines = ["Performance Stats:"]
pipeline = stats["pipeline"]
lines.append(
f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)"
)
if stats["effects"]:
lines.append(" Per-effect (avg ms):")
for name, effect_stats in stats["effects"].items():
lines.append(
f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms"
)
return "\n".join(lines)
def show_effects_menu() -> str:
"""Generate effects menu text for display."""
registry = get_registry()
chain = _get_effect_chain()
lines = [
"\033[1;38;5;231m=== EFFECTS MENU ===\033[0m",
"",
"Effects:",
]
for name, plugin in registry.list_all().items():
status = "ON" if plugin.config.enabled else "OFF"
intensity = plugin.config.intensity
lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}")
if chain:
lines.append("")
lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}")
lines.append("")
lines.append("Controls:")
lines.append(" /effects <name> on|off")
lines.append(" /effects <name> intensity <0.0-1.0>")
lines.append(" /effects reorder name1,name2,...")
lines.append("")
return "\n".join(lines)

View File

@@ -0,0 +1,103 @@
from collections import deque
from dataclasses import dataclass
@dataclass
class EffectTiming:
name: str
duration_ms: float
buffer_chars_in: int
buffer_chars_out: int
@dataclass
class FrameTiming:
frame_number: int
total_ms: float
effects: list[EffectTiming]
class PerformanceMonitor:
"""Collects and stores performance metrics for effect pipeline."""
def __init__(self, max_frames: int = 60):
self._max_frames = max_frames
self._frames: deque[FrameTiming] = deque(maxlen=max_frames)
self._current_frame: list[EffectTiming] = []
def start_frame(self, frame_number: int) -> None:
self._current_frame = []
def record_effect(
self, name: str, duration_ms: float, chars_in: int, chars_out: int
) -> None:
self._current_frame.append(
EffectTiming(
name=name,
duration_ms=duration_ms,
buffer_chars_in=chars_in,
buffer_chars_out=chars_out,
)
)
def end_frame(self, frame_number: int, total_ms: float) -> None:
self._frames.append(
FrameTiming(
frame_number=frame_number,
total_ms=total_ms,
effects=self._current_frame,
)
)
def get_stats(self) -> dict:
if not self._frames:
return {"error": "No timing data available"}
total_times = [f.total_ms for f in self._frames]
avg_total = sum(total_times) / len(total_times)
min_total = min(total_times)
max_total = max(total_times)
effect_stats: dict[str, dict] = {}
for frame in self._frames:
for effect in frame.effects:
if effect.name not in effect_stats:
effect_stats[effect.name] = {"times": [], "total_chars": 0}
effect_stats[effect.name]["times"].append(effect.duration_ms)
effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out
for name, stats in effect_stats.items():
times = stats["times"]
stats["avg_ms"] = sum(times) / len(times)
stats["min_ms"] = min(times)
stats["max_ms"] = max(times)
del stats["times"]
return {
"frame_count": len(self._frames),
"pipeline": {
"avg_ms": avg_total,
"min_ms": min_total,
"max_ms": max_total,
},
"effects": effect_stats,
}
def reset(self) -> None:
self._frames.clear()
self._current_frame = []
_monitor: PerformanceMonitor | None = None
def get_monitor() -> PerformanceMonitor:
global _monitor
if _monitor is None:
_monitor = PerformanceMonitor()
return _monitor
def set_monitor(monitor: PerformanceMonitor) -> None:
global _monitor
_monitor = monitor

View File

@@ -0,0 +1,59 @@
from engine.effects.types import EffectConfig, EffectPlugin
class EffectRegistry:
def __init__(self):
self._plugins: dict[str, EffectPlugin] = {}
self._discovered: bool = False
def register(self, plugin: EffectPlugin) -> None:
self._plugins[plugin.name] = plugin
def get(self, name: str) -> EffectPlugin | None:
return self._plugins.get(name)
def list_all(self) -> dict[str, EffectPlugin]:
return self._plugins.copy()
def list_enabled(self) -> list[EffectPlugin]:
return [p for p in self._plugins.values() if p.config.enabled]
def enable(self, name: str) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.config.enabled = True
return True
return False
def disable(self, name: str) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.config.enabled = False
return True
return False
def configure(self, name: str, config: EffectConfig) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.configure(config)
return True
return False
def is_enabled(self, name: str) -> bool:
plugin = self._plugins.get(name)
return plugin.config.enabled if plugin else False
_registry: EffectRegistry | None = None
def get_registry() -> EffectRegistry:
global _registry
if _registry is None:
_registry = EffectRegistry()
return _registry
def set_registry(registry: EffectRegistry) -> None:
global _registry
_registry = registry

39
engine/effects/types.py Normal file
View File

@@ -0,0 +1,39 @@
from dataclasses import dataclass, field
from typing import Any
@dataclass
class EffectContext:
terminal_width: int
terminal_height: int
scroll_cam: int
ticker_height: int
mic_excess: float
grad_offset: float
frame_number: int
has_message: bool
items: list = field(default_factory=list)
@dataclass
class EffectConfig:
enabled: bool = True
intensity: float = 1.0
params: dict[str, Any] = field(default_factory=dict)
class EffectPlugin:
name: str
config: EffectConfig
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
raise NotImplementedError
def configure(self, config: EffectConfig) -> None:
raise NotImplementedError
@dataclass
class PipelineConfig:
order: list[str] = field(default_factory=list)
effects: dict[str, EffectConfig] = field(default_factory=dict)

View File

@@ -10,6 +10,8 @@ from datetime import datetime
from engine import config from engine import config
from engine.effects import ( from engine.effects import (
EffectChain,
EffectContext,
fade_line, fade_line,
firehose_line, firehose_line,
glitch_bar, glitch_bar,
@@ -199,3 +201,60 @@ def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
fline = firehose_line(items, w) fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K") buf.append(f"\033[{scr_row};1H{fline}\033[K")
return buf return buf
_effect_chain = None
def init_effects() -> None:
"""Initialize effect plugins and chain."""
global _effect_chain
from engine.effects import EffectChain, get_registry
registry = get_registry()
import effects_plugins
effects_plugins.discover_plugins()
chain = EffectChain(registry)
chain.set_order(["noise", "fade", "glitch", "firehose"])
_effect_chain = chain
def process_effects(
buf: list[str],
w: int,
h: int,
scroll_cam: int,
ticker_h: int,
mic_excess: float,
grad_offset: float,
frame_number: int,
has_message: bool,
items: list,
) -> list[str]:
"""Process buffer through effect chain."""
if _effect_chain is None:
init_effects()
ctx = EffectContext(
terminal_width=w,
terminal_height=h,
scroll_cam=scroll_cam,
ticker_height=ticker_h,
mic_excess=mic_excess,
grad_offset=grad_offset,
frame_number=frame_number,
has_message=has_message,
items=items,
)
return _effect_chain.process(buf, ctx)
def get_effect_chain() -> EffectChain | None:
"""Get the effect chain instance."""
global _effect_chain
if _effect_chain is None:
init_effects()
return _effect_chain

View File

@@ -4,33 +4,42 @@ Orchestrates viewport, frame timing, and layers.
""" """
import random import random
import sys
import time import time
from engine import config from engine import config
from engine.display import (
Display,
TerminalDisplay,
)
from engine.display import (
get_monitor as _get_display_monitor,
)
from engine.frame import calculate_scroll_step from engine.frame import calculate_scroll_step
from engine.layers import ( from engine.layers import (
apply_glitch, apply_glitch,
process_effects,
render_firehose, render_firehose,
render_message_overlay, render_message_overlay,
render_ticker_zone, render_ticker_zone,
) )
from engine.terminal import CLR
from engine.viewport import th, tw from engine.viewport import th, tw
USE_EFFECT_CHAIN = True
def stream(items, ntfy_poller, mic_monitor):
def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
"""Main render loop with four layers: message, ticker, scroll motion, firehose.""" """Main render loop with four layers: message, ticker, scroll motion, firehose."""
if display is None:
display = TerminalDisplay()
random.shuffle(items) random.shuffle(items)
pool = list(items) pool = list(items)
seen = set() seen = set()
queued = 0 queued = 0
time.sleep(0.5) time.sleep(0.5)
sys.stdout.write(CLR)
sys.stdout.flush()
w, h = tw(), th() w, h = tw(), th()
display.init(w, h)
display.clear()
fh = config.FIREHOSE_H if config.FIREHOSE else 0 fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh ticker_view_h = h - fh
GAP = 3 GAP = 3
@@ -42,6 +51,7 @@ def stream(items, ntfy_poller, mic_monitor):
noise_cache = {} noise_cache = {}
scroll_motion_accum = 0.0 scroll_motion_accum = 0.0
msg_cache = (None, None) msg_cache = (None, None)
frame_number = 0
while True: while True:
if queued >= config.HEADLINE_LIMIT and not active: if queued >= config.HEADLINE_LIMIT and not active:
@@ -93,19 +103,39 @@ def stream(items, ntfy_poller, mic_monitor):
buf.extend(ticker_buf) buf.extend(ticker_buf)
mic_excess = mic_monitor.excess mic_excess = mic_monitor.excess
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) render_start = time.perf_counter()
firehose_buf = render_firehose(items, w, fh, h) if USE_EFFECT_CHAIN:
buf.extend(firehose_buf) buf = process_effects(
buf,
w,
h,
scroll_cam,
ticker_h,
mic_excess,
grad_offset,
frame_number,
msg is not None,
items,
)
else:
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
firehose_buf = render_firehose(items, w, fh, h)
buf.extend(firehose_buf)
if msg_overlay: if msg_overlay:
buf.extend(msg_overlay) buf.extend(msg_overlay)
sys.stdout.buffer.write("".join(buf).encode()) render_elapsed = (time.perf_counter() - render_start) * 1000
sys.stdout.flush() monitor = _get_display_monitor()
if monitor:
chars = sum(len(line) for line in buf)
monitor.record_effect("render", render_elapsed, chars, chars)
display.show(buf)
elapsed = time.monotonic() - t0 elapsed = time.monotonic() - t0
time.sleep(max(0, config.FRAME_DT - elapsed)) time.sleep(max(0, config.FRAME_DT - elapsed))
frame_number += 1
sys.stdout.write(CLR) display.cleanup()
sys.stdout.flush()

3
hk.pkl
View File

@@ -22,6 +22,9 @@ hooks {
prefix = "uv run" prefix = "uv run"
check = "ruff check engine/ tests/" check = "ruff check engine/ tests/"
} }
["benchmark"] {
check = "uv run python -m engine.benchmark --hook --displays null --iterations 20"
}
} }
} }
} }

View File

@@ -5,45 +5,82 @@ pkl = "latest"
[tasks] [tasks]
# ===================== # =====================
# Development # Testing
# ===================== # =====================
test = "uv run pytest" test = "uv run pytest"
test-v = "uv run pytest -v" test-v = { run = "uv run pytest -v", depends = ["sync-all"] }
test-cov = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html" test-cov = { run = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html", depends = ["sync-all"] }
test-cov-open = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html && open htmlcov/index.html" test-cov-open = { run = "mise run test-cov && open htmlcov/index.html", depends = ["sync-all"] }
test-browser-install = { run = "uv run playwright install chromium", depends = ["sync-all"] }
test-browser = { run = "uv run pytest tests/e2e/", depends = ["test-browser-install"] }
# =====================
# Linting & Formatting
# =====================
lint = "uv run ruff check engine/ mainline.py" lint = "uv run ruff check engine/ mainline.py"
lint-fix = "uv run ruff check --fix engine/ mainline.py" lint-fix = "uv run ruff check --fix engine/ mainline.py"
format = "uv run ruff format engine/ mainline.py" format = "uv run ruff format engine/ mainline.py"
# ===================== # =====================
# Runtime # Runtime Modes
# ===================== # =====================
run = "uv run mainline.py" run = "uv run mainline.py"
run-poetry = "uv run mainline.py --poetry" run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose" run-firehose = "uv run mainline.py --firehose"
run-websocket = { run = "uv run mainline.py --display websocket", depends = ["sync-all"] }
run-sixel = { run = "uv run mainline.py --display sixel", depends = ["sync-all"] }
run-both = { run = "uv run mainline.py --display both", depends = ["sync-all"] }
run-client = { run = "mise run run-both & sleep 2 && $(open http://localhost:8766 2>/dev/null || xdg-open http://localhost:8766 2>/dev/null || echo 'Open http://localhost:8766 manually'); wait", depends = ["sync-all"] }
# =====================
# Command & Control
# =====================
cmd = "uv run cmdline.py"
cmd-stats = { run = "uv run cmdline.py -w \"/effects stats\"", depends = ["sync-all"] }
# =====================
# Benchmark
# =====================
benchmark = { run = "uv run python -m engine.benchmark", depends = ["sync-all"] }
benchmark-json = { run = "uv run python -m engine.benchmark --format json --output benchmark.json", depends = ["sync-all"] }
benchmark-report = { run = "uv run python -m engine.benchmark --output BENCHMARK.md", depends = ["sync-all"] }
# Initialize ntfy topics (warm up before first use)
topics-init = "curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_resp > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline > /dev/null"
# =====================
# Daemon
# =====================
daemon = "nohup uv run mainline.py > nohup.out 2>&1 &"
daemon-stop = "pkill -f 'uv run mainline.py' 2>/dev/null || true"
daemon-restart = "mise run daemon-stop && sleep 2 && mise run daemon"
# ===================== # =====================
# Environment # Environment
# ===================== # =====================
sync = "uv sync" sync = "uv sync"
sync-all = "uv sync --all-extras" sync-all = "uv sync --all-extras"
install = "uv sync" install = "mise run sync"
install-dev = "uv sync --group dev" install-dev = { run = "mise run sync-all && uv sync --group dev", depends = ["sync-all"] }
bootstrap = { run = "mise run sync-all && uv run mainline.py --help", depends = ["sync-all"] }
bootstrap = "uv sync && uv run mainline.py --help" clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out"
clobber = "git clean -fdx && rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out"
clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache"
# ===================== # =====================
# CI/CD # CI/CD
# ===================== # =====================
ci = "uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml" ci = { run = "mise run topics-init && mise run lint && mise run test-cov", depends = ["topics-init", "lint", "test-cov"] }
ci-lint = "uv run ruff check engine/ mainline.py"
# ===================== # =====================
# Git Hooks (via hk) # Git Hooks (via hk)

View File

@@ -30,6 +30,15 @@ mic = [
"sounddevice>=0.4.0", "sounddevice>=0.4.0",
"numpy>=1.24.0", "numpy>=1.24.0",
] ]
websocket = [
"websockets>=12.0",
]
sixel = [
"pysixel>=0.1.0",
]
browser = [
"playwright>=1.40.0",
]
dev = [ dev = [
"pytest>=8.0.0", "pytest>=8.0.0",
"pytest-cov>=4.1.0", "pytest-cov>=4.1.0",

View File

@@ -0,0 +1,133 @@
"""
End-to-end tests for web client with headless browser.
"""
import os
import socketserver
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
import pytest
CLIENT_DIR = Path(__file__).parent.parent.parent / "client"
class ThreadedHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
"""Threaded HTTP server for handling concurrent requests."""
daemon_threads = True
@pytest.fixture(scope="module")
def http_server():
"""Start a local HTTP server for the client."""
os.chdir(CLIENT_DIR)
handler = SimpleHTTPRequestHandler
server = ThreadedHTTPServer(("127.0.0.1", 0), handler)
port = server.server_address[1]
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
yield f"http://127.0.0.1:{port}"
server.shutdown()
class TestWebClient:
"""Tests for the web client using Playwright."""
@pytest.fixture(autouse=True)
def setup_browser(self):
"""Set up browser for tests."""
pytest.importorskip("playwright")
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
yield
self.page.close()
self.context.close()
self.browser.close()
self.playwright.stop()
def test_client_loads(self, http_server):
"""Web client loads without errors."""
response = self.page.goto(http_server)
assert response.status == 200, f"Page load failed with status {response.status}"
self.page.wait_for_load_state("domcontentloaded")
content = self.page.content()
assert "<canvas" in content, "Canvas element not found in page"
canvas = self.page.locator("#terminal")
assert canvas.count() > 0, "Canvas not found"
def test_status_shows_connecting(self, http_server):
"""Status shows connecting initially."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
status = self.page.locator("#status")
assert status.count() > 0, "Status element not found"
def test_canvas_has_dimensions(self, http_server):
"""Canvas has correct dimensions after load."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
canvas = self.page.locator("#terminal")
assert canvas.count() > 0, "Canvas not found"
def test_no_console_errors_on_load(self, http_server):
"""No JavaScript errors on page load (websocket errors are expected without server)."""
js_errors = []
def handle_console(msg):
if msg.type == "error":
text = msg.text
if "WebSocket" not in text:
js_errors.append(text)
self.page.on("console", handle_console)
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
assert len(js_errors) == 0, f"JavaScript errors: {js_errors}"
class TestWebClientProtocol:
"""Tests for WebSocket protocol handling in client."""
@pytest.fixture(autouse=True)
def setup_browser(self):
"""Set up browser for tests."""
pytest.importorskip("playwright")
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
yield
self.page.close()
self.context.close()
self.browser.close()
self.playwright.stop()
def test_websocket_reconnection(self, http_server):
"""Client attempts reconnection on disconnect."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
status = self.page.locator("#status")
assert status.count() > 0, "Status element not found"

55
tests/test_app.py Normal file
View File

@@ -0,0 +1,55 @@
"""
Tests for engine.app module.
"""
from engine.app import _normalize_preview_rows
class TestNormalizePreviewRows:
"""Tests for _normalize_preview_rows function."""
def test_empty_rows(self):
"""Empty input returns empty list."""
result = _normalize_preview_rows([])
assert result == [""]
def test_strips_left_padding(self):
"""Left padding is stripped."""
result = _normalize_preview_rows([" content", " more"])
assert all(not r.startswith(" ") for r in result)
def test_preserves_content(self):
"""Content is preserved."""
result = _normalize_preview_rows([" hello world "])
assert "hello world" in result[0]
def test_handles_all_empty_rows(self):
"""All empty rows returns single empty string."""
result = _normalize_preview_rows(["", " ", ""])
assert result == [""]
class TestAppConstants:
"""Tests for app module constants."""
def test_title_defined(self):
"""TITLE is defined."""
from engine.app import TITLE
assert len(TITLE) > 0
def test_title_lines_are_strings(self):
"""TITLE contains string lines."""
from engine.app import TITLE
assert all(isinstance(line, str) for line in TITLE)
class TestAppImports:
"""Tests for app module imports."""
def test_app_imports_without_error(self):
"""Module imports without error."""
from engine import app
assert app is not None

79
tests/test_display.py Normal file
View File

@@ -0,0 +1,79 @@
"""
Tests for engine.display module.
"""
from engine.display import NullDisplay, TerminalDisplay
class TestDisplayProtocol:
"""Test that display backends satisfy the Display protocol."""
def test_terminal_display_is_display(self):
"""TerminalDisplay satisfies Display protocol."""
display = TerminalDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
def test_null_display_is_display(self):
"""NullDisplay satisfies Display protocol."""
display = NullDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
class TestTerminalDisplay:
"""Tests for TerminalDisplay class."""
def test_init_sets_dimensions(self):
"""init stores terminal dimensions."""
display = TerminalDisplay()
display.init(80, 24)
assert display.width == 80
assert display.height == 24
def test_show_returns_none(self):
"""show returns None after writing to stdout."""
display = TerminalDisplay()
display.width = 80
display.height = 24
display.show(["line1", "line2"])
def test_clear_does_not_error(self):
"""clear works without error."""
display = TerminalDisplay()
display.clear()
def test_cleanup_does_not_error(self):
"""cleanup works without error."""
display = TerminalDisplay()
display.cleanup()
class TestNullDisplay:
"""Tests for NullDisplay class."""
def test_init_stores_dimensions(self):
"""init stores dimensions."""
display = NullDisplay()
display.init(100, 50)
assert display.width == 100
assert display.height == 50
def test_show_does_nothing(self):
"""show discards buffer without error."""
display = NullDisplay()
display.show(["line1", "line2", "line3"])
def test_clear_does_nothing(self):
"""clear does nothing."""
display = NullDisplay()
display.clear()
def test_cleanup_does_nothing(self):
"""cleanup does nothing."""
display = NullDisplay()
display.cleanup()

427
tests/test_effects.py Normal file
View File

@@ -0,0 +1,427 @@
"""
Tests for engine.effects module.
"""
from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry
class MockEffect:
name = "mock"
config = EffectConfig(enabled=True, intensity=1.0)
def __init__(self):
self.processed = False
self.last_ctx = None
def process(self, buf, ctx):
self.processed = True
self.last_ctx = ctx
return buf + ["processed"]
def configure(self, config):
self.config = config
class TestEffectConfig:
def test_defaults(self):
cfg = EffectConfig()
assert cfg.enabled is True
assert cfg.intensity == 1.0
assert cfg.params == {}
def test_custom_values(self):
cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"})
assert cfg.enabled is False
assert cfg.intensity == 0.5
assert cfg.params == {"key": "value"}
class TestEffectContext:
def test_defaults(self):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
assert ctx.terminal_width == 80
assert ctx.terminal_height == 24
assert ctx.ticker_height == 20
assert ctx.items == []
def test_with_items(self):
items = [("Title", "Source", "12:00")]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
items=items,
)
assert ctx.items == items
class TestEffectRegistry:
def test_init_empty(self):
registry = EffectRegistry()
assert len(registry.list_all()) == 0
def test_register(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
assert "mock" in registry.list_all()
def test_get(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
retrieved = registry.get("mock")
assert retrieved is effect
def test_get_nonexistent(self):
registry = EffectRegistry()
assert registry.get("nonexistent") is None
def test_enable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = False
registry.register(effect)
registry.enable("mock")
assert effect.config.enabled is True
def test_disable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
registry.disable("mock")
assert effect.config.enabled is False
def test_list_enabled(self):
registry = EffectRegistry()
class EnabledEffect:
name = "enabled_effect"
config = EffectConfig(enabled=True, intensity=1.0)
class DisabledEffect:
name = "disabled_effect"
config = EffectConfig(enabled=False, intensity=1.0)
registry.register(EnabledEffect())
registry.register(DisabledEffect())
enabled = registry.list_enabled()
assert len(enabled) == 1
assert enabled[0].name == "enabled_effect"
def test_configure(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
new_config = EffectConfig(enabled=False, intensity=0.3)
registry.configure("mock", new_config)
assert effect.config.enabled is False
assert effect.config.intensity == 0.3
def test_is_enabled(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
assert registry.is_enabled("mock") is True
assert registry.is_enabled("nonexistent") is False
class TestEffectChain:
def test_init(self):
registry = EffectRegistry()
chain = EffectChain(registry)
assert chain.get_order() == []
def test_set_order(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
registry.register(effect1)
registry.register(effect2)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2"])
assert chain.get_order() == ["effect1", "effect2"]
def test_add_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.add_effect("test_effect")
assert "test_effect" in chain.get_order()
def test_add_effect_invalid(self):
registry = EffectRegistry()
chain = EffectChain(registry)
result = chain.add_effect("nonexistent")
assert result is False
def test_remove_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
chain.remove_effect("test_effect")
assert "test_effect" not in chain.get_order()
def test_reorder(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
effect3 = MockEffect()
effect3.name = "effect3"
registry.register(effect1)
registry.register(effect2)
registry.register(effect3)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2", "effect3"])
result = chain.reorder(["effect3", "effect1", "effect2"])
assert result is True
assert chain.get_order() == ["effect3", "effect1", "effect2"]
def test_reorder_invalid(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "effect1"
registry.register(effect)
chain = EffectChain(registry)
result = chain.reorder(["effect1", "nonexistent"])
assert result is False
def test_process_empty_chain(self):
registry = EffectRegistry()
chain = EffectChain(registry)
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == buf
def test_process_with_effects(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1", "line2", "processed"]
assert effect.processed is True
assert effect.last_ctx is ctx
def test_process_disabled_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
effect.config.enabled = False
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1"]
assert effect.processed is False
class TestEffectsExports:
def test_all_exports_are_importable(self):
"""Verify all exports in __all__ can actually be imported."""
import engine.effects as effects_module
for name in effects_module.__all__:
getattr(effects_module, name)
class TestPerformanceMonitor:
def test_empty_stats(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
stats = monitor.get_stats()
assert "error" in stats
def test_record_and_retrieve(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test_effect", 1.5, 100, 150)
monitor.end_frame(1, 2.0)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["frame_count"] == 1
assert "test_effect" in stats["effects"]
def test_multiple_frames(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=3)
for i in range(5):
monitor.start_frame(i)
monitor.record_effect("effect1", 1.0, 100, 100)
monitor.record_effect("effect2", 0.5, 100, 100)
monitor.end_frame(i, 1.5)
stats = monitor.get_stats()
assert stats["frame_count"] == 3
assert "effect1" in stats["effects"]
assert "effect2" in stats["effects"]
def test_reset(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test", 1.0, 100, 100)
monitor.end_frame(1, 1.0)
monitor.reset()
stats = monitor.get_stats()
assert "error" in stats
class TestEffectPipelinePerformance:
def test_pipeline_stays_within_frame_budget(self):
"""Verify effect pipeline completes within frame budget (33ms for 30fps)."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class DummyEffect:
name = "dummy"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
return [line * 2 for line in buf]
registry = EffectRegistry()
registry.register(DummyEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=10)
chain = EffectChain(registry, monitor)
chain.set_order(["dummy"])
buf = ["x" * 80] * 20
for i in range(10):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["pipeline"]["max_ms"] < 33.0
def test_individual_effects_performance(self):
"""Verify individual effects don't exceed 10ms per frame."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class SlowEffect:
name = "slow"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
result = []
for line in buf:
result.append(line)
result.append(line + line)
return result
registry = EffectRegistry()
registry.register(SlowEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=5)
chain = EffectChain(registry, monitor)
chain.set_order(["slow"])
buf = ["x" * 80] * 10
for i in range(5):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["effects"]["slow"]["max_ms"] < 10.0

View File

@@ -0,0 +1,117 @@
"""
Tests for engine.effects.controller module.
"""
from unittest.mock import MagicMock, patch
from engine.effects.controller import (
handle_effects_command,
set_effect_chain_ref,
)
class TestHandleEffectsCommand:
"""Tests for handle_effects_command function."""
def test_list_effects(self):
"""list command returns formatted effects list."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_plugin.config.enabled = True
mock_plugin.config.intensity = 0.5
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
mock_chain.return_value.get_order.return_value = ["noise"]
result = handle_effects_command("/effects list")
assert "noise: ON" in result
assert "intensity=0.5" in result
def test_enable_effect(self):
"""enable command calls registry.enable."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise on")
assert "Enabled: noise" in result
mock_registry.return_value.enable.assert_called_once_with("noise")
def test_disable_effect(self):
"""disable command calls registry.disable."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise off")
assert "Disabled: noise" in result
mock_registry.return_value.disable.assert_called_once_with("noise")
def test_set_intensity(self):
"""intensity command sets plugin intensity."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_plugin.config.intensity = 0.5
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise intensity 0.8")
assert "intensity to 0.8" in result
assert mock_plugin.config.intensity == 0.8
def test_invalid_intensity_range(self):
"""intensity outside 0.0-1.0 returns error."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise intensity 1.5")
assert "between 0.0 and 1.0" in result
def test_reorder_pipeline(self):
"""reorder command calls chain.reorder."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_registry.return_value.list_all.return_value = {}
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
mock_chain_instance = MagicMock()
mock_chain_instance.reorder.return_value = True
mock_chain.return_value = mock_chain_instance
result = handle_effects_command("/effects reorder noise,fade")
assert "Reordered pipeline" in result
mock_chain_instance.reorder.assert_called_once_with(["noise", "fade"])
def test_unknown_command(self):
"""unknown command returns error."""
result = handle_effects_command("/unknown")
assert "Unknown command" in result
def test_non_effects_command(self):
"""non-effects command returns error."""
result = handle_effects_command("not a command")
assert "Unknown command" in result
class TestSetEffectChainRef:
"""Tests for set_effect_chain_ref function."""
def test_sets_global_ref(self):
"""set_effect_chain_ref updates global reference."""
mock_chain = MagicMock()
set_effect_chain_ref(mock_chain)
from engine.effects.controller import _get_effect_chain
result = _get_effect_chain()
assert result == mock_chain

234
tests/test_fetch.py Normal file
View File

@@ -0,0 +1,234 @@
"""
Tests for engine.fetch module.
"""
import json
from unittest.mock import MagicMock, patch
from engine.fetch import (
_fetch_gutenberg,
fetch_all,
fetch_feed,
fetch_poetry,
load_cache,
save_cache,
)
class TestFetchFeed:
"""Tests for fetch_feed function."""
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_success(self, mock_urlopen):
"""Successful feed fetch returns parsed feed."""
mock_response = MagicMock()
mock_response.read.return_value = b"<rss>test</rss>"
mock_urlopen.return_value = mock_response
result = fetch_feed("http://example.com/feed")
assert result is not None
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_network_error(self, mock_urlopen):
"""Network error returns None."""
mock_urlopen.side_effect = Exception("Network error")
result = fetch_feed("http://example.com/feed")
assert result is None
class TestFetchAll:
"""Tests for fetch_all function."""
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.strip_tags")
@patch("engine.fetch.skip")
@patch("engine.fetch.boot_ln")
def test_fetch_all_success(self, mock_boot, mock_skip, mock_strip, mock_fetch_feed):
"""Successful fetch returns items."""
mock_feed = MagicMock()
mock_feed.bozo = False
mock_feed.entries = [
{"title": "Headline 1", "published_parsed": (2024, 1, 1, 12, 0, 0)},
{"title": "Headline 2", "updated_parsed": (2024, 1, 2, 12, 0, 0)},
]
mock_fetch_feed.return_value = mock_feed
mock_skip.return_value = False
mock_strip.side_effect = lambda x: x
items, linked, failed = fetch_all()
assert linked > 0
assert failed == 0
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.boot_ln")
def test_fetch_all_feed_error(self, mock_boot, mock_fetch_feed):
"""Feed error increments failed count."""
mock_fetch_feed.return_value = None
items, linked, failed = fetch_all()
assert failed > 0
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.strip_tags")
@patch("engine.fetch.skip")
@patch("engine.fetch.boot_ln")
def test_fetch_all_skips_filtered(
self, mock_boot, mock_skip, mock_strip, mock_fetch_feed
):
"""Filtered headlines are skipped."""
mock_feed = MagicMock()
mock_feed.bozo = False
mock_feed.entries = [
{"title": "Sports scores"},
{"title": "Valid headline"},
]
mock_fetch_feed.return_value = mock_feed
mock_skip.side_effect = lambda x: x == "Sports scores"
mock_strip.side_effect = lambda x: x
items, linked, failed = fetch_all()
assert any("Valid headline" in item[0] for item in items)
class TestFetchGutenberg:
"""Tests for _fetch_gutenberg function."""
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_success(self, mock_urlopen):
"""Successful gutenberg fetch returns items."""
text = """Project Gutenberg
*** START OF THE PROJECT GUTENBERG ***
This is a test poem with multiple lines
that should be parsed as a block.
Another stanza with more content here.
*** END OF THE PROJECT GUTENBERG ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert len(result) > 0
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_network_error(self, mock_urlopen):
"""Network error returns empty list."""
mock_urlopen.side_effect = Exception("Network error")
result = _fetch_gutenberg("http://example.com/test", "Test")
assert result == []
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_skips_short_blocks(self, mock_urlopen):
"""Blocks shorter than 20 chars are skipped."""
text = """*** START OF THE ***
Short
*** END OF THE ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert result == []
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_skips_all_caps_headers(self, mock_urlopen):
"""All-caps lines are skipped as headers."""
text = """*** START OF THE ***
THIS IS ALL CAPS HEADER
more content here
*** END OF THE ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert len(result) > 0
class TestFetchPoetry:
"""Tests for fetch_poetry function."""
@patch("engine.fetch._fetch_gutenberg")
@patch("engine.fetch.boot_ln")
def test_fetch_poetry_success(self, mock_boot, mock_fetch):
"""Successful poetry fetch returns items."""
mock_fetch.return_value = [
("Stanza 1 content here", "Test", ""),
("Stanza 2 content here", "Test", ""),
]
items, linked, failed = fetch_poetry()
assert linked > 0
assert failed == 0
@patch("engine.fetch._fetch_gutenberg")
@patch("engine.fetch.boot_ln")
def test_fetch_poetry_failure(self, mock_boot, mock_fetch):
"""Failed fetch increments failed count."""
mock_fetch.return_value = []
items, linked, failed = fetch_poetry()
assert failed > 0
class TestCache:
"""Tests for cache functions."""
@patch("engine.fetch._cache_path")
def test_load_cache_success(self, mock_path):
"""Successful cache load returns items."""
mock_path.return_value.__str__ = MagicMock(return_value="/tmp/cache")
mock_path.return_value.exists.return_value = True
mock_path.return_value.read_text.return_value = json.dumps(
{"items": [("title", "source", "time")]}
)
result = load_cache()
assert result is not None
@patch("engine.fetch._cache_path")
def test_load_cache_missing_file(self, mock_path):
"""Missing cache file returns None."""
mock_path.return_value.exists.return_value = False
result = load_cache()
assert result is None
@patch("engine.fetch._cache_path")
def test_load_cache_invalid_json(self, mock_path):
"""Invalid JSON returns None."""
mock_path.return_value.exists.return_value = True
mock_path.return_value.read_text.side_effect = json.JSONDecodeError("", "", 0)
result = load_cache()
assert result is None
@patch("engine.fetch._cache_path")
def test_save_cache_success(self, mock_path):
"""Save cache writes to file."""
mock_path.return_value.__truediv__ = MagicMock(
return_value=mock_path.return_value
)
save_cache([("title", "source", "time")])

View File

@@ -0,0 +1,127 @@
"""
Integration tests for ntfy topics.
"""
import json
import time
import urllib.request
class TestNtfyTopics:
def test_cc_cmd_topic_exists_and_writable(self):
"""Verify C&C CMD topic exists and accepts messages."""
from engine.config import NTFY_CC_CMD_TOPIC
topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e
def test_cc_resp_topic_exists_and_writable(self):
"""Verify C&C RESP topic exists and accepts messages."""
from engine.config import NTFY_CC_RESP_TOPIC
topic_url = NTFY_CC_RESP_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to C&C RESP topic: {e}") from e
def test_message_topic_exists_and_writable(self):
"""Verify message topic exists and accepts messages."""
from engine.config import NTFY_TOPIC
topic_url = NTFY_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to message topic: {e}") from e
def test_cc_cmd_topic_readable(self):
"""Verify we can read messages from C&C CMD topic."""
from engine.config import NTFY_CC_CMD_TOPIC
test_message = f"integration_test_{int(time.time())}"
topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "")
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e
time.sleep(1)
poll_url = f"{NTFY_CC_CMD_TOPIC}?poll=1&limit=1"
req = urllib.request.Request(
poll_url,
headers={"User-Agent": "mainline-test/0.1"},
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
body = resp.read().decode("utf-8")
if body.strip():
data = json.loads(body.split("\n")[0])
assert isinstance(data, dict)
except Exception as e:
raise AssertionError(f"Failed to read from C&C CMD topic: {e}") from e
def test_topics_are_different(self):
"""Verify C&C CMD/RESP and message topics are different."""
from engine.config import NTFY_CC_CMD_TOPIC, NTFY_CC_RESP_TOPIC, NTFY_TOPIC
assert NTFY_CC_CMD_TOPIC != NTFY_TOPIC
assert NTFY_CC_RESP_TOPIC != NTFY_TOPIC
assert NTFY_CC_CMD_TOPIC != NTFY_CC_RESP_TOPIC
assert "_cc_cmd" in NTFY_CC_CMD_TOPIC
assert "_cc_resp" in NTFY_CC_RESP_TOPIC

232
tests/test_render.py Normal file
View File

@@ -0,0 +1,232 @@
"""
Tests for engine.render module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.render import (
GRAD_COLS,
MSG_GRAD_COLS,
clear_font_cache,
font_for_lang,
lr_gradient,
lr_gradient_opposite,
make_block,
)
class TestGradientConstants:
"""Tests for gradient color constants."""
def test_grad_cols_defined(self):
"""GRAD_COLS is defined with expected length."""
assert len(GRAD_COLS) > 0
assert all(isinstance(c, str) for c in GRAD_COLS)
def test_msg_grad_cols_defined(self):
"""MSG_GRAD_COLS is defined with expected length."""
assert len(MSG_GRAD_COLS) > 0
assert all(isinstance(c, str) for c in MSG_GRAD_COLS)
def test_grad_cols_start_with_white(self):
"""GRAD_COLS starts with white."""
assert "231" in GRAD_COLS[0]
def test_msg_grad_cols_different_from_grad_cols(self):
"""MSG_GRAD_COLS is different from GRAD_COLS."""
assert MSG_GRAD_COLS != GRAD_COLS
class TestLrGradient:
"""Tests for lr_gradient function."""
def test_empty_rows(self):
"""Empty input returns empty output."""
result = lr_gradient([], 0.0)
assert result == []
def test_preserves_empty_rows(self):
"""Empty rows are preserved."""
result = lr_gradient([""], 0.0)
assert result == [""]
def test_adds_gradient_to_content(self):
"""Non-empty rows get gradient coloring."""
result = lr_gradient(["hello"], 0.0)
assert len(result) == 1
assert "\033[" in result[0]
def test_preserves_spaces(self):
"""Spaces are preserved without coloring."""
result = lr_gradient(["hello world"], 0.0)
assert " " in result[0]
def test_offset_wraps_around(self):
"""Offset wraps around at 1.0."""
result1 = lr_gradient(["hello"], 0.0)
result2 = lr_gradient(["hello"], 1.0)
assert result1 != result2 or result1 == result2
class TestLrGradientOpposite:
"""Tests for lr_gradient_opposite function."""
def test_uses_msg_grad_cols(self):
"""Uses MSG_GRAD_COLS instead of GRAD_COLS."""
result = lr_gradient_opposite(["test"])
assert "\033[" in result[0]
class TestClearFontCache:
"""Tests for clear_font_cache function."""
def test_clears_without_error(self):
"""Function runs without error."""
clear_font_cache()
class TestFontForLang:
"""Tests for font_for_lang function."""
@patch("engine.render.font")
def test_returns_default_for_none(self, mock_font):
"""Returns default font when lang is None."""
result = font_for_lang(None)
assert result is not None
@patch("engine.render.font")
def test_returns_default_for_unknown_lang(self, mock_font):
"""Returns default font for unknown language."""
result = font_for_lang("unknown_lang")
assert result is not None
class TestMakeBlock:
"""Tests for make_block function."""
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_basic(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Basic make_block returns content, color, meta index."""
mock_wrap.return_value = ["Headline content", ""]
mock_random.choice.return_value = "\033[38;5;46m"
content, color, meta_idx = make_block(
"Test headline", "TestSource", "12:00", 80
)
assert len(content) > 0
assert color is not None
assert meta_idx >= 0
@pytest.mark.skip(reason="Requires full PIL/font environment")
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_translation(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Translation is applied when mode is news."""
mock_wrap.return_value = ["Translated"]
mock_random.choice.return_value = "\033[38;5;46m"
mock_detect.return_value = "de"
with patch("engine.config.MODE", "news"):
content, _, _ = make_block("Test", "Source", "12:00", 80)
mock_translate.assert_called_once()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_no_translation_poetry(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""No translation when mode is poetry."""
mock_wrap.return_value = ["Poem content"]
mock_random.choice.return_value = "\033[38;5;46m"
with patch("engine.config.MODE", "poetry"):
make_block("Test", "Source", "12:00", 80)
mock_translate.assert_not_called()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_meta_format(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Meta line includes source and timestamp."""
mock_wrap.return_value = ["Content"]
mock_random.choice.return_value = "\033[38;5;46m"
content, _, meta_idx = make_block("Test", "MySource", "14:30", 80)
meta_line = content[meta_idx]
assert "MySource" in meta_line
assert "14:30" in meta_line
class TestRenderLine:
"""Tests for render_line function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.render import render_line
result = render_line("")
assert result == [""]
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_uses_default_font(self):
"""Uses default font when none provided."""
from engine.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
render_line("test")
def test_getbbox_returns_none(self):
"""Handles None bbox gracefully."""
from engine.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = None
result = render_line("test")
assert result == [""]
class TestBigWrap:
"""Tests for big_wrap function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.render import big_wrap
result = big_wrap("", 80)
assert result == []
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_single_word_fits(self):
"""Single short word returns rendered."""
from engine.render import big_wrap
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
result = big_wrap("test", 80)
assert len(result) > 0

115
tests/test_translate.py Normal file
View File

@@ -0,0 +1,115 @@
"""
Tests for engine.translate module.
"""
import json
from unittest.mock import MagicMock, patch
from engine.translate import (
_translate_cached,
detect_location_language,
translate_headline,
)
def clear_translate_cache():
"""Clear the LRU cache between tests."""
_translate_cached.cache_clear()
class TestDetectLocationLanguage:
"""Tests for detect_location_language function."""
def test_returns_none_for_unknown_location(self):
"""Returns None when no location pattern matches."""
result = detect_location_language("Breaking news about technology")
assert result is None
def test_detects_berlin(self):
"""Detects Berlin location."""
result = detect_location_language("Berlin police arrest protesters")
assert result == "de"
def test_detects_paris(self):
"""Detects Paris location."""
result = detect_location_language("Paris fashion week begins")
assert result == "fr"
def test_detects_tokyo(self):
"""Detects Tokyo location."""
result = detect_location_language("Tokyo stocks rise")
assert result == "ja"
def test_detects_berlin_again(self):
"""Detects Berlin location again."""
result = detect_location_language("Berlin marathon set to begin")
assert result == "de"
def test_case_insensitive(self):
"""Detection is case insensitive."""
result = detect_location_language("BERLIN SUMMER FESTIVAL")
assert result == "de"
def test_returns_first_match(self):
"""Returns first matching pattern."""
result = detect_location_language("Berlin in Paris for the event")
assert result == "de"
class TestTranslateHeadline:
"""Tests for translate_headline function."""
def test_returns_translated_text(self):
"""Returns translated text from cache."""
clear_translate_cache()
with patch("engine.translate.translate_headline") as mock_fn:
mock_fn.return_value = "Translated title"
from engine.translate import translate_headline as th
result = th("Original title", "de")
assert result == "Translated title"
def test_uses_cached_result(self):
"""Translation uses LRU cache."""
clear_translate_cache()
result1 = translate_headline("Test unique", "es")
result2 = translate_headline("Test unique", "es")
assert result1 == result2
class TestTranslateCached:
"""Tests for _translate_cached function."""
def test_translation_network_error(self):
"""Network error returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_urlopen.side_effect = Exception("Network error")
result = _translate_cached("Hello world", "de")
assert result == "Hello world"
def test_translation_invalid_json(self):
"""Invalid JSON returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_response = MagicMock()
mock_response.read.return_value = b"invalid json"
mock_urlopen.return_value = mock_response
result = _translate_cached("Hello", "de")
assert result == "Hello"
def test_translation_empty_response(self):
"""Empty translation response returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_response = MagicMock()
mock_response.read.return_value = json.dumps([[[""], None, "de"], None])
mock_urlopen.return_value = mock_response
result = _translate_cached("Hello", "de")
assert result == "Hello"

161
tests/test_websocket.py Normal file
View File

@@ -0,0 +1,161 @@
"""
Tests for engine.display.backends.websocket module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.display.backends.websocket import WebSocketDisplay
class TestWebSocketDisplayImport:
"""Test that websocket module can be imported."""
def test_import_does_not_error(self):
"""Module imports without error."""
from engine.display import backends
assert backends is not None
class TestWebSocketDisplayInit:
"""Tests for WebSocketDisplay initialization."""
def test_default_init(self):
"""Default initialization sets correct defaults."""
with patch("engine.display.backends.websocket.websockets", None):
display = WebSocketDisplay()
assert display.host == "0.0.0.0"
assert display.port == 8765
assert display.http_port == 8766
assert display.width == 80
assert display.height == 24
def test_custom_init(self):
"""Custom initialization uses provided values."""
with patch("engine.display.backends.websocket.websockets", None):
display = WebSocketDisplay(host="localhost", port=9000, http_port=9001)
assert display.host == "localhost"
assert display.port == 9000
assert display.http_port == 9001
def test_is_available_when_websockets_present(self):
"""is_available returns True when websockets is available."""
pytest.importorskip("websockets")
display = WebSocketDisplay()
assert display.is_available() is True
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_is_available_when_websockets_missing(self):
"""is_available returns False when websockets is not available."""
display = WebSocketDisplay()
assert display.is_available() is False
class TestWebSocketDisplayProtocol:
"""Test that WebSocketDisplay satisfies Display protocol."""
def test_websocket_display_is_display(self):
"""WebSocketDisplay satisfies Display protocol."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
class TestWebSocketDisplayMethods:
"""Tests for WebSocketDisplay methods."""
def test_init_stores_dimensions(self):
"""init stores terminal dimensions."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
display.init(100, 40)
assert display.width == 100
assert display.height == 40
def test_client_count_initially_zero(self):
"""client_count returns 0 when no clients connected."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.client_count() == 0
def test_get_ws_port(self):
"""get_ws_port returns configured port."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay(port=9000)
assert display.get_ws_port() == 9000
def test_get_http_port(self):
"""get_http_port returns configured port."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay(http_port=9001)
assert display.get_http_port() == 9001
def test_frame_delay_defaults_to_zero(self):
"""get_frame_delay returns 0 by default."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.get_frame_delay() == 0.0
def test_set_frame_delay(self):
"""set_frame_delay stores the value."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
display.set_frame_delay(0.05)
assert display.get_frame_delay() == 0.05
class TestWebSocketDisplayCallbacks:
"""Tests for WebSocketDisplay callback methods."""
def test_set_client_connected_callback(self):
"""set_client_connected_callback stores callback."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_connected_callback(callback)
assert display._client_connected_callback is callback
def test_set_client_disconnected_callback(self):
"""set_client_disconnected_callback stores callback."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_disconnected_callback(callback)
assert display._client_disconnected_callback is callback
class TestWebSocketDisplayUnavailable:
"""Tests when WebSocket support is unavailable."""
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_start_server_noop_when_unavailable(self):
"""start_server does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.start_server()
assert display._server_thread is None
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_start_http_server_noop_when_unavailable(self):
"""start_http_server does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.start_http_server()
assert display._http_thread is None
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_show_noops_when_unavailable(self):
"""show does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.show(["line1", "line2"])