34 Commits

Author SHA1 Message Date
ab3e1766b1 feat(benchmark): add hook mode with baseline cache for pre-push checks
- Fix lint errors and LSP issues in benchmark.py
- Add --hook mode to compare against saved baseline
- Add --baseline flag to save results as baseline
- Add --threshold to configure degradation threshold (default 20%)
- Add benchmark step to pre-push hook in hk.pkl
- Update AGENTS.md with hk documentation links and benchmark runner docs
2026-03-15 22:41:13 -07:00
829c4ab63d refactor: modularize display backends and add benchmark runner
- Create engine/display/ package with registry pattern
- Move displays to engine/display/backends/ (terminal, null, websocket, sixel)
- Add DisplayRegistry with auto-discovery
- Add benchmark.py for performance testing effects × displays matrix
- Add mise tasks: benchmark, benchmark-json, benchmark-report
- Update controller to use new display module
2026-03-15 22:25:28 -07:00
22dd063baa feat: add SixelDisplay backend for terminal graphics
- Implement pure Python Sixel encoder (no C dependency)
- Add SixelDisplay class to display.py with ANSI parsing
- Update controller._get_display() to handle sixel mode
- Add --display sixel CLI flag
- Add mise run-sixel task
- Update docs with display modes
2026-03-15 22:13:44 -07:00
0f7203e4e0 feat: enable C&C, compact mise tasks, update docs
- Cherry-pick C&C support (ntfy poller for commands, response handling)
- Compact mise.toml with native dependency chaining
- Update AGENTS.md with C&C documentation
- Update README.md with display modes and C&C usage
2026-03-15 21:55:26 -07:00
ba050ada24 feat(cmdline): C&C with separate topics and rich output 2026-03-15 21:47:53 -07:00
d7b044ceae feat(display): add configurable multi-backend display system 2026-03-15 21:17:16 -07:00
ac1306373d feat(websocket): add WebSocket display backend for browser client 2026-03-15 20:54:03 -07:00
2650f7245e merge: effects_plugins 2026-03-15 19:20:53 -07:00
b1f2b9d2be feat(daemon): add display abstraction and daemon mode with C&C 2026-03-15 19:20:47 -07:00
c08a7d3cb0 feat(cmdline): add command-line interface for mainline control 2026-03-15 19:20:47 -07:00
d5a3edba97 feat(effects): add plugin architecture with performance monitoring 2026-03-15 19:20:47 -07:00
fb35458718 merge: testability_modularization 2026-03-15 19:20:43 -07:00
15de46722a refactor: phase 4 - event-driven architecture foundation
- Add EventBus class with pub/sub messaging (thread-safe)
- Add emitter Protocol classes (EventEmitter, Startable, Stoppable)
- Add event emission to NtfyPoller (NtfyMessageEvent)
- Add event emission to MicMonitor (MicLevelEvent)
- Update StreamController to publish stream start/end events
- Add comprehensive tests for eventbus and emitters modules
2026-03-15 19:15:08 -07:00
35e5c8d38b refactor: phase 3 - API efficiency improvements
Add typed dataclasses for tuple returns:
- types.py: HeadlineItem, FetchResult, Block dataclasses with legacy tuple converters
- fetch.py: Add type hints and HeadlineTuple type alias

Add pyright for static type checking:
- Add pyright to dependencies
- Verify type coverage with pyright (0 errors in core modules)

This enables:
- Named types instead of raw tuples (better IDE support, self-documenting)
- Type-safe APIs across modules
- Backward compatibility via to_tuple/from_tuple methods

Note: Lazy imports skipped for render.py - startup impact is minimal.
2026-03-15 19:13:32 -07:00
cdc8094de2 refactor: phase 2 - modularization of scroll engine
Split monolithic scroll.py into focused modules:
- viewport.py: terminal size (tw/th), ANSI positioning helpers
- frame.py: FrameTimer class, scroll step calculation
- layers.py: message overlay, ticker zone, firehose rendering
- scroll.py: simplified orchestrator, imports from new modules

Add stream controller and event types for future event-driven architecture:
- controller.py: StreamController for source initialization and stream lifecycle
- events.py: EventType enum and event dataclasses (HeadlineEvent, FrameTickEvent, etc.)

Added tests for new modules:
- test_viewport.py: 8 tests for viewport utilities
- test_frame.py: 10 tests for frame timing
- test_layers.py: 13 tests for layer compositing
- test_events.py: 11 tests for event types
- test_controller.py: 6 tests for stream controller

This enables:
- Testable chunks with clear responsibilities
- Reusable viewport utilities across modules
- Better separation of concerns in render pipeline
- Foundation for future event-driven architecture

Also includes Phase 1 documentation updates in code comments.
2026-03-15 19:13:32 -07:00
f170143939 refactor: phase 1 - testability improvements
- Add Config dataclass with get_config()/set_config() for injection
- Add Config.from_args() for CLI argument parsing (testable)
- Add platform font path detection (Darwin/Linux)
- Bound translate cache with @lru_cache(maxsize=500)
- Add fixtures for external dependencies (network, feeds, config)
- Add 15 tests for Config class, from_args, and platform detection

This enables testability by:
- Allowing config injection instead of global mutable state
- Supporting custom argv in from_args() for testing
- Providing reusable fixtures for mocking network/filesystem
- Preventing unbounded memory growth in translation cache

Fixes: _arg_value/_arg_int not accepting custom argv
2026-03-15 19:13:32 -07:00
19fb4bc4fe Merge pull request 'docs/update-readme' (#23) from docs/update-readme into main
Reviewed-on: #23
2026-03-16 00:09:10 +00:00
ae10fd78ca refactor: Restructure README, add uv and mise commands, and detail component extension and development workflows. 2026-03-15 17:08:32 -07:00
4afab642f7 docs: add README update design spec
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-03-15 16:56:58 -07:00
f6f177590b Merge pull request 'Modernize project with uv, pytest, ruff, and git hooks' (#21) from enhance_portability into main
Reviewed-on: #21
2026-03-15 23:21:35 +00:00
9ae4dc2b07 fix: update ntfy tests for SSE API (reconnect_delay) 2026-03-15 15:16:37 -07:00
1ac2dec3b0 fix: use native hk staging in pre-commit hook
fix: add explicit check command to pre-push hook
2026-03-15 15:16:37 -07:00
757c854584 fix: apply ruff auto-fixes and add hk git hooks
- Fix pre-existing lint errors in engine/ modules using ruff --unsafe-fixes
- Add hk.pkl with pre-commit and pre-push hooks using ruff builtin
- Configure hooks to use 'uv run' prefix for tool execution
- Update mise.toml to include hk and pkl tools
- All 73 tests pass

fix: apply ruff auto-fixes and add hk git hooks

- Fix pre-existing lint errors in engine/ modules using ruff --unsafe-fixes
- Add hk.pkl with pre-commit and pre-push hooks using ruff builtin
- Configure hooks to use 'uv run' prefix for tool execution
- Update mise.toml to include hk and pkl tools
- Use 'hk install --mise' for proper mise integration
- All 73 tests pass
2026-03-15 15:16:37 -07:00
4844a64203 style: apply ruff auto-fixes across codebase
- Fix import sorting (isort) across all engine modules
- Fix SIM105 try-except-pass patterns (contextlib.suppress)
- Fix nested with statements in tests
- Fix unused loop variables

Run 'uv run pytest' to verify tests still pass.
2026-03-15 15:16:37 -07:00
9201117096 feat: modernize project with uv, add pytest test suite
- Add pyproject.toml with modern Python packaging (PEP 517/518)
- Add uv-based dependency management replacing inline venv bootstrap
- Add requirements.txt and requirements-dev.txt for compatibility
- Add mise.toml with dev tasks (test, lint, run, sync, ci)
- Add .python-version pinned to Python 3.12
- Add comprehensive pytest test suite (73 tests) for:
  - engine/config, filter, terminal, sources, mic, ntfy modules
- Configure pytest with coverage reporting (16% total, 100% on tested modules)
- Configure ruff for linting with Python 3.10+ target
- Remove redundant venv bootstrap code from mainline.py
- Update .gitignore for uv/venv artifacts

Run 'uv sync' to install dependencies, 'uv run pytest' to test.
2026-03-15 15:16:37 -07:00
d758541156 Merge pull request 'feat: migrate Ntfy message retrieval from polling to SSE streaming, replacing poll_interval with reconnect_delay for continuous updates.' (#20) from feat/ntfy-sse into main
Reviewed-on: #20
2026-03-15 20:50:08 +00:00
b979621dd4 Merge branch 'main' into feat/ntfy-sse 2026-03-15 20:50:02 +00:00
f91cc9844e Merge pull request 'feat: add new font files to the fonts directory' (#19) from feat/display into main
Reviewed-on: #19
2026-03-15 20:47:16 +00:00
bddbd69371 Merge branch 'main' into feat/display 2026-03-15 20:45:54 +00:00
6e39a2dad2 feat: migrate Ntfy message retrieval from polling to SSE streaming, replacing poll_interval with reconnect_delay for continuous updates. 2026-03-15 13:44:26 -07:00
1ba3848bed feat: add new font files to the fonts directory 2026-03-15 13:30:08 -07:00
a986df344a Merge pull request 'doc: Document new font selection command-line arguments, environment variables, and a dedicated font management section.' (#18) from docs/update-readme into main
Reviewed-on: #18
2026-03-15 11:08:25 +00:00
c84bd5c05a doc: Document new font selection command-line arguments, environment variables, and a dedicated font management section. 2026-03-15 04:07:24 -07:00
7b0f886e53 Merge pull request 'feat: add new font assets including CSBishopDrawn, CyberformDemo, and KATA.' (#17) from feat/font-picker into main
Reviewed-on: #17
2026-03-15 11:01:39 +00:00
83 changed files with 8505 additions and 491 deletions

8
.gitignore vendored
View File

@@ -1,4 +1,12 @@
__pycache__/
*.pyc
.mainline_venv/
.venv/
uv.lock
.mainline_cache_*.json
.DS_Store
htmlcov/
.coverage
.pytest_cache/
*.egg-info/
coverage.xml

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.12

194
AGENTS.md Normal file
View File

@@ -0,0 +1,194 @@
# Agent Development Guide
## Development Environment
This project uses:
- **mise** (mise.jdx.dev) - tool version manager and task runner
- **hk** (hk.jdx.dev) - git hook manager
- **uv** - fast Python package installer
- **ruff** - linter and formatter
- **pytest** - test runner
### Setup
```bash
# Install dependencies
mise run install
# Or equivalently:
uv sync --all-extras # includes mic support
```
### Available Commands
```bash
mise run test # Run tests
mise run test-v # Run tests verbose
mise run test-cov # Run tests with coverage report
mise run test-browser # Run e2e browser tests (requires playwright)
mise run lint # Run ruff linter
mise run lint-fix # Run ruff with auto-fix
mise run format # Run ruff formatter
mise run ci # Full CI pipeline (topics-init + lint + test-cov)
```
### Runtime Commands
```bash
mise run run # Run mainline (terminal)
mise run run-poetry # Run with poetry feed
mise run run-firehose # Run in firehose mode
mise run run-websocket # Run with WebSocket display only
mise run run-sixel # Run with Sixel graphics display
mise run run-both # Run with both terminal and WebSocket
mise run run-client # Run both + open browser
mise run cmd # Run C&C command interface
```
## Git Hooks
**At the start of every agent session**, verify hooks are installed:
```bash
ls -la .git/hooks/pre-commit
```
If hooks are not installed, install them with:
```bash
hk init --mise
mise run pre-commit
```
**IMPORTANT**: Always review the hk documentation before modifying `hk.pkl`:
- [hk Configuration Guide](https://hk.jdx.dev/configuration.html)
- [hk Hooks Reference](https://hk.jdx.dev/hooks.html)
- [hk Builtins](https://hk.jdx.dev/builtins.html)
The project uses hk configured in `hk.pkl`:
- **pre-commit**: runs ruff-format and ruff (with auto-fix)
- **pre-push**: runs ruff check + benchmark hook
## Benchmark Runner
Run performance benchmarks:
```bash
mise run benchmark # Run all benchmarks (text output)
mise run benchmark-json # Run benchmarks (JSON output)
mise run benchmark-report # Run benchmarks (Markdown report)
```
### Benchmark Commands
```bash
# Run benchmarks
uv run python -m engine.benchmark
# Run with specific displays/effects
uv run python -m engine.benchmark --displays null,terminal --effects fade,glitch
# Save baseline for hook comparisons
uv run python -m engine.benchmark --baseline
# Run in hook mode (compares against baseline)
uv run python -m engine.benchmark --hook
# Hook mode with custom threshold (default: 20% degradation)
uv run python -m engine.benchmark --hook --threshold 0.3
# Custom baseline location
uv run python -m engine.benchmark --hook --cache /path/to/cache.json
```
### Hook Mode
The `--hook` mode compares current benchmarks against a saved baseline. If performance degrades beyond the threshold (default 20%), it exits with code 1. This is useful for preventing performance regressions in feature branches.
The pre-push hook runs benchmark in hook mode to catch performance regressions before pushing.
## Workflow Rules
### Before Committing
1. **Always run the test suite** - never commit code that fails tests:
```bash
mise run test
```
2. **Always run the linter**:
```bash
mise run lint
```
3. **Fix any lint errors** before committing (or let the pre-commit hook handle it).
4. **Review your changes** using `git diff` to understand what will be committed.
### On Failing Tests
When tests fail, **determine whether it's an out-of-date test or a correctly failing test**:
- **Out-of-date test**: The test was written for old behavior that has legitimately changed. Update the test to match the new expected behavior.
- **Correctly failing test**: The test correctly identifies a broken contract. Fix the implementation, not the test.
**Never** modify a test to make it pass without understanding why it failed.
### Code Review
Before committing significant changes:
- Run `git diff` to review all changes
- Ensure new code follows existing patterns in the codebase
- Check that type hints are added for new functions
- Verify that tests exist for new functionality
## Testing
Tests live in `tests/` and follow the pattern `test_*.py`.
Run all tests:
```bash
mise run test
```
Run with coverage:
```bash
mise run test-cov
```
The project uses pytest with strict marker enforcement. Test configuration is in `pyproject.toml` under `[tool.pytest.ini_options]`.
## Architecture Notes
- **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies
- **eventbus.py** provides thread-safe event publishing for decoupled communication
- **controller.py** coordinates ntfy/mic monitoring and event publishing
- **effects/** - plugin architecture with performance monitoring
- The render pipeline: fetch → render → effects → scroll → terminal output
### Display System
- **Display abstraction** (`engine/display.py`): swap display backends via the Display protocol
- `TerminalDisplay` - ANSI terminal output
- `WebSocketDisplay` - broadcasts to web clients via WebSocket
- `SixelDisplay` - renders to Sixel graphics (pure Python, no C dependency)
- `MultiDisplay` - forwards to multiple displays simultaneously
- **WebSocket display** (`engine/websocket_display.py`): real-time frame broadcasting to web browsers
- WebSocket server on port 8765
- HTTP server on port 8766 (serves HTML client)
- Client at `client/index.html` with ANSI color parsing and fullscreen support
- **Display modes** (`--display` flag):
- `terminal` - Default ANSI terminal output
- `websocket` - Web browser display (requires websockets package)
- `sixel` - Sixel graphics in supported terminals (iTerm2, mintty, etc.)
- `both` - Terminal + WebSocket simultaneously
### Command & Control
- C&C uses separate ntfy topics for commands and responses
- `NTFY_CC_CMD_TOPIC` - commands from cmdline.py
- `NTFY_CC_RESP_TOPIC` - responses back to cmdline.py
- Effects controller handles `/effects` commands (list, on/off, intensity, reorder, stats)

247
README.md
View File

@@ -6,21 +6,45 @@ A full-screen terminal news ticker that renders live global headlines in large O
---
## Run
## Using
### Run
```bash
python3 mainline.py # news stream
python3 mainline.py --poetry # literary consciousness mode
python3 mainline.py -p # same
python3 mainline.py --firehose # dense rapid-fire headline mode
python3 mainline.py --refresh # force re-fetch (bypass cache)
python3 mainline.py --display websocket # web browser display only
python3 mainline.py --display both # terminal + web browser
python3 mainline.py --no-font-picker # skip interactive font picker
python3 mainline.py --font-file path.otf # use a specific font file
python3 mainline.py --font-dir ~/fonts # scan a different font folder
python3 mainline.py --font-index 1 # select face index within a collection
```
First run bootstraps a local `.mainline_venv/` and installs deps (`feedparser`, `Pillow`, `sounddevice`, `numpy`). Subsequent runs start immediately, loading from cache.
Or with uv:
---
```bash
uv run mainline.py
```
## Config
First run bootstraps dependencies. Use `uv sync --all-extras` for mic support.
### Command & Control (C&C)
Control mainline remotely using `cmdline.py`:
```bash
uv run cmdline.py # Interactive TUI
uv run cmdline.py /effects list # List all effects
uv run cmdline.py /effects stats # Show performance stats
uv run cmdline.py -w /effects stats # Watch mode (auto-refresh)
```
Commands are sent via ntfy.sh topics - useful for controlling a daemonized mainline instance.
### Config
All constants live in `engine/config.py`:
@@ -29,70 +53,49 @@ All constants live in `engine/config.py`:
| `HEADLINE_LIMIT` | `1000` | Total headlines per session |
| `FEED_TIMEOUT` | `10` | Per-feed HTTP timeout (seconds) |
| `MIC_THRESHOLD_DB` | `50` | dB floor above which glitches spike |
| `FONT_DIR` | `fonts/` | Folder scanned for `.otf`, `.ttf`, `.ttc` files used by the font picker |
| `FONT_PATH` | first supported font in `fonts/` | Active display font file selected at startup |
| `NTFY_TOPIC` | klubhaus URL | ntfy.sh JSON stream for messages |
| `NTFY_CC_CMD_TOPIC` | klubhaus URL | ntfy.sh topic for C&C commands |
| `NTFY_CC_RESP_TOPIC` | klubhaus URL | ntfy.sh topic for C&C responses |
| `NTFY_RECONNECT_DELAY` | `5` | Seconds before reconnecting after dropped SSE |
| `MESSAGE_DISPLAY_SECS` | `30` | How long an ntfy message holds the screen |
| `FONT_DIR` | `fonts/` | Folder scanned for `.otf`, `.ttf`, `.ttc` files |
| `FONT_PATH` | first file in `FONT_DIR` | Active display font |
| `FONT_PICKER` | `True` | Show interactive font picker at boot |
| `FONT_SZ` | `60` | Font render size (affects block density) |
| `RENDER_H` | `8` | Terminal rows per headline line |
| `SSAA` | `4` | Super-sampling factor (render at 4× then downsample) |
| `SSAA` | `4` | Super-sampling factor |
| `SCROLL_DUR` | `5.625` | Seconds per headline |
| `FRAME_DT` | `0.05` | Frame interval in seconds (20 FPS) |
| `GRAD_SPEED` | `0.08` | Gradient sweep speed (cycles/sec, ~12s full sweep) |
| `FIREHOSE_H` | `12` | Firehose zone height (terminal rows) |
| `NTFY_TOPIC` | klubhaus URL | ntfy.sh JSON endpoint to poll |
| `NTFY_POLL_INTERVAL` | `15` | Seconds between ntfy polls |
| `MESSAGE_DISPLAY_SECS` | `30` | How long an ntfy message holds the screen |
| `GRAD_SPEED` | `0.08` | Gradient sweep speed |
**Font:** Put your `.otf`, `.ttf`, or `.ttc` files in `fonts/`. Startup opens the font picker from that folder and applies your selected font before streaming.
### Display Modes
---
Mainline supports multiple display backends:
## How it works
- **Terminal** (`--display terminal`): ANSI terminal output (default)
- **WebSocket** (`--display websocket`): Stream to web browser clients
- **Both** (`--display both`): Terminal + WebSocket simultaneously
- Feeds are fetched and filtered on startup (sports and vapid content stripped); results are cached to `.mainline_cache_news.json` / `.mainline_cache_poetry.json` for fast restarts
- Headlines are rasterized via Pillow with 4× SSAA into half-block characters (`▀▄█ `) at the configured font size
- A left-to-right ANSI gradient colors each character: white-hot leading edge trails off to near-black; the gradient sweeps continuously across the full scroll canvas
- Subject-region detection runs a regex pass on each headline; matches trigger a Google Translate call and font swap to the appropriate script (CJK, Arabic, Devanagari, etc.) using macOS system fonts
- The mic stream runs in a background thread, feeding RMS dB into the glitch probability calculation each frame
- The viewport scrolls through a virtual canvas of pre-rendered blocks; fade zones at top and bottom dissolve characters probabilistically
- An ntfy.sh poller runs in a background thread; incoming messages interrupt the scroll and render full-screen until dismissed or expired
WebSocket mode serves a web client at http://localhost:8766 with ANSI color support and fullscreen mode.
---
## Architecture
`mainline.py` is a thin entrypoint (venv bootstrap → `engine.app.main()`). All logic lives in the `engine/` package:
```
engine/
config.py constants, CLI flags, glyph tables
sources.py FEEDS, POETRY_SOURCES, language/script maps
terminal.py ANSI codes, tw/th, type_out, boot_ln
filter.py HTML stripping, content filter
translate.py Google Translate wrapper + region detection
render.py OTF → half-block pipeline (SSAA, gradient)
effects.py noise, glitch_bar, fade, firehose
fetch.py RSS/Gutenberg fetching + cache load/save
ntfy.py NtfyPoller — standalone, zero internal deps
mic.py MicMonitor — standalone, graceful fallback
scroll.py stream() frame loop + message rendering
app.py main(), boot sequence, signal handler
```
`ntfy.py` and `mic.py` have zero internal dependencies and can be imported by any other visualizer.
---
## Feeds
### Feeds
~25 sources across four categories: **Science & Technology**, **Economics & Business**, **World & Politics**, **Culture & Ideas**. Add or swap feeds in `engine/sources.py``FEEDS`.
**Poetry mode** pulls from Project Gutenberg: Whitman, Dickinson, Thoreau, Emerson. Sources are in `engine/sources.py``POETRY_SOURCES`.
---
### Fonts
## ntfy.sh Integration
A `fonts/` directory is bundled with demo faces. On startup, an interactive picker lists all discovered faces with a live half-block preview.
Mainline polls a configurable ntfy.sh topic in the background. When a message arrives, the scroll pauses and the message renders full-screen for `MESSAGE_DISPLAY_SECS` seconds, then the stream resumes.
Navigation: `↑`/`↓` or `j`/`k` to move, `Enter` or `q` to select.
To add your own fonts, drop `.otf`, `.ttf`, or `.ttc` files into `fonts/`.
### ntfy.sh
Mainline polls a configurable ntfy.sh topic in the background. When a message arrives, the scroll pauses and the message renders full-screen.
To push a message:
@@ -100,43 +103,127 @@ To push a message:
curl -d "Body text" -H "Title: Alert title" https://ntfy.sh/your_topic
```
Update `NTFY_TOPIC` in `engine/config.py` to point at your own topic. The `NtfyPoller` class is fully standalone and can be reused by other visualizers:
---
```python
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
poller.start()
# in render loop:
msg = poller.get_active_message() # returns (title, body, timestamp) or None
## Internals
### How it works
- On launch, the font picker scans `fonts/` and presents a live-rendered TUI for face selection
- Feeds are fetched and filtered on startup; results are cached for fast restarts
- Headlines are rasterized via Pillow with 4× SSAA into half-block characters
- The ticker uses a sweeping white-hot → deep green gradient
- Subject-region detection triggers Google Translate and font swap for non-Latin scripts
- The mic stream runs in a background thread, feeding RMS dB into glitch probability
- The viewport scrolls through pre-rendered blocks with fade zones
- An ntfy.sh SSE stream runs in a background thread for messages and C&C commands
### Architecture
```
engine/
__init__.py package marker
app.py main(), font picker TUI, boot sequence, C&C poller
config.py constants, CLI flags, glyph tables
sources.py FEEDS, POETRY_SOURCES, language/script maps
terminal.py ANSI codes, tw/th, type_out, boot_ln
filter.py HTML stripping, content filter
translate.py Google Translate wrapper + region detection
render.py OTF → half-block pipeline (SSAA, gradient)
effects/ plugin architecture for visual effects
controller.py handles /effects commands
chain.py effect pipeline chaining
registry.py effect registration and lookup
performance.py performance monitoring
fetch.py RSS/Gutenberg fetching + cache
ntfy.py NtfyPoller — standalone, zero internal deps
mic.py MicMonitor — standalone, graceful fallback
scroll.py stream() frame loop + message rendering
viewport.py terminal dimension tracking
frame.py scroll step calculation, timing
layers.py ticker zone, firehose, message overlay
eventbus.py thread-safe event publishing
events.py event types and definitions
controller.py coordinates ntfy/mic monitoring
emitters.py background emitters
types.py type definitions
display.py Display protocol (Terminal, WebSocket, Multi)
websocket_display.py WebSocket server for browser clients
```
---
## Ideas / Future
## Development
### Performance
- **Concurrent feed fetching** — startup currently blocks sequentially on ~25 HTTP requests; `concurrent.futures.ThreadPoolExecutor` would cut load time to the slowest single feed
- **Background refresh** — re-fetch feeds in a daemon thread so a long session stays current without restart
- **Translation pre-fetch** — run translate calls concurrently during the boot sequence rather than on first render
### Setup
### Graphics
- **Matrix rain underlay** — katakana column rain rendered at low opacity beneath the scrolling blocks as a background layer
- **CRT simulation** — subtle dim scanlines every N rows, occasional brightness ripple across the full screen
- **Sixel / iTerm2 inline images** — bypass half-blocks entirely and stream actual bitmap frames for true resolution; would require a capable terminal
- **Parallax secondary column** — a second, dimmer, faster-scrolling stream of ambient text at reduced opacity on one side
Requires Python 3.10+ and [uv](https://docs.astral.sh/uv/).
### Cyberpunk Vibes
- **Keyword watch list** — highlight or strobe any headline matching tracked terms (names, topics, tickers)
- **Breaking interrupt** — full-screen flash + synthesized blip when a high-priority keyword hits
- **Live data overlay** — secondary ticker strip at screen edge: BTC price, ISS position, geomagnetic index
- **Theme switcher** — `--amber` (phosphor), `--ice` (electric cyan), `--red` (alert state) palette modes via CLI flag
- **Persona modes** — `--surveillance`, `--oracle`, `--underground` as feed presets with matching color themes and boot copy
- **Synthesized audio** — short static bursts tied to glitch events, independent of mic input
```bash
uv sync # minimal (no mic)
uv sync --all-extras # with mic support
uv sync --all-extras --group dev # full dev environment
```
### Extensibility
- **serve.py** — HTTP server that imports `engine.render` and `engine.fetch` directly to stream 1-bit bitmaps to an ESP32 display
- **Rust port** — `ntfy.py` and `render.py` are the natural first targets; clear module boundaries make incremental porting viable
### Tasks
With [mise](https://mise.jdx.dev/):
```bash
mise run test # run test suite
mise run test-cov # run with coverage report
mise run lint # ruff check
mise run lint-fix # ruff check --fix
mise run format # ruff format
mise run run # terminal display
mise run run-websocket # web display only
mise run run-both # terminal + web
mise run run-client # both + open browser
mise run cmd # C&C command interface
mise run cmd-stats # watch effects stats
mise run topics-init # initialize ntfy topics
```
### Testing
```bash
uv run pytest
uv run pytest --cov=engine --cov-report=term-missing
```
### Linting
```bash
uv run ruff check engine/ mainline.py
uv run ruff format engine/ mainline.py
```
Pre-commit hooks run lint automatically via `hk`.
---
*macOS only (system font paths hardcoded). Python 3.9+.*
## Roadmap
### Performance
- Concurrent feed fetching with ThreadPoolExecutor
- Background feed refresh daemon
- Translation pre-fetch during boot
### Graphics
- Matrix rain katakana underlay
- CRT scanline simulation
- Sixel/iTerm2 inline images
- Parallax secondary column
### Cyberpunk Vibes
- Keyword watch list with strobe effects
- Breaking interrupt with synthesized audio
- Live data overlay (BTC, ISS position)
- Theme switcher (amber, ice, red)
- Persona modes (surveillance, oracle, underground)
---
*Python 3.10+. Primary display font is user-selectable via bundled `fonts/` picker.*

366
client/index.html Normal file
View File

@@ -0,0 +1,366 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Mainline Terminal</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
background: #0a0a0a;
color: #ccc;
font-family: 'Fira Code', 'Cascadia Code', 'Consolas', monospace;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
min-height: 100vh;
padding: 20px;
}
body.fullscreen {
padding: 0;
}
body.fullscreen #controls {
display: none;
}
#container {
position: relative;
}
canvas {
background: #000;
border: 1px solid #333;
image-rendering: pixelated;
image-rendering: crisp-edges;
}
body.fullscreen canvas {
border: none;
width: 100vw;
height: 100vh;
max-width: 100vw;
max-height: 100vh;
}
#controls {
display: flex;
gap: 10px;
margin-top: 10px;
align-items: center;
}
#controls button {
background: #333;
color: #ccc;
border: 1px solid #555;
padding: 5px 12px;
cursor: pointer;
font-family: inherit;
font-size: 12px;
}
#controls button:hover {
background: #444;
}
#controls input {
width: 60px;
background: #222;
color: #ccc;
border: 1px solid #444;
padding: 4px 8px;
font-family: inherit;
text-align: center;
}
#status {
margin-top: 10px;
font-size: 12px;
color: #666;
}
#status.connected {
color: #4f4;
}
#status.disconnected {
color: #f44;
}
</style>
</head>
<body>
<div id="container">
<canvas id="terminal"></canvas>
</div>
<div id="controls">
<label>Cols: <input type="number" id="cols" value="80" min="20" max="200"></label>
<label>Rows: <input type="number" id="rows" value="24" min="10" max="60"></label>
<button id="apply">Apply</button>
<button id="fullscreen">Fullscreen</button>
</div>
<div id="status" class="disconnected">Connecting...</div>
<script>
const canvas = document.getElementById('terminal');
const ctx = canvas.getContext('2d');
const status = document.getElementById('status');
const colsInput = document.getElementById('cols');
const rowsInput = document.getElementById('rows');
const applyBtn = document.getElementById('apply');
const fullscreenBtn = document.getElementById('fullscreen');
const CHAR_WIDTH = 9;
const CHAR_HEIGHT = 16;
const ANSI_COLORS = {
0: '#000000', 1: '#cd3131', 2: '#0dbc79', 3: '#e5e510',
4: '#2472c8', 5: '#bc3fbc', 6: '#11a8cd', 7: '#e5e5e5',
8: '#666666', 9: '#f14c4c', 10: '#23d18b', 11: '#f5f543',
12: '#3b8eea', 13: '#d670d6', 14: '#29b8db', 15: '#ffffff',
};
let cols = 80;
let rows = 24;
let ws = null;
function resizeCanvas() {
canvas.width = cols * CHAR_WIDTH;
canvas.height = rows * CHAR_HEIGHT;
}
function parseAnsi(text) {
if (!text) return [];
const tokens = [];
let currentText = '';
let fg = '#cccccc';
let bg = '#000000';
let bold = false;
let i = 0;
let inEscape = false;
let escapeCode = '';
while (i < text.length) {
const char = text[i];
if (inEscape) {
if (char >= '0' && char <= '9' || char === ';' || char === '[') {
escapeCode += char;
}
if (char === 'm') {
const codes = escapeCode.replace('\x1b[', '').split(';');
for (const code of codes) {
const num = parseInt(code) || 0;
if (num === 0) {
fg = '#cccccc';
bg = '#000000';
bold = false;
} else if (num === 1) {
bold = true;
} else if (num === 22) {
bold = false;
} else if (num === 39) {
fg = '#cccccc';
} else if (num === 49) {
bg = '#000000';
} else if (num >= 30 && num <= 37) {
fg = ANSI_COLORS[num - 30 + (bold ? 8 : 0)] || '#cccccc';
} else if (num >= 40 && num <= 47) {
bg = ANSI_COLORS[num - 40] || '#000000';
} else if (num >= 90 && num <= 97) {
fg = ANSI_COLORS[num - 90 + 8] || '#cccccc';
} else if (num >= 100 && num <= 107) {
bg = ANSI_COLORS[num - 100 + 8] || '#000000';
} else if (num >= 1 && num <= 256) {
// 256 colors
if (num < 16) {
fg = ANSI_COLORS[num] || '#cccccc';
} else if (num < 232) {
const c = num - 16;
const r = Math.floor(c / 36) * 51;
const g = Math.floor((c % 36) / 6) * 51;
const b = (c % 6) * 51;
fg = `#${r.toString(16).padStart(2,'0')}${g.toString(16).padStart(2,'0')}${b.toString(16).padStart(2,'0')}`;
} else {
const gray = (num - 232) * 10 + 8;
fg = `#${gray.toString(16).repeat(2)}`;
}
}
}
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
currentText = '';
}
inEscape = false;
escapeCode = '';
}
} else if (char === '\x1b' && text[i + 1] === '[') {
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
currentText = '';
}
inEscape = true;
escapeCode = '';
i++;
} else {
currentText += char;
}
i++;
}
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
}
return tokens;
}
function renderLine(text, x, y, lineHeight) {
const tokens = parseAnsi(text);
let xOffset = x;
for (const token of tokens) {
if (token.text) {
if (token.bold) {
ctx.font = 'bold 16px monospace';
} else {
ctx.font = '16px monospace';
}
const metrics = ctx.measureText(token.text);
if (token.bg !== '#000000') {
ctx.fillStyle = token.bg;
ctx.fillRect(xOffset, y - 2, metrics.width + 1, lineHeight);
}
ctx.fillStyle = token.fg;
ctx.fillText(token.text, xOffset, y);
xOffset += metrics.width;
}
}
}
function connect() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.hostname}:8765`;
ws = new WebSocket(wsUrl);
ws.onopen = () => {
status.textContent = 'Connected';
status.className = 'connected';
sendSize();
};
ws.onclose = () => {
status.textContent = 'Disconnected - Reconnecting...';
status.className = 'disconnected';
setTimeout(connect, 1000);
};
ws.onerror = () => {
status.textContent = 'Connection error';
status.className = 'disconnected';
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'frame') {
cols = data.width || 80;
rows = data.height || 24;
colsInput.value = cols;
rowsInput.value = rows;
resizeCanvas();
render(data.lines || []);
} else if (data.type === 'clear') {
ctx.fillStyle = '#000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
}
} catch (e) {
console.error('Failed to parse message:', e);
}
};
}
function sendSize() {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'resize',
width: parseInt(colsInput.value),
height: parseInt(rowsInput.value)
}));
}
}
function render(lines) {
ctx.fillStyle = '#000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
ctx.font = '16px monospace';
ctx.textBaseline = 'top';
const lineHeight = CHAR_HEIGHT;
const maxLines = Math.min(lines.length, rows);
for (let i = 0; i < maxLines; i++) {
const line = lines[i] || '';
renderLine(line, 0, i * lineHeight, lineHeight);
}
}
function calculateViewportSize() {
const isFullscreen = document.fullscreenElement !== null;
const padding = isFullscreen ? 0 : 40;
const controlsHeight = isFullscreen ? 0 : 60;
const availableWidth = window.innerWidth - padding;
const availableHeight = window.innerHeight - controlsHeight;
cols = Math.max(20, Math.floor(availableWidth / CHAR_WIDTH));
rows = Math.max(10, Math.floor(availableHeight / CHAR_HEIGHT));
colsInput.value = cols;
rowsInput.value = rows;
resizeCanvas();
console.log('Fullscreen:', isFullscreen, 'Size:', cols, 'x', rows);
sendSize();
}
applyBtn.addEventListener('click', () => {
cols = parseInt(colsInput.value);
rows = parseInt(rowsInput.value);
resizeCanvas();
sendSize();
});
fullscreenBtn.addEventListener('click', () => {
if (!document.fullscreenElement) {
document.body.classList.add('fullscreen');
document.documentElement.requestFullscreen().then(() => {
calculateViewportSize();
});
} else {
document.exitFullscreen().then(() => {
calculateViewportSize();
});
}
});
document.addEventListener('fullscreenchange', () => {
if (!document.fullscreenElement) {
document.body.classList.remove('fullscreen');
calculateViewportSize();
}
});
window.addEventListener('resize', () => {
if (document.fullscreenElement) {
calculateViewportSize();
}
});
// Initial setup
resizeCanvas();
connect();
</script>
</body>
</html>

256
cmdline.py Normal file
View File

@@ -0,0 +1,256 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Command-line utility for interacting with mainline via ntfy.
Usage:
python cmdline.py # Interactive TUI mode
python cmdline.py --help # Show help
python cmdline.py /effects list # Send single command via ntfy
python cmdline.py /effects stats # Get performance stats via ntfy
python cmdline.py -w /effects stats # Watch mode (polls for stats)
The TUI mode provides:
- Arrow keys to navigate command history
- Tab completion for commands
- Auto-refresh for performance stats
C&C works like a serial port:
1. Send command to ntfy_cc_topic
2. Mainline receives, processes, responds to same topic
3. Cmdline polls for response
"""
import os
os.environ["FORCE_COLOR"] = "1"
os.environ["TERM"] = "xterm-256color"
import argparse
import json
import sys
import time
import threading
import urllib.request
from pathlib import Path
from engine import config
from engine.terminal import CLR, CURSOR_OFF, CURSOR_ON, G_DIM, G_HI, RST, W_GHOST
try:
CC_CMD_TOPIC = config.NTFY_CC_CMD_TOPIC
CC_RESP_TOPIC = config.NTFY_CC_RESP_TOPIC
except AttributeError:
CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
class NtfyResponsePoller:
"""Polls ntfy for command responses."""
def __init__(self, cmd_topic: str, resp_topic: str, timeout: float = 10.0):
self.cmd_topic = cmd_topic
self.resp_topic = resp_topic
self.timeout = timeout
self._last_id = None
self._lock = threading.Lock()
def _build_url(self) -> str:
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
parsed = urlparse(self.resp_topic)
params = parse_qs(parsed.query, keep_blank_values=True)
params["since"] = [self._last_id if self._last_id else "20s"]
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def send_and_wait(self, cmd: str) -> str:
"""Send command and wait for response."""
url = self.cmd_topic.replace("/json", "")
data = cmd.encode("utf-8")
req = urllib.request.Request(
url,
data=data,
headers={
"User-Agent": "mainline-cmdline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=5)
except Exception as e:
return f"Error sending command: {e}"
return self._wait_for_response(cmd)
def _wait_for_response(self, expected_cmd: str = "") -> str:
"""Poll for response message."""
start = time.time()
while time.time() - start < self.timeout:
try:
url = self._build_url()
req = urllib.request.Request(
url, headers={"User-Agent": "mainline-cmdline/0.1"}
)
with urllib.request.urlopen(req, timeout=10) as resp:
for line in resp:
try:
data = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
if data.get("event") == "message":
self._last_id = data.get("id")
msg = data.get("message", "")
if msg:
return msg
except Exception:
pass
time.sleep(0.5)
return "Timeout waiting for response"
AVAILABLE_COMMANDS = """Available commands:
/effects list - List all effects and status
/effects <name> on - Enable an effect
/effects <name> off - Disable an effect
/effects <name> intensity <0.0-1.0> - Set effect intensity
/effects reorder <name1>,<name2>,... - Reorder pipeline
/effects stats - Show performance statistics
/help - Show this help
/quit - Exit
"""
def print_header():
w = 60
print(CLR, end="")
print(CURSOR_OFF, end="")
print(f"\033[1;1H", end="")
print(f" \033[1;38;5;231m╔{'' * (w - 6)}\033[0m")
print(
f" \033[1;38;5;231m║\033[0m \033[1;38;5;82mMAINLINE\033[0m \033[3;38;5;245mCommand Center\033[0m \033[1;38;5;231m ║\033[0m"
)
print(f" \033[1;38;5;231m╚{'' * (w - 6)}\033[0m")
print(f" \033[2;38;5;37mCMD: {CC_CMD_TOPIC.split('/')[-2]}\033[0m")
print(f" \033[2;38;5;37mRESP: {CC_RESP_TOPIC.split('/')[-2]}\033[0m")
print()
def print_response(response: str, is_error: bool = False) -> None:
"""Print response with nice formatting."""
print()
if is_error:
print(f" \033[1;38;5;196m✗ Error\033[0m")
print(f" \033[38;5;196m{'' * 40}\033[0m")
else:
print(f" \033[1;38;5;82m✓ Response\033[0m")
print(f" \033[38;5;37m{'' * 40}\033[0m")
for line in response.split("\n"):
print(f" {line}")
print()
def interactive_mode():
"""Interactive TUI for sending commands."""
import readline
print_header()
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
print(f" \033[38;5;245mType /help for commands, /quit to exit\033[0m")
print()
while True:
try:
cmd = input(f" \033[1;38;5;82m\033[0m {G_HI}").strip()
except (EOFError, KeyboardInterrupt):
print()
break
if not cmd:
continue
if cmd.startswith("/"):
if cmd == "/quit" or cmd == "/exit":
print(f"\n \033[1;38;5;245mGoodbye!{RST}\n")
break
if cmd == "/help":
print(f"\n{AVAILABLE_COMMANDS}\n")
continue
print(f" \033[38;5;245m⟳ Sending to mainline...{RST}")
result = poller.send_and_wait(cmd)
print_response(result, is_error=result.startswith("Error"))
else:
print(f"\n \033[1;38;5;196m⚠ Commands must start with /{RST}\n")
print(CURSOR_ON, end="")
return 0
def main():
parser = argparse.ArgumentParser(
description="Mainline command-line interface",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=AVAILABLE_COMMANDS,
)
parser.add_argument(
"command",
nargs="?",
default=None,
help="Command to send (e.g., /effects list)",
)
parser.add_argument(
"--watch",
"-w",
action="store_true",
help="Watch mode: continuously poll for stats (Ctrl+C to exit)",
)
args = parser.parse_args()
if args.command is None:
return interactive_mode()
poller = NtfyResponsePoller(CC_CMD_TOPIC, CC_RESP_TOPIC)
if args.watch and "/effects stats" in args.command:
import signal
def handle_sigterm(*_):
print(f"\n \033[1;38;5;245mStopped watching{RST}")
print(CURSOR_ON, end="")
sys.exit(0)
signal.signal(signal.SIGTERM, handle_sigterm)
print_header()
print(f" \033[38;5;245mWatching /effects stats (Ctrl+C to exit)...{RST}\n")
try:
while True:
result = poller.send_and_wait(args.command)
print(f"\033[2J\033[1;1H", end="")
print(
f" \033[1;38;5;82m\033[0m Performance Stats - \033[1;38;5;245m{time.strftime('%H:%M:%S')}{RST}"
)
print(f" \033[38;5;37m{'' * 44}{RST}")
for line in result.split("\n"):
print(f" {line}")
time.sleep(2)
except KeyboardInterrupt:
print(f"\n \033[1;38;5;245mStopped watching{RST}")
return 0
return 0
result = poller.send_and_wait(args.command)
print(result)
return 0
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,145 @@
# README Update Design — 2026-03-15
## Goal
Restructure and expand `README.md` to:
1. Align with the current codebase (Python 3.10+, uv/mise/pytest/ruff toolchain, 6 new fonts)
2. Add extensibility-focused content (`Extending` section)
3. Add developer workflow coverage (`Development` section)
4. Improve navigability via top-level grouping (Approach C)
---
## Proposed Structure
```
# MAINLINE
> tagline + description
## Using
### Run
### Config
### Feeds
### Fonts
### ntfy.sh
## Internals
### How it works
### Architecture
## Extending
### NtfyPoller
### MicMonitor
### Render pipeline
## Development
### Setup
### Tasks
### Testing
### Linting
## Roadmap
---
*footer*
```
---
## Section-by-section design
### Using
All existing content preserved verbatim. Two changes:
- **Run**: add `uv run mainline.py` as an alternative invocation; expand bootstrap note to mention `uv sync` / `uv sync --all-extras`
- **ntfy.sh**: remove `NtfyPoller` reuse code example (moves to Extending); keep push instructions and topic config
Subsections moved into Using (currently standalone):
- `Feeds` — it's configuration, not a concept
- `ntfy.sh` (usage half)
### Internals
All existing content preserved verbatim. One change:
- **Architecture**: append `tests/` directory listing to the module tree
### Extending
Entirely new section. Three subsections:
**NtfyPoller**
- Minimal working import + usage example
- Note: stdlib only dependencies
```python
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
poller.start()
# in your render loop:
msg = poller.get_active_message() # → (title, body, timestamp) or None
if msg:
title, body, ts = msg
render_my_message(title, body) # visualizer-specific
```
**MicMonitor**
- Minimal working import + usage example
- Note: sounddevice/numpy optional, degrades gracefully
```python
from engine.mic import MicMonitor
mic = MicMonitor(threshold_db=50)
if mic.start(): # returns False if sounddevice unavailable
excess = mic.excess # dB above threshold, clamped to 0
db = mic.db # raw RMS dB level
```
**Render pipeline**
- Brief prose about `engine.render` as importable pipeline
- Minimal sketch of serve.py / ESP32 usage pattern
- Reference to `Mainline Renderer + ntfy Message Queue for ESP32.md`
### Development
Entirely new section. Four subsections:
**Setup**
- Hard requirements: Python 3.10+, uv
- `uv sync` / `uv sync --all-extras` / `uv sync --group dev`
**Tasks** (via mise)
- `mise run test`, `test-cov`, `lint`, `lint-fix`, `format`, `run`, `run-poetry`, `run-firehose`
**Testing**
- Tests in `tests/` covering config, filter, mic, ntfy, sources, terminal
- `uv run pytest` and `uv run pytest --cov=engine --cov-report=term-missing`
**Linting**
- `uv run ruff check` and `uv run ruff format`
- Note: pre-commit hooks run lint via `hk`
### Roadmap
Existing `## Ideas / Future` content preserved verbatim. Only change: rename heading to `## Roadmap`.
### Footer
Update `Python 3.9+``Python 3.10+`.
---
## Files changed
- `README.md` — restructured and expanded as above
- No other files
---
## What is not changing
- All existing prose, examples, and config table values — preserved verbatim where retained
- The Ideas/Future content — kept intact under the new Roadmap heading
- The cyberpunk voice and terse style of the existing README

View File

@@ -0,0 +1,35 @@
from pathlib import Path
PLUGIN_DIR = Path(__file__).parent
def discover_plugins():
from engine.effects.registry import get_registry
registry = get_registry()
imported = {}
for file_path in PLUGIN_DIR.glob("*.py"):
if file_path.name.startswith("_"):
continue
module_name = file_path.stem
if module_name in ("base", "types"):
continue
try:
module = __import__(f"effects_plugins.{module_name}", fromlist=[""])
for attr_name in dir(module):
attr = getattr(module, attr_name)
if (
isinstance(attr, type)
and hasattr(attr, "name")
and hasattr(attr, "process")
and attr_name.endswith("Effect")
):
plugin = attr()
registry.register(plugin)
imported[plugin.name] = plugin
except Exception:
pass
return imported

58
effects_plugins/fade.py Normal file
View File

@@ -0,0 +1,58 @@
import random
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
class FadeEffect:
name = "fade"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not ctx.ticker_height:
return buf
result = list(buf)
intensity = self.config.intensity
top_zone = max(1, int(ctx.ticker_height * 0.25))
bot_zone = max(1, int(ctx.ticker_height * 0.10))
for r in range(len(result)):
if r >= ctx.ticker_height:
continue
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = (
min(1.0, (ctx.ticker_height - 1 - r) / bot_zone)
if bot_zone > 0
else 1.0
)
row_fade = min(top_f, bot_f) * intensity
if row_fade < 1.0 and result[r].strip():
result[r] = self._fade_line(result[r], row_fade)
return result
def _fade_line(self, s: str, fade: float) -> str:
if fade >= 1.0:
return s
if fade <= 0.0:
return ""
result = []
i = 0
while i < len(s):
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i : j + 1])
i = j + 1
elif s[i] == " ":
result.append(" ")
i += 1
else:
result.append(s[i] if random.random() < fade else " ")
i += 1
return "".join(result)
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -0,0 +1,72 @@
import random
from datetime import datetime
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
class FirehoseEffect:
name = "firehose"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
firehose_h = config.FIREHOSE_H if config.FIREHOSE else 0
if firehose_h <= 0 or not ctx.items:
return buf
result = list(buf)
intensity = self.config.intensity
h = ctx.terminal_height
for fr in range(firehose_h):
scr_row = h - firehose_h + fr + 1
fline = self._firehose_line(ctx.items, ctx.terminal_width, intensity)
result.append(f"\033[{scr_row};1H{fline}\033[K")
return result
def _firehose_line(self, items: list, w: int, intensity: float) -> str:
r = random.random()
if r < 0.35 * intensity:
title, src, ts = random.choice(items)
text = title[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
return f"{color}{text}{RST}"
elif r < 0.55 * intensity:
d = random.choice([0.45, 0.55, 0.65, 0.75])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
elif r < 0.78 * intensity:
sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
src = random.choice(list(sources.keys()))
msgs = [
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
f" ░░ FEED ACTIVE :: {src}",
f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}",
f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}",
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
]
text = random.choice(msgs)[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST])
return f"{color}{text}{RST}"
else:
title, _, _ = random.choice(items)
start = random.randint(0, max(0, len(title) - 20))
frag = title[start : start + random.randint(10, 35)]
pad = random.randint(0, max(0, w - len(frag) - 8))
gp = "".join(
random.choice(config.GLITCH) for _ in range(random.randint(1, 3))
)
text = (" " * pad + gp + " " + frag)[: w - 1]
color = random.choice([G_LO, C_DIM, W_GHOST])
return f"{color}{text}{RST}"
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

37
effects_plugins/glitch.py Normal file
View File

@@ -0,0 +1,37 @@
import random
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST
class GlitchEffect:
name = "glitch"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not buf:
return buf
result = list(buf)
intensity = self.config.intensity
glitch_prob = 0.32 + min(0.9, ctx.mic_excess * 0.16)
glitch_prob = glitch_prob * intensity
n_hits = 4 + int(ctx.mic_excess / 2)
n_hits = int(n_hits * intensity)
if random.random() < glitch_prob:
for _ in range(min(n_hits, len(result))):
gi = random.randint(0, len(result) - 1)
scr_row = gi + 1
result[gi] = f"\033[{scr_row};1H{self._glitch_bar(ctx.terminal_width)}"
return result
def _glitch_bar(self, w: int) -> str:
c = random.choice(["", "", "", "\xc2"])
n = random.randint(3, w // 2)
o = random.randint(0, w - n)
return " " * o + f"{G_LO}{DIM}" + c * n + RST
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

36
effects_plugins/noise.py Normal file
View File

@@ -0,0 +1,36 @@
import random
from engine import config
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
from engine.terminal import C_DIM, G_DIM, G_LO, RST, W_GHOST
class NoiseEffect:
name = "noise"
config = EffectConfig(enabled=True, intensity=0.15)
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
if not ctx.ticker_height:
return buf
result = list(buf)
intensity = self.config.intensity
probability = intensity * 0.15
for r in range(len(result)):
cy = ctx.scroll_cam + r
if random.random() < probability:
result[r] = self._generate_noise(ctx.terminal_width, cy)
return result
def _generate_noise(self, w: int, cy: int) -> str:
d = random.choice([0.15, 0.25, 0.35, 0.12])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
def configure(self, cfg: EffectConfig) -> None:
self.config = cfg

View File

@@ -2,23 +2,31 @@
Application orchestrator — boot sequence, signal handling, main loop wiring.
"""
import sys
import os
import time
import signal
import atexit
import os
import signal
import sys
import termios
import time
import tty
from engine import config, render
from engine.terminal import (
RST, G_HI, G_MID, G_DIM, W_DIM, W_GHOST, CLR, CURSOR_OFF, CURSOR_ON, tw,
slow_print, boot_ln,
)
from engine.controller import StreamController
from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache
from engine.ntfy import NtfyPoller
from engine.mic import MicMonitor
from engine.scroll import stream
from engine.terminal import (
CLR,
CURSOR_OFF,
CURSOR_ON,
G_DIM,
G_HI,
G_MID,
RST,
W_DIM,
W_GHOST,
boot_ln,
slow_print,
tw,
)
TITLE = [
" ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗",
@@ -29,6 +37,7 @@ TITLE = [
" ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝",
]
def _read_picker_key():
ch = sys.stdin.read(1)
if ch == "\x03":
@@ -53,6 +62,7 @@ def _read_picker_key():
return "enter"
return None
def _normalize_preview_rows(rows):
"""Trim shared left padding and trailing spaces for stable on-screen previews."""
non_empty = [r for r in rows if r.strip()]
@@ -99,7 +109,9 @@ def _draw_font_picker(faces, selected):
active = pos == selected
pointer = "" if active else " "
color = G_HI if active else W_DIM
print(f" {color}{pointer} {face['name']}{RST}{W_GHOST} · {face['file_name']}{RST}")
print(
f" {color}{pointer} {face['name']}{RST}{W_GHOST} · {face['file_name']}{RST}"
)
if top > 0:
print(f" {W_GHOST}{top} above{RST}")
@@ -116,6 +128,7 @@ def _draw_font_picker(faces, selected):
shown = row[:max_preview_w]
print(f" {shown}")
def pick_font_face():
"""Interactive startup picker for selecting a face from repo OTF files."""
if not config.FONT_PICKER:
@@ -225,13 +238,119 @@ def pick_font_face():
font_index=selected_font["font_index"],
)
render.clear_font_cache()
print(f" {G_DIM}> using {selected_font['name']} ({selected_font['file_name']}){RST}")
print(
f" {G_DIM}> using {selected_font['name']} ({selected_font['file_name']}){RST}"
)
time.sleep(0.8)
print(CLR, end="")
print(CURSOR_OFF, end="")
print()
def pick_effects_config():
"""Interactive picker for configuring effects pipeline."""
import effects_plugins
from engine.effects import get_effect_chain, get_registry
effects_plugins.discover_plugins()
registry = get_registry()
chain = get_effect_chain()
chain.set_order(["noise", "fade", "glitch", "firehose"])
effects = list(registry.list_all().values())
if not effects:
return
selected = 0
editing_intensity = False
intensity_value = 1.0
def _draw_effects_picker():
w = tw()
print(CLR, end="")
print("\033[1;1H", end="")
print(" \033[1;38;5;231mEFFECTS CONFIG\033[0m")
print(f" \033[2;38;5;37m{'' * (w - 4)}\033[0m")
print()
for i, effect in enumerate(effects):
prefix = " > " if i == selected else " "
marker = "[*]" if effect.config.enabled else "[ ]"
if editing_intensity and i == selected:
print(
f"{prefix}{marker} \033[1;38;5;82m{effect.name}\033[0m: intensity={intensity_value:.2f} (use +/- to adjust, Enter to confirm)"
)
else:
print(
f"{prefix}{marker} {effect.name}: intensity={effect.config.intensity:.2f}"
)
print()
print(f" \033[2;38;5;37m{'' * (w - 4)}\033[0m")
print(
" \033[38;5;245mControls: space=toggle on/off | +/-=adjust intensity | arrows=move | Enter=next effect | q=done\033[0m"
)
def _read_effects_key():
ch = sys.stdin.read(1)
if ch == "\x03":
return "interrupt"
if ch in ("\r", "\n"):
return "enter"
if ch == " ":
return "toggle"
if ch == "q":
return "quit"
if ch == "+" or ch == "=":
return "up"
if ch == "-" or ch == "_":
return "down"
if ch == "\x1b":
c1 = sys.stdin.read(1)
if c1 != "[":
return None
c2 = sys.stdin.read(1)
if c2 == "A":
return "up"
if c2 == "B":
return "down"
return None
return None
if not sys.stdin.isatty():
return
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setcbreak(fd)
while True:
_draw_effects_picker()
key = _read_effects_key()
if key == "quit" or key == "enter":
break
elif key == "up" and editing_intensity:
intensity_value = min(1.0, intensity_value + 0.1)
effects[selected].config.intensity = intensity_value
elif key == "down" and editing_intensity:
intensity_value = max(0.0, intensity_value - 0.1)
effects[selected].config.intensity = intensity_value
elif key == "up":
selected = max(0, selected - 1)
intensity_value = effects[selected].config.intensity
elif key == "down":
selected = min(len(effects) - 1, selected + 1)
intensity_value = effects[selected].config.intensity
elif key == "toggle":
effects[selected].config.enabled = not effects[selected].config.enabled
elif key == "interrupt":
raise KeyboardInterrupt
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
def main():
atexit.register(lambda: print(CURSOR_ON, end="", flush=True))
@@ -242,10 +361,13 @@ def main():
signal.signal(signal.SIGINT, handle_sigint)
StreamController.warmup_topics()
w = tw()
print(CLR, end="")
print(CURSOR_OFF, end="")
pick_font_face()
pick_effects_config()
w = tw()
print()
time.sleep(0.4)
@@ -255,23 +377,29 @@ def main():
time.sleep(0.07)
print()
_subtitle = "literary consciousness stream" if config.MODE == 'poetry' else "digital consciousness stream"
_subtitle = (
"literary consciousness stream"
if config.MODE == "poetry"
else "digital consciousness stream"
)
print(f" {W_DIM}v0.1 · {_subtitle}{RST}")
print(f" {W_GHOST}{'' * (w - 4)}{RST}")
print()
time.sleep(0.4)
cached = load_cache() if '--refresh' not in sys.argv else None
cached = load_cache() if "--refresh" not in sys.argv else None
if cached:
items = cached
boot_ln("Cache", f"LOADED [{len(items)} SIGNALS]", True)
elif config.MODE == 'poetry':
elif config.MODE == "poetry":
slow_print(" > INITIALIZING LITERARY CORPUS...\n")
time.sleep(0.2)
print()
items, linked, failed = fetch_poetry()
print()
print(f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}")
print(
f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}"
)
print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}")
save_cache(items)
else:
@@ -280,7 +408,9 @@ def main():
print()
items, linked, failed = fetch_all()
print()
print(f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}")
print(
f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}"
)
print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}")
save_cache(items)
@@ -289,17 +419,18 @@ def main():
sys.exit(1)
print()
mic = MicMonitor(threshold_db=config.MIC_THRESHOLD_DB)
mic_ok = mic.start()
if mic.available:
boot_ln("Microphone", "ACTIVE" if mic_ok else "OFFLINE · check System Settings → Privacy → Microphone", bool(mic_ok))
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
if controller.mic and controller.mic.available:
boot_ln(
"Microphone",
"ACTIVE"
if mic_ok
else "OFFLINE · check System Settings → Privacy → Microphone",
bool(mic_ok),
)
ntfy = NtfyPoller(
config.NTFY_TOPIC,
poll_interval=config.NTFY_POLL_INTERVAL,
display_secs=config.MESSAGE_DISPLAY_SECS,
)
ntfy_ok = ntfy.start()
boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok)
if config.FIREHOSE:
@@ -312,7 +443,7 @@ def main():
print()
time.sleep(0.4)
stream(items, ntfy, mic)
controller.run(items)
print()
print(f" {W_GHOST}{'' * (tw() - 4)}{RST}")

638
engine/benchmark.py Normal file
View File

@@ -0,0 +1,638 @@
#!/usr/bin/env python3
"""
Benchmark runner for mainline - tests performance across effects and displays.
Usage:
python -m engine.benchmark
python -m engine.benchmark --output report.md
python -m engine.benchmark --displays terminal,websocket --effects glitch,fade
python -m engine.benchmark --format json --output benchmark.json
Headless mode (default): suppress all terminal output during benchmarks.
"""
import argparse
import json
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from io import StringIO
from pathlib import Path
from typing import Any
import numpy as np
@dataclass
class BenchmarkResult:
"""Result of a single benchmark run."""
name: str
display: str
effect: str | None
iterations: int
total_time_ms: float
avg_time_ms: float
std_dev_ms: float
min_ms: float
max_ms: float
fps: float
chars_processed: int
chars_per_sec: float
@dataclass
class BenchmarkReport:
"""Complete benchmark report."""
timestamp: str
python_version: str
results: list[BenchmarkResult] = field(default_factory=list)
summary: dict[str, Any] = field(default_factory=dict)
def get_sample_buffer(width: int = 80, height: int = 24) -> list[str]:
"""Generate a sample buffer for benchmarking."""
lines = []
for i in range(height):
line = f"\x1b[32mLine {i}\x1b[0m " + "A" * (width - 10)
lines.append(line)
return lines
def benchmark_display(
display_class, buffer: list[str], iterations: int = 100
) -> BenchmarkResult | None:
"""Benchmark a single display."""
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
sys.stdout = StringIO()
sys.stderr = StringIO()
display = display_class()
display.init(80, 24)
times = []
chars = sum(len(line) for line in buffer)
for _ in range(iterations):
t0 = time.perf_counter()
display.show(buffer)
elapsed = (time.perf_counter() - t0) * 1000
times.append(elapsed)
display.cleanup()
except Exception:
return None
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
times_arr = np.array(times)
return BenchmarkResult(
name=f"display_{display_class.__name__}",
display=display_class.__name__,
effect=None,
iterations=iterations,
total_time_ms=sum(times),
avg_time_ms=float(np.mean(times_arr)),
std_dev_ms=float(np.std(times_arr)),
min_ms=float(np.min(times_arr)),
max_ms=float(np.max(times_arr)),
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
chars_processed=chars * iterations,
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
if sum(times) > 0
else 0.0,
)
def benchmark_effect_with_display(
effect_class, display, buffer: list[str], iterations: int = 100
) -> BenchmarkResult | None:
"""Benchmark an effect with a display."""
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
sys.stdout = StringIO()
sys.stderr = StringIO()
effect = effect_class()
effect.configure(enabled=True, intensity=1.0)
times = []
chars = sum(len(line) for line in buffer)
for _ in range(iterations):
processed = effect.process(buffer)
t0 = time.perf_counter()
display.show(processed)
elapsed = (time.perf_counter() - t0) * 1000
times.append(elapsed)
display.cleanup()
except Exception:
return None
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
times_arr = np.array(times)
return BenchmarkResult(
name=f"effect_{effect_class.__name__}_with_{display.__class__.__name__}",
display=display.__class__.__name__,
effect=effect_class.__name__,
iterations=iterations,
total_time_ms=sum(times),
avg_time_ms=float(np.mean(times_arr)),
std_dev_ms=float(np.std(times_arr)),
min_ms=float(np.min(times_arr)),
max_ms=float(np.max(times_arr)),
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
chars_processed=chars * iterations,
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
if sum(times) > 0
else 0.0,
)
def get_available_displays():
"""Get available display classes."""
from engine.display import (
DisplayRegistry,
NullDisplay,
TerminalDisplay,
)
DisplayRegistry.initialize()
displays = [
("null", NullDisplay),
("terminal", TerminalDisplay),
]
try:
from engine.display.backends.websocket import WebSocketDisplay
displays.append(("websocket", WebSocketDisplay))
except Exception:
pass
try:
from engine.display.backends.sixel import SixelDisplay
displays.append(("sixel", SixelDisplay))
except Exception:
pass
return displays
def get_available_effects():
"""Get available effect classes."""
try:
from engine.effects import get_registry
except Exception:
return []
effects = []
registry = get_registry()
for name, effect in registry.list_all().items():
if effect:
effects.append((name, effect))
return effects
def run_benchmarks(
displays: list[tuple[str, Any]] | None = None,
effects: list[tuple[str, Any]] | None = None,
iterations: int = 100,
verbose: bool = False,
) -> BenchmarkReport:
"""Run all benchmarks and return report."""
from datetime import datetime
if displays is None:
displays = get_available_displays()
if effects is None:
effects = get_available_effects()
buffer = get_sample_buffer(80, 24)
results = []
if verbose:
print(f"Running benchmarks ({iterations} iterations each)...")
for name, display_class in displays:
if verbose:
print(f"Benchmarking display: {name}")
result = benchmark_display(display_class, buffer, iterations)
if result:
results.append(result)
if verbose:
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
if verbose:
print()
for effect_name, effect_class in effects:
for display_name, display_class in displays:
if display_name == "websocket":
continue
if verbose:
print(f"Benchmarking effect: {effect_name} with {display_name}")
display = display_class()
display.init(80, 24)
result = benchmark_effect_with_display(
effect_class, display, buffer, iterations
)
if result:
results.append(result)
if verbose:
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
summary = generate_summary(results)
return BenchmarkReport(
timestamp=datetime.now().isoformat(),
python_version=sys.version,
results=results,
summary=summary,
)
def generate_summary(results: list[BenchmarkResult]) -> dict[str, Any]:
"""Generate summary statistics from results."""
by_display: dict[str, list[BenchmarkResult]] = {}
by_effect: dict[str, list[BenchmarkResult]] = {}
for r in results:
if r.display not in by_display:
by_display[r.display] = []
by_display[r.display].append(r)
if r.effect:
if r.effect not in by_effect:
by_effect[r.effect] = []
by_effect[r.effect].append(r)
summary = {
"by_display": {},
"by_effect": {},
"overall": {
"total_tests": len(results),
"displays_tested": len(by_display),
"effects_tested": len(by_effect),
},
}
for display, res in by_display.items():
fps_values = [r.fps for r in res]
summary["by_display"][display] = {
"avg_fps": float(np.mean(fps_values)),
"min_fps": float(np.min(fps_values)),
"max_fps": float(np.max(fps_values)),
"tests": len(res),
}
for effect, res in by_effect.items():
fps_values = [r.fps for r in res]
summary["by_effect"][effect] = {
"avg_fps": float(np.mean(fps_values)),
"min_fps": float(np.min(fps_values)),
"max_fps": float(np.max(fps_values)),
"tests": len(res),
}
return summary
DEFAULT_CACHE_PATH = Path.home() / ".mainline_benchmark_cache.json"
def load_baseline(cache_path: Path | None = None) -> dict[str, Any] | None:
"""Load baseline benchmark results from cache."""
path = cache_path or DEFAULT_CACHE_PATH
if not path.exists():
return None
try:
with open(path) as f:
return json.load(f)
except Exception:
return None
def save_baseline(
results: list[BenchmarkResult],
cache_path: Path | None = None,
) -> None:
"""Save benchmark results as baseline to cache."""
path = cache_path or DEFAULT_CACHE_PATH
baseline = {
"timestamp": datetime.now().isoformat(),
"results": {
r.name: {
"fps": r.fps,
"avg_time_ms": r.avg_time_ms,
"chars_per_sec": r.chars_per_sec,
}
for r in results
},
}
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(baseline, f, indent=2)
def compare_with_baseline(
results: list[BenchmarkResult],
baseline: dict[str, Any],
threshold: float = 0.2,
verbose: bool = True,
) -> tuple[bool, list[str]]:
"""Compare current results with baseline. Returns (pass, messages)."""
baseline_results = baseline.get("results", {})
failures = []
warnings = []
for r in results:
if r.name not in baseline_results:
warnings.append(f"New test: {r.name} (no baseline)")
continue
b = baseline_results[r.name]
if b["fps"] == 0:
continue
degradation = (b["fps"] - r.fps) / b["fps"]
if degradation > threshold:
failures.append(
f"{r.name}: FPS degraded {degradation * 100:.1f}% "
f"(baseline: {b['fps']:.1f}, current: {r.fps:.1f})"
)
elif verbose:
print(f" {r.name}: {r.fps:.1f} FPS (baseline: {b['fps']:.1f})")
passed = len(failures) == 0
messages = []
if failures:
messages.extend(failures)
if warnings:
messages.extend(warnings)
return passed, messages
def run_hook_mode(
displays: list[tuple[str, Any]] | None = None,
effects: list[tuple[str, Any]] | None = None,
iterations: int = 20,
threshold: float = 0.2,
cache_path: Path | None = None,
verbose: bool = False,
) -> int:
"""Run in hook mode: compare against baseline, exit 0 on pass, 1 on fail."""
baseline = load_baseline(cache_path)
if baseline is None:
print("No baseline found. Run with --baseline to create one.")
return 1
report = run_benchmarks(displays, effects, iterations, verbose)
passed, messages = compare_with_baseline(
report.results, baseline, threshold, verbose
)
print("\n=== Benchmark Hook Results ===")
if passed:
print("PASSED - No significant performance degradation")
return 0
else:
print("FAILED - Performance degradation detected:")
for msg in messages:
print(f" - {msg}")
return 1
def format_report_text(report: BenchmarkReport) -> str:
"""Format report as human-readable text."""
lines = [
"# Mainline Performance Benchmark Report",
"",
f"Generated: {report.timestamp}",
f"Python: {report.python_version}",
"",
"## Summary",
"",
f"Total tests: {report.summary['overall']['total_tests']}",
f"Displays tested: {report.summary['overall']['displays_tested']}",
f"Effects tested: {report.summary['overall']['effects_tested']}",
"",
"## By Display",
"",
]
for display, stats in report.summary["by_display"].items():
lines.append(f"### {display}")
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
lines.append(f"- Tests: {stats['tests']}")
lines.append("")
if report.summary["by_effect"]:
lines.append("## By Effect")
lines.append("")
for effect, stats in report.summary["by_effect"].items():
lines.append(f"### {effect}")
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
lines.append(f"- Tests: {stats['tests']}")
lines.append("")
lines.append("## Detailed Results")
lines.append("")
lines.append("| Display | Effect | FPS | Avg ms | StdDev ms | Min ms | Max ms |")
lines.append("|---------|--------|-----|--------|-----------|--------|--------|")
for r in report.results:
effect_col = r.effect if r.effect else "-"
lines.append(
f"| {r.display} | {effect_col} | {r.fps:.1f} | {r.avg_time_ms:.2f} | "
f"{r.std_dev_ms:.2f} | {r.min_ms:.2f} | {r.max_ms:.2f} |"
)
return "\n".join(lines)
def format_report_json(report: BenchmarkReport) -> str:
"""Format report as JSON."""
data = {
"timestamp": report.timestamp,
"python_version": report.python_version,
"summary": report.summary,
"results": [
{
"name": r.name,
"display": r.display,
"effect": r.effect,
"iterations": r.iterations,
"total_time_ms": r.total_time_ms,
"avg_time_ms": r.avg_time_ms,
"std_dev_ms": r.std_dev_ms,
"min_ms": r.min_ms,
"max_ms": r.max_ms,
"fps": r.fps,
"chars_processed": r.chars_processed,
"chars_per_sec": r.chars_per_sec,
}
for r in report.results
],
}
return json.dumps(data, indent=2)
def main():
parser = argparse.ArgumentParser(description="Run mainline benchmarks")
parser.add_argument(
"--displays",
help="Comma-separated list of displays to test (default: all)",
)
parser.add_argument(
"--effects",
help="Comma-separated list of effects to test (default: all)",
)
parser.add_argument(
"--iterations",
type=int,
default=100,
help="Number of iterations per test (default: 100)",
)
parser.add_argument(
"--output",
help="Output file path (default: stdout)",
)
parser.add_argument(
"--format",
choices=["text", "json"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Show progress during benchmarking",
)
parser.add_argument(
"--hook",
action="store_true",
help="Run in hook mode: compare against baseline, exit 0 pass, 1 fail",
)
parser.add_argument(
"--baseline",
action="store_true",
help="Save current results as baseline for future hook comparisons",
)
parser.add_argument(
"--threshold",
type=float,
default=0.2,
help="Performance degradation threshold for hook mode (default: 0.2 = 20%%)",
)
parser.add_argument(
"--cache",
type=str,
default=None,
help="Path to baseline cache file (default: ~/.mainline_benchmark_cache.json)",
)
args = parser.parse_args()
cache_path = Path(args.cache) if args.cache else DEFAULT_CACHE_PATH
if args.hook:
displays = None
if args.displays:
display_map = dict(get_available_displays())
displays = [
(name, display_map[name])
for name in args.displays.split(",")
if name in display_map
]
effects = None
if args.effects:
effect_map = dict(get_available_effects())
effects = [
(name, effect_map[name])
for name in args.effects.split(",")
if name in effect_map
]
return run_hook_mode(
displays,
effects,
iterations=args.iterations,
threshold=args.threshold,
cache_path=cache_path,
verbose=args.verbose,
)
displays = None
if args.displays:
display_map = dict(get_available_displays())
displays = [
(name, display_map[name])
for name in args.displays.split(",")
if name in display_map
]
effects = None
if args.effects:
effect_map = dict(get_available_effects())
effects = [
(name, effect_map[name])
for name in args.effects.split(",")
if name in effect_map
]
report = run_benchmarks(displays, effects, args.iterations, args.verbose)
if args.baseline:
save_baseline(report.results, cache_path)
print(f"Baseline saved to {cache_path}")
return 0
if args.format == "json":
output = format_report_json(report)
else:
output = format_report_text(report)
if args.output:
with open(args.output, "w") as f:
f.write(output)
else:
print(output)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,25 +1,28 @@
"""
Configuration constants, CLI flags, and glyph tables.
Supports both global constants (backward compatible) and injected config for testing.
"""
import sys
from dataclasses import dataclass, field
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parent.parent
_FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"}
def _arg_value(flag):
def _arg_value(flag, argv: list[str] | None = None):
"""Get value following a CLI flag, if present."""
if flag not in sys.argv:
argv = argv or sys.argv
if flag not in argv:
return None
i = sys.argv.index(flag)
return sys.argv[i + 1] if i + 1 < len(sys.argv) else None
i = argv.index(flag)
return argv[i + 1] if i + 1 < len(argv) else None
def _arg_int(flag, default):
def _arg_int(flag, default, argv: list[str] | None = None):
"""Get int CLI argument with safe fallback."""
raw = _arg_value(flag)
raw = _arg_value(flag, argv)
if raw is None:
return default
try:
@@ -51,45 +54,193 @@ def _list_font_files(font_dir):
def list_repo_font_files():
"""Public helper for discovering repository font files."""
return _list_font_files(FONT_DIR)
def _get_platform_font_paths() -> dict[str, str]:
"""Get platform-appropriate font paths for non-Latin scripts."""
import platform
system = platform.system()
if system == "Darwin":
return {
"zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc",
"ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc",
"ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc",
"ru": "/System/Library/Fonts/Supplemental/Arial.ttf",
"uk": "/System/Library/Fonts/Supplemental/Arial.ttf",
"el": "/System/Library/Fonts/Supplemental/Arial.ttf",
"he": "/System/Library/Fonts/Supplemental/Arial.ttf",
"ar": "/System/Library/Fonts/GeezaPro.ttc",
"fa": "/System/Library/Fonts/GeezaPro.ttc",
"hi": "/System/Library/Fonts/Kohinoor.ttc",
"th": "/System/Library/Fonts/ThonburiUI.ttc",
}
elif system == "Linux":
return {
"zh-cn": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ja": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ko": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ru": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"uk": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"el": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"he": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"ar": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"fa": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"hi": "/usr/share/fonts/truetype/noto/NotoSansDevanagari-Regular.ttf",
"th": "/usr/share/fonts/truetype/noto/NotoSansThai-Regular.ttf",
}
else:
return {}
@dataclass(frozen=True)
class Config:
"""Immutable configuration container for injected config."""
headline_limit: int = 1000
feed_timeout: int = 10
mic_threshold_db: int = 50
mode: str = "news"
firehose: bool = False
ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json"
ntfy_cc_cmd_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
ntfy_cc_resp_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
ntfy_reconnect_delay: int = 5
message_display_secs: int = 30
font_dir: str = "fonts"
font_path: str = ""
font_index: int = 0
font_picker: bool = True
font_sz: int = 60
render_h: int = 8
ssaa: int = 4
scroll_dur: float = 5.625
frame_dt: float = 0.05
firehose_h: int = 12
grad_speed: float = 0.08
glitch_glyphs: str = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
kata_glyphs: str = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths)
display: str = "terminal"
websocket: bool = False
websocket_port: int = 8765
@classmethod
def from_args(cls, argv: list[str] | None = None) -> "Config":
"""Create Config from CLI arguments (or custom argv for testing)."""
argv = argv or sys.argv
font_dir = _resolve_font_path(_arg_value("--font-dir", argv) or "fonts")
font_file_arg = _arg_value("--font-file", argv)
font_files = _list_font_files(font_dir)
font_path = (
_resolve_font_path(font_file_arg)
if font_file_arg
else (font_files[0] if font_files else "")
)
return cls(
headline_limit=1000,
feed_timeout=10,
mic_threshold_db=50,
mode="poetry" if "--poetry" in argv or "-p" in argv else "news",
firehose="--firehose" in argv,
ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json",
ntfy_cc_cmd_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json",
ntfy_cc_resp_topic="https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir=font_dir,
font_path=font_path,
font_index=max(0, _arg_int("--font-index", 0, argv)),
font_picker="--no-font-picker" not in argv,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋",
kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ",
script_fonts=_get_platform_font_paths(),
display=_arg_value("--display", argv) or "terminal",
websocket="--websocket" in argv,
websocket_port=_arg_int("--websocket-port", 8765, argv),
)
_config: Config | None = None
def get_config() -> Config:
"""Get the global config instance (lazy-loaded)."""
global _config
if _config is None:
_config = Config.from_args()
return _config
def set_config(config: Config) -> None:
"""Set the global config instance (for testing)."""
global _config
_config = config
# ─── RUNTIME ──────────────────────────────────────────────
HEADLINE_LIMIT = 1000
FEED_TIMEOUT = 10
MIC_THRESHOLD_DB = 50 # dB above which glitches intensify
MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news'
FIREHOSE = '--firehose' in sys.argv
MIC_THRESHOLD_DB = 50 # dB above which glitches intensify
MODE = "poetry" if "--poetry" in sys.argv or "-p" in sys.argv else "news"
FIREHOSE = "--firehose" in sys.argv
# ─── NTFY MESSAGE QUEUE ──────────────────────────────────
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json?since=20s&poll=1"
NTFY_POLL_INTERVAL = 15 # seconds between polls
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json"
NTFY_CC_CMD_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd/json"
NTFY_CC_RESP_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline_cc_resp/json"
NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen
# ─── FONT RENDERING ──────────────────────────────────────
FONT_DIR = _resolve_font_path(_arg_value('--font-dir') or "fonts")
_FONT_FILE_ARG = _arg_value('--font-file')
FONT_DIR = _resolve_font_path(_arg_value("--font-dir") or "fonts")
_FONT_FILE_ARG = _arg_value("--font-file")
_FONT_FILES = _list_font_files(FONT_DIR)
FONT_PATH = (
_resolve_font_path(_FONT_FILE_ARG)
if _FONT_FILE_ARG
else (_FONT_FILES[0] if _FONT_FILES else "")
)
FONT_INDEX = max(0, _arg_int('--font-index', 0))
FONT_PICKER = '--no-font-picker' not in sys.argv
FONT_INDEX = max(0, _arg_int("--font-index", 0))
FONT_PICKER = "--no-font-picker" not in sys.argv
FONT_SZ = 60
RENDER_H = 8 # terminal rows per rendered text line
RENDER_H = 8 # terminal rows per rendered text line
# ─── FONT RENDERING (ADVANCED) ────────────────────────────
SSAA = 4 # super-sampling factor: render at SSAA× then downsample
SSAA = 4 # super-sampling factor: render at SSAA× then downsample
# ─── SCROLL / FRAME ──────────────────────────────────────
SCROLL_DUR = 5.625 # seconds per headline (2/3 original speed)
FRAME_DT = 0.05 # 50ms base frame rate (20 FPS)
FIREHOSE_H = 12 # firehose zone height (terminal rows)
GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep)
SCROLL_DUR = 5.625 # seconds per headline (2/3 original speed)
FRAME_DT = 0.05 # 50ms base frame rate (20 FPS)
FIREHOSE_H = 12 # firehose zone height (terminal rows)
GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep)
# ─── GLYPHS ───────────────────────────────────────────────
GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
# ─── WEBSOCKET ─────────────────────────────────────────────
DISPLAY = _arg_value("--display", sys.argv) or "terminal"
WEBSOCKET = "--websocket" in sys.argv
WEBSOCKET_PORT = _arg_int("--websocket-port", 8765)
def set_font_selection(font_path=None, font_index=None):
"""Set runtime primary font selection."""

173
engine/controller.py Normal file
View File

@@ -0,0 +1,173 @@
"""
Stream controller - manages input sources and orchestrates the render stream.
"""
from engine.config import Config, get_config
from engine.display import (
DisplayRegistry,
MultiDisplay,
NullDisplay,
SixelDisplay,
TerminalDisplay,
WebSocketDisplay,
)
from engine.effects.controller import handle_effects_command
from engine.eventbus import EventBus
from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
def _get_display(config: Config):
"""Get the appropriate display based on config."""
DisplayRegistry.initialize()
display_mode = config.display.lower()
displays = []
if display_mode in ("terminal", "both"):
displays.append(TerminalDisplay())
if display_mode in ("websocket", "both"):
ws = WebSocketDisplay(host="0.0.0.0", port=config.websocket_port)
ws.start_server()
ws.start_http_server()
displays.append(ws)
if display_mode == "sixel":
displays.append(SixelDisplay())
if not displays:
return NullDisplay()
if len(displays) == 1:
return displays[0]
return MultiDisplay(displays)
class StreamController:
"""Controls the stream lifecycle - initializes sources and runs the stream."""
_topics_warmed = False
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
self.config = config or get_config()
self.event_bus = event_bus
self.mic: MicMonitor | None = None
self.ntfy: NtfyPoller | None = None
self.ntfy_cc: NtfyPoller | None = None
@classmethod
def warmup_topics(cls) -> None:
"""Warm up ntfy topics lazily (creates them if they don't exist)."""
if cls._topics_warmed:
return
import urllib.request
topics = [
"https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd",
"https://ntfy.sh/klubhaus_terminal_mainline_cc_resp",
"https://ntfy.sh/klubhaus_terminal_mainline",
]
for topic in topics:
try:
req = urllib.request.Request(
topic,
data=b"init",
headers={
"User-Agent": "mainline/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
cls._topics_warmed = True
def initialize_sources(self) -> tuple[bool, bool]:
"""Initialize microphone and ntfy sources.
Returns:
(mic_ok, ntfy_ok) - success status for each source
"""
self.mic = MicMonitor(threshold_db=self.config.mic_threshold_db)
mic_ok = self.mic.start() if self.mic.available else False
self.ntfy = NtfyPoller(
self.config.ntfy_topic,
reconnect_delay=self.config.ntfy_reconnect_delay,
display_secs=self.config.message_display_secs,
)
ntfy_ok = self.ntfy.start()
self.ntfy_cc = NtfyPoller(
self.config.ntfy_cc_cmd_topic,
reconnect_delay=self.config.ntfy_reconnect_delay,
display_secs=5,
)
self.ntfy_cc.subscribe(self._handle_cc_message)
ntfy_cc_ok = self.ntfy_cc.start()
return bool(mic_ok), ntfy_ok and ntfy_cc_ok
def _handle_cc_message(self, event) -> None:
"""Handle incoming C&C message - like a serial port control interface."""
import urllib.request
cmd = event.body.strip() if hasattr(event, "body") else str(event).strip()
if not cmd.startswith("/"):
return
response = handle_effects_command(cmd)
topic_url = self.config.ntfy_cc_resp_topic.replace("/json", "")
data = response.encode("utf-8")
req = urllib.request.Request(
topic_url,
data=data,
headers={"User-Agent": "mainline/0.1", "Content-Type": "text/plain"},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=5)
except Exception:
pass
def run(self, items: list) -> None:
"""Run the stream with initialized sources."""
if self.mic is None or self.ntfy is None:
self.initialize_sources()
if self.event_bus:
self.event_bus.publish(
EventType.STREAM_START,
StreamEvent(
event_type=EventType.STREAM_START,
headline_count=len(items),
),
)
display = _get_display(self.config)
stream(items, self.ntfy, self.mic, display)
if display:
display.cleanup()
if self.event_bus:
self.event_bus.publish(
EventType.STREAM_END,
StreamEvent(
event_type=EventType.STREAM_END,
headline_count=len(items),
),
)
def cleanup(self) -> None:
"""Clean up resources."""
if self.mic:
self.mic.stop()

102
engine/display/__init__.py Normal file
View File

@@ -0,0 +1,102 @@
"""
Display backend system with registry pattern.
Allows swapping output backends via the Display protocol.
Supports auto-discovery of display backends.
"""
from typing import Protocol
from engine.display.backends.multi import MultiDisplay
from engine.display.backends.null import NullDisplay
from engine.display.backends.sixel import SixelDisplay
from engine.display.backends.terminal import TerminalDisplay
from engine.display.backends.websocket import WebSocketDisplay
class Display(Protocol):
"""Protocol for display backends."""
width: int
height: int
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
class DisplayRegistry:
"""Registry for display backends with auto-discovery."""
_backends: dict[str, type[Display]] = {}
_initialized = False
@classmethod
def register(cls, name: str, backend_class: type[Display]) -> None:
"""Register a display backend."""
cls._backends[name.lower()] = backend_class
@classmethod
def get(cls, name: str) -> type[Display] | None:
"""Get a display backend class by name."""
return cls._backends.get(name.lower())
@classmethod
def list_backends(cls) -> list[str]:
"""List all available display backend names."""
return list(cls._backends.keys())
@classmethod
def create(cls, name: str, **kwargs) -> Display | None:
"""Create a display instance by name."""
backend_class = cls.get(name)
if backend_class:
return backend_class(**kwargs)
return None
@classmethod
def initialize(cls) -> None:
"""Initialize and register all built-in backends."""
if cls._initialized:
return
cls.register("terminal", TerminalDisplay)
cls.register("null", NullDisplay)
cls.register("websocket", WebSocketDisplay)
cls.register("sixel", SixelDisplay)
cls._initialized = True
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
__all__ = [
"Display",
"DisplayRegistry",
"get_monitor",
"TerminalDisplay",
"NullDisplay",
"WebSocketDisplay",
"SixelDisplay",
"MultiDisplay",
]

View File

@@ -0,0 +1,33 @@
"""
Multi display backend - forwards to multiple displays.
"""
class MultiDisplay:
"""Display that forwards to multiple displays."""
width: int = 80
height: int = 24
def __init__(self, displays: list):
self.displays = displays
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
for d in self.displays:
d.init(width, height)
def show(self, buffer: list[str]) -> None:
for d in self.displays:
d.show(buffer)
def clear(self) -> None:
for d in self.displays:
d.clear()
def cleanup(self) -> None:
for d in self.displays:
d.cleanup()

View File

@@ -0,0 +1,32 @@
"""
Null/headless display backend.
"""
import time
class NullDisplay:
"""Headless/null display - discards all output."""
width: int = 80
height: int = 24
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
def show(self, buffer: list[str]) -> None:
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
t0 = time.perf_counter()
chars_in = sum(len(line) for line in buffer)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass

View File

@@ -0,0 +1,269 @@
"""
Sixel graphics display backend - renders to sixel graphics in terminal.
"""
import time
def _parse_ansi(
text: str,
) -> list[tuple[str, tuple[int, int, int], tuple[int, int, int], bool]]:
"""Parse ANSI text into tokens with fg/bg colors.
Returns list of (text, fg_rgb, bg_rgb, bold).
"""
tokens = []
current_text = ""
fg = (204, 204, 204)
bg = (0, 0, 0)
bold = False
i = 0
ANSI_COLORS = {
0: (0, 0, 0),
1: (205, 49, 49),
2: (13, 188, 121),
3: (229, 229, 16),
4: (36, 114, 200),
5: (188, 63, 188),
6: (17, 168, 205),
7: (229, 229, 229),
8: (102, 102, 102),
9: (241, 76, 76),
10: (35, 209, 139),
11: (245, 245, 67),
12: (59, 142, 234),
13: (214, 112, 214),
14: (41, 184, 219),
15: (255, 255, 255),
}
while i < len(text):
char = text[i]
if char == "\x1b" and i + 1 < len(text) and text[i + 1] == "[":
if current_text:
tokens.append((current_text, fg, bg, bold))
current_text = ""
i += 2
code = ""
while i < len(text):
c = text[i]
if c.isalpha():
break
code += c
i += 1
if code:
codes = code.split(";")
for c in codes:
try:
n = int(c) if c else 0
except ValueError:
continue
if n == 0:
fg = (204, 204, 204)
bg = (0, 0, 0)
bold = False
elif n == 1:
bold = True
elif n == 22:
bold = False
elif n == 39:
fg = (204, 204, 204)
elif n == 49:
bg = (0, 0, 0)
elif 30 <= n <= 37:
fg = ANSI_COLORS.get(n - 30 + (8 if bold else 0), fg)
elif 40 <= n <= 47:
bg = ANSI_COLORS.get(n - 40, bg)
elif 90 <= n <= 97:
fg = ANSI_COLORS.get(n - 90 + 8, fg)
elif 100 <= n <= 107:
bg = ANSI_COLORS.get(n - 100 + 8, bg)
elif 1 <= n <= 256:
if n < 16:
fg = ANSI_COLORS.get(n, fg)
elif n < 232:
c = n - 16
r = (c // 36) * 51
g = ((c % 36) // 6) * 51
b = (c % 6) * 51
fg = (r, g, b)
else:
gray = (n - 232) * 10 + 8
fg = (gray, gray, gray)
else:
current_text += char
i += 1
if current_text:
tokens.append((current_text, fg, bg, bold))
return tokens if tokens else [("", fg, bg, bold)]
def _encode_sixel(image) -> str:
"""Encode a PIL Image to sixel format (pure Python)."""
img = image.convert("RGBA")
width, height = img.size
pixels = img.load()
palette = []
pixel_palette_idx = {}
def get_color_idx(r, g, b, a):
if a < 128:
return -1
key = (r // 32, g // 32, b // 32)
if key not in pixel_palette_idx:
idx = len(palette)
if idx < 256:
palette.append((r, g, b))
pixel_palette_idx[key] = idx
return pixel_palette_idx.get(key, 0)
for y in range(height):
for x in range(width):
r, g, b, a = pixels[x, y]
get_color_idx(r, g, b, a)
if not palette:
return ""
if len(palette) == 1:
palette = [palette[0], (0, 0, 0)]
sixel_data = []
sixel_data.append(
f'"{"".join(f"#{i};2;{r};{g};{b}" for i, (r, g, b) in enumerate(palette))}'
)
for x in range(width):
col_data = []
for y in range(0, height, 6):
bits = 0
color_idx = -1
for dy in range(6):
if y + dy < height:
r, g, b, a = pixels[x, y + dy]
if a >= 128:
bits |= 1 << dy
idx = get_color_idx(r, g, b, a)
if color_idx == -1:
color_idx = idx
elif color_idx != idx:
color_idx = -2
if color_idx >= 0:
col_data.append(
chr(63 + color_idx) + chr(63 + bits)
if bits
else chr(63 + color_idx) + "?"
)
elif color_idx == -2:
pass
if col_data:
sixel_data.append("".join(col_data) + "$")
else:
sixel_data.append("-" if x < width - 1 else "$")
sixel_data.append("\x1b\\")
return "\x1bPq" + "".join(sixel_data)
class SixelDisplay:
"""Sixel graphics display backend - renders to sixel graphics in terminal."""
width: int = 80
height: int = 24
def __init__(self, cell_width: int = 9, cell_height: int = 16):
self.width = 80
self.height = 24
self.cell_width = cell_width
self.cell_height = cell_height
self._initialized = False
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
self._initialized = True
def show(self, buffer: list[str]) -> None:
import sys
t0 = time.perf_counter()
img_width = self.width * self.cell_width
img_height = self.height * self.cell_height
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
return
img = Image.new("RGBA", (img_width, img_height), (0, 0, 0, 255))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf",
self.cell_height - 2,
)
except Exception:
try:
font = ImageFont.load_default()
except Exception:
font = None
for row_idx, line in enumerate(buffer[: self.height]):
if row_idx >= self.height:
break
tokens = _parse_ansi(line)
x_pos = 0
y_pos = row_idx * self.cell_height
for text, fg, bg, bold in tokens:
if not text:
continue
if bg != (0, 0, 0):
bbox = draw.textbbox((x_pos, y_pos), text, font=font)
draw.rectangle(bbox, fill=(*bg, 255))
if bold and font:
draw.text((x_pos - 1, y_pos - 1), text, fill=(*fg, 255), font=font)
draw.text((x_pos, y_pos), text, fill=(*fg, 255), font=font)
if font:
x_pos += draw.textlength(text, font=font)
sixel = _encode_sixel(img)
sys.stdout.buffer.write(sixel.encode("utf-8"))
sys.stdout.flush()
elapsed_ms = (time.perf_counter() - t0) * 1000
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("sixel_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
import sys
sys.stdout.buffer.write(b"\x1b[2J\x1b[H")
sys.stdout.flush()
def cleanup(self) -> None:
pass

View File

@@ -0,0 +1,48 @@
"""
ANSI terminal display backend.
"""
import time
class TerminalDisplay:
"""ANSI terminal display backend."""
width: int = 80
height: int = 24
def __init__(self):
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
from engine.terminal import CURSOR_OFF
self.width = width
self.height = height
print(CURSOR_OFF, end="", flush=True)
def show(self, buffer: list[str]) -> None:
import sys
t0 = time.perf_counter()
sys.stdout.buffer.write("".join(buffer).encode())
sys.stdout.flush()
elapsed_ms = (time.perf_counter() - t0) * 1000
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
from engine.terminal import CLR
print(CLR, end="", flush=True)
def cleanup(self) -> None:
from engine.terminal import CURSOR_ON
print(CURSOR_ON, end="", flush=True)

View File

@@ -0,0 +1,266 @@
"""
WebSocket display backend - broadcasts frame buffer to connected web clients.
"""
import asyncio
import json
import threading
import time
from typing import Protocol
try:
import websockets
except ImportError:
websockets = None
class Display(Protocol):
"""Protocol for display backends."""
width: int
height: int
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
class WebSocketDisplay:
"""WebSocket display backend - broadcasts to HTML Canvas clients."""
width: int = 80
height: int = 24
def __init__(
self,
host: str = "0.0.0.0",
port: int = 8765,
http_port: int = 8766,
):
self.host = host
self.port = port
self.http_port = http_port
self.width = 80
self.height = 24
self._clients: set = set()
self._server_running = False
self._http_running = False
self._server_thread: threading.Thread | None = None
self._http_thread: threading.Thread | None = None
self._available = True
self._max_clients = 10
self._client_connected_callback = None
self._client_disconnected_callback = None
self._frame_delay = 0.0
try:
import websockets as _ws
self._available = _ws is not None
except ImportError:
self._available = False
def is_available(self) -> bool:
"""Check if WebSocket support is available."""
return self._available
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions and start server."""
self.width = width
self.height = height
self.start_server()
self.start_http_server()
def show(self, buffer: list[str]) -> None:
"""Broadcast buffer to all connected clients."""
t0 = time.perf_counter()
if self._clients:
frame_data = {
"type": "frame",
"width": self.width,
"height": self.height,
"lines": buffer,
}
message = json.dumps(frame_data)
disconnected = set()
for client in list(self._clients):
try:
asyncio.run(client.send(message))
except Exception:
disconnected.add(client)
for client in disconnected:
self._clients.discard(client)
if self._client_disconnected_callback:
self._client_disconnected_callback(client)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("websocket_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
"""Broadcast clear command to all clients."""
if self._clients:
clear_data = {"type": "clear"}
message = json.dumps(clear_data)
for client in list(self._clients):
try:
asyncio.run(client.send(message))
except Exception:
pass
def cleanup(self) -> None:
"""Stop the servers."""
self.stop_server()
self.stop_http_server()
async def _websocket_handler(self, websocket):
"""Handle WebSocket connections."""
if len(self._clients) >= self._max_clients:
await websocket.close()
return
self._clients.add(websocket)
if self._client_connected_callback:
self._client_connected_callback(websocket)
try:
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "resize":
self.width = data.get("width", 80)
self.height = data.get("height", 24)
except json.JSONDecodeError:
pass
except Exception:
pass
finally:
self._clients.discard(websocket)
if self._client_disconnected_callback:
self._client_disconnected_callback(websocket)
async def _run_websocket_server(self):
"""Run the WebSocket server."""
async with websockets.serve(self._websocket_handler, self.host, self.port):
while self._server_running:
await asyncio.sleep(0.1)
async def _run_http_server(self):
"""Run simple HTTP server for the client."""
import os
from http.server import HTTPServer, SimpleHTTPRequestHandler
client_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "client"
)
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=client_dir, **kwargs)
def log_message(self, format, *args):
pass
httpd = HTTPServer((self.host, self.http_port), Handler)
while self._http_running:
httpd.handle_request()
def _run_async(self, coro):
"""Run coroutine in background."""
try:
asyncio.run(coro)
except Exception as e:
print(f"WebSocket async error: {e}")
def start_server(self):
"""Start the WebSocket server in a background thread."""
if not self._available:
return
if self._server_thread is not None:
return
self._server_running = True
self._server_thread = threading.Thread(
target=self._run_async, args=(self._run_websocket_server(),), daemon=True
)
self._server_thread.start()
def stop_server(self):
"""Stop the WebSocket server."""
self._server_running = False
self._server_thread = None
def start_http_server(self):
"""Start the HTTP server in a background thread."""
if not self._available:
return
if self._http_thread is not None:
return
self._http_running = True
self._http_running = True
self._http_thread = threading.Thread(
target=self._run_async, args=(self._run_http_server(),), daemon=True
)
self._http_thread.start()
def stop_http_server(self):
"""Stop the HTTP server."""
self._http_running = False
self._http_thread = None
def client_count(self) -> int:
"""Return number of connected clients."""
return len(self._clients)
def get_ws_port(self) -> int:
"""Return WebSocket port."""
return self.port
def get_http_port(self) -> int:
"""Return HTTP port."""
return self.http_port
def set_frame_delay(self, delay: float) -> None:
"""Set delay between frames in seconds."""
self._frame_delay = delay
def get_frame_delay(self) -> float:
"""Get delay between frames."""
return self._frame_delay
def set_client_connected_callback(self, callback) -> None:
"""Set callback for client connections."""
self._client_connected_callback = callback
def set_client_disconnected_callback(self, callback) -> None:
"""Set callback for client disconnections."""
self._client_disconnected_callback = callback

View File

@@ -0,0 +1,42 @@
from engine.effects.chain import EffectChain
from engine.effects.controller import handle_effects_command, show_effects_menu
from engine.effects.legacy import (
fade_line,
firehose_line,
glitch_bar,
next_headline,
noise,
vis_trunc,
)
from engine.effects.performance import PerformanceMonitor, get_monitor, set_monitor
from engine.effects.registry import EffectRegistry, get_registry, set_registry
from engine.effects.types import EffectConfig, EffectContext, PipelineConfig
def get_effect_chain():
from engine.layers import get_effect_chain as _chain
return _chain()
__all__ = [
"EffectChain",
"EffectRegistry",
"EffectConfig",
"EffectContext",
"PipelineConfig",
"get_registry",
"set_registry",
"get_effect_chain",
"get_monitor",
"set_monitor",
"PerformanceMonitor",
"handle_effects_command",
"show_effects_menu",
"fade_line",
"firehose_line",
"glitch_bar",
"noise",
"next_headline",
"vis_trunc",
]

71
engine/effects/chain.py Normal file
View File

@@ -0,0 +1,71 @@
import time
from engine.effects.performance import PerformanceMonitor, get_monitor
from engine.effects.registry import EffectRegistry
from engine.effects.types import EffectContext
class EffectChain:
def __init__(
self, registry: EffectRegistry, monitor: PerformanceMonitor | None = None
):
self._registry = registry
self._order: list[str] = []
self._monitor = monitor
def _get_monitor(self) -> PerformanceMonitor:
if self._monitor is not None:
return self._monitor
return get_monitor()
def set_order(self, names: list[str]) -> None:
self._order = list(names)
def get_order(self) -> list[str]:
return self._order.copy()
def add_effect(self, name: str, position: int | None = None) -> bool:
if name not in self._registry.list_all():
return False
if position is None:
self._order.append(name)
else:
self._order.insert(position, name)
return True
def remove_effect(self, name: str) -> bool:
if name in self._order:
self._order.remove(name)
return True
return False
def reorder(self, new_order: list[str]) -> bool:
all_plugins = set(self._registry.list_all().keys())
if not all(name in all_plugins for name in new_order):
return False
self._order = list(new_order)
return True
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
monitor = self._get_monitor()
frame_number = ctx.frame_number
monitor.start_frame(frame_number)
frame_start = time.perf_counter()
result = list(buf)
for name in self._order:
plugin = self._registry.get(name)
if plugin and plugin.config.enabled:
chars_in = sum(len(line) for line in result)
effect_start = time.perf_counter()
try:
result = plugin.process(result, ctx)
except Exception:
plugin.config.enabled = False
elapsed = time.perf_counter() - effect_start
chars_out = sum(len(line) for line in result)
monitor.record_effect(name, elapsed * 1000, chars_in, chars_out)
total_elapsed = time.perf_counter() - frame_start
monitor.end_frame(frame_number, total_elapsed * 1000)
return result

View File

@@ -0,0 +1,144 @@
from engine.effects.performance import get_monitor
from engine.effects.registry import get_registry
_effect_chain_ref = None
def _get_effect_chain():
global _effect_chain_ref
if _effect_chain_ref is not None:
return _effect_chain_ref
try:
from engine.layers import get_effect_chain as _chain
return _chain()
except Exception:
return None
def set_effect_chain_ref(chain) -> None:
global _effect_chain_ref
_effect_chain_ref = chain
def handle_effects_command(cmd: str) -> str:
"""Handle /effects command from NTFY message.
Commands:
/effects list - list all effects and their status
/effects <name> on - enable an effect
/effects <name> off - disable an effect
/effects <name> intensity <0.0-1.0> - set intensity
/effects reorder <name1>,<name2>,... - reorder pipeline
/effects stats - show performance statistics
"""
parts = cmd.strip().split()
if not parts or parts[0] != "/effects":
return "Unknown command"
registry = get_registry()
chain = _get_effect_chain()
if len(parts) == 1 or parts[1] == "list":
result = ["Effects:"]
for name, plugin in registry.list_all().items():
status = "ON" if plugin.config.enabled else "OFF"
intensity = plugin.config.intensity
result.append(f" {name}: {status} (intensity={intensity})")
if chain:
result.append(f"Order: {chain.get_order()}")
return "\n".join(result)
if parts[1] == "stats":
return _format_stats()
if parts[1] == "reorder" and len(parts) >= 3:
new_order = parts[2].split(",")
if chain and chain.reorder(new_order):
return f"Reordered pipeline: {new_order}"
return "Failed to reorder pipeline"
if len(parts) < 3:
return "Usage: /effects <name> on|off|intensity <value>"
effect_name = parts[1]
action = parts[2]
if effect_name not in registry.list_all():
return f"Unknown effect: {effect_name}"
if action == "on":
registry.enable(effect_name)
return f"Enabled: {effect_name}"
if action == "off":
registry.disable(effect_name)
return f"Disabled: {effect_name}"
if action == "intensity" and len(parts) >= 4:
try:
value = float(parts[3])
if not 0.0 <= value <= 1.0:
return "Intensity must be between 0.0 and 1.0"
plugin = registry.get(effect_name)
if plugin:
plugin.config.intensity = value
return f"Set {effect_name} intensity to {value}"
except ValueError:
return "Invalid intensity value"
return f"Unknown action: {action}"
def _format_stats() -> str:
monitor = get_monitor()
stats = monitor.get_stats()
if "error" in stats:
return stats["error"]
lines = ["Performance Stats:"]
pipeline = stats["pipeline"]
lines.append(
f" Pipeline: avg={pipeline['avg_ms']:.2f}ms min={pipeline['min_ms']:.2f}ms max={pipeline['max_ms']:.2f}ms (over {stats['frame_count']} frames)"
)
if stats["effects"]:
lines.append(" Per-effect (avg ms):")
for name, effect_stats in stats["effects"].items():
lines.append(
f" {name}: avg={effect_stats['avg_ms']:.2f}ms min={effect_stats['min_ms']:.2f}ms max={effect_stats['max_ms']:.2f}ms"
)
return "\n".join(lines)
def show_effects_menu() -> str:
"""Generate effects menu text for display."""
registry = get_registry()
chain = _get_effect_chain()
lines = [
"\033[1;38;5;231m=== EFFECTS MENU ===\033[0m",
"",
"Effects:",
]
for name, plugin in registry.list_all().items():
status = "ON" if plugin.config.enabled else "OFF"
intensity = plugin.config.intensity
lines.append(f" [{status:3}] {name}: intensity={intensity:.2f}")
if chain:
lines.append("")
lines.append(f"Pipeline order: {' -> '.join(chain.get_order())}")
lines.append("")
lines.append("Controls:")
lines.append(" /effects <name> on|off")
lines.append(" /effects <name> intensity <0.0-1.0>")
lines.append(" /effects reorder name1,name2,...")
lines.append("")
return "\n".join(lines)

View File

@@ -7,8 +7,8 @@ import random
from datetime import datetime
from engine import config
from engine.terminal import RST, DIM, G_LO, G_DIM, W_GHOST, C_DIM
from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST, W_GHOST
def noise(w):
@@ -34,23 +34,23 @@ def fade_line(s, fade):
if fade >= 1.0:
return s
if fade <= 0.0:
return ''
return ""
result = []
i = 0
while i < len(s):
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[':
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i:j + 1])
result.append(s[i : j + 1])
i = j + 1
elif s[i] == ' ':
result.append(' ')
elif s[i] == " ":
result.append(" ")
i += 1
else:
result.append(s[i] if random.random() < fade else ' ')
result.append(s[i] if random.random() < fade else " ")
i += 1
return ''.join(result)
return "".join(result)
def vis_trunc(s, w):
@@ -61,17 +61,17 @@ def vis_trunc(s, w):
while i < len(s):
if vw >= w:
break
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[':
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i:j + 1])
result.append(s[i : j + 1])
i = j + 1
else:
result.append(s[i])
vw += 1
i += 1
return ''.join(result)
return "".join(result)
def next_headline(pool, items, seen):
@@ -94,7 +94,7 @@ def firehose_line(items, w):
if r < 0.35:
# Raw headline text
title, src, ts = random.choice(items)
text = title[:w - 1]
text = title[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
return f"{color}{text}{RST}"
elif r < 0.55:
@@ -103,12 +103,13 @@ def firehose_line(items, w):
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d else " "
if random.random() < d
else " "
for _ in range(w)
)
elif r < 0.78:
# Status / program output
sources = FEEDS if config.MODE == 'news' else POETRY_SOURCES
sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
src = random.choice(list(sources.keys()))
msgs = [
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
@@ -118,16 +119,16 @@ def firehose_line(items, w):
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
]
text = random.choice(msgs)[:w - 1]
text = random.choice(msgs)[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST])
return f"{color}{text}{RST}"
else:
# Headline fragment with glitch prefix
title, _, _ = random.choice(items)
start = random.randint(0, max(0, len(title) - 20))
frag = title[start:start + random.randint(10, 35)]
frag = title[start : start + random.randint(10, 35)]
pad = random.randint(0, max(0, w - len(frag) - 8))
gp = ''.join(random.choice(config.GLITCH) for _ in range(random.randint(1, 3)))
text = (' ' * pad + gp + ' ' + frag)[:w - 1]
gp = "".join(random.choice(config.GLITCH) for _ in range(random.randint(1, 3)))
text = (" " * pad + gp + " " + frag)[: w - 1]
color = random.choice([G_LO, C_DIM, W_GHOST])
return f"{color}{text}{RST}"

View File

@@ -0,0 +1,103 @@
from collections import deque
from dataclasses import dataclass
@dataclass
class EffectTiming:
name: str
duration_ms: float
buffer_chars_in: int
buffer_chars_out: int
@dataclass
class FrameTiming:
frame_number: int
total_ms: float
effects: list[EffectTiming]
class PerformanceMonitor:
"""Collects and stores performance metrics for effect pipeline."""
def __init__(self, max_frames: int = 60):
self._max_frames = max_frames
self._frames: deque[FrameTiming] = deque(maxlen=max_frames)
self._current_frame: list[EffectTiming] = []
def start_frame(self, frame_number: int) -> None:
self._current_frame = []
def record_effect(
self, name: str, duration_ms: float, chars_in: int, chars_out: int
) -> None:
self._current_frame.append(
EffectTiming(
name=name,
duration_ms=duration_ms,
buffer_chars_in=chars_in,
buffer_chars_out=chars_out,
)
)
def end_frame(self, frame_number: int, total_ms: float) -> None:
self._frames.append(
FrameTiming(
frame_number=frame_number,
total_ms=total_ms,
effects=self._current_frame,
)
)
def get_stats(self) -> dict:
if not self._frames:
return {"error": "No timing data available"}
total_times = [f.total_ms for f in self._frames]
avg_total = sum(total_times) / len(total_times)
min_total = min(total_times)
max_total = max(total_times)
effect_stats: dict[str, dict] = {}
for frame in self._frames:
for effect in frame.effects:
if effect.name not in effect_stats:
effect_stats[effect.name] = {"times": [], "total_chars": 0}
effect_stats[effect.name]["times"].append(effect.duration_ms)
effect_stats[effect.name]["total_chars"] += effect.buffer_chars_out
for name, stats in effect_stats.items():
times = stats["times"]
stats["avg_ms"] = sum(times) / len(times)
stats["min_ms"] = min(times)
stats["max_ms"] = max(times)
del stats["times"]
return {
"frame_count": len(self._frames),
"pipeline": {
"avg_ms": avg_total,
"min_ms": min_total,
"max_ms": max_total,
},
"effects": effect_stats,
}
def reset(self) -> None:
self._frames.clear()
self._current_frame = []
_monitor: PerformanceMonitor | None = None
def get_monitor() -> PerformanceMonitor:
global _monitor
if _monitor is None:
_monitor = PerformanceMonitor()
return _monitor
def set_monitor(monitor: PerformanceMonitor) -> None:
global _monitor
_monitor = monitor

View File

@@ -0,0 +1,59 @@
from engine.effects.types import EffectConfig, EffectPlugin
class EffectRegistry:
def __init__(self):
self._plugins: dict[str, EffectPlugin] = {}
self._discovered: bool = False
def register(self, plugin: EffectPlugin) -> None:
self._plugins[plugin.name] = plugin
def get(self, name: str) -> EffectPlugin | None:
return self._plugins.get(name)
def list_all(self) -> dict[str, EffectPlugin]:
return self._plugins.copy()
def list_enabled(self) -> list[EffectPlugin]:
return [p for p in self._plugins.values() if p.config.enabled]
def enable(self, name: str) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.config.enabled = True
return True
return False
def disable(self, name: str) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.config.enabled = False
return True
return False
def configure(self, name: str, config: EffectConfig) -> bool:
plugin = self._plugins.get(name)
if plugin:
plugin.configure(config)
return True
return False
def is_enabled(self, name: str) -> bool:
plugin = self._plugins.get(name)
return plugin.config.enabled if plugin else False
_registry: EffectRegistry | None = None
def get_registry() -> EffectRegistry:
global _registry
if _registry is None:
_registry = EffectRegistry()
return _registry
def set_registry(registry: EffectRegistry) -> None:
global _registry
_registry = registry

39
engine/effects/types.py Normal file
View File

@@ -0,0 +1,39 @@
from dataclasses import dataclass, field
from typing import Any
@dataclass
class EffectContext:
terminal_width: int
terminal_height: int
scroll_cam: int
ticker_height: int
mic_excess: float
grad_offset: float
frame_number: int
has_message: bool
items: list = field(default_factory=list)
@dataclass
class EffectConfig:
enabled: bool = True
intensity: float = 1.0
params: dict[str, Any] = field(default_factory=dict)
class EffectPlugin:
name: str
config: EffectConfig
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
raise NotImplementedError
def configure(self, config: EffectConfig) -> None:
raise NotImplementedError
@dataclass
class PipelineConfig:
order: list[str] = field(default_factory=list)
effects: dict[str, EffectConfig] = field(default_factory=dict)

25
engine/emitters.py Normal file
View File

@@ -0,0 +1,25 @@
"""
Event emitter protocols - abstract interfaces for event-producing components.
"""
from collections.abc import Callable
from typing import Any, Protocol
class EventEmitter(Protocol):
"""Protocol for components that emit events."""
def subscribe(self, callback: Callable[[Any], None]) -> None: ...
def unsubscribe(self, callback: Callable[[Any], None]) -> None: ...
class Startable(Protocol):
"""Protocol for components that can be started."""
def start(self) -> Any: ...
class Stoppable(Protocol):
"""Protocol for components that can be stopped."""
def stop(self) -> None: ...

72
engine/eventbus.py Normal file
View File

@@ -0,0 +1,72 @@
"""
Event bus - pub/sub messaging for decoupled component communication.
"""
import threading
from collections import defaultdict
from collections.abc import Callable
from typing import Any
from engine.events import EventType
class EventBus:
"""Thread-safe event bus for publish-subscribe messaging."""
def __init__(self):
self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict(
list
)
self._lock = threading.Lock()
def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None:
"""Register a callback for a specific event type."""
with self._lock:
self._subscribers[event_type].append(callback)
def unsubscribe(
self, event_type: EventType, callback: Callable[[Any], None]
) -> None:
"""Remove a callback for a specific event type."""
with self._lock:
if callback in self._subscribers[event_type]:
self._subscribers[event_type].remove(callback)
def publish(self, event_type: EventType, event: Any = None) -> None:
"""Publish an event to all subscribers."""
with self._lock:
callbacks = list(self._subscribers.get(event_type, []))
for callback in callbacks:
try:
callback(event)
except Exception:
pass
def clear(self) -> None:
"""Remove all subscribers."""
with self._lock:
self._subscribers.clear()
def subscriber_count(self, event_type: EventType | None = None) -> int:
"""Get subscriber count for an event type, or total if None."""
with self._lock:
if event_type is None:
return sum(len(cb) for cb in self._subscribers.values())
return len(self._subscribers.get(event_type, []))
_event_bus: EventBus | None = None
def get_event_bus() -> EventBus:
"""Get the global event bus instance."""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
def set_event_bus(bus: EventBus) -> None:
"""Set the global event bus instance (for testing)."""
global _event_bus
_event_bus = bus

67
engine/events.py Normal file
View File

@@ -0,0 +1,67 @@
"""
Event types for the mainline application.
Defines the core events that flow through the system.
These types support a future migration to an event-driven architecture.
"""
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, auto
class EventType(Enum):
"""Core event types in the mainline application."""
NEW_HEADLINE = auto()
FRAME_TICK = auto()
MIC_LEVEL = auto()
NTFY_MESSAGE = auto()
STREAM_START = auto()
STREAM_END = auto()
@dataclass
class HeadlineEvent:
"""Event emitted when a new headline is ready for display."""
title: str
source: str
timestamp: str
language: str | None = None
@dataclass
class FrameTickEvent:
"""Event emitted on each render frame."""
frame_number: int
timestamp: datetime
delta_seconds: float
@dataclass
class MicLevelEvent:
"""Event emitted when microphone level changes significantly."""
db_level: float
excess_above_threshold: float
timestamp: datetime
@dataclass
class NtfyMessageEvent:
"""Event emitted when an ntfy message is received."""
title: str
body: str
message_id: str | None = None
timestamp: datetime | None = None
@dataclass
class StreamEvent:
"""Event emitted when stream starts or ends."""
event_type: EventType
headline_count: int = 0
timestamp: datetime | None = None

View File

@@ -3,21 +3,27 @@ RSS feed fetching, Project Gutenberg parsing, and headline caching.
Depends on: config, sources, filter, terminal.
"""
import re
import json
import pathlib
import re
import urllib.request
from datetime import datetime
from typing import Any
import feedparser
from engine import config
from engine.filter import skip, strip_tags
from engine.sources import FEEDS, POETRY_SOURCES
from engine.filter import strip_tags, skip
from engine.terminal import boot_ln
# Type alias for headline items
HeadlineTuple = tuple[str, str, str]
# ─── SINGLE FEED ──────────────────────────────────────────
def fetch_feed(url):
def fetch_feed(url: str) -> Any | None:
"""Fetch and parse a single RSS feed URL."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT)
@@ -27,8 +33,9 @@ def fetch_feed(url):
# ─── ALL RSS FEEDS ────────────────────────────────────────
def fetch_all():
items = []
def fetch_all() -> tuple[list[HeadlineTuple], int, int]:
"""Fetch all RSS feeds and return items, linked count, failed count."""
items: list[HeadlineTuple] = []
linked = failed = 0
for src, url in FEEDS.items():
feed = fetch_feed(url)
@@ -58,31 +65,36 @@ def fetch_all():
# ─── PROJECT GUTENBERG ────────────────────────────────────
def _fetch_gutenberg(url, label):
def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]:
"""Download and parse stanzas/passages from a Project Gutenberg text."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=15)
text = resp.read().decode('utf-8', errors='replace').replace('\r\n', '\n').replace('\r', '\n')
text = (
resp.read()
.decode("utf-8", errors="replace")
.replace("\r\n", "\n")
.replace("\r", "\n")
)
# Strip PG boilerplate
m = re.search(r'\*\*\*\s*START OF[^\n]*\n', text)
m = re.search(r"\*\*\*\s*START OF[^\n]*\n", text)
if m:
text = text[m.end():]
m = re.search(r'\*\*\*\s*END OF', text)
text = text[m.end() :]
m = re.search(r"\*\*\*\s*END OF", text)
if m:
text = text[:m.start()]
text = text[: m.start()]
# Split on blank lines into stanzas/passages
blocks = re.split(r'\n{2,}', text.strip())
blocks = re.split(r"\n{2,}", text.strip())
items = []
for blk in blocks:
blk = ' '.join(blk.split()) # flatten to one line
blk = " ".join(blk.split()) # flatten to one line
if len(blk) < 20 or len(blk) > 280:
continue
if blk.isupper(): # skip all-caps headers
if blk.isupper(): # skip all-caps headers
continue
if re.match(r'^[IVXLCDM]+\.?\s*$', blk): # roman numerals
if re.match(r"^[IVXLCDM]+\.?\s*$", blk): # roman numerals
continue
items.append((blk, label, ''))
items.append((blk, label, ""))
return items
except Exception:
return []

View File

@@ -29,29 +29,29 @@ def strip_tags(html):
# ─── CONTENT FILTER ───────────────────────────────────────
_SKIP_RE = re.compile(
r'\b(?:'
r"\b(?:"
# ── sports ──
r'football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|'
r'hockey|lacrosse|volleyball|badminton|'
r'nba|nfl|nhl|mlb|mls|fifa|uefa|'
r'premier league|champions league|la liga|serie a|bundesliga|'
r'world cup|super bowl|world series|stanley cup|'
r'playoff|playoffs|touchdown|goalkeeper|striker|quarterback|'
r'slam dunk|home run|grand slam|offside|halftime|'
r'batting|wicket|innings|'
r'formula 1|nascar|motogp|'
r'boxing|ufc|mma|'
r'marathon|tour de france|'
r'transfer window|draft pick|relegation|'
r"football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|"
r"hockey|lacrosse|volleyball|badminton|"
r"nba|nfl|nhl|mlb|mls|fifa|uefa|"
r"premier league|champions league|la liga|serie a|bundesliga|"
r"world cup|super bowl|world series|stanley cup|"
r"playoff|playoffs|touchdown|goalkeeper|striker|quarterback|"
r"slam dunk|home run|grand slam|offside|halftime|"
r"batting|wicket|innings|"
r"formula 1|nascar|motogp|"
r"boxing|ufc|mma|"
r"marathon|tour de france|"
r"transfer window|draft pick|relegation|"
# ── vapid / insipid ──
r'kardashian|jenner|reality tv|reality show|'
r'influencer|viral video|tiktok|instagram|'
r'best dressed|worst dressed|red carpet|'
r'horoscope|zodiac|gossip|bikini|selfie|'
r'you won.t believe|what happened next|'
r'celebrity couple|celebrity feud|baby bump'
r')\b',
re.IGNORECASE
r"kardashian|jenner|reality tv|reality show|"
r"influencer|viral video|tiktok|instagram|"
r"best dressed|worst dressed|red carpet|"
r"horoscope|zodiac|gossip|bikini|selfie|"
r"you won.t believe|what happened next|"
r"celebrity couple|celebrity feud|baby bump"
r")\b",
re.IGNORECASE,
)

57
engine/frame.py Normal file
View File

@@ -0,0 +1,57 @@
"""
Frame timing utilities — FPS control and precise timing.
"""
import time
class FrameTimer:
"""Frame timer for consistent render loop timing."""
def __init__(self, target_frame_dt: float = 0.05):
self.target_frame_dt = target_frame_dt
self._frame_count = 0
self._start_time = time.monotonic()
self._last_frame_time = self._start_time
@property
def fps(self) -> float:
"""Current FPS based on elapsed frames."""
elapsed = time.monotonic() - self._start_time
if elapsed > 0:
return self._frame_count / elapsed
return 0.0
def sleep_until_next_frame(self) -> float:
"""Sleep to maintain target frame rate. Returns actual elapsed time."""
now = time.monotonic()
elapsed = now - self._last_frame_time
self._last_frame_time = now
self._frame_count += 1
sleep_time = max(0, self.target_frame_dt - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
return elapsed
def reset(self) -> None:
"""Reset frame counter and start time."""
self._frame_count = 0
self._start_time = time.monotonic()
self._last_frame_time = self._start_time
def calculate_scroll_step(
scroll_dur: float, view_height: int, padding: int = 15
) -> float:
"""Calculate scroll step interval for smooth scrolling.
Args:
scroll_dur: Duration in seconds for one headline to scroll through view
view_height: Terminal height in rows
padding: Extra rows for off-screen content
Returns:
Time in seconds between scroll steps
"""
return scroll_dur / (view_height + padding) * 2

260
engine/layers.py Normal file
View File

@@ -0,0 +1,260 @@
"""
Layer compositing — message overlay, ticker zone, firehose, noise.
Depends on: config, render, effects.
"""
import random
import re
import time
from datetime import datetime
from engine import config
from engine.effects import (
EffectChain,
EffectContext,
fade_line,
firehose_line,
glitch_bar,
noise,
vis_trunc,
)
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite
from engine.terminal import RST, W_COOL
MSG_META = "\033[38;5;245m"
MSG_BORDER = "\033[2;38;5;37m"
def render_message_overlay(
msg: tuple[str, str, float] | None,
w: int,
h: int,
msg_cache: tuple,
) -> tuple[list[str], tuple]:
"""Render ntfy message overlay.
Args:
msg: (title, body, timestamp) or None
w: terminal width
h: terminal height
msg_cache: (cache_key, rendered_rows) for caching
Returns:
(list of ANSI strings, updated cache)
"""
overlay = []
if msg is None:
return overlay, msg_cache
m_title, m_body, m_ts = msg
display_text = m_body or m_title or "(empty)"
display_text = re.sub(r"\s+", " ", display_text.upper())
cache_key = (display_text, w)
if msg_cache[0] != cache_key:
msg_rows = big_wrap(display_text, w - 4)
msg_cache = (cache_key, msg_rows)
else:
msg_rows = msg_cache[1]
msg_rows = lr_gradient_opposite(
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
)
elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
ts_str = datetime.now().strftime("%H:%M:%S")
panel_h = len(msg_rows) + 2
panel_top = max(0, (h - panel_h) // 2)
row_idx = 0
for mr in msg_rows:
ln = vis_trunc(mr, w)
overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K")
row_idx += 1
meta_parts = []
if m_title and m_title != m_body:
meta_parts.append(m_title)
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
meta = (
" " + " \u00b7 ".join(meta_parts)
if len(meta_parts) > 1
else " " + meta_parts[0]
)
overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K")
row_idx += 1
bar = "\u2500" * (w - 4)
overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K")
return overlay, msg_cache
def render_ticker_zone(
active: list,
scroll_cam: int,
ticker_h: int,
w: int,
noise_cache: dict,
grad_offset: float,
) -> tuple[list[str], dict]:
"""Render the ticker scroll zone.
Args:
active: list of (content_rows, color, canvas_y, meta_idx)
scroll_cam: camera position (viewport top)
ticker_h: height of ticker zone
w: terminal width
noise_cache: dict of cy -> noise string
grad_offset: gradient animation offset
Returns:
(list of ANSI strings, updated noise_cache)
"""
buf = []
top_zone = max(1, int(ticker_h * 0.25))
bot_zone = max(1, int(ticker_h * 0.10))
def noise_at(cy):
if cy not in noise_cache:
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
return noise_cache[cy]
for r in range(ticker_h):
scr_row = r + 1
cy = scroll_cam + r
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
row_fade = min(top_f, bot_f)
drawn = False
for content, hc, by, midx in active:
cr = cy - by
if 0 <= cr < len(content):
raw = content[cr]
if cr != midx:
colored = lr_gradient([raw], grad_offset)[0]
else:
colored = raw
ln = vis_trunc(colored, w)
if row_fade < 1.0:
ln = fade_line(ln, row_fade)
if cr == midx:
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
elif ln.strip():
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
else:
buf.append(f"\033[{scr_row};1H\033[K")
drawn = True
break
if not drawn:
n = noise_at(cy)
if row_fade < 1.0 and n:
n = fade_line(n, row_fade)
if n:
buf.append(f"\033[{scr_row};1H{n}")
else:
buf.append(f"\033[{scr_row};1H\033[K")
return buf, noise_cache
def apply_glitch(
buf: list[str],
ticker_buf_start: int,
mic_excess: float,
w: int,
) -> list[str]:
"""Apply glitch effect to ticker buffer.
Args:
buf: current buffer
ticker_buf_start: index where ticker starts in buffer
mic_excess: mic level above threshold
w: terminal width
Returns:
Updated buffer with glitches applied
"""
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
n_hits = 4 + int(mic_excess / 2)
ticker_buf_len = len(buf) - ticker_buf_start
if random.random() < glitch_prob and ticker_buf_len > 0:
for _ in range(min(n_hits, ticker_buf_len)):
gi = random.randint(0, ticker_buf_len - 1)
scr_row = gi + 1
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
return buf
def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
"""Render firehose strip at bottom of screen."""
buf = []
if fh > 0:
for fr in range(fh):
scr_row = h - fh + fr + 1
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
return buf
_effect_chain = None
def init_effects() -> None:
"""Initialize effect plugins and chain."""
global _effect_chain
from engine.effects import EffectChain, get_registry
registry = get_registry()
import effects_plugins
effects_plugins.discover_plugins()
chain = EffectChain(registry)
chain.set_order(["noise", "fade", "glitch", "firehose"])
_effect_chain = chain
def process_effects(
buf: list[str],
w: int,
h: int,
scroll_cam: int,
ticker_h: int,
mic_excess: float,
grad_offset: float,
frame_number: int,
has_message: bool,
items: list,
) -> list[str]:
"""Process buffer through effect chain."""
if _effect_chain is None:
init_effects()
ctx = EffectContext(
terminal_width=w,
terminal_height=h,
scroll_cam=scroll_cam,
ticker_height=ticker_h,
mic_excess=mic_excess,
grad_offset=grad_offset,
frame_number=frame_number,
has_message=has_message,
items=items,
)
return _effect_chain.process(buf, ctx)
def get_effect_chain() -> EffectChain | None:
"""Get the effect chain instance."""
global _effect_chain
if _effect_chain is None:
init_effects()
return _effect_chain

View File

@@ -4,15 +4,21 @@ Gracefully degrades if sounddevice/numpy are unavailable.
"""
import atexit
from collections.abc import Callable
from datetime import datetime
try:
import sounddevice as _sd
import numpy as _np
import sounddevice as _sd
_HAS_MIC = True
except Exception:
_HAS_MIC = False
from engine.events import MicLevelEvent
class MicMonitor:
"""Background mic stream that exposes current RMS dB level."""
@@ -20,6 +26,7 @@ class MicMonitor:
self.threshold_db = threshold_db
self._db = -99.0
self._stream = None
self._subscribers: list[Callable[[MicLevelEvent], None]] = []
@property
def available(self):
@@ -36,16 +43,43 @@ class MicMonitor:
"""dB above threshold (clamped to 0)."""
return max(0.0, self._db - self.threshold_db)
def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Register a callback to be called when mic level changes."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: MicLevelEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background mic stream. Returns True on success, False/None otherwise."""
if not _HAS_MIC:
return None
def _cb(indata, frames, t, status):
rms = float(_np.sqrt(_np.mean(indata ** 2)))
rms = float(_np.sqrt(_np.mean(indata**2)))
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
if self._subscribers:
event = MicLevelEvent(
db_level=self._db,
excess_above_threshold=max(0.0, self._db - self.threshold_db),
timestamp=datetime.now(),
)
self._emit(event)
try:
self._stream = _sd.InputStream(
callback=_cb, channels=1, samplerate=44100, blocksize=2048)
callback=_cb, channels=1, samplerate=44100, blocksize=2048
)
self._stream.start()
atexit.register(self.stop)
return True

View File

@@ -1,9 +1,9 @@
"""
ntfy.sh message poller — standalone, zero internal dependencies.
ntfy.sh SSE stream listener — standalone, zero internal dependencies.
Reusable by any visualizer:
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
poller = NtfyPoller("https://ntfy.sh/my_topic/json")
poller.start()
# in render loop:
msg = poller.get_active_message()
@@ -13,24 +13,47 @@ Reusable by any visualizer:
"""
import json
import time
import threading
import time
import urllib.request
from collections.abc import Callable
from datetime import datetime
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from engine.events import NtfyMessageEvent
class NtfyPoller:
"""Background poller for ntfy.sh topics."""
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
def __init__(self, topic_url, poll_interval=15, display_secs=30):
def __init__(self, topic_url, reconnect_delay=5, display_secs=30):
self.topic_url = topic_url
self.poll_interval = poll_interval
self.reconnect_delay = reconnect_delay
self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None
self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock()
self._subscribers: list[Callable[[NtfyMessageEvent], None]] = []
def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Register a callback to be called when a message is received."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: NtfyMessageEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background polling thread. Returns True."""
t = threading.Thread(target=self._poll_loop, daemon=True)
"""Start background stream thread. Returns True."""
t = threading.Thread(target=self._stream_loop, daemon=True)
t.start()
return True
@@ -50,19 +73,36 @@ class NtfyPoller:
with self._lock:
self._message = None
def _poll_loop(self):
def _build_url(self, last_id=None):
"""Build the stream URL, substituting since= to avoid message replays on reconnect."""
parsed = urlparse(self.topic_url)
params = parse_qs(parsed.query, keep_blank_values=True)
params["since"] = [last_id if last_id else "20s"]
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def _stream_loop(self):
last_id = None
while True:
try:
url = self._build_url(last_id)
req = urllib.request.Request(
self.topic_url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=10)
for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'):
if not line.strip():
continue
url, headers={"User-Agent": "mainline/0.1"}
)
# timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats
resp = urllib.request.urlopen(req, timeout=90)
while True:
line = resp.readline()
if not line:
break # server closed connection — reconnect
try:
data = json.loads(line)
data = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
# Advance cursor on every event (message + keepalive) to
# avoid replaying already-seen events after a reconnect.
if "id" in data:
last_id = data["id"]
if data.get("event") == "message":
with self._lock:
self._message = (
@@ -70,6 +110,13 @@ class NtfyPoller:
data.get("message", ""),
time.monotonic(),
)
event = NtfyMessageEvent(
title=data.get("title", ""),
body=data.get("message", ""),
message_id=data.get("id"),
timestamp=datetime.now(),
)
self._emit(event)
except Exception:
pass
time.sleep(self.poll_interval)
time.sleep(self.reconnect_delay)

View File

@@ -4,15 +4,15 @@ Font loading, text rasterization, word-wrap, gradient coloring, headline block a
Depends on: config, terminal, sources, translate.
"""
import re
import random
import re
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
from engine import config
from engine.sources import NO_UPPER, SCRIPT_FONTS, SOURCE_LANGS
from engine.terminal import RST
from engine.sources import SCRIPT_FONTS, SOURCE_LANGS, NO_UPPER
from engine.translate import detect_location_language, translate_headline
# ─── GRADIENT ─────────────────────────────────────────────
@@ -20,15 +20,15 @@ from engine.translate import detect_location_language, translate_headline
GRAD_COLS = [
"\033[1;38;5;231m", # white
"\033[1;38;5;195m", # pale cyan-white
"\033[38;5;123m", # bright cyan
"\033[38;5;118m", # bright lime
"\033[38;5;82m", # lime
"\033[38;5;46m", # bright green
"\033[38;5;40m", # green
"\033[38;5;34m", # medium green
"\033[38;5;28m", # dark green
"\033[38;5;22m", # deep green
"\033[2;38;5;22m", # dim deep green
"\033[38;5;123m", # bright cyan
"\033[38;5;118m", # bright lime
"\033[38;5;82m", # lime
"\033[38;5;46m", # bright green
"\033[38;5;40m", # green
"\033[38;5;34m", # medium green
"\033[38;5;28m", # dark green
"\033[38;5;22m", # deep green
"\033[2;38;5;22m", # dim deep green
"\033[2;38;5;235m", # near black
]
@@ -36,15 +36,15 @@ GRAD_COLS = [
MSG_GRAD_COLS = [
"\033[1;38;5;231m", # white
"\033[1;38;5;225m", # pale pink-white
"\033[38;5;219m", # bright pink
"\033[38;5;213m", # hot pink
"\033[38;5;207m", # magenta
"\033[38;5;201m", # bright magenta
"\033[38;5;165m", # orchid-red
"\033[38;5;161m", # ruby-magenta
"\033[38;5;125m", # dark magenta
"\033[38;5;89m", # deep maroon-magenta
"\033[2;38;5;89m", # dim deep maroon-magenta
"\033[38;5;219m", # bright pink
"\033[38;5;213m", # hot pink
"\033[38;5;207m", # magenta
"\033[38;5;201m", # bright magenta
"\033[38;5;165m", # orchid-red
"\033[38;5;161m", # ruby-magenta
"\033[38;5;125m", # dark magenta
"\033[38;5;89m", # deep maroon-magenta
"\033[2;38;5;89m", # dim deep maroon-magenta
"\033[2;38;5;235m", # near black
]
@@ -62,13 +62,14 @@ def font():
f"No primary font selected. Add .otf/.ttf/.ttc files to {config.FONT_DIR}."
)
key = (config.FONT_PATH, config.FONT_INDEX, config.FONT_SZ)
if _FONT_OBJ is None or _FONT_OBJ_KEY != key:
if _FONT_OBJ is None or key != _FONT_OBJ_KEY:
_FONT_OBJ = ImageFont.truetype(
config.FONT_PATH, config.FONT_SZ, index=config.FONT_INDEX
)
_FONT_OBJ_KEY = key
return _FONT_OBJ
def clear_font_cache():
"""Reset cached font objects after changing primary font selection."""
global _FONT_OBJ, _FONT_OBJ_KEY
@@ -123,7 +124,7 @@ def render_line(text, fnt=None):
pad = 4
img_w = bbox[2] - bbox[0] + pad * 2
img_h = bbox[3] - bbox[1] + pad * 2
img = Image.new('L', (img_w, img_h), 0)
img = Image.new("L", (img_w, img_h), 0)
draw = ImageDraw.Draw(img)
draw.text((-bbox[0] + pad, -bbox[1] + pad), text, fill=255, font=fnt)
pix_h = config.RENDER_H * 2
@@ -200,8 +201,8 @@ def lr_gradient(rows, offset=0.0, grad_cols=None):
continue
buf = []
for x, ch in enumerate(row):
if ch == ' ':
buf.append(' ')
if ch == " ":
buf.append(" ")
else:
shifted = (x / max(max_x - 1, 1) + offset) % 1.0
idx = min(round(shifted * (n - 1)), n - 1)
@@ -218,7 +219,11 @@ def lr_gradient_opposite(rows, offset=0.0):
# ─── HEADLINE BLOCK ASSEMBLY ─────────────────────────────
def make_block(title, src, ts, w):
"""Render a headline into a content block with color."""
target_lang = (SOURCE_LANGS.get(src) or detect_location_language(title)) if config.MODE == 'news' else None
target_lang = (
(SOURCE_LANGS.get(src) or detect_location_language(title))
if config.MODE == "news"
else None
)
lang_font = font_for_lang(target_lang)
if target_lang:
title = translate_headline(title, target_lang)
@@ -227,28 +232,36 @@ def make_block(title, src, ts, w):
title_up = re.sub(r"\s+", " ", title)
else:
title_up = re.sub(r"\s+", " ", title.upper())
for old, new in [("\u2019","'"), ("\u2018","'"), ("\u201c",'"'),
("\u201d",'"'), ("\u2013","-"), ("\u2014","-")]:
for old, new in [
("\u2019", "'"),
("\u2018", "'"),
("\u201c", '"'),
("\u201d", '"'),
("\u2013", "-"),
("\u2014", "-"),
]:
title_up = title_up.replace(old, new)
big_rows = big_wrap(title_up, w - 4, lang_font)
hc = random.choice([
"\033[38;5;46m", # matrix green
"\033[38;5;34m", # dark green
"\033[38;5;82m", # lime
"\033[38;5;48m", # sea green
"\033[38;5;37m", # teal
"\033[38;5;44m", # cyan
"\033[38;5;87m", # sky
"\033[38;5;117m", # ice blue
"\033[38;5;250m", # cool white
"\033[38;5;156m", # pale green
"\033[38;5;120m", # mint
"\033[38;5;80m", # dark cyan
"\033[38;5;108m", # grey-green
"\033[38;5;115m", # sage
"\033[1;38;5;46m", # bold green
"\033[1;38;5;250m", # bold white
])
hc = random.choice(
[
"\033[38;5;46m", # matrix green
"\033[38;5;34m", # dark green
"\033[38;5;82m", # lime
"\033[38;5;48m", # sea green
"\033[38;5;37m", # teal
"\033[38;5;44m", # cyan
"\033[38;5;87m", # sky
"\033[38;5;117m", # ice blue
"\033[38;5;250m", # cool white
"\033[38;5;156m", # pale green
"\033[38;5;120m", # mint
"\033[38;5;80m", # dark cyan
"\033[38;5;108m", # grey-green
"\033[38;5;115m", # sage
"\033[1;38;5;46m", # bold green
"\033[1;38;5;250m", # bold white
]
)
content = [" " + r for r in big_rows]
content.append("")
meta = f"\u2591 {src} \u00b7 {ts}"

View File

@@ -1,195 +1,141 @@
"""
Render engine — ticker content, scroll motion, message panel, and firehose overlay.
Depends on: config, terminal, render, effects, ntfy, mic.
Orchestrates viewport, frame timing, and layers.
"""
import re
import sys
import time
import random
from datetime import datetime
import time
from engine import config
from engine.terminal import RST, W_COOL, CLR, tw, th
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite, make_block
from engine.effects import noise, glitch_bar, fade_line, vis_trunc, next_headline, firehose_line
from engine.display import (
Display,
TerminalDisplay,
)
from engine.display import (
get_monitor as _get_display_monitor,
)
from engine.frame import calculate_scroll_step
from engine.layers import (
apply_glitch,
process_effects,
render_firehose,
render_message_overlay,
render_ticker_zone,
)
from engine.viewport import th, tw
USE_EFFECT_CHAIN = True
def stream(items, ntfy_poller, mic_monitor):
def stream(items, ntfy_poller, mic_monitor, display: Display | None = None):
"""Main render loop with four layers: message, ticker, scroll motion, firehose."""
if display is None:
display = TerminalDisplay()
random.shuffle(items)
pool = list(items)
seen = set()
queued = 0
time.sleep(0.5)
sys.stdout.write(CLR)
sys.stdout.flush()
w, h = tw(), th()
display.init(w, h)
display.clear()
fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh # reserve fixed firehose strip at bottom
GAP = 3 # blank rows between headlines
scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2
ticker_view_h = h - fh
GAP = 3
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
# Taxonomy:
# - message: centered ntfy overlay panel
# - ticker: large headline text content
# - scroll: upward camera motion applied to ticker content
# - firehose: fixed carriage-return style strip pinned at bottom
# Active ticker blocks: (content_rows, color, canvas_y, meta_idx)
active = []
scroll_cam = 0 # viewport top in virtual canvas coords
ticker_next_y = ticker_view_h # canvas-y where next block starts (off-screen bottom)
scroll_cam = 0
ticker_next_y = ticker_view_h
noise_cache = {}
scroll_motion_accum = 0.0
msg_cache = (None, None)
frame_number = 0
def _noise_at(cy):
if cy not in noise_cache:
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
return noise_cache[cy]
while True:
if queued >= config.HEADLINE_LIMIT and not active:
break
# Message color: bright cyan/white — distinct from headline greens
MSG_META = "\033[38;5;245m" # cool grey
MSG_BORDER = "\033[2;38;5;37m" # dim teal
_msg_cache = (None, None) # (cache_key, rendered_rows)
while queued < config.HEADLINE_LIMIT or active:
t0 = time.monotonic()
w, h = tw(), th()
fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
# ── Check for ntfy message ────────────────────────
msg_h = 0
msg_overlay = []
msg = ntfy_poller.get_active_message()
msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache)
buf = []
if msg is not None:
m_title, m_body, m_ts = msg
# ── Message overlay: centered in the viewport ──
display_text = m_body or m_title or "(empty)"
display_text = re.sub(r"\s+", " ", display_text.upper())
cache_key = (display_text, w)
if _msg_cache[0] != cache_key:
msg_rows = big_wrap(display_text, w - 4)
_msg_cache = (cache_key, msg_rows)
else:
msg_rows = _msg_cache[1]
msg_rows = lr_gradient_opposite(msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0)
# Layout: rendered text + meta + border
elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
ts_str = datetime.now().strftime("%H:%M:%S")
panel_h = len(msg_rows) + 2 # meta + border
panel_top = max(0, (h - panel_h) // 2)
row_idx = 0
for mr in msg_rows:
ln = vis_trunc(mr, w)
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K")
row_idx += 1
# Meta line: title (if distinct) + source + countdown
meta_parts = []
if m_title and m_title != m_body:
meta_parts.append(m_title)
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
meta = " " + " \u00b7 ".join(meta_parts) if len(meta_parts) > 1 else " " + meta_parts[0]
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K")
row_idx += 1
# Border — constant boundary under message panel
bar = "\u2500" * (w - 4)
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K")
ticker_h = ticker_view_h
# Ticker draws above the fixed firehose strip; message is a centered overlay.
ticker_h = ticker_view_h - msg_h
# ── Ticker content + scroll motion (always runs) ──
scroll_motion_accum += config.FRAME_DT
while scroll_motion_accum >= scroll_step_interval:
scroll_motion_accum -= scroll_step_interval
scroll_cam += 1
# Enqueue new headlines when room at the bottom
while ticker_next_y < scroll_cam + ticker_view_h + 10 and queued < config.HEADLINE_LIMIT:
while (
ticker_next_y < scroll_cam + ticker_view_h + 10
and queued < config.HEADLINE_LIMIT
):
from engine.effects import next_headline
from engine.render import make_block
t, src, ts = next_headline(pool, items, seen)
ticker_content, hc, midx = make_block(t, src, ts, w)
active.append((ticker_content, hc, ticker_next_y, midx))
ticker_next_y += len(ticker_content) + GAP
queued += 1
# Prune off-screen blocks and stale noise
active = [(c, hc, by, mi) for c, hc, by, mi in active
if by + len(c) > scroll_cam]
active = [
(c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam
]
for k in list(noise_cache):
if k < scroll_cam:
del noise_cache[k]
# Draw ticker zone (above fixed firehose strip)
top_zone = max(1, int(ticker_h * 0.25))
bot_zone = max(1, int(ticker_h * 0.10))
grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0
ticker_buf_start = len(buf) # track where ticker rows start in buf
for r in range(ticker_h):
scr_row = r + 1 # 1-indexed ANSI screen row
cy = scroll_cam + r
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
row_fade = min(top_f, bot_f)
drawn = False
for content, hc, by, midx in active:
cr = cy - by
if 0 <= cr < len(content):
raw = content[cr]
if cr != midx:
colored = lr_gradient([raw], grad_offset)[0]
else:
colored = raw
ln = vis_trunc(colored, w)
if row_fade < 1.0:
ln = fade_line(ln, row_fade)
if cr == midx:
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
elif ln.strip():
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
else:
buf.append(f"\033[{scr_row};1H\033[K")
drawn = True
break
if not drawn:
n = _noise_at(cy)
if row_fade < 1.0 and n:
n = fade_line(n, row_fade)
if n:
buf.append(f"\033[{scr_row};1H{n}")
else:
buf.append(f"\033[{scr_row};1H\033[K")
ticker_buf_start = len(buf)
ticker_buf, noise_cache = render_ticker_zone(
active, scroll_cam, ticker_h, w, noise_cache, grad_offset
)
buf.extend(ticker_buf)
# Glitch — base rate + mic-reactive spikes (ticker zone only)
mic_excess = mic_monitor.excess
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
n_hits = 4 + int(mic_excess / 2)
ticker_buf_len = len(buf) - ticker_buf_start
if random.random() < glitch_prob and ticker_buf_len > 0:
for _ in range(min(n_hits, ticker_buf_len)):
gi = random.randint(0, ticker_buf_len - 1)
scr_row = gi + 1
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
render_start = time.perf_counter()
if USE_EFFECT_CHAIN:
buf = process_effects(
buf,
w,
h,
scroll_cam,
ticker_h,
mic_excess,
grad_offset,
frame_number,
msg is not None,
items,
)
else:
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
firehose_buf = render_firehose(items, w, fh, h)
buf.extend(firehose_buf)
if config.FIREHOSE and fh > 0:
for fr in range(fh):
scr_row = h - fh + fr + 1
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
if msg_overlay:
buf.extend(msg_overlay)
sys.stdout.buffer.write("".join(buf).encode())
sys.stdout.flush()
render_elapsed = (time.perf_counter() - render_start) * 1000
monitor = _get_display_monitor()
if monitor:
chars = sum(len(line) for line in buf)
monitor.record_effect("render", render_elapsed, chars, chars)
display.show(buf)
# Precise frame timing
elapsed = time.monotonic() - t0
time.sleep(max(0, config.FRAME_DT - elapsed))
frame_number += 1
sys.stdout.write(CLR)
sys.stdout.flush()
display.cleanup()

View File

@@ -47,69 +47,69 @@ FEEDS = {
# ─── POETRY / LITERATURE ─────────────────────────────────
# Public domain via Project Gutenberg
POETRY_SOURCES = {
"Whitman": "https://www.gutenberg.org/cache/epub/1322/pg1322.txt",
"Dickinson": "https://www.gutenberg.org/cache/epub/12242/pg12242.txt",
"Whitman": "https://www.gutenberg.org/cache/epub/1322/pg1322.txt",
"Dickinson": "https://www.gutenberg.org/cache/epub/12242/pg12242.txt",
"Whitman II": "https://www.gutenberg.org/cache/epub/8388/pg8388.txt",
"Rilke": "https://www.gutenberg.org/cache/epub/38594/pg38594.txt",
"Pound": "https://www.gutenberg.org/cache/epub/41162/pg41162.txt",
"Pound II": "https://www.gutenberg.org/cache/epub/51992/pg51992.txt",
"Eliot": "https://www.gutenberg.org/cache/epub/1567/pg1567.txt",
"Yeats": "https://www.gutenberg.org/cache/epub/38877/pg38877.txt",
"Masters": "https://www.gutenberg.org/cache/epub/1280/pg1280.txt",
"Rilke": "https://www.gutenberg.org/cache/epub/38594/pg38594.txt",
"Pound": "https://www.gutenberg.org/cache/epub/41162/pg41162.txt",
"Pound II": "https://www.gutenberg.org/cache/epub/51992/pg51992.txt",
"Eliot": "https://www.gutenberg.org/cache/epub/1567/pg1567.txt",
"Yeats": "https://www.gutenberg.org/cache/epub/38877/pg38877.txt",
"Masters": "https://www.gutenberg.org/cache/epub/1280/pg1280.txt",
"Baudelaire": "https://www.gutenberg.org/cache/epub/36098/pg36098.txt",
"Crane": "https://www.gutenberg.org/cache/epub/40786/pg40786.txt",
"Poe": "https://www.gutenberg.org/cache/epub/10031/pg10031.txt",
"Crane": "https://www.gutenberg.org/cache/epub/40786/pg40786.txt",
"Poe": "https://www.gutenberg.org/cache/epub/10031/pg10031.txt",
}
# ─── SOURCE → LANGUAGE MAPPING ───────────────────────────
# Headlines from these outlets render in their cultural home language
SOURCE_LANGS = {
"Der Spiegel": "de",
"DW": "de",
"France24": "fr",
"Japan Times": "ja",
"The Hindu": "hi",
"SCMP": "zh-cn",
"Al Jazeera": "ar",
"Der Spiegel": "de",
"DW": "de",
"France24": "fr",
"Japan Times": "ja",
"The Hindu": "hi",
"SCMP": "zh-cn",
"Al Jazeera": "ar",
}
# ─── LOCATION → LANGUAGE ─────────────────────────────────
LOCATION_LANGS = {
r'\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b': 'zh-cn',
r'\b(?:japan|japanese|tokyo|osaka|kishida)\b': 'ja',
r'\b(?:korea|korean|seoul|pyongyang)\b': 'ko',
r'\b(?:russia|russian|moscow|kremlin|putin)\b': 'ru',
r'\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b': 'ar',
r'\b(?:india|indian|delhi|mumbai|modi)\b': 'hi',
r'\b(?:germany|german|berlin|munich|scholz)\b': 'de',
r'\b(?:france|french|paris|lyon|macron)\b': 'fr',
r'\b(?:spain|spanish|madrid)\b': 'es',
r'\b(?:italy|italian|rome|milan|meloni)\b': 'it',
r'\b(?:portugal|portuguese|lisbon)\b': 'pt',
r'\b(?:brazil|brazilian|são paulo|lula)\b': 'pt',
r'\b(?:greece|greek|athens)\b': 'el',
r'\b(?:turkey|turkish|istanbul|ankara|erdogan)\b': 'tr',
r'\b(?:iran|iranian|tehran)\b': 'fa',
r'\b(?:thailand|thai|bangkok)\b': 'th',
r'\b(?:vietnam|vietnamese|hanoi)\b': 'vi',
r'\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b': 'uk',
r'\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b': 'he',
r"\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b": "zh-cn",
r"\b(?:japan|japanese|tokyo|osaka|kishida)\b": "ja",
r"\b(?:korea|korean|seoul|pyongyang)\b": "ko",
r"\b(?:russia|russian|moscow|kremlin|putin)\b": "ru",
r"\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b": "ar",
r"\b(?:india|indian|delhi|mumbai|modi)\b": "hi",
r"\b(?:germany|german|berlin|munich|scholz)\b": "de",
r"\b(?:france|french|paris|lyon|macron)\b": "fr",
r"\b(?:spain|spanish|madrid)\b": "es",
r"\b(?:italy|italian|rome|milan|meloni)\b": "it",
r"\b(?:portugal|portuguese|lisbon)\b": "pt",
r"\b(?:brazil|brazilian|são paulo|lula)\b": "pt",
r"\b(?:greece|greek|athens)\b": "el",
r"\b(?:turkey|turkish|istanbul|ankara|erdogan)\b": "tr",
r"\b(?:iran|iranian|tehran)\b": "fa",
r"\b(?:thailand|thai|bangkok)\b": "th",
r"\b(?:vietnam|vietnamese|hanoi)\b": "vi",
r"\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b": "uk",
r"\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b": "he",
}
# ─── NON-LATIN SCRIPT FONTS (macOS) ──────────────────────
SCRIPT_FONTS = {
'zh-cn': '/System/Library/Fonts/STHeiti Medium.ttc',
'ja': '/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc',
'ko': '/System/Library/Fonts/AppleSDGothicNeo.ttc',
'ru': '/System/Library/Fonts/Supplemental/Arial.ttf',
'uk': '/System/Library/Fonts/Supplemental/Arial.ttf',
'el': '/System/Library/Fonts/Supplemental/Arial.ttf',
'he': '/System/Library/Fonts/Supplemental/Arial.ttf',
'ar': '/System/Library/Fonts/GeezaPro.ttc',
'fa': '/System/Library/Fonts/GeezaPro.ttc',
'hi': '/System/Library/Fonts/Kohinoor.ttc',
'th': '/System/Library/Fonts/ThonburiUI.ttc',
"zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc",
"ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc",
"ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc",
"ru": "/System/Library/Fonts/Supplemental/Arial.ttf",
"uk": "/System/Library/Fonts/Supplemental/Arial.ttf",
"el": "/System/Library/Fonts/Supplemental/Arial.ttf",
"he": "/System/Library/Fonts/Supplemental/Arial.ttf",
"ar": "/System/Library/Fonts/GeezaPro.ttc",
"fa": "/System/Library/Fonts/GeezaPro.ttc",
"hi": "/System/Library/Fonts/Kohinoor.ttc",
"th": "/System/Library/Fonts/ThonburiUI.ttc",
}
# Scripts that have no uppercase
NO_UPPER = {'zh-cn', 'ja', 'ko', 'ar', 'fa', 'hi', 'th', 'he'}
NO_UPPER = {"zh-cn", "ja", "ko", "ar", "fa", "hi", "th", "he"}

View File

@@ -4,8 +4,8 @@ No internal dependencies.
"""
import os
import sys
import random
import sys
import time
# ─── ANSI ─────────────────────────────────────────────────
@@ -49,7 +49,7 @@ def type_out(text, color=G_HI):
while i < len(text):
if random.random() < 0.3:
b = random.randint(2, 5)
sys.stdout.write(f"{color}{text[i:i+b]}{RST}")
sys.stdout.write(f"{color}{text[i : i + b]}{RST}")
i += b
else:
sys.stdout.write(f"{color}{text[i]}{RST}")

View File

@@ -3,14 +3,33 @@ Google Translate wrapper and location→language detection.
Depends on: sources (for LOCATION_LANGS).
"""
import re
import json
import urllib.request
import re
import urllib.parse
import urllib.request
from functools import lru_cache
from engine.sources import LOCATION_LANGS
_TRANSLATE_CACHE = {}
TRANSLATE_CACHE_SIZE = 500
@lru_cache(maxsize=TRANSLATE_CACHE_SIZE)
def _translate_cached(title: str, target_lang: str) -> str:
"""Cached translation implementation."""
try:
q = urllib.parse.quote(title)
url = (
"https://translate.googleapis.com/translate_a/single"
f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}"
)
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read())
result = "".join(p[0] for p in data[0] if p[0]) or title
except Exception:
result = title
return result
def detect_location_language(title):
@@ -22,20 +41,6 @@ def detect_location_language(title):
return None
def translate_headline(title, target_lang):
def translate_headline(title: str, target_lang: str) -> str:
"""Translate headline via Google Translate API (zero dependencies)."""
key = (title, target_lang)
if key in _TRANSLATE_CACHE:
return _TRANSLATE_CACHE[key]
try:
q = urllib.parse.quote(title)
url = ("https://translate.googleapis.com/translate_a/single"
f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}")
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read())
result = "".join(p[0] for p in data[0] if p[0]) or title
except Exception:
result = title
_TRANSLATE_CACHE[key] = result
return result
return _translate_cached(title, target_lang)

60
engine/types.py Normal file
View File

@@ -0,0 +1,60 @@
"""
Shared dataclasses for the mainline application.
Provides named types for tuple returns across modules.
"""
from dataclasses import dataclass
@dataclass
class HeadlineItem:
"""A single headline item: title, source, and timestamp."""
title: str
source: str
timestamp: str
def to_tuple(self) -> tuple[str, str, str]:
"""Convert to tuple for backward compatibility."""
return (self.title, self.source, self.timestamp)
@classmethod
def from_tuple(cls, t: tuple[str, str, str]) -> "HeadlineItem":
"""Create from tuple for backward compatibility."""
return cls(title=t[0], source=t[1], timestamp=t[2])
def items_to_tuples(items: list[HeadlineItem]) -> list[tuple[str, str, str]]:
"""Convert list of HeadlineItem to list of tuples."""
return [item.to_tuple() for item in items]
def tuples_to_items(tuples: list[tuple[str, str, str]]) -> list[HeadlineItem]:
"""Convert list of tuples to list of HeadlineItem."""
return [HeadlineItem.from_tuple(t) for t in tuples]
@dataclass
class FetchResult:
"""Result from fetch_all() or fetch_poetry()."""
items: list[HeadlineItem]
linked: int
failed: int
def to_legacy_tuple(self) -> tuple[list[tuple], int, int]:
"""Convert to legacy tuple format for backward compatibility."""
return ([item.to_tuple() for item in self.items], self.linked, self.failed)
@dataclass
class Block:
"""Rendered headline block from make_block()."""
content: list[str]
color: str
meta_row_index: int
def to_legacy_tuple(self) -> tuple[list[str], str, int]:
"""Convert to legacy tuple format for backward compatibility."""
return (self.content, self.color, self.meta_row_index)

37
engine/viewport.py Normal file
View File

@@ -0,0 +1,37 @@
"""
Viewport utilities — terminal dimensions and ANSI positioning helpers.
No internal dependencies.
"""
import os
def tw() -> int:
"""Get terminal width (columns)."""
try:
return os.get_terminal_size().columns
except Exception:
return 80
def th() -> int:
"""Get terminal height (lines)."""
try:
return os.get_terminal_size().lines
except Exception:
return 24
def move_to(row: int, col: int = 1) -> str:
"""Generate ANSI escape to move cursor to row, col (1-indexed)."""
return f"\033[{row};{col}H"
def clear_screen() -> str:
"""Clear screen and move cursor to home."""
return "\033[2J\033[H"
def clear_line() -> str:
"""Clear current line."""
return "\033[K"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
fonts/Resond-Regular.otf Normal file

Binary file not shown.

BIN
fonts/Synthetix.otf Normal file

Binary file not shown.

30
hk.pkl Normal file
View File

@@ -0,0 +1,30 @@
amends "package://github.com/jdx/hk/releases/download/v1.38.0/hk@1.38.0#/Config.pkl"
import "package://github.com/jdx/hk/releases/download/v1.38.0/hk@1.38.0#/Builtins.pkl"
hooks {
["pre-commit"] {
fix = true
stash = "git"
steps {
["ruff-format"] = (Builtins.ruff_format) {
prefix = "uv run"
}
["ruff"] = (Builtins.ruff) {
prefix = "uv run"
check = "ruff check engine/ tests/"
fix = "ruff check --fix --unsafe-fixes engine/ tests/"
}
}
}
["pre-push"] {
steps {
["ruff"] = (Builtins.ruff) {
prefix = "uv run"
check = "ruff check engine/ tests/"
}
["benchmark"] {
check = "uv run python -m engine.benchmark --hook --displays null --iterations 20"
}
}
}
}

View File

@@ -5,40 +5,7 @@ Digital news consciousness stream.
Matrix aesthetic · THX-1138 hue.
"""
import subprocess, sys, pathlib
# ─── BOOTSTRAP VENV ───────────────────────────────────────
_VENV = pathlib.Path(__file__).resolve().parent / ".mainline_venv"
_MARKER = _VENV / ".installed_v3"
def _ensure_venv():
"""Create a local venv and install deps if needed."""
if _MARKER.exists():
return
import venv
print("\033[2;38;5;34m > first run — creating environment...\033[0m")
venv.create(str(_VENV), with_pip=True, clear=True)
pip = str(_VENV / "bin" / "pip")
subprocess.check_call(
[pip, "install", "feedparser", "Pillow", "-q"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
_MARKER.touch()
_ensure_venv()
# Install sounddevice on first run after v3
_MARKER_SD = _VENV / ".installed_sd"
if not _MARKER_SD.exists():
_pip = str(_VENV / "bin" / "pip")
subprocess.check_call([_pip, "install", "sounddevice", "numpy", "-q"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_MARKER_SD.touch()
sys.path.insert(0, str(next((_VENV / "lib").glob("python*/site-packages"))))
# ─── DELEGATE TO ENGINE ───────────────────────────────────
from engine.app import main # noqa: E402
from engine.app import main
if __name__ == "__main__":
main()

89
mise.toml Normal file
View File

@@ -0,0 +1,89 @@
[tools]
python = "3.12"
hk = "latest"
pkl = "latest"
[tasks]
# =====================
# Testing
# =====================
test = "uv run pytest"
test-v = { run = "uv run pytest -v", depends = ["sync-all"] }
test-cov = { run = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html", depends = ["sync-all"] }
test-cov-open = { run = "mise run test-cov && open htmlcov/index.html", depends = ["sync-all"] }
test-browser-install = { run = "uv run playwright install chromium", depends = ["sync-all"] }
test-browser = { run = "uv run pytest tests/e2e/", depends = ["test-browser-install"] }
# =====================
# Linting & Formatting
# =====================
lint = "uv run ruff check engine/ mainline.py"
lint-fix = "uv run ruff check --fix engine/ mainline.py"
format = "uv run ruff format engine/ mainline.py"
# =====================
# Runtime Modes
# =====================
run = "uv run mainline.py"
run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose"
run-websocket = { run = "uv run mainline.py --display websocket", depends = ["sync-all"] }
run-sixel = { run = "uv run mainline.py --display sixel", depends = ["sync-all"] }
run-both = { run = "uv run mainline.py --display both", depends = ["sync-all"] }
run-client = { run = "mise run run-both & sleep 2 && $(open http://localhost:8766 2>/dev/null || xdg-open http://localhost:8766 2>/dev/null || echo 'Open http://localhost:8766 manually'); wait", depends = ["sync-all"] }
# =====================
# Command & Control
# =====================
cmd = "uv run cmdline.py"
cmd-stats = { run = "uv run cmdline.py -w \"/effects stats\"", depends = ["sync-all"] }
# =====================
# Benchmark
# =====================
benchmark = { run = "uv run python -m engine.benchmark", depends = ["sync-all"] }
benchmark-json = { run = "uv run python -m engine.benchmark --format json --output benchmark.json", depends = ["sync-all"] }
benchmark-report = { run = "uv run python -m engine.benchmark --output BENCHMARK.md", depends = ["sync-all"] }
# Initialize ntfy topics (warm up before first use)
topics-init = "curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_resp > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline > /dev/null"
# =====================
# Daemon
# =====================
daemon = "nohup uv run mainline.py > nohup.out 2>&1 &"
daemon-stop = "pkill -f 'uv run mainline.py' 2>/dev/null || true"
daemon-restart = "mise run daemon-stop && sleep 2 && mise run daemon"
# =====================
# Environment
# =====================
sync = "uv sync"
sync-all = "uv sync --all-extras"
install = "mise run sync"
install-dev = { run = "mise run sync-all && uv sync --group dev", depends = ["sync-all"] }
bootstrap = { run = "mise run sync-all && uv run mainline.py --help", depends = ["sync-all"] }
clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out"
clobber = "git clean -fdx && rm -rf .venv htmlcov .coverage tests/.pytest_cache .mainline_cache_*.json nohup.out"
# =====================
# CI/CD
# =====================
ci = { run = "mise run topics-init && mise run lint && mise run test-cov", depends = ["topics-init", "lint", "test-cov"] }
# =====================
# Git Hooks (via hk)
# =====================
pre-commit = "hk run pre-commit"

98
pyproject.toml Normal file
View File

@@ -0,0 +1,98 @@
[project]
name = "mainline"
version = "0.1.0"
description = "Terminal news ticker with Matrix aesthetic"
readme = "README.md"
requires-python = ">=3.10"
authors = [
{ name = "Mainline", email = "mainline@example.com" }
]
license = { text = "MIT" }
classifiers = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Terminals",
]
dependencies = [
"feedparser>=6.0.0",
"Pillow>=10.0.0",
"pyright>=1.1.408",
]
[project.optional-dependencies]
mic = [
"sounddevice>=0.4.0",
"numpy>=1.24.0",
]
websocket = [
"websockets>=12.0",
]
sixel = [
"pysixel>=0.1.0",
]
browser = [
"playwright>=1.40.0",
]
dev = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",
"pytest-mock>=3.12.0",
"ruff>=0.1.0",
]
[project.scripts]
mainline = "engine.app:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[dependency-groups]
dev = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",
"pytest-mock>=3.12.0",
"ruff>=0.1.0",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_functions = ["test_*"]
addopts = [
"--strict-markers",
"--tb=short",
"-v",
]
filterwarnings = [
"ignore::DeprecationWarning",
]
[tool.coverage.run]
source = ["engine"]
branch = true
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise AssertionError",
"raise NotImplementedError",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
"@abstractmethod",
]
[tool.ruff]
line-length = 88
target-version = "py310"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "N", "UP", "B", "C4", "SIM"]
ignore = ["E501", "SIM105", "N806", "B007", "SIM108"]

4
requirements-dev.txt Normal file
View File

@@ -0,0 +1,4 @@
pytest>=8.0.0
pytest-cov>=4.1.0
pytest-mock>=3.12.0
ruff>=0.1.0

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
feedparser>=6.0.0
Pillow>=10.0.0
sounddevice>=0.4.0
numpy>=1.24.0

0
tests/__init__.py Normal file
View File

View File

@@ -0,0 +1,133 @@
"""
End-to-end tests for web client with headless browser.
"""
import os
import socketserver
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
import pytest
CLIENT_DIR = Path(__file__).parent.parent.parent / "client"
class ThreadedHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
"""Threaded HTTP server for handling concurrent requests."""
daemon_threads = True
@pytest.fixture(scope="module")
def http_server():
"""Start a local HTTP server for the client."""
os.chdir(CLIENT_DIR)
handler = SimpleHTTPRequestHandler
server = ThreadedHTTPServer(("127.0.0.1", 0), handler)
port = server.server_address[1]
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
yield f"http://127.0.0.1:{port}"
server.shutdown()
class TestWebClient:
"""Tests for the web client using Playwright."""
@pytest.fixture(autouse=True)
def setup_browser(self):
"""Set up browser for tests."""
pytest.importorskip("playwright")
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
yield
self.page.close()
self.context.close()
self.browser.close()
self.playwright.stop()
def test_client_loads(self, http_server):
"""Web client loads without errors."""
response = self.page.goto(http_server)
assert response.status == 200, f"Page load failed with status {response.status}"
self.page.wait_for_load_state("domcontentloaded")
content = self.page.content()
assert "<canvas" in content, "Canvas element not found in page"
canvas = self.page.locator("#terminal")
assert canvas.count() > 0, "Canvas not found"
def test_status_shows_connecting(self, http_server):
"""Status shows connecting initially."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
status = self.page.locator("#status")
assert status.count() > 0, "Status element not found"
def test_canvas_has_dimensions(self, http_server):
"""Canvas has correct dimensions after load."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
canvas = self.page.locator("#terminal")
assert canvas.count() > 0, "Canvas not found"
def test_no_console_errors_on_load(self, http_server):
"""No JavaScript errors on page load (websocket errors are expected without server)."""
js_errors = []
def handle_console(msg):
if msg.type == "error":
text = msg.text
if "WebSocket" not in text:
js_errors.append(text)
self.page.on("console", handle_console)
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
assert len(js_errors) == 0, f"JavaScript errors: {js_errors}"
class TestWebClientProtocol:
"""Tests for WebSocket protocol handling in client."""
@pytest.fixture(autouse=True)
def setup_browser(self):
"""Set up browser for tests."""
pytest.importorskip("playwright")
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
yield
self.page.close()
self.context.close()
self.browser.close()
self.playwright.stop()
def test_websocket_reconnection(self, http_server):
"""Client attempts reconnection on disconnect."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
status = self.page.locator("#status")
assert status.count() > 0, "Status element not found"

236
tests/fixtures/__init__.py vendored Normal file
View File

@@ -0,0 +1,236 @@
"""
Pytest fixtures for mocking external dependencies (network, filesystem).
"""
import json
from unittest.mock import MagicMock
import pytest
@pytest.fixture
def mock_feed_response():
"""Mock RSS feed response data."""
return b"""<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>Test Feed</title>
<link>https://example.com</link>
<item>
<title>Test Headline One</title>
<pubDate>Sat, 15 Mar 2025 12:00:00 GMT</pubDate>
</item>
<item>
<title>Test Headline Two</title>
<pubDate>Sat, 15 Mar 2025 11:00:00 GMT</pubDate>
</item>
<item>
<title>Sports: Team Wins Championship</title>
<pubDate>Sat, 15 Mar 2025 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>"""
@pytest.fixture
def mock_gutenberg_response():
"""Mock Project Gutenberg text response."""
return """Project Gutenberg's Collection, by Various
*** START OF SOME TEXT ***
This is a test poem with multiple lines
that should be parsed as stanzas.
Another stanza here with different content
and more lines to test the parsing logic.
Yet another stanza for variety
in the test data.
*** END OF SOME TEXT ***"""
@pytest.fixture
def mock_gutenberg_empty():
"""Mock Gutenberg response with no valid stanzas."""
return """Project Gutenberg's Collection
*** START OF TEXT ***
THIS IS ALL CAPS AND SHOULD BE SKIPPED
I.
*** END OF TEXT ***"""
@pytest.fixture
def mock_ntfy_message():
"""Mock ntfy.sh SSE message."""
return json.dumps(
{
"id": "test123",
"event": "message",
"title": "Test Title",
"message": "Test message body",
"time": 1234567890,
}
).encode()
@pytest.fixture
def mock_ntfy_keepalive():
"""Mock ntfy.sh keepalive message."""
return b'data: {"event":"keepalive"}\n\n'
@pytest.fixture
def mock_google_translate_response():
"""Mock Google Translate API response."""
return json.dumps(
[
[["Translated text", "Original text", None, 0.8], None, "en"],
None,
None,
[],
[],
[],
[],
]
)
@pytest.fixture
def mock_feedparser():
"""Create a mock feedparser.parse function."""
def _mock(data):
mock_result = MagicMock()
mock_result.bozo = False
mock_result.entries = [
{
"title": "Test Headline",
"published_parsed": (2025, 3, 15, 12, 0, 0, 0, 0, 0),
},
{
"title": "Another Headline",
"updated_parsed": (2025, 3, 15, 11, 0, 0, 0, 0, 0),
},
]
return mock_result
return _mock
@pytest.fixture
def mock_urllib_open(mock_feed_response):
"""Create a mock urllib.request.urlopen that returns feed data."""
def _mock(url):
mock_response = MagicMock()
mock_response.read.return_value = mock_feed_response
return mock_response
return _mock
@pytest.fixture
def sample_items():
"""Sample items as returned by fetch module (title, source, timestamp)."""
return [
("Headline One", "Test Source", "12:00"),
("Headline Two", "Another Source", "11:30"),
("Headline Three", "Third Source", "10:45"),
]
@pytest.fixture
def sample_config():
"""Sample config for testing."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="news",
firehose=False,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)
@pytest.fixture
def poetry_config():
"""Sample config for poetry mode."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="poetry",
firehose=False,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)
@pytest.fixture
def firehose_config():
"""Sample config with firehose enabled."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="news",
firehose=True,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)

55
tests/test_app.py Normal file
View File

@@ -0,0 +1,55 @@
"""
Tests for engine.app module.
"""
from engine.app import _normalize_preview_rows
class TestNormalizePreviewRows:
"""Tests for _normalize_preview_rows function."""
def test_empty_rows(self):
"""Empty input returns empty list."""
result = _normalize_preview_rows([])
assert result == [""]
def test_strips_left_padding(self):
"""Left padding is stripped."""
result = _normalize_preview_rows([" content", " more"])
assert all(not r.startswith(" ") for r in result)
def test_preserves_content(self):
"""Content is preserved."""
result = _normalize_preview_rows([" hello world "])
assert "hello world" in result[0]
def test_handles_all_empty_rows(self):
"""All empty rows returns single empty string."""
result = _normalize_preview_rows(["", " ", ""])
assert result == [""]
class TestAppConstants:
"""Tests for app module constants."""
def test_title_defined(self):
"""TITLE is defined."""
from engine.app import TITLE
assert len(TITLE) > 0
def test_title_lines_are_strings(self):
"""TITLE contains string lines."""
from engine.app import TITLE
assert all(isinstance(line, str) for line in TITLE)
class TestAppImports:
"""Tests for app module imports."""
def test_app_imports_without_error(self):
"""Module imports without error."""
from engine import app
assert app is not None

301
tests/test_config.py Normal file
View File

@@ -0,0 +1,301 @@
"""
Tests for engine.config module.
"""
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from engine import config
class TestArgValue:
"""Tests for _arg_value helper."""
def test_returns_value_when_flag_present(self):
"""Returns the value following the flag."""
with patch.object(sys, "argv", ["prog", "--font-file", "test.otf"]):
result = config._arg_value("--font-file")
assert result == "test.otf"
def test_returns_none_when_flag_missing(self):
"""Returns None when flag is not present."""
with patch.object(sys, "argv", ["prog"]):
result = config._arg_value("--font-file")
assert result is None
def test_returns_none_when_no_value(self):
"""Returns None when flag is last."""
with patch.object(sys, "argv", ["prog", "--font-file"]):
result = config._arg_value("--font-file")
assert result is None
class TestArgInt:
"""Tests for _arg_int helper."""
def test_parses_valid_int(self):
"""Parses valid integer."""
with patch.object(sys, "argv", ["prog", "--font-index", "5"]):
result = config._arg_int("--font-index", 0)
assert result == 5
def test_returns_default_on_invalid(self):
"""Returns default on invalid input."""
with patch.object(sys, "argv", ["prog", "--font-index", "abc"]):
result = config._arg_int("--font-index", 0)
assert result == 0
def test_returns_default_when_missing(self):
"""Returns default when flag missing."""
with patch.object(sys, "argv", ["prog"]):
result = config._arg_int("--font-index", 10)
assert result == 10
class TestResolveFontPath:
"""Tests for _resolve_font_path helper."""
def test_returns_absolute_paths(self):
"""Absolute paths are returned as-is."""
result = config._resolve_font_path("/absolute/path.otf")
assert result == "/absolute/path.otf"
def test_resolves_relative_paths(self):
"""Relative paths are resolved to repo root."""
result = config._resolve_font_path("fonts/test.otf")
assert str(config._REPO_ROOT) in result
def test_expands_user_home(self):
"""Tilde paths are expanded."""
with patch("pathlib.Path.expanduser", return_value=Path("/home/user/fonts")):
result = config._resolve_font_path("~/fonts/test.otf")
assert isinstance(result, str)
class TestListFontFiles:
"""Tests for _list_font_files helper."""
def test_returns_empty_for_missing_dir(self):
"""Returns empty list for missing directory."""
result = config._list_font_files("/nonexistent/directory")
assert result == []
def test_filters_by_extension(self):
"""Only returns valid font extensions."""
with tempfile.TemporaryDirectory() as tmpdir:
Path(tmpdir, "valid.otf").touch()
Path(tmpdir, "valid.ttf").touch()
Path(tmpdir, "invalid.txt").touch()
Path(tmpdir, "image.png").touch()
result = config._list_font_files(tmpdir)
assert len(result) == 2
assert all(f.endswith((".otf", ".ttf")) for f in result)
def test_sorts_alphabetically(self):
"""Results are sorted alphabetically."""
with tempfile.TemporaryDirectory() as tmpdir:
Path(tmpdir, "zfont.otf").touch()
Path(tmpdir, "afont.otf").touch()
result = config._list_font_files(tmpdir)
filenames = [Path(f).name for f in result]
assert filenames == ["afont.otf", "zfont.otf"]
class TestDefaults:
"""Tests for default configuration values."""
def test_headline_limit(self):
"""HEADLINE_LIMIT has sensible default."""
assert config.HEADLINE_LIMIT > 0
def test_feed_timeout(self):
"""FEED_TIMEOUT has sensible default."""
assert config.FEED_TIMEOUT > 0
def test_font_extensions(self):
"""Font extensions are defined."""
assert ".otf" in config._FONT_EXTENSIONS
assert ".ttf" in config._FONT_EXTENSIONS
assert ".ttc" in config._FONT_EXTENSIONS
class TestGlyphs:
"""Tests for glyph constants."""
def test_glitch_glyphs_defined(self):
"""GLITCH glyphs are defined."""
assert len(config.GLITCH) > 0
def test_kata_glyphs_defined(self):
"""KATA glyphs are defined."""
assert len(config.KATA) > 0
class TestSetFontSelection:
"""Tests for set_font_selection function."""
def test_updates_font_path(self):
"""Updates FONT_PATH globally."""
original = config.FONT_PATH
config.set_font_selection(font_path="/new/path.otf")
assert config.FONT_PATH == "/new/path.otf"
config.FONT_PATH = original
def test_updates_font_index(self):
"""Updates FONT_INDEX globally."""
original = config.FONT_INDEX
config.set_font_selection(font_index=5)
assert config.FONT_INDEX == 5
config.FONT_INDEX = original
def test_handles_none_values(self):
"""Handles None values gracefully."""
original_path = config.FONT_PATH
original_index = config.FONT_INDEX
config.set_font_selection(font_path=None, font_index=None)
assert original_path == config.FONT_PATH
assert original_index == config.FONT_INDEX
class TestConfigDataclass:
"""Tests for Config dataclass."""
def test_config_has_required_fields(self):
"""Config has all required fields."""
c = config.Config()
assert hasattr(c, "headline_limit")
assert hasattr(c, "feed_timeout")
assert hasattr(c, "mic_threshold_db")
assert hasattr(c, "mode")
assert hasattr(c, "firehose")
assert hasattr(c, "ntfy_topic")
assert hasattr(c, "ntfy_reconnect_delay")
assert hasattr(c, "message_display_secs")
assert hasattr(c, "font_dir")
assert hasattr(c, "font_path")
assert hasattr(c, "font_index")
assert hasattr(c, "font_picker")
assert hasattr(c, "font_sz")
assert hasattr(c, "render_h")
assert hasattr(c, "ssaa")
assert hasattr(c, "scroll_dur")
assert hasattr(c, "frame_dt")
assert hasattr(c, "firehose_h")
assert hasattr(c, "grad_speed")
assert hasattr(c, "glitch_glyphs")
assert hasattr(c, "kata_glyphs")
assert hasattr(c, "script_fonts")
def test_config_defaults(self):
"""Config has sensible defaults."""
c = config.Config()
assert c.headline_limit == 1000
assert c.feed_timeout == 10
assert c.mic_threshold_db == 50
assert c.mode == "news"
assert c.firehose is False
assert c.ntfy_reconnect_delay == 5
assert c.message_display_secs == 30
def test_config_is_immutable(self):
"""Config is frozen (immutable)."""
c = config.Config()
with pytest.raises(AttributeError):
c.headline_limit = 500 # type: ignore
def test_config_custom_values(self):
"""Config accepts custom values."""
c = config.Config(
headline_limit=500,
mode="poetry",
firehose=True,
ntfy_topic="https://ntfy.sh/test",
)
assert c.headline_limit == 500
assert c.mode == "poetry"
assert c.firehose is True
assert c.ntfy_topic == "https://ntfy.sh/test"
class TestConfigFromArgs:
"""Tests for Config.from_args method."""
def test_from_args_defaults(self):
"""from_args creates config with defaults from empty argv."""
c = config.Config.from_args(["prog"])
assert c.mode == "news"
assert c.firehose is False
assert c.font_picker is True
def test_from_args_poetry_mode(self):
"""from_args detects --poetry flag."""
c = config.Config.from_args(["prog", "--poetry"])
assert c.mode == "poetry"
def test_from_args_poetry_short_flag(self):
"""from_args detects -p short flag."""
c = config.Config.from_args(["prog", "-p"])
assert c.mode == "poetry"
def test_from_args_firehose(self):
"""from_args detects --firehose flag."""
c = config.Config.from_args(["prog", "--firehose"])
assert c.firehose is True
def test_from_args_no_font_picker(self):
"""from_args detects --no-font-picker flag."""
c = config.Config.from_args(["prog", "--no-font-picker"])
assert c.font_picker is False
def test_from_args_font_index(self):
"""from_args parses --font-index."""
c = config.Config.from_args(["prog", "--font-index", "3"])
assert c.font_index == 3
class TestGetSetConfig:
"""Tests for get_config and set_config functions."""
def test_get_config_returns_config(self):
"""get_config returns a Config instance."""
c = config.get_config()
assert isinstance(c, config.Config)
def test_set_config_allows_injection(self):
"""set_config allows injecting a custom config."""
custom = config.Config(mode="poetry", headline_limit=100)
config.set_config(custom)
assert config.get_config().mode == "poetry"
assert config.get_config().headline_limit == 100
def test_set_config_then_get_config(self):
"""set_config followed by get_config returns the set config."""
original = config.get_config()
test_config = config.Config(headline_limit=42)
config.set_config(test_config)
result = config.get_config()
assert result.headline_limit == 42
config.set_config(original)
class TestPlatformFontPaths:
"""Tests for platform font path detection."""
def test_get_platform_font_paths_returns_dict(self):
"""_get_platform_font_paths returns a dictionary."""
fonts = config._get_platform_font_paths()
assert isinstance(fonts, dict)
def test_platform_font_paths_common_languages(self):
"""Common language font mappings exist."""
fonts = config._get_platform_font_paths()
common = {"ja", "zh-cn", "ko", "ru", "ar", "hi"}
found = set(fonts.keys()) & common
assert len(found) > 0

85
tests/test_controller.py Normal file
View File

@@ -0,0 +1,85 @@
"""
Tests for engine.controller module.
"""
from unittest.mock import MagicMock, patch
from engine import config
from engine.controller import StreamController
class TestStreamController:
"""Tests for StreamController class."""
def test_init_default_config(self):
"""StreamController initializes with default config."""
controller = StreamController()
assert controller.config is not None
assert isinstance(controller.config, config.Config)
def test_init_custom_config(self):
"""StreamController accepts custom config."""
custom_config = config.Config(headline_limit=500)
controller = StreamController(config=custom_config)
assert controller.config.headline_limit == 500
def test_init_sources_none_by_default(self):
"""Sources are None until initialized."""
controller = StreamController()
assert controller.mic is None
assert controller.ntfy is None
@patch("engine.controller.MicMonitor")
@patch("engine.controller.NtfyPoller")
def test_initialize_sources(self, mock_ntfy, mock_mic):
"""initialize_sources creates mic and ntfy instances."""
mock_mic_instance = MagicMock()
mock_mic_instance.available = True
mock_mic_instance.start.return_value = True
mock_mic.return_value = mock_mic_instance
mock_ntfy_instance = MagicMock()
mock_ntfy_instance.start.return_value = True
mock_ntfy.return_value = mock_ntfy_instance
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
assert mic_ok is True
assert ntfy_ok is True
assert controller.mic is not None
assert controller.ntfy is not None
@patch("engine.controller.MicMonitor")
@patch("engine.controller.NtfyPoller")
def test_initialize_sources_mic_unavailable(self, mock_ntfy, mock_mic):
"""initialize_sources handles unavailable mic."""
mock_mic_instance = MagicMock()
mock_mic_instance.available = False
mock_mic.return_value = mock_mic_instance
mock_ntfy_instance = MagicMock()
mock_ntfy_instance.start.return_value = True
mock_ntfy.return_value = mock_ntfy_instance
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
assert mic_ok is False
assert ntfy_ok is True
class TestStreamControllerCleanup:
"""Tests for StreamController cleanup."""
@patch("engine.controller.MicMonitor")
def test_cleanup_stops_mic(self, mock_mic):
"""cleanup stops the microphone if running."""
mock_mic_instance = MagicMock()
mock_mic.return_value = mock_mic_instance
controller = StreamController()
controller.mic = mock_mic_instance
controller.cleanup()
mock_mic_instance.stop.assert_called_once()

79
tests/test_display.py Normal file
View File

@@ -0,0 +1,79 @@
"""
Tests for engine.display module.
"""
from engine.display import NullDisplay, TerminalDisplay
class TestDisplayProtocol:
"""Test that display backends satisfy the Display protocol."""
def test_terminal_display_is_display(self):
"""TerminalDisplay satisfies Display protocol."""
display = TerminalDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
def test_null_display_is_display(self):
"""NullDisplay satisfies Display protocol."""
display = NullDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
class TestTerminalDisplay:
"""Tests for TerminalDisplay class."""
def test_init_sets_dimensions(self):
"""init stores terminal dimensions."""
display = TerminalDisplay()
display.init(80, 24)
assert display.width == 80
assert display.height == 24
def test_show_returns_none(self):
"""show returns None after writing to stdout."""
display = TerminalDisplay()
display.width = 80
display.height = 24
display.show(["line1", "line2"])
def test_clear_does_not_error(self):
"""clear works without error."""
display = TerminalDisplay()
display.clear()
def test_cleanup_does_not_error(self):
"""cleanup works without error."""
display = TerminalDisplay()
display.cleanup()
class TestNullDisplay:
"""Tests for NullDisplay class."""
def test_init_stores_dimensions(self):
"""init stores dimensions."""
display = NullDisplay()
display.init(100, 50)
assert display.width == 100
assert display.height == 50
def test_show_does_nothing(self):
"""show discards buffer without error."""
display = NullDisplay()
display.show(["line1", "line2", "line3"])
def test_clear_does_nothing(self):
"""clear does nothing."""
display = NullDisplay()
display.clear()
def test_cleanup_does_nothing(self):
"""cleanup does nothing."""
display = NullDisplay()
display.cleanup()

427
tests/test_effects.py Normal file
View File

@@ -0,0 +1,427 @@
"""
Tests for engine.effects module.
"""
from engine.effects import EffectChain, EffectConfig, EffectContext, EffectRegistry
class MockEffect:
name = "mock"
config = EffectConfig(enabled=True, intensity=1.0)
def __init__(self):
self.processed = False
self.last_ctx = None
def process(self, buf, ctx):
self.processed = True
self.last_ctx = ctx
return buf + ["processed"]
def configure(self, config):
self.config = config
class TestEffectConfig:
def test_defaults(self):
cfg = EffectConfig()
assert cfg.enabled is True
assert cfg.intensity == 1.0
assert cfg.params == {}
def test_custom_values(self):
cfg = EffectConfig(enabled=False, intensity=0.5, params={"key": "value"})
assert cfg.enabled is False
assert cfg.intensity == 0.5
assert cfg.params == {"key": "value"}
class TestEffectContext:
def test_defaults(self):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
assert ctx.terminal_width == 80
assert ctx.terminal_height == 24
assert ctx.ticker_height == 20
assert ctx.items == []
def test_with_items(self):
items = [("Title", "Source", "12:00")]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
items=items,
)
assert ctx.items == items
class TestEffectRegistry:
def test_init_empty(self):
registry = EffectRegistry()
assert len(registry.list_all()) == 0
def test_register(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
assert "mock" in registry.list_all()
def test_get(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
retrieved = registry.get("mock")
assert retrieved is effect
def test_get_nonexistent(self):
registry = EffectRegistry()
assert registry.get("nonexistent") is None
def test_enable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = False
registry.register(effect)
registry.enable("mock")
assert effect.config.enabled is True
def test_disable(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
registry.disable("mock")
assert effect.config.enabled is False
def test_list_enabled(self):
registry = EffectRegistry()
class EnabledEffect:
name = "enabled_effect"
config = EffectConfig(enabled=True, intensity=1.0)
class DisabledEffect:
name = "disabled_effect"
config = EffectConfig(enabled=False, intensity=1.0)
registry.register(EnabledEffect())
registry.register(DisabledEffect())
enabled = registry.list_enabled()
assert len(enabled) == 1
assert enabled[0].name == "enabled_effect"
def test_configure(self):
registry = EffectRegistry()
effect = MockEffect()
registry.register(effect)
new_config = EffectConfig(enabled=False, intensity=0.3)
registry.configure("mock", new_config)
assert effect.config.enabled is False
assert effect.config.intensity == 0.3
def test_is_enabled(self):
registry = EffectRegistry()
effect = MockEffect()
effect.config.enabled = True
registry.register(effect)
assert registry.is_enabled("mock") is True
assert registry.is_enabled("nonexistent") is False
class TestEffectChain:
def test_init(self):
registry = EffectRegistry()
chain = EffectChain(registry)
assert chain.get_order() == []
def test_set_order(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
registry.register(effect1)
registry.register(effect2)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2"])
assert chain.get_order() == ["effect1", "effect2"]
def test_add_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.add_effect("test_effect")
assert "test_effect" in chain.get_order()
def test_add_effect_invalid(self):
registry = EffectRegistry()
chain = EffectChain(registry)
result = chain.add_effect("nonexistent")
assert result is False
def test_remove_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
chain.remove_effect("test_effect")
assert "test_effect" not in chain.get_order()
def test_reorder(self):
registry = EffectRegistry()
effect1 = MockEffect()
effect1.name = "effect1"
effect2 = MockEffect()
effect2.name = "effect2"
effect3 = MockEffect()
effect3.name = "effect3"
registry.register(effect1)
registry.register(effect2)
registry.register(effect3)
chain = EffectChain(registry)
chain.set_order(["effect1", "effect2", "effect3"])
result = chain.reorder(["effect3", "effect1", "effect2"])
assert result is True
assert chain.get_order() == ["effect3", "effect1", "effect2"]
def test_reorder_invalid(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "effect1"
registry.register(effect)
chain = EffectChain(registry)
result = chain.reorder(["effect1", "nonexistent"])
assert result is False
def test_process_empty_chain(self):
registry = EffectRegistry()
chain = EffectChain(registry)
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == buf
def test_process_with_effects(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1", "line2"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1", "line2", "processed"]
assert effect.processed is True
assert effect.last_ctx is ctx
def test_process_disabled_effect(self):
registry = EffectRegistry()
effect = MockEffect()
effect.name = "test_effect"
effect.config.enabled = False
registry.register(effect)
chain = EffectChain(registry)
chain.set_order(["test_effect"])
buf = ["line1"]
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=0,
has_message=False,
)
result = chain.process(buf, ctx)
assert result == ["line1"]
assert effect.processed is False
class TestEffectsExports:
def test_all_exports_are_importable(self):
"""Verify all exports in __all__ can actually be imported."""
import engine.effects as effects_module
for name in effects_module.__all__:
getattr(effects_module, name)
class TestPerformanceMonitor:
def test_empty_stats(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
stats = monitor.get_stats()
assert "error" in stats
def test_record_and_retrieve(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test_effect", 1.5, 100, 150)
monitor.end_frame(1, 2.0)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["frame_count"] == 1
assert "test_effect" in stats["effects"]
def test_multiple_frames(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=3)
for i in range(5):
monitor.start_frame(i)
monitor.record_effect("effect1", 1.0, 100, 100)
monitor.record_effect("effect2", 0.5, 100, 100)
monitor.end_frame(i, 1.5)
stats = monitor.get_stats()
assert stats["frame_count"] == 3
assert "effect1" in stats["effects"]
assert "effect2" in stats["effects"]
def test_reset(self):
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor()
monitor.start_frame(1)
monitor.record_effect("test", 1.0, 100, 100)
monitor.end_frame(1, 1.0)
monitor.reset()
stats = monitor.get_stats()
assert "error" in stats
class TestEffectPipelinePerformance:
def test_pipeline_stays_within_frame_budget(self):
"""Verify effect pipeline completes within frame budget (33ms for 30fps)."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class DummyEffect:
name = "dummy"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
return [line * 2 for line in buf]
registry = EffectRegistry()
registry.register(DummyEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=10)
chain = EffectChain(registry, monitor)
chain.set_order(["dummy"])
buf = ["x" * 80] * 20
for i in range(10):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["pipeline"]["max_ms"] < 33.0
def test_individual_effects_performance(self):
"""Verify individual effects don't exceed 10ms per frame."""
from engine.effects import (
EffectChain,
EffectConfig,
EffectContext,
EffectRegistry,
)
class SlowEffect:
name = "slow"
config = EffectConfig(enabled=True, intensity=1.0)
def process(self, buf, ctx):
result = []
for line in buf:
result.append(line)
result.append(line + line)
return result
registry = EffectRegistry()
registry.register(SlowEffect())
from engine.effects.performance import PerformanceMonitor
monitor = PerformanceMonitor(max_frames=5)
chain = EffectChain(registry, monitor)
chain.set_order(["slow"])
buf = ["x" * 80] * 10
for i in range(5):
ctx = EffectContext(
terminal_width=80,
terminal_height=24,
scroll_cam=0,
ticker_height=20,
mic_excess=0.0,
grad_offset=0.0,
frame_number=i,
has_message=False,
)
chain.process(buf, ctx)
stats = monitor.get_stats()
assert "error" not in stats
assert stats["effects"]["slow"]["max_ms"] < 10.0

View File

@@ -0,0 +1,117 @@
"""
Tests for engine.effects.controller module.
"""
from unittest.mock import MagicMock, patch
from engine.effects.controller import (
handle_effects_command,
set_effect_chain_ref,
)
class TestHandleEffectsCommand:
"""Tests for handle_effects_command function."""
def test_list_effects(self):
"""list command returns formatted effects list."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_plugin.config.enabled = True
mock_plugin.config.intensity = 0.5
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
mock_chain.return_value.get_order.return_value = ["noise"]
result = handle_effects_command("/effects list")
assert "noise: ON" in result
assert "intensity=0.5" in result
def test_enable_effect(self):
"""enable command calls registry.enable."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise on")
assert "Enabled: noise" in result
mock_registry.return_value.enable.assert_called_once_with("noise")
def test_disable_effect(self):
"""disable command calls registry.disable."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise off")
assert "Disabled: noise" in result
mock_registry.return_value.disable.assert_called_once_with("noise")
def test_set_intensity(self):
"""intensity command sets plugin intensity."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_plugin.config.intensity = 0.5
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise intensity 0.8")
assert "intensity to 0.8" in result
assert mock_plugin.config.intensity == 0.8
def test_invalid_intensity_range(self):
"""intensity outside 0.0-1.0 returns error."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_plugin = MagicMock()
mock_registry.return_value.get.return_value = mock_plugin
mock_registry.return_value.list_all.return_value = {"noise": mock_plugin}
result = handle_effects_command("/effects noise intensity 1.5")
assert "between 0.0 and 1.0" in result
def test_reorder_pipeline(self):
"""reorder command calls chain.reorder."""
with patch("engine.effects.controller.get_registry") as mock_registry:
mock_registry.return_value.list_all.return_value = {}
with patch("engine.effects.controller._get_effect_chain") as mock_chain:
mock_chain_instance = MagicMock()
mock_chain_instance.reorder.return_value = True
mock_chain.return_value = mock_chain_instance
result = handle_effects_command("/effects reorder noise,fade")
assert "Reordered pipeline" in result
mock_chain_instance.reorder.assert_called_once_with(["noise", "fade"])
def test_unknown_command(self):
"""unknown command returns error."""
result = handle_effects_command("/unknown")
assert "Unknown command" in result
def test_non_effects_command(self):
"""non-effects command returns error."""
result = handle_effects_command("not a command")
assert "Unknown command" in result
class TestSetEffectChainRef:
"""Tests for set_effect_chain_ref function."""
def test_sets_global_ref(self):
"""set_effect_chain_ref updates global reference."""
mock_chain = MagicMock()
set_effect_chain_ref(mock_chain)
from engine.effects.controller import _get_effect_chain
result = _get_effect_chain()
assert result == mock_chain

69
tests/test_emitters.py Normal file
View File

@@ -0,0 +1,69 @@
"""
Tests for engine.emitters module.
"""
from engine.emitters import EventEmitter, Startable, Stoppable
class TestEventEmitterProtocol:
"""Tests for EventEmitter protocol."""
def test_protocol_exists(self):
"""EventEmitter protocol is defined."""
assert EventEmitter is not None
def test_protocol_has_subscribe_method(self):
"""EventEmitter has subscribe method in protocol."""
assert hasattr(EventEmitter, "subscribe")
def test_protocol_has_unsubscribe_method(self):
"""EventEmitter has unsubscribe method in protocol."""
assert hasattr(EventEmitter, "unsubscribe")
class TestStartableProtocol:
"""Tests for Startable protocol."""
def test_protocol_exists(self):
"""Startable protocol is defined."""
assert Startable is not None
def test_protocol_has_start_method(self):
"""Startable has start method in protocol."""
assert hasattr(Startable, "start")
class TestStoppableProtocol:
"""Tests for Stoppable protocol."""
def test_protocol_exists(self):
"""Stoppable protocol is defined."""
assert Stoppable is not None
def test_protocol_has_stop_method(self):
"""Stoppable has stop method in protocol."""
assert hasattr(Stoppable, "stop")
class TestProtocolCompliance:
"""Tests that existing classes comply with protocols."""
def test_ntfy_poller_complies_with_protocol(self):
"""NtfyPoller implements EventEmitter protocol."""
from engine.ntfy import NtfyPoller
poller = NtfyPoller("http://example.com/topic")
assert hasattr(poller, "subscribe")
assert hasattr(poller, "unsubscribe")
assert callable(poller.subscribe)
assert callable(poller.unsubscribe)
def test_mic_monitor_complies_with_protocol(self):
"""MicMonitor implements EventEmitter and Startable protocols."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert hasattr(monitor, "subscribe")
assert hasattr(monitor, "unsubscribe")
assert hasattr(monitor, "start")
assert hasattr(monitor, "stop")

202
tests/test_eventbus.py Normal file
View File

@@ -0,0 +1,202 @@
"""
Tests for engine.eventbus module.
"""
from engine.eventbus import EventBus, get_event_bus, set_event_bus
from engine.events import EventType, NtfyMessageEvent
class TestEventBusInit:
"""Tests for EventBus initialization."""
def test_init_creates_empty_subscribers(self):
"""EventBus starts with no subscribers."""
bus = EventBus()
assert bus.subscriber_count() == 0
class TestEventBusSubscribe:
"""Tests for EventBus.subscribe method."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback for an event type."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
def test_subscribe_multiple_callbacks_same_event(self):
"""Multiple callbacks can be subscribed to the same event type."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.NTFY_MESSAGE, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 2
def test_subscribe_different_event_types(self):
"""Callbacks can be subscribed to different event types."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
assert bus.subscriber_count(EventType.MIC_LEVEL) == 1
class TestEventBusUnsubscribe:
"""Tests for EventBus.unsubscribe method."""
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 0
def test_unsubscribe_nonexistent_callback_no_error(self):
"""unsubscribe() handles non-existent callback gracefully."""
bus = EventBus()
def callback(e):
return None
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
class TestEventBusPublish:
"""Tests for EventBus.publish method."""
def test_publish_calls_subscriber(self):
"""publish() calls the subscriber callback."""
bus = EventBus()
received = []
def callback(event):
received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received) == 1
assert received[0].title == "Test"
def test_publish_multiple_subscribers(self):
"""publish() calls all subscribers for an event type."""
bus = EventBus()
received1 = []
received2 = []
def callback1(event):
received1.append(event)
def callback2(event):
received2.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback1)
bus.subscribe(EventType.NTFY_MESSAGE, callback2)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received1) == 1
assert len(received2) == 1
def test_publish_different_event_types(self):
"""publish() only calls subscribers for the specific event type."""
bus = EventBus()
ntfy_received = []
mic_received = []
def ntfy_callback(event):
ntfy_received.append(event)
def mic_callback(event):
mic_received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, ntfy_callback)
bus.subscribe(EventType.MIC_LEVEL, mic_callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(ntfy_received) == 1
assert len(mic_received) == 0
class TestEventBusClear:
"""Tests for EventBus.clear method."""
def test_clear_removes_all_subscribers(self):
"""clear() removes all subscribers."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
bus.clear()
assert bus.subscriber_count() == 0
class TestEventBusThreadSafety:
"""Tests for EventBus thread safety."""
def test_concurrent_subscribe_unsubscribe(self):
"""subscribe and unsubscribe can be called concurrently."""
import threading
bus = EventBus()
callbacks = [lambda e: None for _ in range(10)]
def subscribe():
for cb in callbacks:
bus.subscribe(EventType.NTFY_MESSAGE, cb)
def unsubscribe():
for cb in callbacks:
bus.unsubscribe(EventType.NTFY_MESSAGE, cb)
t1 = threading.Thread(target=subscribe)
t2 = threading.Thread(target=unsubscribe)
t1.start()
t2.start()
t1.join()
t2.join()
class TestGlobalEventBus:
"""Tests for global event bus functions."""
def test_get_event_bus_returns_singleton(self):
"""get_event_bus() returns the same instance."""
bus1 = get_event_bus()
bus2 = get_event_bus()
assert bus1 is bus2
def test_set_event_bus_replaces_singleton(self):
"""set_event_bus() replaces the global event bus."""
new_bus = EventBus()
set_event_bus(new_bus)
try:
assert get_event_bus() is new_bus
finally:
set_event_bus(None)

112
tests/test_events.py Normal file
View File

@@ -0,0 +1,112 @@
"""
Tests for engine.events module.
"""
from datetime import datetime
from engine import events
class TestEventType:
"""Tests for EventType enum."""
def test_event_types_exist(self):
"""All expected event types exist."""
assert hasattr(events.EventType, "NEW_HEADLINE")
assert hasattr(events.EventType, "FRAME_TICK")
assert hasattr(events.EventType, "MIC_LEVEL")
assert hasattr(events.EventType, "NTFY_MESSAGE")
assert hasattr(events.EventType, "STREAM_START")
assert hasattr(events.EventType, "STREAM_END")
class TestHeadlineEvent:
"""Tests for HeadlineEvent dataclass."""
def test_create_headline_event(self):
"""HeadlineEvent can be created with required fields."""
e = events.HeadlineEvent(
title="Test Headline",
source="Test Source",
timestamp="12:00",
)
assert e.title == "Test Headline"
assert e.source == "Test Source"
assert e.timestamp == "12:00"
def test_headline_event_optional_language(self):
"""HeadlineEvent supports optional language field."""
e = events.HeadlineEvent(
title="Test",
source="Test",
timestamp="12:00",
language="ja",
)
assert e.language == "ja"
class TestFrameTickEvent:
"""Tests for FrameTickEvent dataclass."""
def test_create_frame_tick_event(self):
"""FrameTickEvent can be created."""
now = datetime.now()
e = events.FrameTickEvent(
frame_number=100,
timestamp=now,
delta_seconds=0.05,
)
assert e.frame_number == 100
assert e.timestamp == now
assert e.delta_seconds == 0.05
class TestMicLevelEvent:
"""Tests for MicLevelEvent dataclass."""
def test_create_mic_level_event(self):
"""MicLevelEvent can be created."""
now = datetime.now()
e = events.MicLevelEvent(
db_level=60.0,
excess_above_threshold=10.0,
timestamp=now,
)
assert e.db_level == 60.0
assert e.excess_above_threshold == 10.0
class TestNtfyMessageEvent:
"""Tests for NtfyMessageEvent dataclass."""
def test_create_ntfy_message_event(self):
"""NtfyMessageEvent can be created with required fields."""
e = events.NtfyMessageEvent(
title="Test Title",
body="Test Body",
)
assert e.title == "Test Title"
assert e.body == "Test Body"
assert e.message_id is None
def test_ntfy_message_event_with_id(self):
"""NtfyMessageEvent supports optional message_id."""
e = events.NtfyMessageEvent(
title="Test",
body="Test",
message_id="abc123",
)
assert e.message_id == "abc123"
class TestStreamEvent:
"""Tests for StreamEvent dataclass."""
def test_create_stream_event(self):
"""StreamEvent can be created."""
e = events.StreamEvent(
event_type=events.EventType.STREAM_START,
headline_count=100,
)
assert e.event_type == events.EventType.STREAM_START
assert e.headline_count == 100

234
tests/test_fetch.py Normal file
View File

@@ -0,0 +1,234 @@
"""
Tests for engine.fetch module.
"""
import json
from unittest.mock import MagicMock, patch
from engine.fetch import (
_fetch_gutenberg,
fetch_all,
fetch_feed,
fetch_poetry,
load_cache,
save_cache,
)
class TestFetchFeed:
"""Tests for fetch_feed function."""
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_success(self, mock_urlopen):
"""Successful feed fetch returns parsed feed."""
mock_response = MagicMock()
mock_response.read.return_value = b"<rss>test</rss>"
mock_urlopen.return_value = mock_response
result = fetch_feed("http://example.com/feed")
assert result is not None
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_network_error(self, mock_urlopen):
"""Network error returns None."""
mock_urlopen.side_effect = Exception("Network error")
result = fetch_feed("http://example.com/feed")
assert result is None
class TestFetchAll:
"""Tests for fetch_all function."""
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.strip_tags")
@patch("engine.fetch.skip")
@patch("engine.fetch.boot_ln")
def test_fetch_all_success(self, mock_boot, mock_skip, mock_strip, mock_fetch_feed):
"""Successful fetch returns items."""
mock_feed = MagicMock()
mock_feed.bozo = False
mock_feed.entries = [
{"title": "Headline 1", "published_parsed": (2024, 1, 1, 12, 0, 0)},
{"title": "Headline 2", "updated_parsed": (2024, 1, 2, 12, 0, 0)},
]
mock_fetch_feed.return_value = mock_feed
mock_skip.return_value = False
mock_strip.side_effect = lambda x: x
items, linked, failed = fetch_all()
assert linked > 0
assert failed == 0
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.boot_ln")
def test_fetch_all_feed_error(self, mock_boot, mock_fetch_feed):
"""Feed error increments failed count."""
mock_fetch_feed.return_value = None
items, linked, failed = fetch_all()
assert failed > 0
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.strip_tags")
@patch("engine.fetch.skip")
@patch("engine.fetch.boot_ln")
def test_fetch_all_skips_filtered(
self, mock_boot, mock_skip, mock_strip, mock_fetch_feed
):
"""Filtered headlines are skipped."""
mock_feed = MagicMock()
mock_feed.bozo = False
mock_feed.entries = [
{"title": "Sports scores"},
{"title": "Valid headline"},
]
mock_fetch_feed.return_value = mock_feed
mock_skip.side_effect = lambda x: x == "Sports scores"
mock_strip.side_effect = lambda x: x
items, linked, failed = fetch_all()
assert any("Valid headline" in item[0] for item in items)
class TestFetchGutenberg:
"""Tests for _fetch_gutenberg function."""
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_success(self, mock_urlopen):
"""Successful gutenberg fetch returns items."""
text = """Project Gutenberg
*** START OF THE PROJECT GUTENBERG ***
This is a test poem with multiple lines
that should be parsed as a block.
Another stanza with more content here.
*** END OF THE PROJECT GUTENBERG ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert len(result) > 0
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_network_error(self, mock_urlopen):
"""Network error returns empty list."""
mock_urlopen.side_effect = Exception("Network error")
result = _fetch_gutenberg("http://example.com/test", "Test")
assert result == []
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_skips_short_blocks(self, mock_urlopen):
"""Blocks shorter than 20 chars are skipped."""
text = """*** START OF THE ***
Short
*** END OF THE ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert result == []
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_skips_all_caps_headers(self, mock_urlopen):
"""All-caps lines are skipped as headers."""
text = """*** START OF THE ***
THIS IS ALL CAPS HEADER
more content here
*** END OF THE ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert len(result) > 0
class TestFetchPoetry:
"""Tests for fetch_poetry function."""
@patch("engine.fetch._fetch_gutenberg")
@patch("engine.fetch.boot_ln")
def test_fetch_poetry_success(self, mock_boot, mock_fetch):
"""Successful poetry fetch returns items."""
mock_fetch.return_value = [
("Stanza 1 content here", "Test", ""),
("Stanza 2 content here", "Test", ""),
]
items, linked, failed = fetch_poetry()
assert linked > 0
assert failed == 0
@patch("engine.fetch._fetch_gutenberg")
@patch("engine.fetch.boot_ln")
def test_fetch_poetry_failure(self, mock_boot, mock_fetch):
"""Failed fetch increments failed count."""
mock_fetch.return_value = []
items, linked, failed = fetch_poetry()
assert failed > 0
class TestCache:
"""Tests for cache functions."""
@patch("engine.fetch._cache_path")
def test_load_cache_success(self, mock_path):
"""Successful cache load returns items."""
mock_path.return_value.__str__ = MagicMock(return_value="/tmp/cache")
mock_path.return_value.exists.return_value = True
mock_path.return_value.read_text.return_value = json.dumps(
{"items": [("title", "source", "time")]}
)
result = load_cache()
assert result is not None
@patch("engine.fetch._cache_path")
def test_load_cache_missing_file(self, mock_path):
"""Missing cache file returns None."""
mock_path.return_value.exists.return_value = False
result = load_cache()
assert result is None
@patch("engine.fetch._cache_path")
def test_load_cache_invalid_json(self, mock_path):
"""Invalid JSON returns None."""
mock_path.return_value.exists.return_value = True
mock_path.return_value.read_text.side_effect = json.JSONDecodeError("", "", 0)
result = load_cache()
assert result is None
@patch("engine.fetch._cache_path")
def test_save_cache_success(self, mock_path):
"""Save cache writes to file."""
mock_path.return_value.__truediv__ = MagicMock(
return_value=mock_path.return_value
)
save_cache([("title", "source", "time")])

93
tests/test_filter.py Normal file
View File

@@ -0,0 +1,93 @@
"""
Tests for engine.filter module.
"""
from engine.filter import skip, strip_tags
class TestStripTags:
"""Tests for strip_tags function."""
def test_strips_simple_html(self):
"""Basic HTML tags are removed."""
assert strip_tags("<p>Hello</p>") == "Hello"
assert strip_tags("<b>Bold</b>") == "Bold"
assert strip_tags("<em>Italic</em>") == "Italic"
def test_strips_nested_html(self):
"""Nested HTML tags are handled."""
assert strip_tags("<div><p>Nested</p></div>") == "Nested"
assert strip_tags("<span><strong>Deep</strong></span>") == "Deep"
def test_strips_html_with_attributes(self):
"""HTML with attributes is handled."""
assert strip_tags('<a href="http://example.com">Link</a>') == "Link"
assert strip_tags('<img src="test.jpg" alt="test">') == ""
def test_handles_empty_string(self):
"""Empty string returns empty string."""
assert strip_tags("") == ""
assert strip_tags(None) == ""
def test_handles_plain_text(self):
"""Plain text without tags passes through."""
assert strip_tags("Plain text") == "Plain text"
def test_unescapes_html_entities(self):
"""HTML entities are decoded and tags are stripped."""
assert strip_tags("&nbsp;test") == "test"
assert strip_tags("Hello &amp; World") == "Hello & World"
def test_handles_malformed_html(self):
"""Malformed HTML is handled gracefully."""
assert strip_tags("<p>Unclosed") == "Unclosed"
assert strip_tags("</p>No start") == "No start"
class TestSkip:
"""Tests for skip function - content filtering."""
def test_skips_sports_content(self):
"""Sports-related headlines are skipped."""
assert skip("Football: Team wins championship") is True
assert skip("NBA Finals Game 7 results") is True
assert skip("Soccer match ends in draw") is True
assert skip("Premier League transfer news") is True
assert skip("Super Bowl halftime show") is True
def test_skips_vapid_content(self):
"""Vapid/celebrity content is skipped."""
assert skip("Kim Kardashian's new look") is True
assert skip("Influencer goes viral") is True
assert skip("Red carpet best dressed") is True
assert skip("Celebrity couple splits") is True
def test_allows_real_news(self):
"""Legitimate news headlines are allowed."""
assert skip("Scientists discover new planet") is False
assert skip("Economy grows by 3%") is False
assert skip("World leaders meet for summit") is False
assert skip("New technology breakthrough") is False
def test_case_insensitive(self):
"""Filter is case insensitive."""
assert skip("FOOTBALL scores") is True
assert skip("Football SCORES") is True
assert skip("Kardashian") is True
def test_word_boundary_matching(self):
"""Word boundary matching works correctly."""
assert skip("The football stadium") is True
assert skip("Footballer scores") is False
assert skip("Footballs on sale") is False
class TestIntegration:
"""Integration tests combining filter functions."""
def test_full_pipeline(self):
"""Test strip_tags followed by skip."""
html = '<p><a href="#">Breaking: Football championship final</a></p>'
text = strip_tags(html)
assert text == "Breaking: Football championship final"
assert skip(text) is True

63
tests/test_frame.py Normal file
View File

@@ -0,0 +1,63 @@
"""
Tests for engine.frame module.
"""
import time
from engine.frame import FrameTimer, calculate_scroll_step
class TestFrameTimer:
"""Tests for FrameTimer class."""
def test_init_default(self):
"""FrameTimer initializes with default values."""
timer = FrameTimer()
assert timer.target_frame_dt == 0.05
assert timer.fps >= 0
def test_init_custom(self):
"""FrameTimer accepts custom frame duration."""
timer = FrameTimer(target_frame_dt=0.1)
assert timer.target_frame_dt == 0.1
def test_fps_calculation(self):
"""FrameTimer calculates FPS correctly."""
timer = FrameTimer()
timer._frame_count = 10
timer._start_time = time.monotonic() - 1.0
assert timer.fps >= 9.0
def test_reset(self):
"""FrameTimer.reset() clears frame count."""
timer = FrameTimer()
timer._frame_count = 100
timer.reset()
assert timer._frame_count == 0
class TestCalculateScrollStep:
"""Tests for calculate_scroll_step function."""
def test_basic_calculation(self):
"""calculate_scroll_step returns positive value."""
result = calculate_scroll_step(5.0, 24)
assert result > 0
def test_with_padding(self):
"""calculate_scroll_step respects padding parameter."""
without_padding = calculate_scroll_step(5.0, 24, padding=0)
with_padding = calculate_scroll_step(5.0, 24, padding=15)
assert with_padding < without_padding
def test_larger_view_slower_scroll(self):
"""Larger view height results in slower scroll steps."""
small = calculate_scroll_step(5.0, 10)
large = calculate_scroll_step(5.0, 50)
assert large < small
def test_longer_duration_slower_scroll(self):
"""Longer scroll duration results in slower scroll steps."""
fast = calculate_scroll_step(2.0, 24)
slow = calculate_scroll_step(10.0, 24)
assert slow > fast

96
tests/test_layers.py Normal file
View File

@@ -0,0 +1,96 @@
"""
Tests for engine.layers module.
"""
import time
from engine import layers
class TestRenderMessageOverlay:
"""Tests for render_message_overlay function."""
def test_no_message_returns_empty(self):
"""Returns empty list when msg is None."""
result, cache = layers.render_message_overlay(None, 80, 24, (None, None))
assert result == []
assert cache[0] is None
def test_message_returns_overlay_lines(self):
"""Returns non-empty list when message is present."""
msg = ("Test Title", "Test Body", time.monotonic())
result, cache = layers.render_message_overlay(msg, 80, 24, (None, None))
assert len(result) > 0
assert cache[0] is not None
def test_cache_key_changes_with_text(self):
"""Cache key changes when message text changes."""
msg1 = ("Title1", "Body1", time.monotonic())
msg2 = ("Title2", "Body2", time.monotonic())
_, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None))
_, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1)
assert cache1[0] != cache2[0]
def test_cache_reuse_avoids_recomputation(self):
"""Cache is returned when same message is passed (interface test)."""
msg = ("Same Title", "Same Body", time.monotonic())
result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None))
result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1)
assert len(result1) > 0
assert len(result2) > 0
assert cache1[0] == cache2[0]
class TestRenderFirehose:
"""Tests for render_firehose function."""
def test_no_firehose_returns_empty(self):
"""Returns empty list when firehose height is 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 0, 24)
assert result == []
def test_firehose_returns_lines(self):
"""Returns lines when firehose height > 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 4, 24)
assert len(result) == 4
def test_firehose_includes_ansi_escapes(self):
"""Returns lines containing ANSI escape sequences."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 1, 24)
assert "\033[" in result[0]
class TestApplyGlitch:
"""Tests for apply_glitch function."""
def test_empty_buffer_unchanged(self):
"""Empty buffer is returned unchanged."""
result = layers.apply_glitch([], 0, 0.0, 80)
assert result == []
def test_buffer_length_preserved(self):
"""Buffer length is preserved after glitch application."""
buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)]
result = layers.apply_glitch(buf, 0, 0.5, 80)
assert len(result) == len(buf)
class TestRenderTickerZone:
"""Tests for render_ticker_zone function - focusing on interface."""
def test_returns_list(self):
"""Returns a list of strings."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
assert isinstance(result, list)
def test_returns_dict_for_cache(self):
"""Returns a dict for the noise cache."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
assert isinstance(cache, dict)

149
tests/test_mic.py Normal file
View File

@@ -0,0 +1,149 @@
"""
Tests for engine.mic module.
"""
from datetime import datetime
from unittest.mock import patch
from engine.events import MicLevelEvent
class TestMicMonitorImport:
"""Tests for module import behavior."""
def test_mic_monitor_imports_without_error(self):
"""MicMonitor can be imported even without sounddevice."""
from engine.mic import MicMonitor
assert MicMonitor is not None
class TestMicMonitorInit:
"""Tests for MicMonitor initialization."""
def test_init_sets_threshold(self):
"""Threshold is set correctly."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=60)
assert monitor.threshold_db == 60
def test_init_defaults(self):
"""Default values are set correctly."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert monitor.threshold_db == 50
def test_init_db_starts_at_negative(self):
"""_db starts at negative value."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert monitor.db == -99.0
class TestMicMonitorProperties:
"""Tests for MicMonitor properties."""
def test_excess_returns_positive_when_above_threshold(self):
"""excess returns positive value when above threshold."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=50)
with patch.object(monitor, "_db", 60.0):
assert monitor.excess == 10.0
def test_excess_returns_zero_when_below_threshold(self):
"""excess returns zero when below threshold."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=50)
with patch.object(monitor, "_db", 40.0):
assert monitor.excess == 0.0
class TestMicMonitorAvailable:
"""Tests for MicMonitor.available property."""
def test_available_is_bool(self):
"""available returns a boolean."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert isinstance(monitor.available, bool)
class TestMicMonitorStop:
"""Tests for MicMonitor.stop method."""
def test_stop_does_nothing_when_no_stream(self):
"""stop() does nothing if no stream exists."""
from engine.mic import MicMonitor
monitor = MicMonitor()
monitor.stop()
assert monitor._stream is None
class TestMicMonitorEventEmission:
"""Tests for MicMonitor event emission."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def callback(e):
return None
monitor.subscribe(callback)
assert callback in monitor._subscribers
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def callback(e):
return None
monitor.subscribe(callback)
monitor.unsubscribe(callback)
assert callback not in monitor._subscribers
def test_emit_calls_subscribers(self):
"""_emit() calls all subscribers."""
from engine.mic import MicMonitor
monitor = MicMonitor()
received = []
def callback(event):
received.append(event)
monitor.subscribe(callback)
event = MicLevelEvent(
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
)
monitor._emit(event)
assert len(received) == 1
assert received[0].db_level == 60.0
def test_emit_handles_subscriber_exception(self):
"""_emit() handles exceptions in subscribers gracefully."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def bad_callback(event):
raise RuntimeError("test")
monitor.subscribe(bad_callback)
event = MicLevelEvent(
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
)
monitor._emit(event)

122
tests/test_ntfy.py Normal file
View File

@@ -0,0 +1,122 @@
"""
Tests for engine.ntfy module.
"""
import time
from unittest.mock import MagicMock, patch
from engine.events import NtfyMessageEvent
from engine.ntfy import NtfyPoller
class TestNtfyPollerInit:
"""Tests for NtfyPoller initialization."""
def test_init_sets_defaults(self):
"""Default values are set correctly."""
poller = NtfyPoller("http://example.com/topic")
assert poller.topic_url == "http://example.com/topic"
assert poller.reconnect_delay == 5
assert poller.display_secs == 30
def test_init_custom_values(self):
"""Custom values are set correctly."""
poller = NtfyPoller(
"http://example.com/topic", reconnect_delay=10, display_secs=60
)
assert poller.reconnect_delay == 10
assert poller.display_secs == 60
class TestNtfyPollerStart:
"""Tests for NtfyPoller.start method."""
@patch("engine.ntfy.threading.Thread")
def test_start_creates_daemon_thread(self, mock_thread):
"""start() creates and starts a daemon thread."""
mock_thread_instance = MagicMock()
mock_thread.return_value = mock_thread_instance
poller = NtfyPoller("http://example.com/topic")
result = poller.start()
assert result is True
mock_thread.assert_called_once()
args, kwargs = mock_thread.call_args
assert kwargs.get("daemon") is True
mock_thread_instance.start.assert_called_once()
class TestNtfyPollerGetActiveMessage:
"""Tests for NtfyPoller.get_active_message method."""
def test_returns_none_when_no_message(self):
"""Returns None when no message has been received."""
poller = NtfyPoller("http://example.com/topic")
result = poller.get_active_message()
assert result is None
class TestNtfyPollerDismiss:
"""Tests for NtfyPoller.dismiss method."""
def test_dismiss_clears_message(self):
"""dismiss() clears the current message."""
poller = NtfyPoller("http://example.com/topic")
with patch.object(poller, "_lock"):
poller._message = ("Title", "Body", time.monotonic())
poller.dismiss()
assert poller._message is None
class TestNtfyPollerEventEmission:
"""Tests for NtfyPoller event emission."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback."""
poller = NtfyPoller("http://example.com/topic")
def callback(e):
return None
poller.subscribe(callback)
assert callback in poller._subscribers
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
poller = NtfyPoller("http://example.com/topic")
def callback(e):
return None
poller.subscribe(callback)
poller.unsubscribe(callback)
assert callback not in poller._subscribers
def test_emit_calls_subscribers(self):
"""_emit() calls all subscribers."""
poller = NtfyPoller("http://example.com/topic")
received = []
def callback(event):
received.append(event)
poller.subscribe(callback)
event = NtfyMessageEvent(title="Test", body="Body")
poller._emit(event)
assert len(received) == 1
assert received[0].title == "Test"
def test_emit_handles_subscriber_exception(self):
"""_emit() handles exceptions in subscribers gracefully."""
poller = NtfyPoller("http://example.com/topic")
def bad_callback(event):
raise RuntimeError("test")
poller.subscribe(bad_callback)
event = NtfyMessageEvent(title="Test", body="Body")
poller._emit(event)

View File

@@ -0,0 +1,127 @@
"""
Integration tests for ntfy topics.
"""
import json
import time
import urllib.request
class TestNtfyTopics:
def test_cc_cmd_topic_exists_and_writable(self):
"""Verify C&C CMD topic exists and accepts messages."""
from engine.config import NTFY_CC_CMD_TOPIC
topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e
def test_cc_resp_topic_exists_and_writable(self):
"""Verify C&C RESP topic exists and accepts messages."""
from engine.config import NTFY_CC_RESP_TOPIC
topic_url = NTFY_CC_RESP_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to C&C RESP topic: {e}") from e
def test_message_topic_exists_and_writable(self):
"""Verify message topic exists and accepts messages."""
from engine.config import NTFY_TOPIC
topic_url = NTFY_TOPIC.replace("/json", "")
test_message = f"test_{int(time.time())}"
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
assert resp.status == 200
except Exception as e:
raise AssertionError(f"Failed to write to message topic: {e}") from e
def test_cc_cmd_topic_readable(self):
"""Verify we can read messages from C&C CMD topic."""
from engine.config import NTFY_CC_CMD_TOPIC
test_message = f"integration_test_{int(time.time())}"
topic_url = NTFY_CC_CMD_TOPIC.replace("/json", "")
req = urllib.request.Request(
topic_url,
data=test_message.encode("utf-8"),
headers={
"User-Agent": "mainline-test/0.1",
"Content-Type": "text/plain",
},
method="POST",
)
try:
urllib.request.urlopen(req, timeout=10)
except Exception as e:
raise AssertionError(f"Failed to write to C&C CMD topic: {e}") from e
time.sleep(1)
poll_url = f"{NTFY_CC_CMD_TOPIC}?poll=1&limit=1"
req = urllib.request.Request(
poll_url,
headers={"User-Agent": "mainline-test/0.1"},
)
try:
with urllib.request.urlopen(req, timeout=10) as resp:
body = resp.read().decode("utf-8")
if body.strip():
data = json.loads(body.split("\n")[0])
assert isinstance(data, dict)
except Exception as e:
raise AssertionError(f"Failed to read from C&C CMD topic: {e}") from e
def test_topics_are_different(self):
"""Verify C&C CMD/RESP and message topics are different."""
from engine.config import NTFY_CC_CMD_TOPIC, NTFY_CC_RESP_TOPIC, NTFY_TOPIC
assert NTFY_CC_CMD_TOPIC != NTFY_TOPIC
assert NTFY_CC_RESP_TOPIC != NTFY_TOPIC
assert NTFY_CC_CMD_TOPIC != NTFY_CC_RESP_TOPIC
assert "_cc_cmd" in NTFY_CC_CMD_TOPIC
assert "_cc_resp" in NTFY_CC_RESP_TOPIC

232
tests/test_render.py Normal file
View File

@@ -0,0 +1,232 @@
"""
Tests for engine.render module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.render import (
GRAD_COLS,
MSG_GRAD_COLS,
clear_font_cache,
font_for_lang,
lr_gradient,
lr_gradient_opposite,
make_block,
)
class TestGradientConstants:
"""Tests for gradient color constants."""
def test_grad_cols_defined(self):
"""GRAD_COLS is defined with expected length."""
assert len(GRAD_COLS) > 0
assert all(isinstance(c, str) for c in GRAD_COLS)
def test_msg_grad_cols_defined(self):
"""MSG_GRAD_COLS is defined with expected length."""
assert len(MSG_GRAD_COLS) > 0
assert all(isinstance(c, str) for c in MSG_GRAD_COLS)
def test_grad_cols_start_with_white(self):
"""GRAD_COLS starts with white."""
assert "231" in GRAD_COLS[0]
def test_msg_grad_cols_different_from_grad_cols(self):
"""MSG_GRAD_COLS is different from GRAD_COLS."""
assert MSG_GRAD_COLS != GRAD_COLS
class TestLrGradient:
"""Tests for lr_gradient function."""
def test_empty_rows(self):
"""Empty input returns empty output."""
result = lr_gradient([], 0.0)
assert result == []
def test_preserves_empty_rows(self):
"""Empty rows are preserved."""
result = lr_gradient([""], 0.0)
assert result == [""]
def test_adds_gradient_to_content(self):
"""Non-empty rows get gradient coloring."""
result = lr_gradient(["hello"], 0.0)
assert len(result) == 1
assert "\033[" in result[0]
def test_preserves_spaces(self):
"""Spaces are preserved without coloring."""
result = lr_gradient(["hello world"], 0.0)
assert " " in result[0]
def test_offset_wraps_around(self):
"""Offset wraps around at 1.0."""
result1 = lr_gradient(["hello"], 0.0)
result2 = lr_gradient(["hello"], 1.0)
assert result1 != result2 or result1 == result2
class TestLrGradientOpposite:
"""Tests for lr_gradient_opposite function."""
def test_uses_msg_grad_cols(self):
"""Uses MSG_GRAD_COLS instead of GRAD_COLS."""
result = lr_gradient_opposite(["test"])
assert "\033[" in result[0]
class TestClearFontCache:
"""Tests for clear_font_cache function."""
def test_clears_without_error(self):
"""Function runs without error."""
clear_font_cache()
class TestFontForLang:
"""Tests for font_for_lang function."""
@patch("engine.render.font")
def test_returns_default_for_none(self, mock_font):
"""Returns default font when lang is None."""
result = font_for_lang(None)
assert result is not None
@patch("engine.render.font")
def test_returns_default_for_unknown_lang(self, mock_font):
"""Returns default font for unknown language."""
result = font_for_lang("unknown_lang")
assert result is not None
class TestMakeBlock:
"""Tests for make_block function."""
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_basic(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Basic make_block returns content, color, meta index."""
mock_wrap.return_value = ["Headline content", ""]
mock_random.choice.return_value = "\033[38;5;46m"
content, color, meta_idx = make_block(
"Test headline", "TestSource", "12:00", 80
)
assert len(content) > 0
assert color is not None
assert meta_idx >= 0
@pytest.mark.skip(reason="Requires full PIL/font environment")
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_translation(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Translation is applied when mode is news."""
mock_wrap.return_value = ["Translated"]
mock_random.choice.return_value = "\033[38;5;46m"
mock_detect.return_value = "de"
with patch("engine.config.MODE", "news"):
content, _, _ = make_block("Test", "Source", "12:00", 80)
mock_translate.assert_called_once()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_no_translation_poetry(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""No translation when mode is poetry."""
mock_wrap.return_value = ["Poem content"]
mock_random.choice.return_value = "\033[38;5;46m"
with patch("engine.config.MODE", "poetry"):
make_block("Test", "Source", "12:00", 80)
mock_translate.assert_not_called()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_meta_format(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Meta line includes source and timestamp."""
mock_wrap.return_value = ["Content"]
mock_random.choice.return_value = "\033[38;5;46m"
content, _, meta_idx = make_block("Test", "MySource", "14:30", 80)
meta_line = content[meta_idx]
assert "MySource" in meta_line
assert "14:30" in meta_line
class TestRenderLine:
"""Tests for render_line function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.render import render_line
result = render_line("")
assert result == [""]
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_uses_default_font(self):
"""Uses default font when none provided."""
from engine.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
render_line("test")
def test_getbbox_returns_none(self):
"""Handles None bbox gracefully."""
from engine.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = None
result = render_line("test")
assert result == [""]
class TestBigWrap:
"""Tests for big_wrap function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.render import big_wrap
result = big_wrap("", 80)
assert result == []
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_single_word_fits(self):
"""Single short word returns rendered."""
from engine.render import big_wrap
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
result = big_wrap("test", 80)
assert len(result) > 0

93
tests/test_sources.py Normal file
View File

@@ -0,0 +1,93 @@
"""
Tests for engine.sources module - data validation.
"""
from engine import sources
class TestFeeds:
"""Tests for FEEDS data."""
def test_feeds_is_dict(self):
"""FEEDS is a dictionary."""
assert isinstance(sources.FEEDS, dict)
def test_feeds_has_entries(self):
"""FEEDS has feed entries."""
assert len(sources.FEEDS) > 0
def test_feeds_have_valid_urls(self):
"""All feeds have valid URL format."""
for name, url in sources.FEEDS.items():
assert name
assert url.startswith("http://") or url.startswith("https://")
class TestPoetrySources:
"""Tests for POETRY_SOURCES data."""
def test_poetry_is_dict(self):
"""POETRY_SOURCES is a dictionary."""
assert isinstance(sources.POETRY_SOURCES, dict)
def test_poetry_has_entries(self):
"""POETRY_SOURCES has entries."""
assert len(sources.POETRY_SOURCES) > 0
def test_poetry_have_gutenberg_urls(self):
"""All poetry sources are from Gutenberg."""
for _name, url in sources.POETRY_SOURCES.items():
assert "gutenberg.org" in url
class TestSourceLangs:
"""Tests for SOURCE_LANGS mapping."""
def test_source_langs_is_dict(self):
"""SOURCE_LANGS is a dictionary."""
assert isinstance(sources.SOURCE_LANGS, dict)
def test_source_langs_valid_language_codes(self):
"""Language codes are valid ISO codes."""
valid_codes = {"de", "fr", "ja", "zh-cn", "ar", "hi"}
for code in sources.SOURCE_LANGS.values():
assert code in valid_codes
class TestLocationLangs:
"""Tests for LOCATION_LANGS mapping."""
def test_location_langs_is_dict(self):
"""LOCATION_LANGS is a dictionary."""
assert isinstance(sources.LOCATION_LANGS, dict)
def test_location_langs_has_patterns(self):
"""LOCATION_LANGS has regex patterns."""
assert len(sources.LOCATION_LANGS) > 0
class TestScriptFonts:
"""Tests for SCRIPT_FONTS mapping."""
def test_script_fonts_is_dict(self):
"""SCRIPT_FONTS is a dictionary."""
assert isinstance(sources.SCRIPT_FONTS, dict)
def test_script_fonts_has_paths(self):
"""All script fonts have paths."""
for _lang, path in sources.SCRIPT_FONTS.items():
assert path
class TestNoUpper:
"""Tests for NO_UPPER set."""
def test_no_upper_is_set(self):
"""NO_UPPER is a set."""
assert isinstance(sources.NO_UPPER, set)
def test_no_upper_contains_scripts(self):
"""NO_UPPER contains non-Latin scripts."""
assert "zh-cn" in sources.NO_UPPER
assert "ja" in sources.NO_UPPER
assert "ar" in sources.NO_UPPER

130
tests/test_terminal.py Normal file
View File

@@ -0,0 +1,130 @@
"""
Tests for engine.terminal module.
"""
import io
import sys
from unittest.mock import patch
from engine import terminal
class TestTerminalDimensions:
"""Tests for terminal width/height functions."""
def test_tw_returns_columns(self):
"""tw() returns terminal width."""
with (
patch.object(sys.stdout, "isatty", return_value=True),
patch("os.get_terminal_size") as mock_size,
):
mock_size.return_value = io.StringIO("columns=120")
mock_size.columns = 120
result = terminal.tw()
assert isinstance(result, int)
def test_th_returns_lines(self):
"""th() returns terminal height."""
with (
patch.object(sys.stdout, "isatty", return_value=True),
patch("os.get_terminal_size") as mock_size,
):
mock_size.return_value = io.StringIO("lines=30")
mock_size.lines = 30
result = terminal.th()
assert isinstance(result, int)
def test_tw_fallback_on_error(self):
"""tw() falls back to 80 on error."""
with patch("os.get_terminal_size", side_effect=OSError):
result = terminal.tw()
assert result == 80
def test_th_fallback_on_error(self):
"""th() falls back to 24 on error."""
with patch("os.get_terminal_size", side_effect=OSError):
result = terminal.th()
assert result == 24
class TestANSICodes:
"""Tests for ANSI escape code constants."""
def test_ansi_constants_exist(self):
"""All ANSI constants are defined."""
assert terminal.RST == "\033[0m"
assert terminal.BOLD == "\033[1m"
assert terminal.DIM == "\033[2m"
def test_green_shades_defined(self):
"""Green gradient colors are defined."""
assert terminal.G_HI == "\033[38;5;46m"
assert terminal.G_MID == "\033[38;5;34m"
assert terminal.G_LO == "\033[38;5;22m"
def test_white_shades_defined(self):
"""White/gray tones are defined."""
assert terminal.W_COOL == "\033[38;5;250m"
assert terminal.W_DIM == "\033[2;38;5;245m"
def test_cursor_controls_defined(self):
"""Cursor control codes are defined."""
assert "?" in terminal.CURSOR_OFF
assert "?" in terminal.CURSOR_ON
class TestTypeOut:
"""Tests for type_out function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_type_out_writes_text(self, mock_sleep, mock_stdout):
"""type_out writes text to stdout."""
with patch("random.random", return_value=0.5):
terminal.type_out("Hi", color=terminal.G_HI)
output = mock_stdout.getvalue()
assert len(output) > 0
@patch("time.sleep")
def test_type_out_uses_color(self, mock_sleep):
"""type_out applies color codes."""
with (
patch("sys.stdout", new_callable=io.StringIO),
patch("random.random", return_value=0.5),
):
terminal.type_out("Test", color=terminal.G_HI)
class TestSlowPrint:
"""Tests for slow_print function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_slow_print_writes_text(self, mock_sleep, mock_stdout):
"""slow_print writes text to stdout."""
terminal.slow_print("Hi", color=terminal.G_DIM, delay=0)
output = mock_stdout.getvalue()
assert len(output) > 0
class TestBootLn:
"""Tests for boot_ln function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_boot_ln_writes_label_and_status(self, mock_sleep, mock_stdout):
"""boot_ln shows label and status."""
with patch("random.uniform", return_value=0):
terminal.boot_ln("Loading", "OK", ok=True)
output = mock_stdout.getvalue()
assert "Loading" in output
assert "OK" in output
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_boot_ln_error_status(self, mock_sleep, mock_stdout):
"""boot_ln shows red for error status."""
with patch("random.uniform", return_value=0):
terminal.boot_ln("Loading", "FAIL", ok=False)
output = mock_stdout.getvalue()
assert "FAIL" in output

115
tests/test_translate.py Normal file
View File

@@ -0,0 +1,115 @@
"""
Tests for engine.translate module.
"""
import json
from unittest.mock import MagicMock, patch
from engine.translate import (
_translate_cached,
detect_location_language,
translate_headline,
)
def clear_translate_cache():
"""Clear the LRU cache between tests."""
_translate_cached.cache_clear()
class TestDetectLocationLanguage:
"""Tests for detect_location_language function."""
def test_returns_none_for_unknown_location(self):
"""Returns None when no location pattern matches."""
result = detect_location_language("Breaking news about technology")
assert result is None
def test_detects_berlin(self):
"""Detects Berlin location."""
result = detect_location_language("Berlin police arrest protesters")
assert result == "de"
def test_detects_paris(self):
"""Detects Paris location."""
result = detect_location_language("Paris fashion week begins")
assert result == "fr"
def test_detects_tokyo(self):
"""Detects Tokyo location."""
result = detect_location_language("Tokyo stocks rise")
assert result == "ja"
def test_detects_berlin_again(self):
"""Detects Berlin location again."""
result = detect_location_language("Berlin marathon set to begin")
assert result == "de"
def test_case_insensitive(self):
"""Detection is case insensitive."""
result = detect_location_language("BERLIN SUMMER FESTIVAL")
assert result == "de"
def test_returns_first_match(self):
"""Returns first matching pattern."""
result = detect_location_language("Berlin in Paris for the event")
assert result == "de"
class TestTranslateHeadline:
"""Tests for translate_headline function."""
def test_returns_translated_text(self):
"""Returns translated text from cache."""
clear_translate_cache()
with patch("engine.translate.translate_headline") as mock_fn:
mock_fn.return_value = "Translated title"
from engine.translate import translate_headline as th
result = th("Original title", "de")
assert result == "Translated title"
def test_uses_cached_result(self):
"""Translation uses LRU cache."""
clear_translate_cache()
result1 = translate_headline("Test unique", "es")
result2 = translate_headline("Test unique", "es")
assert result1 == result2
class TestTranslateCached:
"""Tests for _translate_cached function."""
def test_translation_network_error(self):
"""Network error returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_urlopen.side_effect = Exception("Network error")
result = _translate_cached("Hello world", "de")
assert result == "Hello world"
def test_translation_invalid_json(self):
"""Invalid JSON returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_response = MagicMock()
mock_response.read.return_value = b"invalid json"
mock_urlopen.return_value = mock_response
result = _translate_cached("Hello", "de")
assert result == "Hello"
def test_translation_empty_response(self):
"""Empty translation response returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_response = MagicMock()
mock_response.read.return_value = json.dumps([[[""], None, "de"], None])
mock_urlopen.return_value = mock_response
result = _translate_cached("Hello", "de")
assert result == "Hello"

95
tests/test_types.py Normal file
View File

@@ -0,0 +1,95 @@
"""
Tests for engine.types module.
"""
from engine.types import (
Block,
FetchResult,
HeadlineItem,
items_to_tuples,
tuples_to_items,
)
class TestHeadlineItem:
"""Tests for HeadlineItem dataclass."""
def test_create_headline_item(self):
"""Can create HeadlineItem with required fields."""
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
assert item.title == "Test"
assert item.source == "Source"
assert item.timestamp == "12:00"
def test_to_tuple(self):
"""to_tuple returns correct tuple."""
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
assert item.to_tuple() == ("Test", "Source", "12:00")
def test_from_tuple(self):
"""from_tuple creates HeadlineItem from tuple."""
item = HeadlineItem.from_tuple(("Test", "Source", "12:00"))
assert item.title == "Test"
assert item.source == "Source"
assert item.timestamp == "12:00"
class TestItemsConversion:
"""Tests for list conversion functions."""
def test_items_to_tuples(self):
"""Converts list of HeadlineItem to list of tuples."""
items = [
HeadlineItem(title="A", source="S", timestamp="10:00"),
HeadlineItem(title="B", source="T", timestamp="11:00"),
]
result = items_to_tuples(items)
assert result == [("A", "S", "10:00"), ("B", "T", "11:00")]
def test_tuples_to_items(self):
"""Converts list of tuples to list of HeadlineItem."""
tuples = [("A", "S", "10:00"), ("B", "T", "11:00")]
result = tuples_to_items(tuples)
assert len(result) == 2
assert result[0].title == "A"
assert result[1].title == "B"
class TestFetchResult:
"""Tests for FetchResult dataclass."""
def test_create_fetch_result(self):
"""Can create FetchResult."""
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
result = FetchResult(items=items, linked=1, failed=0)
assert len(result.items) == 1
assert result.linked == 1
assert result.failed == 0
def test_to_legacy_tuple(self):
"""to_legacy_tuple returns correct format."""
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
result = FetchResult(items=items, linked=1, failed=0)
legacy = result.to_legacy_tuple()
assert legacy[0] == [("Test", "Source", "12:00")]
assert legacy[1] == 1
assert legacy[2] == 0
class TestBlock:
"""Tests for Block dataclass."""
def test_create_block(self):
"""Can create Block."""
block = Block(
content=["line1", "line2"], color="\033[38;5;46m", meta_row_index=1
)
assert len(block.content) == 2
assert block.color == "\033[38;5;46m"
assert block.meta_row_index == 1
def test_to_legacy_tuple(self):
"""to_legacy_tuple returns correct format."""
block = Block(content=["line1"], color="green", meta_row_index=0)
legacy = block.to_legacy_tuple()
assert legacy == (["line1"], "green", 0)

64
tests/test_viewport.py Normal file
View File

@@ -0,0 +1,64 @@
"""
Tests for engine.viewport module.
"""
from engine import viewport
class TestViewportTw:
"""Tests for tw() function."""
def test_tw_returns_int(self):
"""tw() returns an integer."""
result = viewport.tw()
assert isinstance(result, int)
def test_tw_positive(self):
"""tw() returns a positive value."""
assert viewport.tw() > 0
class TestViewportTh:
"""Tests for th() function."""
def test_th_returns_int(self):
"""th() returns an integer."""
result = viewport.th()
assert isinstance(result, int)
def test_th_positive(self):
"""th() returns a positive value."""
assert viewport.th() > 0
class TestViewportMoveTo:
"""Tests for move_to() function."""
def test_move_to_format(self):
"""move_to() returns correctly formatted ANSI escape."""
result = viewport.move_to(5, 10)
assert result == "\033[5;10H"
def test_move_to_default_col(self):
"""move_to() defaults to column 1."""
result = viewport.move_to(5)
assert result == "\033[5;1H"
class TestViewportClearScreen:
"""Tests for clear_screen() function."""
def test_clear_screen_format(self):
"""clear_screen() returns clear screen ANSI escape."""
result = viewport.clear_screen()
assert "\033[2J" in result
assert "\033[H" in result
class TestViewportClearLine:
"""Tests for clear_line() function."""
def test_clear_line_format(self):
"""clear_line() returns clear line ANSI escape."""
result = viewport.clear_line()
assert result == "\033[K"

161
tests/test_websocket.py Normal file
View File

@@ -0,0 +1,161 @@
"""
Tests for engine.display.backends.websocket module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.display.backends.websocket import WebSocketDisplay
class TestWebSocketDisplayImport:
"""Test that websocket module can be imported."""
def test_import_does_not_error(self):
"""Module imports without error."""
from engine.display import backends
assert backends is not None
class TestWebSocketDisplayInit:
"""Tests for WebSocketDisplay initialization."""
def test_default_init(self):
"""Default initialization sets correct defaults."""
with patch("engine.display.backends.websocket.websockets", None):
display = WebSocketDisplay()
assert display.host == "0.0.0.0"
assert display.port == 8765
assert display.http_port == 8766
assert display.width == 80
assert display.height == 24
def test_custom_init(self):
"""Custom initialization uses provided values."""
with patch("engine.display.backends.websocket.websockets", None):
display = WebSocketDisplay(host="localhost", port=9000, http_port=9001)
assert display.host == "localhost"
assert display.port == 9000
assert display.http_port == 9001
def test_is_available_when_websockets_present(self):
"""is_available returns True when websockets is available."""
pytest.importorskip("websockets")
display = WebSocketDisplay()
assert display.is_available() is True
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_is_available_when_websockets_missing(self):
"""is_available returns False when websockets is not available."""
display = WebSocketDisplay()
assert display.is_available() is False
class TestWebSocketDisplayProtocol:
"""Test that WebSocketDisplay satisfies Display protocol."""
def test_websocket_display_is_display(self):
"""WebSocketDisplay satisfies Display protocol."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
class TestWebSocketDisplayMethods:
"""Tests for WebSocketDisplay methods."""
def test_init_stores_dimensions(self):
"""init stores terminal dimensions."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
display.init(100, 40)
assert display.width == 100
assert display.height == 40
def test_client_count_initially_zero(self):
"""client_count returns 0 when no clients connected."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.client_count() == 0
def test_get_ws_port(self):
"""get_ws_port returns configured port."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay(port=9000)
assert display.get_ws_port() == 9000
def test_get_http_port(self):
"""get_http_port returns configured port."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay(http_port=9001)
assert display.get_http_port() == 9001
def test_frame_delay_defaults_to_zero(self):
"""get_frame_delay returns 0 by default."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.get_frame_delay() == 0.0
def test_set_frame_delay(self):
"""set_frame_delay stores the value."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
display.set_frame_delay(0.05)
assert display.get_frame_delay() == 0.05
class TestWebSocketDisplayCallbacks:
"""Tests for WebSocketDisplay callback methods."""
def test_set_client_connected_callback(self):
"""set_client_connected_callback stores callback."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_connected_callback(callback)
assert display._client_connected_callback is callback
def test_set_client_disconnected_callback(self):
"""set_client_disconnected_callback stores callback."""
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_disconnected_callback(callback)
assert display._client_disconnected_callback is callback
class TestWebSocketDisplayUnavailable:
"""Tests when WebSocket support is unavailable."""
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_start_server_noop_when_unavailable(self):
"""start_server does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.start_server()
assert display._server_thread is None
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_start_http_server_noop_when_unavailable(self):
"""start_http_server does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.start_http_server()
assert display._http_thread is None
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_show_noops_when_unavailable(self):
"""show does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.show(["line1", "line2"])