19 Commits

Author SHA1 Message Date
3551cc249f refactor: phase 4 - event-driven architecture foundation
- Add EventBus class with pub/sub messaging (thread-safe)
- Add emitter Protocol classes (EventEmitter, Startable, Stoppable)
- Add event emission to NtfyPoller (NtfyMessageEvent)
- Add event emission to MicMonitor (MicLevelEvent)
- Update StreamController to publish stream start/end events
- Add comprehensive tests for eventbus and emitters modules
2026-03-15 16:39:19 -07:00
fba183526a refactor: phase 3 - API efficiency improvements
Add typed dataclasses for tuple returns:
- types.py: HeadlineItem, FetchResult, Block dataclasses with legacy tuple converters
- fetch.py: Add type hints and HeadlineTuple type alias

Add pyright for static type checking:
- Add pyright to dependencies
- Verify type coverage with pyright (0 errors in core modules)

This enables:
- Named types instead of raw tuples (better IDE support, self-documenting)
- Type-safe APIs across modules
- Backward compatibility via to_tuple/from_tuple methods

Note: Lazy imports skipped for render.py - startup impact is minimal.
2026-03-15 16:39:19 -07:00
7193e7487b refactor: phase 2 - modularization of scroll engine
Split monolithic scroll.py into focused modules:
- viewport.py: terminal size (tw/th), ANSI positioning helpers
- frame.py: FrameTimer class, scroll step calculation
- layers.py: message overlay, ticker zone, firehose rendering
- scroll.py: simplified orchestrator, imports from new modules

Add stream controller and event types for future event-driven architecture:
- controller.py: StreamController for source initialization and stream lifecycle
- events.py: EventType enum and event dataclasses (HeadlineEvent, FrameTickEvent, etc.)

Added tests for new modules:
- test_viewport.py: 8 tests for viewport utilities
- test_frame.py: 10 tests for frame timing
- test_layers.py: 13 tests for layer compositing
- test_events.py: 11 tests for event types
- test_controller.py: 6 tests for stream controller

This enables:
- Testable chunks with clear responsibilities
- Reusable viewport utilities across modules
- Better separation of concerns in render pipeline
- Foundation for future event-driven architecture

Also includes Phase 1 documentation updates in code comments.
2026-03-15 16:39:19 -07:00
b5d6eeedc0 refactor: phase 1 - testability improvements
- Add Config dataclass with get_config()/set_config() for injection
- Add Config.from_args() for CLI argument parsing (testable)
- Add platform font path detection (Darwin/Linux)
- Bound translate cache with @lru_cache(maxsize=500)
- Add fixtures for external dependencies (network, feeds, config)
- Add 15 tests for Config class, from_args, and platform detection

This enables testability by:
- Allowing config injection instead of global mutable state
- Supporting custom argv in from_args() for testing
- Providing reusable fixtures for mocking network/filesystem
- Preventing unbounded memory growth in translation cache

Fixes: _arg_value/_arg_int not accepting custom argv
2026-03-15 16:39:19 -07:00
f6f177590b Merge pull request 'Modernize project with uv, pytest, ruff, and git hooks' (#21) from enhance_portability into main
Reviewed-on: #21
2026-03-15 23:21:35 +00:00
9ae4dc2b07 fix: update ntfy tests for SSE API (reconnect_delay) 2026-03-15 15:16:37 -07:00
1ac2dec3b0 fix: use native hk staging in pre-commit hook
fix: add explicit check command to pre-push hook
2026-03-15 15:16:37 -07:00
757c854584 fix: apply ruff auto-fixes and add hk git hooks
- Fix pre-existing lint errors in engine/ modules using ruff --unsafe-fixes
- Add hk.pkl with pre-commit and pre-push hooks using ruff builtin
- Configure hooks to use 'uv run' prefix for tool execution
- Update mise.toml to include hk and pkl tools
- All 73 tests pass

fix: apply ruff auto-fixes and add hk git hooks

- Fix pre-existing lint errors in engine/ modules using ruff --unsafe-fixes
- Add hk.pkl with pre-commit and pre-push hooks using ruff builtin
- Configure hooks to use 'uv run' prefix for tool execution
- Update mise.toml to include hk and pkl tools
- Use 'hk install --mise' for proper mise integration
- All 73 tests pass
2026-03-15 15:16:37 -07:00
4844a64203 style: apply ruff auto-fixes across codebase
- Fix import sorting (isort) across all engine modules
- Fix SIM105 try-except-pass patterns (contextlib.suppress)
- Fix nested with statements in tests
- Fix unused loop variables

Run 'uv run pytest' to verify tests still pass.
2026-03-15 15:16:37 -07:00
9201117096 feat: modernize project with uv, add pytest test suite
- Add pyproject.toml with modern Python packaging (PEP 517/518)
- Add uv-based dependency management replacing inline venv bootstrap
- Add requirements.txt and requirements-dev.txt for compatibility
- Add mise.toml with dev tasks (test, lint, run, sync, ci)
- Add .python-version pinned to Python 3.12
- Add comprehensive pytest test suite (73 tests) for:
  - engine/config, filter, terminal, sources, mic, ntfy modules
- Configure pytest with coverage reporting (16% total, 100% on tested modules)
- Configure ruff for linting with Python 3.10+ target
- Remove redundant venv bootstrap code from mainline.py
- Update .gitignore for uv/venv artifacts

Run 'uv sync' to install dependencies, 'uv run pytest' to test.
2026-03-15 15:16:37 -07:00
d758541156 Merge pull request 'feat: migrate Ntfy message retrieval from polling to SSE streaming, replacing poll_interval with reconnect_delay for continuous updates.' (#20) from feat/ntfy-sse into main
Reviewed-on: #20
2026-03-15 20:50:08 +00:00
b979621dd4 Merge branch 'main' into feat/ntfy-sse 2026-03-15 20:50:02 +00:00
f91cc9844e Merge pull request 'feat: add new font files to the fonts directory' (#19) from feat/display into main
Reviewed-on: #19
2026-03-15 20:47:16 +00:00
bddbd69371 Merge branch 'main' into feat/display 2026-03-15 20:45:54 +00:00
6e39a2dad2 feat: migrate Ntfy message retrieval from polling to SSE streaming, replacing poll_interval with reconnect_delay for continuous updates. 2026-03-15 13:44:26 -07:00
1ba3848bed feat: add new font files to the fonts directory 2026-03-15 13:30:08 -07:00
a986df344a Merge pull request 'doc: Document new font selection command-line arguments, environment variables, and a dedicated font management section.' (#18) from docs/update-readme into main
Reviewed-on: #18
2026-03-15 11:08:25 +00:00
c84bd5c05a doc: Document new font selection command-line arguments, environment variables, and a dedicated font management section. 2026-03-15 04:07:24 -07:00
7b0f886e53 Merge pull request 'feat: add new font assets including CSBishopDrawn, CyberformDemo, and KATA.' (#17) from feat/font-picker into main
Reviewed-on: #17
2026-03-15 11:01:39 +00:00
52 changed files with 3383 additions and 408 deletions

7
.gitignore vendored
View File

@@ -1,4 +1,11 @@
__pycache__/
*.pyc
.mainline_venv/
.venv/
uv.lock
.mainline_cache_*.json
.DS_Store
htmlcov/
.coverage
.pytest_cache/
*.egg-info/

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.12

110
AGENTS.md Normal file
View File

@@ -0,0 +1,110 @@
# Agent Development Guide
## Development Environment
This project uses:
- **mise** (mise.jdx.dev) - tool version manager and task runner
- **hk** (hk.jdx.dev) - git hook manager
- **uv** - fast Python package installer
- **ruff** - linter and formatter
- **pytest** - test runner
### Setup
```bash
# Install dependencies
mise run install
# Or equivalently:
uv sync
```
### Available Commands
```bash
mise run test # Run tests
mise run test-v # Run tests verbose
mise run test-cov # Run tests with coverage report
mise run lint # Run ruff linter
mise run lint-fix # Run ruff with auto-fix
mise run format # Run ruff formatter
mise run ci # Full CI pipeline (sync + test + coverage)
```
## Git Hooks
**At the start of every agent session**, verify hooks are installed:
```bash
ls -la .git/hooks/pre-commit
```
If hooks are not installed, install them with:
```bash
hk init --mise
mise run pre-commit
```
The project uses hk configured in `hk.pkl`:
- **pre-commit**: runs ruff-format and ruff (with auto-fix)
- **pre-push**: runs ruff check
## Workflow Rules
### Before Committing
1. **Always run the test suite** - never commit code that fails tests:
```bash
mise run test
```
2. **Always run the linter**:
```bash
mise run lint
```
3. **Fix any lint errors** before committing (or let the pre-commit hook handle it).
4. **Review your changes** using `git diff` to understand what will be committed.
### On Failing Tests
When tests fail, **determine whether it's an out-of-date test or a correctly failing test**:
- **Out-of-date test**: The test was written for old behavior that has legitimately changed. Update the test to match the new expected behavior.
- **Correctly failing test**: The test correctly identifies a broken contract. Fix the implementation, not the test.
**Never** modify a test to make it pass without understanding why it failed.
### Code Review
Before committing significant changes:
- Run `git diff` to review all changes
- Ensure new code follows existing patterns in the codebase
- Check that type hints are added for new functions
- Verify that tests exist for new functionality
## Testing
Tests live in `tests/` and follow the pattern `test_*.py`.
Run all tests:
```bash
mise run test
```
Run with coverage:
```bash
mise run test-cov
```
The project uses pytest with strict marker enforcement. Test configuration is in `pyproject.toml` under `[tool.pytest.ini_options]`.
## Architecture Notes
- **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies
- **eventbus.py** provides thread-safe event publishing for decoupled communication
- **controller.py** coordinates ntfy/mic monitoring
- The render pipeline: fetch → render → effects → scroll → terminal output

View File

@@ -14,6 +14,10 @@ python3 mainline.py --poetry # literary consciousness mode
python3 mainline.py -p # same
python3 mainline.py --firehose # dense rapid-fire headline mode
python3 mainline.py --refresh # force re-fetch (bypass cache)
python3 mainline.py --no-font-picker # skip interactive font picker
python3 mainline.py --font-file path.otf # use a specific font file
python3 mainline.py --font-dir ~/fonts # scan a different font folder
python3 mainline.py --font-index 1 # select face index within a collection
```
First run bootstraps a local `.mainline_venv/` and installs deps (`feedparser`, `Pillow`, `sounddevice`, `numpy`). Subsequent runs start immediately, loading from cache.
@@ -29,8 +33,10 @@ All constants live in `engine/config.py`:
| `HEADLINE_LIMIT` | `1000` | Total headlines per session |
| `FEED_TIMEOUT` | `10` | Per-feed HTTP timeout (seconds) |
| `MIC_THRESHOLD_DB` | `50` | dB floor above which glitches spike |
| `FONT_DIR` | `fonts/` | Folder scanned for `.otf`, `.ttf`, `.ttc` files used by the font picker |
| `FONT_PATH` | first supported font in `fonts/` | Active display font file selected at startup |
| `FONT_DIR` | `fonts/` | Folder scanned for `.otf`, `.ttf`, `.ttc` files |
| `FONT_PATH` | first file in `FONT_DIR` | Active display font (overridden by picker or `--font-file`) |
| `FONT_INDEX` | `0` | Face index within a font collection file |
| `FONT_PICKER` | `True` | Show interactive font picker at boot (`--no-font-picker` to skip) |
| `FONT_SZ` | `60` | Font render size (affects block density) |
| `RENDER_H` | `8` | Terminal rows per headline line |
| `SSAA` | `4` | Super-sampling factor (render at 4× then downsample) |
@@ -42,15 +48,24 @@ All constants live in `engine/config.py`:
| `NTFY_POLL_INTERVAL` | `15` | Seconds between ntfy polls |
| `MESSAGE_DISPLAY_SECS` | `30` | How long an ntfy message holds the screen |
**Font:** Put your `.otf`, `.ttf`, or `.ttc` files in `fonts/`. Startup opens the font picker from that folder and applies your selected font before streaming.
---
## Fonts
A `fonts/` directory is bundled with demo faces (AlphatronDemo, CSBishopDrawn, CyberformDemo, KATA, Microbots, Neoform, Pixel Sparta, Robocops, Xeonic, and others). On startup, an interactive picker lists all discovered faces with a live half-block preview rendered at your configured size.
Navigation: `↑`/`↓` or `j`/`k` to move, `Enter` or `q` to select. The selected face persists for that session.
To add your own fonts, drop `.otf`, `.ttf`, or `.ttc` files into `fonts/` (or point `--font-dir` at any other folder). Font collections (`.ttc`, multi-face `.otf`) are enumerated face-by-face.
---
## How it works
- On launch, the font picker scans `fonts/` and presents a live-rendered TUI for face selection; `--no-font-picker` skips directly to stream
- Feeds are fetched and filtered on startup (sports and vapid content stripped); results are cached to `.mainline_cache_news.json` / `.mainline_cache_poetry.json` for fast restarts
- Headlines are rasterized via Pillow with 4× SSAA into half-block characters (`▀▄█ `) at the configured font size
- A left-to-right ANSI gradient colors each character: white-hot leading edge trails off to near-black; the gradient sweeps continuously across the full scroll canvas
- The ticker uses a sweeping white-hot → deep green gradient; ntfy messages use a complementary white-hot → magenta/maroon gradient to distinguish them visually
- Subject-region detection runs a regex pass on each headline; matches trigger a Google Translate call and font swap to the appropriate script (CJK, Arabic, Devanagari, etc.) using macOS system fonts
- The mic stream runs in a background thread, feeding RMS dB into the glitch probability calculation each frame
- The viewport scrolls through a virtual canvas of pre-rendered blocks; fade zones at top and bottom dissolve characters probabilistically
@@ -64,6 +79,8 @@ All constants live in `engine/config.py`:
```
engine/
__init__.py package marker
app.py main(), font picker TUI, boot sequence, signal handler
config.py constants, CLI flags, glyph tables
sources.py FEEDS, POETRY_SOURCES, language/script maps
terminal.py ANSI codes, tw/th, type_out, boot_ln
@@ -75,7 +92,14 @@ engine/
ntfy.py NtfyPoller — standalone, zero internal deps
mic.py MicMonitor — standalone, graceful fallback
scroll.py stream() frame loop + message rendering
app.py main(), boot sequence, signal handler
viewport.py terminal dimension tracking (tw/th)
frame.py scroll step calculation, timing
layers.py ticker zone, firehose, message overlay rendering
eventbus.py thread-safe event publishing for decoupled communication
events.py event types and definitions
controller.py coordinates ntfy/mic monitoring and event publishing
emitters.py background emitters for ntfy and mic
types.py type definitions and dataclasses
```
`ntfy.py` and `mic.py` have zero internal dependencies and can be imported by any other visualizer.
@@ -139,4 +163,5 @@ msg = poller.get_active_message() # returns (title, body, timestamp) or None
---
*macOS only (system font paths hardcoded). Python 3.9+.*
*macOS only (script/system font paths for translation are hardcoded). Primary display font is user-selectable via the bundled `fonts/` picker. Python 3.9+.*
# test

View File

@@ -2,23 +2,33 @@
Application orchestrator — boot sequence, signal handling, main loop wiring.
"""
import sys
import os
import time
import signal
import atexit
import os
import signal
import sys
import termios
import time
import tty
from engine import config, render
from engine.terminal import (
RST, G_HI, G_MID, G_DIM, W_DIM, W_GHOST, CLR, CURSOR_OFF, CURSOR_ON, tw,
slow_print, boot_ln,
)
from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache
from engine.ntfy import NtfyPoller
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
from engine.terminal import (
CLR,
CURSOR_OFF,
CURSOR_ON,
G_DIM,
G_HI,
G_MID,
RST,
W_DIM,
W_GHOST,
boot_ln,
slow_print,
tw,
)
TITLE = [
" ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗",
@@ -29,6 +39,7 @@ TITLE = [
" ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝",
]
def _read_picker_key():
ch = sys.stdin.read(1)
if ch == "\x03":
@@ -53,6 +64,7 @@ def _read_picker_key():
return "enter"
return None
def _normalize_preview_rows(rows):
"""Trim shared left padding and trailing spaces for stable on-screen previews."""
non_empty = [r for r in rows if r.strip()]
@@ -99,7 +111,9 @@ def _draw_font_picker(faces, selected):
active = pos == selected
pointer = "" if active else " "
color = G_HI if active else W_DIM
print(f" {color}{pointer} {face['name']}{RST}{W_GHOST} · {face['file_name']}{RST}")
print(
f" {color}{pointer} {face['name']}{RST}{W_GHOST} · {face['file_name']}{RST}"
)
if top > 0:
print(f" {W_GHOST}{top} above{RST}")
@@ -116,6 +130,7 @@ def _draw_font_picker(faces, selected):
shown = row[:max_preview_w]
print(f" {shown}")
def pick_font_face():
"""Interactive startup picker for selecting a face from repo OTF files."""
if not config.FONT_PICKER:
@@ -225,7 +240,9 @@ def pick_font_face():
font_index=selected_font["font_index"],
)
render.clear_font_cache()
print(f" {G_DIM}> using {selected_font['name']} ({selected_font['file_name']}){RST}")
print(
f" {G_DIM}> using {selected_font['name']} ({selected_font['file_name']}){RST}"
)
time.sleep(0.8)
print(CLR, end="")
print(CURSOR_OFF, end="")
@@ -255,23 +272,29 @@ def main():
time.sleep(0.07)
print()
_subtitle = "literary consciousness stream" if config.MODE == 'poetry' else "digital consciousness stream"
_subtitle = (
"literary consciousness stream"
if config.MODE == "poetry"
else "digital consciousness stream"
)
print(f" {W_DIM}v0.1 · {_subtitle}{RST}")
print(f" {W_GHOST}{'' * (w - 4)}{RST}")
print()
time.sleep(0.4)
cached = load_cache() if '--refresh' not in sys.argv else None
cached = load_cache() if "--refresh" not in sys.argv else None
if cached:
items = cached
boot_ln("Cache", f"LOADED [{len(items)} SIGNALS]", True)
elif config.MODE == 'poetry':
elif config.MODE == "poetry":
slow_print(" > INITIALIZING LITERARY CORPUS...\n")
time.sleep(0.2)
print()
items, linked, failed = fetch_poetry()
print()
print(f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}")
print(
f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}"
)
print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}")
save_cache(items)
else:
@@ -280,7 +303,9 @@ def main():
print()
items, linked, failed = fetch_all()
print()
print(f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}")
print(
f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}"
)
print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}")
save_cache(items)
@@ -292,11 +317,17 @@ def main():
mic = MicMonitor(threshold_db=config.MIC_THRESHOLD_DB)
mic_ok = mic.start()
if mic.available:
boot_ln("Microphone", "ACTIVE" if mic_ok else "OFFLINE · check System Settings → Privacy → Microphone", bool(mic_ok))
boot_ln(
"Microphone",
"ACTIVE"
if mic_ok
else "OFFLINE · check System Settings → Privacy → Microphone",
bool(mic_ok),
)
ntfy = NtfyPoller(
config.NTFY_TOPIC,
poll_interval=config.NTFY_POLL_INTERVAL,
reconnect_delay=config.NTFY_RECONNECT_DELAY,
display_secs=config.MESSAGE_DISPLAY_SECS,
)
ntfy_ok = ntfy.start()

View File

@@ -1,25 +1,28 @@
"""
Configuration constants, CLI flags, and glyph tables.
Supports both global constants (backward compatible) and injected config for testing.
"""
import sys
from dataclasses import dataclass, field
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parent.parent
_FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"}
def _arg_value(flag):
def _arg_value(flag, argv: list[str] | None = None):
"""Get value following a CLI flag, if present."""
if flag not in sys.argv:
argv = argv or sys.argv
if flag not in argv:
return None
i = sys.argv.index(flag)
return sys.argv[i + 1] if i + 1 < len(sys.argv) else None
i = argv.index(flag)
return argv[i + 1] if i + 1 < len(argv) else None
def _arg_int(flag, default):
def _arg_int(flag, default, argv: list[str] | None = None):
"""Get int CLI argument with safe fallback."""
raw = _arg_value(flag)
raw = _arg_value(flag, argv)
if raw is None:
return default
try:
@@ -51,29 +54,159 @@ def _list_font_files(font_dir):
def list_repo_font_files():
"""Public helper for discovering repository font files."""
return _list_font_files(FONT_DIR)
def _get_platform_font_paths() -> dict[str, str]:
"""Get platform-appropriate font paths for non-Latin scripts."""
import platform
system = platform.system()
if system == "Darwin":
return {
"zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc",
"ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc",
"ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc",
"ru": "/System/Library/Fonts/Supplemental/Arial.ttf",
"uk": "/System/Library/Fonts/Supplemental/Arial.ttf",
"el": "/System/Library/Fonts/Supplemental/Arial.ttf",
"he": "/System/Library/Fonts/Supplemental/Arial.ttf",
"ar": "/System/Library/Fonts/GeezaPro.ttc",
"fa": "/System/Library/Fonts/GeezaPro.ttc",
"hi": "/System/Library/Fonts/Kohinoor.ttc",
"th": "/System/Library/Fonts/ThonburiUI.ttc",
}
elif system == "Linux":
return {
"zh-cn": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ja": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ko": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ru": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"uk": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"el": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"he": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"ar": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"fa": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"hi": "/usr/share/fonts/truetype/noto/NotoSansDevanagari-Regular.ttf",
"th": "/usr/share/fonts/truetype/noto/NotoSansThai-Regular.ttf",
}
else:
return {}
@dataclass(frozen=True)
class Config:
"""Immutable configuration container for injected config."""
headline_limit: int = 1000
feed_timeout: int = 10
mic_threshold_db: int = 50
mode: str = "news"
firehose: bool = False
ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json"
ntfy_reconnect_delay: int = 5
message_display_secs: int = 30
font_dir: str = "fonts"
font_path: str = ""
font_index: int = 0
font_picker: bool = True
font_sz: int = 60
render_h: int = 8
ssaa: int = 4
scroll_dur: float = 5.625
frame_dt: float = 0.05
firehose_h: int = 12
grad_speed: float = 0.08
glitch_glyphs: str = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
kata_glyphs: str = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths)
@classmethod
def from_args(cls, argv: list[str] | None = None) -> "Config":
"""Create Config from CLI arguments (or custom argv for testing)."""
argv = argv or sys.argv
font_dir = _resolve_font_path(_arg_value("--font-dir", argv) or "fonts")
font_file_arg = _arg_value("--font-file", argv)
font_files = _list_font_files(font_dir)
font_path = (
_resolve_font_path(font_file_arg)
if font_file_arg
else (font_files[0] if font_files else "")
)
return cls(
headline_limit=1000,
feed_timeout=10,
mic_threshold_db=50,
mode="poetry" if "--poetry" in argv or "-p" in argv else "news",
firehose="--firehose" in argv,
ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir=font_dir,
font_path=font_path,
font_index=max(0, _arg_int("--font-index", 0, argv)),
font_picker="--no-font-picker" not in argv,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋",
kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ",
script_fonts=_get_platform_font_paths(),
)
_config: Config | None = None
def get_config() -> Config:
"""Get the global config instance (lazy-loaded)."""
global _config
if _config is None:
_config = Config.from_args()
return _config
def set_config(config: Config) -> None:
"""Set the global config instance (for testing)."""
global _config
_config = config
# ─── RUNTIME ──────────────────────────────────────────────
HEADLINE_LIMIT = 1000
FEED_TIMEOUT = 10
MIC_THRESHOLD_DB = 50 # dB above which glitches intensify
MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news'
FIREHOSE = '--firehose' in sys.argv
MODE = "poetry" if "--poetry" in sys.argv or "-p" in sys.argv else "news"
FIREHOSE = "--firehose" in sys.argv
# ─── NTFY MESSAGE QUEUE ──────────────────────────────────
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json?since=20s&poll=1"
NTFY_POLL_INTERVAL = 15 # seconds between polls
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json"
NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen
# ─── FONT RENDERING ──────────────────────────────────────
FONT_DIR = _resolve_font_path(_arg_value('--font-dir') or "fonts")
_FONT_FILE_ARG = _arg_value('--font-file')
FONT_DIR = _resolve_font_path(_arg_value("--font-dir") or "fonts")
_FONT_FILE_ARG = _arg_value("--font-file")
_FONT_FILES = _list_font_files(FONT_DIR)
FONT_PATH = (
_resolve_font_path(_FONT_FILE_ARG)
if _FONT_FILE_ARG
else (_FONT_FILES[0] if _FONT_FILES else "")
)
FONT_INDEX = max(0, _arg_int('--font-index', 0))
FONT_PICKER = '--no-font-picker' not in sys.argv
FONT_INDEX = max(0, _arg_int("--font-index", 0))
FONT_PICKER = "--no-font-picker" not in sys.argv
FONT_SZ = 60
RENDER_H = 8 # terminal rows per rendered text line

68
engine/controller.py Normal file
View File

@@ -0,0 +1,68 @@
"""
Stream controller - manages input sources and orchestrates the render stream.
"""
from engine.config import Config, get_config
from engine.eventbus import EventBus
from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
class StreamController:
"""Controls the stream lifecycle - initializes sources and runs the stream."""
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
self.config = config or get_config()
self.event_bus = event_bus
self.mic: MicMonitor | None = None
self.ntfy: NtfyPoller | None = None
def initialize_sources(self) -> tuple[bool, bool]:
"""Initialize microphone and ntfy sources.
Returns:
(mic_ok, ntfy_ok) - success status for each source
"""
self.mic = MicMonitor(threshold_db=self.config.mic_threshold_db)
mic_ok = self.mic.start() if self.mic.available else False
self.ntfy = NtfyPoller(
self.config.ntfy_topic,
reconnect_delay=self.config.ntfy_reconnect_delay,
display_secs=self.config.message_display_secs,
)
ntfy_ok = self.ntfy.start()
return bool(mic_ok), ntfy_ok
def run(self, items: list) -> None:
"""Run the stream with initialized sources."""
if self.mic is None or self.ntfy is None:
self.initialize_sources()
if self.event_bus:
self.event_bus.publish(
EventType.STREAM_START,
StreamEvent(
event_type=EventType.STREAM_START,
headline_count=len(items),
),
)
stream(items, self.ntfy, self.mic)
if self.event_bus:
self.event_bus.publish(
EventType.STREAM_END,
StreamEvent(
event_type=EventType.STREAM_END,
headline_count=len(items),
),
)
def cleanup(self) -> None:
"""Clean up resources."""
if self.mic:
self.mic.stop()

View File

@@ -7,8 +7,8 @@ import random
from datetime import datetime
from engine import config
from engine.terminal import RST, DIM, G_LO, G_DIM, W_GHOST, C_DIM
from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST, W_GHOST
def noise(w):
@@ -34,23 +34,23 @@ def fade_line(s, fade):
if fade >= 1.0:
return s
if fade <= 0.0:
return ''
return ""
result = []
i = 0
while i < len(s):
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[':
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i:j + 1])
result.append(s[i : j + 1])
i = j + 1
elif s[i] == ' ':
result.append(' ')
elif s[i] == " ":
result.append(" ")
i += 1
else:
result.append(s[i] if random.random() < fade else ' ')
result.append(s[i] if random.random() < fade else " ")
i += 1
return ''.join(result)
return "".join(result)
def vis_trunc(s, w):
@@ -61,17 +61,17 @@ def vis_trunc(s, w):
while i < len(s):
if vw >= w:
break
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[':
if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i:j + 1])
result.append(s[i : j + 1])
i = j + 1
else:
result.append(s[i])
vw += 1
i += 1
return ''.join(result)
return "".join(result)
def next_headline(pool, items, seen):
@@ -94,7 +94,7 @@ def firehose_line(items, w):
if r < 0.35:
# Raw headline text
title, src, ts = random.choice(items)
text = title[:w - 1]
text = title[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
return f"{color}{text}{RST}"
elif r < 0.55:
@@ -103,12 +103,13 @@ def firehose_line(items, w):
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d else " "
if random.random() < d
else " "
for _ in range(w)
)
elif r < 0.78:
# Status / program output
sources = FEEDS if config.MODE == 'news' else POETRY_SOURCES
sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
src = random.choice(list(sources.keys()))
msgs = [
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
@@ -118,16 +119,16 @@ def firehose_line(items, w):
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
]
text = random.choice(msgs)[:w - 1]
text = random.choice(msgs)[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST])
return f"{color}{text}{RST}"
else:
# Headline fragment with glitch prefix
title, _, _ = random.choice(items)
start = random.randint(0, max(0, len(title) - 20))
frag = title[start:start + random.randint(10, 35)]
frag = title[start : start + random.randint(10, 35)]
pad = random.randint(0, max(0, w - len(frag) - 8))
gp = ''.join(random.choice(config.GLITCH) for _ in range(random.randint(1, 3)))
text = (' ' * pad + gp + ' ' + frag)[:w - 1]
gp = "".join(random.choice(config.GLITCH) for _ in range(random.randint(1, 3)))
text = (" " * pad + gp + " " + frag)[: w - 1]
color = random.choice([G_LO, C_DIM, W_GHOST])
return f"{color}{text}{RST}"

25
engine/emitters.py Normal file
View File

@@ -0,0 +1,25 @@
"""
Event emitter protocols - abstract interfaces for event-producing components.
"""
from collections.abc import Callable
from typing import Any, Protocol
class EventEmitter(Protocol):
"""Protocol for components that emit events."""
def subscribe(self, callback: Callable[[Any], None]) -> None: ...
def unsubscribe(self, callback: Callable[[Any], None]) -> None: ...
class Startable(Protocol):
"""Protocol for components that can be started."""
def start(self) -> Any: ...
class Stoppable(Protocol):
"""Protocol for components that can be stopped."""
def stop(self) -> None: ...

72
engine/eventbus.py Normal file
View File

@@ -0,0 +1,72 @@
"""
Event bus - pub/sub messaging for decoupled component communication.
"""
import threading
from collections import defaultdict
from collections.abc import Callable
from typing import Any
from engine.events import EventType
class EventBus:
"""Thread-safe event bus for publish-subscribe messaging."""
def __init__(self):
self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict(
list
)
self._lock = threading.Lock()
def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None:
"""Register a callback for a specific event type."""
with self._lock:
self._subscribers[event_type].append(callback)
def unsubscribe(
self, event_type: EventType, callback: Callable[[Any], None]
) -> None:
"""Remove a callback for a specific event type."""
with self._lock:
if callback in self._subscribers[event_type]:
self._subscribers[event_type].remove(callback)
def publish(self, event_type: EventType, event: Any = None) -> None:
"""Publish an event to all subscribers."""
with self._lock:
callbacks = list(self._subscribers.get(event_type, []))
for callback in callbacks:
try:
callback(event)
except Exception:
pass
def clear(self) -> None:
"""Remove all subscribers."""
with self._lock:
self._subscribers.clear()
def subscriber_count(self, event_type: EventType | None = None) -> int:
"""Get subscriber count for an event type, or total if None."""
with self._lock:
if event_type is None:
return sum(len(cb) for cb in self._subscribers.values())
return len(self._subscribers.get(event_type, []))
_event_bus: EventBus | None = None
def get_event_bus() -> EventBus:
"""Get the global event bus instance."""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
def set_event_bus(bus: EventBus) -> None:
"""Set the global event bus instance (for testing)."""
global _event_bus
_event_bus = bus

67
engine/events.py Normal file
View File

@@ -0,0 +1,67 @@
"""
Event types for the mainline application.
Defines the core events that flow through the system.
These types support a future migration to an event-driven architecture.
"""
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, auto
class EventType(Enum):
"""Core event types in the mainline application."""
NEW_HEADLINE = auto()
FRAME_TICK = auto()
MIC_LEVEL = auto()
NTFY_MESSAGE = auto()
STREAM_START = auto()
STREAM_END = auto()
@dataclass
class HeadlineEvent:
"""Event emitted when a new headline is ready for display."""
title: str
source: str
timestamp: str
language: str | None = None
@dataclass
class FrameTickEvent:
"""Event emitted on each render frame."""
frame_number: int
timestamp: datetime
delta_seconds: float
@dataclass
class MicLevelEvent:
"""Event emitted when microphone level changes significantly."""
db_level: float
excess_above_threshold: float
timestamp: datetime
@dataclass
class NtfyMessageEvent:
"""Event emitted when an ntfy message is received."""
title: str
body: str
message_id: str | None = None
timestamp: datetime | None = None
@dataclass
class StreamEvent:
"""Event emitted when stream starts or ends."""
event_type: EventType
headline_count: int = 0
timestamp: datetime | None = None

View File

@@ -3,21 +3,27 @@ RSS feed fetching, Project Gutenberg parsing, and headline caching.
Depends on: config, sources, filter, terminal.
"""
import re
import json
import pathlib
import re
import urllib.request
from datetime import datetime
from typing import Any
import feedparser
from engine import config
from engine.filter import skip, strip_tags
from engine.sources import FEEDS, POETRY_SOURCES
from engine.filter import strip_tags, skip
from engine.terminal import boot_ln
# Type alias for headline items
HeadlineTuple = tuple[str, str, str]
# ─── SINGLE FEED ──────────────────────────────────────────
def fetch_feed(url):
def fetch_feed(url: str) -> Any | None:
"""Fetch and parse a single RSS feed URL."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT)
@@ -27,8 +33,9 @@ def fetch_feed(url):
# ─── ALL RSS FEEDS ────────────────────────────────────────
def fetch_all():
items = []
def fetch_all() -> tuple[list[HeadlineTuple], int, int]:
"""Fetch all RSS feeds and return items, linked count, failed count."""
items: list[HeadlineTuple] = []
linked = failed = 0
for src, url in FEEDS.items():
feed = fetch_feed(url)
@@ -58,31 +65,36 @@ def fetch_all():
# ─── PROJECT GUTENBERG ────────────────────────────────────
def _fetch_gutenberg(url, label):
def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]:
"""Download and parse stanzas/passages from a Project Gutenberg text."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=15)
text = resp.read().decode('utf-8', errors='replace').replace('\r\n', '\n').replace('\r', '\n')
text = (
resp.read()
.decode("utf-8", errors="replace")
.replace("\r\n", "\n")
.replace("\r", "\n")
)
# Strip PG boilerplate
m = re.search(r'\*\*\*\s*START OF[^\n]*\n', text)
m = re.search(r"\*\*\*\s*START OF[^\n]*\n", text)
if m:
text = text[m.end():]
m = re.search(r'\*\*\*\s*END OF', text)
text = text[m.end() :]
m = re.search(r"\*\*\*\s*END OF", text)
if m:
text = text[:m.start()]
text = text[: m.start()]
# Split on blank lines into stanzas/passages
blocks = re.split(r'\n{2,}', text.strip())
blocks = re.split(r"\n{2,}", text.strip())
items = []
for blk in blocks:
blk = ' '.join(blk.split()) # flatten to one line
blk = " ".join(blk.split()) # flatten to one line
if len(blk) < 20 or len(blk) > 280:
continue
if blk.isupper(): # skip all-caps headers
continue
if re.match(r'^[IVXLCDM]+\.?\s*$', blk): # roman numerals
if re.match(r"^[IVXLCDM]+\.?\s*$", blk): # roman numerals
continue
items.append((blk, label, ''))
items.append((blk, label, ""))
return items
except Exception:
return []

View File

@@ -29,29 +29,29 @@ def strip_tags(html):
# ─── CONTENT FILTER ───────────────────────────────────────
_SKIP_RE = re.compile(
r'\b(?:'
r"\b(?:"
# ── sports ──
r'football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|'
r'hockey|lacrosse|volleyball|badminton|'
r'nba|nfl|nhl|mlb|mls|fifa|uefa|'
r'premier league|champions league|la liga|serie a|bundesliga|'
r'world cup|super bowl|world series|stanley cup|'
r'playoff|playoffs|touchdown|goalkeeper|striker|quarterback|'
r'slam dunk|home run|grand slam|offside|halftime|'
r'batting|wicket|innings|'
r'formula 1|nascar|motogp|'
r'boxing|ufc|mma|'
r'marathon|tour de france|'
r'transfer window|draft pick|relegation|'
r"football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|"
r"hockey|lacrosse|volleyball|badminton|"
r"nba|nfl|nhl|mlb|mls|fifa|uefa|"
r"premier league|champions league|la liga|serie a|bundesliga|"
r"world cup|super bowl|world series|stanley cup|"
r"playoff|playoffs|touchdown|goalkeeper|striker|quarterback|"
r"slam dunk|home run|grand slam|offside|halftime|"
r"batting|wicket|innings|"
r"formula 1|nascar|motogp|"
r"boxing|ufc|mma|"
r"marathon|tour de france|"
r"transfer window|draft pick|relegation|"
# ── vapid / insipid ──
r'kardashian|jenner|reality tv|reality show|'
r'influencer|viral video|tiktok|instagram|'
r'best dressed|worst dressed|red carpet|'
r'horoscope|zodiac|gossip|bikini|selfie|'
r'you won.t believe|what happened next|'
r'celebrity couple|celebrity feud|baby bump'
r')\b',
re.IGNORECASE
r"kardashian|jenner|reality tv|reality show|"
r"influencer|viral video|tiktok|instagram|"
r"best dressed|worst dressed|red carpet|"
r"horoscope|zodiac|gossip|bikini|selfie|"
r"you won.t believe|what happened next|"
r"celebrity couple|celebrity feud|baby bump"
r")\b",
re.IGNORECASE,
)

57
engine/frame.py Normal file
View File

@@ -0,0 +1,57 @@
"""
Frame timing utilities — FPS control and precise timing.
"""
import time
class FrameTimer:
"""Frame timer for consistent render loop timing."""
def __init__(self, target_frame_dt: float = 0.05):
self.target_frame_dt = target_frame_dt
self._frame_count = 0
self._start_time = time.monotonic()
self._last_frame_time = self._start_time
@property
def fps(self) -> float:
"""Current FPS based on elapsed frames."""
elapsed = time.monotonic() - self._start_time
if elapsed > 0:
return self._frame_count / elapsed
return 0.0
def sleep_until_next_frame(self) -> float:
"""Sleep to maintain target frame rate. Returns actual elapsed time."""
now = time.monotonic()
elapsed = now - self._last_frame_time
self._last_frame_time = now
self._frame_count += 1
sleep_time = max(0, self.target_frame_dt - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
return elapsed
def reset(self) -> None:
"""Reset frame counter and start time."""
self._frame_count = 0
self._start_time = time.monotonic()
self._last_frame_time = self._start_time
def calculate_scroll_step(
scroll_dur: float, view_height: int, padding: int = 15
) -> float:
"""Calculate scroll step interval for smooth scrolling.
Args:
scroll_dur: Duration in seconds for one headline to scroll through view
view_height: Terminal height in rows
padding: Extra rows for off-screen content
Returns:
Time in seconds between scroll steps
"""
return scroll_dur / (view_height + padding) * 2

201
engine/layers.py Normal file
View File

@@ -0,0 +1,201 @@
"""
Layer compositing — message overlay, ticker zone, firehose, noise.
Depends on: config, render, effects.
"""
import random
import re
import time
from datetime import datetime
from engine import config
from engine.effects import (
fade_line,
firehose_line,
glitch_bar,
noise,
vis_trunc,
)
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite
from engine.terminal import RST, W_COOL
MSG_META = "\033[38;5;245m"
MSG_BORDER = "\033[2;38;5;37m"
def render_message_overlay(
msg: tuple[str, str, float] | None,
w: int,
h: int,
msg_cache: tuple,
) -> tuple[list[str], tuple]:
"""Render ntfy message overlay.
Args:
msg: (title, body, timestamp) or None
w: terminal width
h: terminal height
msg_cache: (cache_key, rendered_rows) for caching
Returns:
(list of ANSI strings, updated cache)
"""
overlay = []
if msg is None:
return overlay, msg_cache
m_title, m_body, m_ts = msg
display_text = m_body or m_title or "(empty)"
display_text = re.sub(r"\s+", " ", display_text.upper())
cache_key = (display_text, w)
if msg_cache[0] != cache_key:
msg_rows = big_wrap(display_text, w - 4)
msg_cache = (cache_key, msg_rows)
else:
msg_rows = msg_cache[1]
msg_rows = lr_gradient_opposite(
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
)
elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
ts_str = datetime.now().strftime("%H:%M:%S")
panel_h = len(msg_rows) + 2
panel_top = max(0, (h - panel_h) // 2)
row_idx = 0
for mr in msg_rows:
ln = vis_trunc(mr, w)
overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K")
row_idx += 1
meta_parts = []
if m_title and m_title != m_body:
meta_parts.append(m_title)
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
meta = (
" " + " \u00b7 ".join(meta_parts)
if len(meta_parts) > 1
else " " + meta_parts[0]
)
overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K")
row_idx += 1
bar = "\u2500" * (w - 4)
overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K")
return overlay, msg_cache
def render_ticker_zone(
active: list,
scroll_cam: int,
ticker_h: int,
w: int,
noise_cache: dict,
grad_offset: float,
) -> tuple[list[str], dict]:
"""Render the ticker scroll zone.
Args:
active: list of (content_rows, color, canvas_y, meta_idx)
scroll_cam: camera position (viewport top)
ticker_h: height of ticker zone
w: terminal width
noise_cache: dict of cy -> noise string
grad_offset: gradient animation offset
Returns:
(list of ANSI strings, updated noise_cache)
"""
buf = []
top_zone = max(1, int(ticker_h * 0.25))
bot_zone = max(1, int(ticker_h * 0.10))
def noise_at(cy):
if cy not in noise_cache:
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
return noise_cache[cy]
for r in range(ticker_h):
scr_row = r + 1
cy = scroll_cam + r
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
row_fade = min(top_f, bot_f)
drawn = False
for content, hc, by, midx in active:
cr = cy - by
if 0 <= cr < len(content):
raw = content[cr]
if cr != midx:
colored = lr_gradient([raw], grad_offset)[0]
else:
colored = raw
ln = vis_trunc(colored, w)
if row_fade < 1.0:
ln = fade_line(ln, row_fade)
if cr == midx:
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
elif ln.strip():
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
else:
buf.append(f"\033[{scr_row};1H\033[K")
drawn = True
break
if not drawn:
n = noise_at(cy)
if row_fade < 1.0 and n:
n = fade_line(n, row_fade)
if n:
buf.append(f"\033[{scr_row};1H{n}")
else:
buf.append(f"\033[{scr_row};1H\033[K")
return buf, noise_cache
def apply_glitch(
buf: list[str],
ticker_buf_start: int,
mic_excess: float,
w: int,
) -> list[str]:
"""Apply glitch effect to ticker buffer.
Args:
buf: current buffer
ticker_buf_start: index where ticker starts in buffer
mic_excess: mic level above threshold
w: terminal width
Returns:
Updated buffer with glitches applied
"""
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
n_hits = 4 + int(mic_excess / 2)
ticker_buf_len = len(buf) - ticker_buf_start
if random.random() < glitch_prob and ticker_buf_len > 0:
for _ in range(min(n_hits, ticker_buf_len)):
gi = random.randint(0, ticker_buf_len - 1)
scr_row = gi + 1
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
return buf
def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
"""Render firehose strip at bottom of screen."""
buf = []
if fh > 0:
for fr in range(fh):
scr_row = h - fh + fr + 1
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
return buf

View File

@@ -4,15 +4,21 @@ Gracefully degrades if sounddevice/numpy are unavailable.
"""
import atexit
from collections.abc import Callable
from datetime import datetime
try:
import sounddevice as _sd
import numpy as _np
import sounddevice as _sd
_HAS_MIC = True
except Exception:
_HAS_MIC = False
from engine.events import MicLevelEvent
class MicMonitor:
"""Background mic stream that exposes current RMS dB level."""
@@ -20,6 +26,7 @@ class MicMonitor:
self.threshold_db = threshold_db
self._db = -99.0
self._stream = None
self._subscribers: list[Callable[[MicLevelEvent], None]] = []
@property
def available(self):
@@ -36,16 +43,43 @@ class MicMonitor:
"""dB above threshold (clamped to 0)."""
return max(0.0, self._db - self.threshold_db)
def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Register a callback to be called when mic level changes."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: MicLevelEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background mic stream. Returns True on success, False/None otherwise."""
if not _HAS_MIC:
return None
def _cb(indata, frames, t, status):
rms = float(_np.sqrt(_np.mean(indata ** 2)))
rms = float(_np.sqrt(_np.mean(indata**2)))
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
if self._subscribers:
event = MicLevelEvent(
db_level=self._db,
excess_above_threshold=max(0.0, self._db - self.threshold_db),
timestamp=datetime.now(),
)
self._emit(event)
try:
self._stream = _sd.InputStream(
callback=_cb, channels=1, samplerate=44100, blocksize=2048)
callback=_cb, channels=1, samplerate=44100, blocksize=2048
)
self._stream.start()
atexit.register(self.stop)
return True

View File

@@ -1,9 +1,9 @@
"""
ntfy.sh message poller — standalone, zero internal dependencies.
ntfy.sh SSE stream listener — standalone, zero internal dependencies.
Reusable by any visualizer:
from engine.ntfy import NtfyPoller
poller = NtfyPoller("https://ntfy.sh/my_topic/json?since=20s&poll=1")
poller = NtfyPoller("https://ntfy.sh/my_topic/json")
poller.start()
# in render loop:
msg = poller.get_active_message()
@@ -13,24 +13,47 @@ Reusable by any visualizer:
"""
import json
import time
import threading
import time
import urllib.request
from collections.abc import Callable
from datetime import datetime
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from engine.events import NtfyMessageEvent
class NtfyPoller:
"""Background poller for ntfy.sh topics."""
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
def __init__(self, topic_url, poll_interval=15, display_secs=30):
def __init__(self, topic_url, reconnect_delay=5, display_secs=30):
self.topic_url = topic_url
self.poll_interval = poll_interval
self.reconnect_delay = reconnect_delay
self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock()
self._subscribers: list[Callable[[NtfyMessageEvent], None]] = []
def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Register a callback to be called when a message is received."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: NtfyMessageEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background polling thread. Returns True."""
t = threading.Thread(target=self._poll_loop, daemon=True)
"""Start background stream thread. Returns True."""
t = threading.Thread(target=self._stream_loop, daemon=True)
t.start()
return True
@@ -50,19 +73,36 @@ class NtfyPoller:
with self._lock:
self._message = None
def _poll_loop(self):
def _build_url(self, last_id=None):
"""Build the stream URL, substituting since= to avoid message replays on reconnect."""
parsed = urlparse(self.topic_url)
params = parse_qs(parsed.query, keep_blank_values=True)
params["since"] = [last_id if last_id else "20s"]
new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query))
def _stream_loop(self):
last_id = None
while True:
try:
url = self._build_url(last_id)
req = urllib.request.Request(
self.topic_url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=10)
for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'):
if not line.strip():
continue
url, headers={"User-Agent": "mainline/0.1"}
)
# timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats
resp = urllib.request.urlopen(req, timeout=90)
while True:
line = resp.readline()
if not line:
break # server closed connection — reconnect
try:
data = json.loads(line)
data = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError:
continue
# Advance cursor on every event (message + keepalive) to
# avoid replaying already-seen events after a reconnect.
if "id" in data:
last_id = data["id"]
if data.get("event") == "message":
with self._lock:
self._message = (
@@ -70,6 +110,13 @@ class NtfyPoller:
data.get("message", ""),
time.monotonic(),
)
event = NtfyMessageEvent(
title=data.get("title", ""),
body=data.get("message", ""),
message_id=data.get("id"),
timestamp=datetime.now(),
)
self._emit(event)
except Exception:
pass
time.sleep(self.poll_interval)
time.sleep(self.reconnect_delay)

View File

@@ -4,15 +4,15 @@ Font loading, text rasterization, word-wrap, gradient coloring, headline block a
Depends on: config, terminal, sources, translate.
"""
import re
import random
import re
from pathlib import Path
from PIL import Image, ImageDraw, ImageFont
from engine import config
from engine.sources import NO_UPPER, SCRIPT_FONTS, SOURCE_LANGS
from engine.terminal import RST
from engine.sources import SCRIPT_FONTS, SOURCE_LANGS, NO_UPPER
from engine.translate import detect_location_language, translate_headline
# ─── GRADIENT ─────────────────────────────────────────────
@@ -62,13 +62,14 @@ def font():
f"No primary font selected. Add .otf/.ttf/.ttc files to {config.FONT_DIR}."
)
key = (config.FONT_PATH, config.FONT_INDEX, config.FONT_SZ)
if _FONT_OBJ is None or _FONT_OBJ_KEY != key:
if _FONT_OBJ is None or key != _FONT_OBJ_KEY:
_FONT_OBJ = ImageFont.truetype(
config.FONT_PATH, config.FONT_SZ, index=config.FONT_INDEX
)
_FONT_OBJ_KEY = key
return _FONT_OBJ
def clear_font_cache():
"""Reset cached font objects after changing primary font selection."""
global _FONT_OBJ, _FONT_OBJ_KEY
@@ -123,7 +124,7 @@ def render_line(text, fnt=None):
pad = 4
img_w = bbox[2] - bbox[0] + pad * 2
img_h = bbox[3] - bbox[1] + pad * 2
img = Image.new('L', (img_w, img_h), 0)
img = Image.new("L", (img_w, img_h), 0)
draw = ImageDraw.Draw(img)
draw.text((-bbox[0] + pad, -bbox[1] + pad), text, fill=255, font=fnt)
pix_h = config.RENDER_H * 2
@@ -200,8 +201,8 @@ def lr_gradient(rows, offset=0.0, grad_cols=None):
continue
buf = []
for x, ch in enumerate(row):
if ch == ' ':
buf.append(' ')
if ch == " ":
buf.append(" ")
else:
shifted = (x / max(max_x - 1, 1) + offset) % 1.0
idx = min(round(shifted * (n - 1)), n - 1)
@@ -218,7 +219,11 @@ def lr_gradient_opposite(rows, offset=0.0):
# ─── HEADLINE BLOCK ASSEMBLY ─────────────────────────────
def make_block(title, src, ts, w):
"""Render a headline into a content block with color."""
target_lang = (SOURCE_LANGS.get(src) or detect_location_language(title)) if config.MODE == 'news' else None
target_lang = (
(SOURCE_LANGS.get(src) or detect_location_language(title))
if config.MODE == "news"
else None
)
lang_font = font_for_lang(target_lang)
if target_lang:
title = translate_headline(title, target_lang)
@@ -227,11 +232,18 @@ def make_block(title, src, ts, w):
title_up = re.sub(r"\s+", " ", title)
else:
title_up = re.sub(r"\s+", " ", title.upper())
for old, new in [("\u2019","'"), ("\u2018","'"), ("\u201c",'"'),
("\u201d",'"'), ("\u2013","-"), ("\u2014","-")]:
for old, new in [
("\u2019", "'"),
("\u2018", "'"),
("\u201c", '"'),
("\u201d", '"'),
("\u2013", "-"),
("\u2014", "-"),
]:
title_up = title_up.replace(old, new)
big_rows = big_wrap(title_up, w - 4, lang_font)
hc = random.choice([
hc = random.choice(
[
"\033[38;5;46m", # matrix green
"\033[38;5;34m", # dark green
"\033[38;5;82m", # lime
@@ -248,7 +260,8 @@ def make_block(title, src, ts, w):
"\033[38;5;115m", # sage
"\033[1;38;5;46m", # bold green
"\033[1;38;5;250m", # bold white
])
]
)
content = [" " + r for r in big_rows]
content.append("")
meta = f"\u2591 {src} \u00b7 {ts}"

View File

@@ -1,18 +1,22 @@
"""
Render engine — ticker content, scroll motion, message panel, and firehose overlay.
Depends on: config, terminal, render, effects, ntfy, mic.
Orchestrates viewport, frame timing, and layers.
"""
import re
import random
import sys
import time
import random
from datetime import datetime
from engine import config
from engine.terminal import RST, W_COOL, CLR, tw, th
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite, make_block
from engine.effects import noise, glitch_bar, fade_line, vis_trunc, next_headline, firehose_line
from engine.frame import calculate_scroll_step
from engine.layers import (
apply_glitch,
render_firehose,
render_message_overlay,
render_ticker_zone,
)
from engine.terminal import CLR
from engine.viewport import th, tw
def stream(items, ntfy_poller, mic_monitor):
@@ -28,166 +32,78 @@ def stream(items, ntfy_poller, mic_monitor):
w, h = tw(), th()
fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh # reserve fixed firehose strip at bottom
GAP = 3 # blank rows between headlines
scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2
ticker_view_h = h - fh
GAP = 3
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
# Taxonomy:
# - message: centered ntfy overlay panel
# - ticker: large headline text content
# - scroll: upward camera motion applied to ticker content
# - firehose: fixed carriage-return style strip pinned at bottom
# Active ticker blocks: (content_rows, color, canvas_y, meta_idx)
active = []
scroll_cam = 0 # viewport top in virtual canvas coords
ticker_next_y = ticker_view_h # canvas-y where next block starts (off-screen bottom)
scroll_cam = 0
ticker_next_y = ticker_view_h
noise_cache = {}
scroll_motion_accum = 0.0
msg_cache = (None, None)
def _noise_at(cy):
if cy not in noise_cache:
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
return noise_cache[cy]
while True:
if queued >= config.HEADLINE_LIMIT and not active:
break
# Message color: bright cyan/white — distinct from headline greens
MSG_META = "\033[38;5;245m" # cool grey
MSG_BORDER = "\033[2;38;5;37m" # dim teal
_msg_cache = (None, None) # (cache_key, rendered_rows)
while queued < config.HEADLINE_LIMIT or active:
t0 = time.monotonic()
w, h = tw(), th()
fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
# ── Check for ntfy message ────────────────────────
msg_h = 0
msg_overlay = []
msg = ntfy_poller.get_active_message()
msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache)
buf = []
if msg is not None:
m_title, m_body, m_ts = msg
# ── Message overlay: centered in the viewport ──
display_text = m_body or m_title or "(empty)"
display_text = re.sub(r"\s+", " ", display_text.upper())
cache_key = (display_text, w)
if _msg_cache[0] != cache_key:
msg_rows = big_wrap(display_text, w - 4)
_msg_cache = (cache_key, msg_rows)
else:
msg_rows = _msg_cache[1]
msg_rows = lr_gradient_opposite(msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0)
# Layout: rendered text + meta + border
elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
ts_str = datetime.now().strftime("%H:%M:%S")
panel_h = len(msg_rows) + 2 # meta + border
panel_top = max(0, (h - panel_h) // 2)
row_idx = 0
for mr in msg_rows:
ln = vis_trunc(mr, w)
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K")
row_idx += 1
# Meta line: title (if distinct) + source + countdown
meta_parts = []
if m_title and m_title != m_body:
meta_parts.append(m_title)
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
meta = " " + " \u00b7 ".join(meta_parts) if len(meta_parts) > 1 else " " + meta_parts[0]
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K")
row_idx += 1
# Border — constant boundary under message panel
bar = "\u2500" * (w - 4)
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K")
ticker_h = ticker_view_h
# Ticker draws above the fixed firehose strip; message is a centered overlay.
ticker_h = ticker_view_h - msg_h
# ── Ticker content + scroll motion (always runs) ──
scroll_motion_accum += config.FRAME_DT
while scroll_motion_accum >= scroll_step_interval:
scroll_motion_accum -= scroll_step_interval
scroll_cam += 1
# Enqueue new headlines when room at the bottom
while ticker_next_y < scroll_cam + ticker_view_h + 10 and queued < config.HEADLINE_LIMIT:
while (
ticker_next_y < scroll_cam + ticker_view_h + 10
and queued < config.HEADLINE_LIMIT
):
from engine.effects import next_headline
from engine.render import make_block
t, src, ts = next_headline(pool, items, seen)
ticker_content, hc, midx = make_block(t, src, ts, w)
active.append((ticker_content, hc, ticker_next_y, midx))
ticker_next_y += len(ticker_content) + GAP
queued += 1
# Prune off-screen blocks and stale noise
active = [(c, hc, by, mi) for c, hc, by, mi in active
if by + len(c) > scroll_cam]
active = [
(c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam
]
for k in list(noise_cache):
if k < scroll_cam:
del noise_cache[k]
# Draw ticker zone (above fixed firehose strip)
top_zone = max(1, int(ticker_h * 0.25))
bot_zone = max(1, int(ticker_h * 0.10))
grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0
ticker_buf_start = len(buf) # track where ticker rows start in buf
for r in range(ticker_h):
scr_row = r + 1 # 1-indexed ANSI screen row
cy = scroll_cam + r
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
row_fade = min(top_f, bot_f)
drawn = False
for content, hc, by, midx in active:
cr = cy - by
if 0 <= cr < len(content):
raw = content[cr]
if cr != midx:
colored = lr_gradient([raw], grad_offset)[0]
else:
colored = raw
ln = vis_trunc(colored, w)
if row_fade < 1.0:
ln = fade_line(ln, row_fade)
if cr == midx:
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
elif ln.strip():
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
else:
buf.append(f"\033[{scr_row};1H\033[K")
drawn = True
break
if not drawn:
n = _noise_at(cy)
if row_fade < 1.0 and n:
n = fade_line(n, row_fade)
if n:
buf.append(f"\033[{scr_row};1H{n}")
else:
buf.append(f"\033[{scr_row};1H\033[K")
ticker_buf_start = len(buf)
ticker_buf, noise_cache = render_ticker_zone(
active, scroll_cam, ticker_h, w, noise_cache, grad_offset
)
buf.extend(ticker_buf)
# Glitch — base rate + mic-reactive spikes (ticker zone only)
mic_excess = mic_monitor.excess
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
n_hits = 4 + int(mic_excess / 2)
ticker_buf_len = len(buf) - ticker_buf_start
if random.random() < glitch_prob and ticker_buf_len > 0:
for _ in range(min(n_hits, ticker_buf_len)):
gi = random.randint(0, ticker_buf_len - 1)
scr_row = gi + 1
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
firehose_buf = render_firehose(items, w, fh, h)
buf.extend(firehose_buf)
if config.FIREHOSE and fh > 0:
for fr in range(fh):
scr_row = h - fh + fr + 1
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
if msg_overlay:
buf.extend(msg_overlay)
sys.stdout.buffer.write("".join(buf).encode())
sys.stdout.flush()
# Precise frame timing
elapsed = time.monotonic() - t0
time.sleep(max(0, config.FRAME_DT - elapsed))

View File

@@ -75,41 +75,41 @@ SOURCE_LANGS = {
# ─── LOCATION → LANGUAGE ─────────────────────────────────
LOCATION_LANGS = {
r'\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b': 'zh-cn',
r'\b(?:japan|japanese|tokyo|osaka|kishida)\b': 'ja',
r'\b(?:korea|korean|seoul|pyongyang)\b': 'ko',
r'\b(?:russia|russian|moscow|kremlin|putin)\b': 'ru',
r'\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b': 'ar',
r'\b(?:india|indian|delhi|mumbai|modi)\b': 'hi',
r'\b(?:germany|german|berlin|munich|scholz)\b': 'de',
r'\b(?:france|french|paris|lyon|macron)\b': 'fr',
r'\b(?:spain|spanish|madrid)\b': 'es',
r'\b(?:italy|italian|rome|milan|meloni)\b': 'it',
r'\b(?:portugal|portuguese|lisbon)\b': 'pt',
r'\b(?:brazil|brazilian|são paulo|lula)\b': 'pt',
r'\b(?:greece|greek|athens)\b': 'el',
r'\b(?:turkey|turkish|istanbul|ankara|erdogan)\b': 'tr',
r'\b(?:iran|iranian|tehran)\b': 'fa',
r'\b(?:thailand|thai|bangkok)\b': 'th',
r'\b(?:vietnam|vietnamese|hanoi)\b': 'vi',
r'\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b': 'uk',
r'\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b': 'he',
r"\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b": "zh-cn",
r"\b(?:japan|japanese|tokyo|osaka|kishida)\b": "ja",
r"\b(?:korea|korean|seoul|pyongyang)\b": "ko",
r"\b(?:russia|russian|moscow|kremlin|putin)\b": "ru",
r"\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b": "ar",
r"\b(?:india|indian|delhi|mumbai|modi)\b": "hi",
r"\b(?:germany|german|berlin|munich|scholz)\b": "de",
r"\b(?:france|french|paris|lyon|macron)\b": "fr",
r"\b(?:spain|spanish|madrid)\b": "es",
r"\b(?:italy|italian|rome|milan|meloni)\b": "it",
r"\b(?:portugal|portuguese|lisbon)\b": "pt",
r"\b(?:brazil|brazilian|são paulo|lula)\b": "pt",
r"\b(?:greece|greek|athens)\b": "el",
r"\b(?:turkey|turkish|istanbul|ankara|erdogan)\b": "tr",
r"\b(?:iran|iranian|tehran)\b": "fa",
r"\b(?:thailand|thai|bangkok)\b": "th",
r"\b(?:vietnam|vietnamese|hanoi)\b": "vi",
r"\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b": "uk",
r"\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b": "he",
}
# ─── NON-LATIN SCRIPT FONTS (macOS) ──────────────────────
SCRIPT_FONTS = {
'zh-cn': '/System/Library/Fonts/STHeiti Medium.ttc',
'ja': '/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc',
'ko': '/System/Library/Fonts/AppleSDGothicNeo.ttc',
'ru': '/System/Library/Fonts/Supplemental/Arial.ttf',
'uk': '/System/Library/Fonts/Supplemental/Arial.ttf',
'el': '/System/Library/Fonts/Supplemental/Arial.ttf',
'he': '/System/Library/Fonts/Supplemental/Arial.ttf',
'ar': '/System/Library/Fonts/GeezaPro.ttc',
'fa': '/System/Library/Fonts/GeezaPro.ttc',
'hi': '/System/Library/Fonts/Kohinoor.ttc',
'th': '/System/Library/Fonts/ThonburiUI.ttc',
"zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc",
"ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc",
"ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc",
"ru": "/System/Library/Fonts/Supplemental/Arial.ttf",
"uk": "/System/Library/Fonts/Supplemental/Arial.ttf",
"el": "/System/Library/Fonts/Supplemental/Arial.ttf",
"he": "/System/Library/Fonts/Supplemental/Arial.ttf",
"ar": "/System/Library/Fonts/GeezaPro.ttc",
"fa": "/System/Library/Fonts/GeezaPro.ttc",
"hi": "/System/Library/Fonts/Kohinoor.ttc",
"th": "/System/Library/Fonts/ThonburiUI.ttc",
}
# Scripts that have no uppercase
NO_UPPER = {'zh-cn', 'ja', 'ko', 'ar', 'fa', 'hi', 'th', 'he'}
NO_UPPER = {"zh-cn", "ja", "ko", "ar", "fa", "hi", "th", "he"}

View File

@@ -4,8 +4,8 @@ No internal dependencies.
"""
import os
import sys
import random
import sys
import time
# ─── ANSI ─────────────────────────────────────────────────
@@ -49,7 +49,7 @@ def type_out(text, color=G_HI):
while i < len(text):
if random.random() < 0.3:
b = random.randint(2, 5)
sys.stdout.write(f"{color}{text[i:i+b]}{RST}")
sys.stdout.write(f"{color}{text[i : i + b]}{RST}")
i += b
else:
sys.stdout.write(f"{color}{text[i]}{RST}")

View File

@@ -3,14 +3,33 @@ Google Translate wrapper and location→language detection.
Depends on: sources (for LOCATION_LANGS).
"""
import re
import json
import urllib.request
import re
import urllib.parse
import urllib.request
from functools import lru_cache
from engine.sources import LOCATION_LANGS
_TRANSLATE_CACHE = {}
TRANSLATE_CACHE_SIZE = 500
@lru_cache(maxsize=TRANSLATE_CACHE_SIZE)
def _translate_cached(title: str, target_lang: str) -> str:
"""Cached translation implementation."""
try:
q = urllib.parse.quote(title)
url = (
"https://translate.googleapis.com/translate_a/single"
f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}"
)
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read())
result = "".join(p[0] for p in data[0] if p[0]) or title
except Exception:
result = title
return result
def detect_location_language(title):
@@ -22,20 +41,6 @@ def detect_location_language(title):
return None
def translate_headline(title, target_lang):
def translate_headline(title: str, target_lang: str) -> str:
"""Translate headline via Google Translate API (zero dependencies)."""
key = (title, target_lang)
if key in _TRANSLATE_CACHE:
return _TRANSLATE_CACHE[key]
try:
q = urllib.parse.quote(title)
url = ("https://translate.googleapis.com/translate_a/single"
f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}")
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read())
result = "".join(p[0] for p in data[0] if p[0]) or title
except Exception:
result = title
_TRANSLATE_CACHE[key] = result
return result
return _translate_cached(title, target_lang)

60
engine/types.py Normal file
View File

@@ -0,0 +1,60 @@
"""
Shared dataclasses for the mainline application.
Provides named types for tuple returns across modules.
"""
from dataclasses import dataclass
@dataclass
class HeadlineItem:
"""A single headline item: title, source, and timestamp."""
title: str
source: str
timestamp: str
def to_tuple(self) -> tuple[str, str, str]:
"""Convert to tuple for backward compatibility."""
return (self.title, self.source, self.timestamp)
@classmethod
def from_tuple(cls, t: tuple[str, str, str]) -> "HeadlineItem":
"""Create from tuple for backward compatibility."""
return cls(title=t[0], source=t[1], timestamp=t[2])
def items_to_tuples(items: list[HeadlineItem]) -> list[tuple[str, str, str]]:
"""Convert list of HeadlineItem to list of tuples."""
return [item.to_tuple() for item in items]
def tuples_to_items(tuples: list[tuple[str, str, str]]) -> list[HeadlineItem]:
"""Convert list of tuples to list of HeadlineItem."""
return [HeadlineItem.from_tuple(t) for t in tuples]
@dataclass
class FetchResult:
"""Result from fetch_all() or fetch_poetry()."""
items: list[HeadlineItem]
linked: int
failed: int
def to_legacy_tuple(self) -> tuple[list[tuple], int, int]:
"""Convert to legacy tuple format for backward compatibility."""
return ([item.to_tuple() for item in self.items], self.linked, self.failed)
@dataclass
class Block:
"""Rendered headline block from make_block()."""
content: list[str]
color: str
meta_row_index: int
def to_legacy_tuple(self) -> tuple[list[str], str, int]:
"""Convert to legacy tuple format for backward compatibility."""
return (self.content, self.color, self.meta_row_index)

37
engine/viewport.py Normal file
View File

@@ -0,0 +1,37 @@
"""
Viewport utilities — terminal dimensions and ANSI positioning helpers.
No internal dependencies.
"""
import os
def tw() -> int:
"""Get terminal width (columns)."""
try:
return os.get_terminal_size().columns
except Exception:
return 80
def th() -> int:
"""Get terminal height (lines)."""
try:
return os.get_terminal_size().lines
except Exception:
return 24
def move_to(row: int, col: int = 1) -> str:
"""Generate ANSI escape to move cursor to row, col (1-indexed)."""
return f"\033[{row};{col}H"
def clear_screen() -> str:
"""Clear screen and move cursor to home."""
return "\033[2J\033[H"
def clear_line() -> str:
"""Clear current line."""
return "\033[K"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

BIN
fonts/Resond-Regular.otf Normal file

Binary file not shown.

BIN
fonts/Synthetix.otf Normal file

Binary file not shown.

27
hk.pkl Normal file
View File

@@ -0,0 +1,27 @@
amends "package://github.com/jdx/hk/releases/download/v1.38.0/hk@1.38.0#/Config.pkl"
import "package://github.com/jdx/hk/releases/download/v1.38.0/hk@1.38.0#/Builtins.pkl"
hooks {
["pre-commit"] {
fix = true
stash = "git"
steps {
["ruff-format"] = (Builtins.ruff_format) {
prefix = "uv run"
}
["ruff"] = (Builtins.ruff) {
prefix = "uv run"
check = "ruff check engine/ tests/"
fix = "ruff check --fix --unsafe-fixes engine/ tests/"
}
}
}
["pre-push"] {
steps {
["ruff"] = (Builtins.ruff) {
prefix = "uv run"
check = "ruff check engine/ tests/"
}
}
}
}

View File

@@ -5,40 +5,7 @@ Digital news consciousness stream.
Matrix aesthetic · THX-1138 hue.
"""
import subprocess, sys, pathlib
# ─── BOOTSTRAP VENV ───────────────────────────────────────
_VENV = pathlib.Path(__file__).resolve().parent / ".mainline_venv"
_MARKER = _VENV / ".installed_v3"
def _ensure_venv():
"""Create a local venv and install deps if needed."""
if _MARKER.exists():
return
import venv
print("\033[2;38;5;34m > first run — creating environment...\033[0m")
venv.create(str(_VENV), with_pip=True, clear=True)
pip = str(_VENV / "bin" / "pip")
subprocess.check_call(
[pip, "install", "feedparser", "Pillow", "-q"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
_MARKER.touch()
_ensure_venv()
# Install sounddevice on first run after v3
_MARKER_SD = _VENV / ".installed_sd"
if not _MARKER_SD.exists():
_pip = str(_VENV / "bin" / "pip")
subprocess.check_call([_pip, "install", "sounddevice", "numpy", "-q"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_MARKER_SD.touch()
sys.path.insert(0, str(next((_VENV / "lib").glob("python*/site-packages"))))
# ─── DELEGATE TO ENGINE ───────────────────────────────────
from engine.app import main # noqa: E402
from engine.app import main
if __name__ == "__main__":
main()

52
mise.toml Normal file
View File

@@ -0,0 +1,52 @@
[tools]
python = "3.12"
hk = "latest"
pkl = "latest"
[tasks]
# =====================
# Development
# =====================
test = "uv run pytest"
test-v = "uv run pytest -v"
test-cov = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html"
test-cov-open = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html && open htmlcov/index.html"
lint = "uv run ruff check engine/ mainline.py"
lint-fix = "uv run ruff check --fix engine/ mainline.py"
format = "uv run ruff format engine/ mainline.py"
# =====================
# Runtime
# =====================
run = "uv run mainline.py"
run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose"
# =====================
# Environment
# =====================
sync = "uv sync"
sync-all = "uv sync --all-extras"
install = "uv sync"
install-dev = "uv sync --group dev"
bootstrap = "uv sync && uv run mainline.py --help"
clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache"
# =====================
# CI/CD
# =====================
ci = "uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml"
ci-lint = "uv run ruff check engine/ mainline.py"
# =====================
# Git Hooks (via hk)
# =====================
pre-commit = "hk run pre-commit"

89
pyproject.toml Normal file
View File

@@ -0,0 +1,89 @@
[project]
name = "mainline"
version = "0.1.0"
description = "Terminal news ticker with Matrix aesthetic"
readme = "README.md"
requires-python = ">=3.10"
authors = [
{ name = "Mainline", email = "mainline@example.com" }
]
license = { text = "MIT" }
classifiers = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Terminals",
]
dependencies = [
"feedparser>=6.0.0",
"Pillow>=10.0.0",
"pyright>=1.1.408",
]
[project.optional-dependencies]
mic = [
"sounddevice>=0.4.0",
"numpy>=1.24.0",
]
dev = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",
"pytest-mock>=3.12.0",
"ruff>=0.1.0",
]
[project.scripts]
mainline = "engine.app:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[dependency-groups]
dev = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",
"pytest-mock>=3.12.0",
"ruff>=0.1.0",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_functions = ["test_*"]
addopts = [
"--strict-markers",
"--tb=short",
"-v",
]
filterwarnings = [
"ignore::DeprecationWarning",
]
[tool.coverage.run]
source = ["engine"]
branch = true
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise AssertionError",
"raise NotImplementedError",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
"@abstractmethod",
]
[tool.ruff]
line-length = 88
target-version = "py310"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "N", "UP", "B", "C4", "SIM"]
ignore = ["E501", "SIM105", "N806", "B007", "SIM108"]

4
requirements-dev.txt Normal file
View File

@@ -0,0 +1,4 @@
pytest>=8.0.0
pytest-cov>=4.1.0
pytest-mock>=3.12.0
ruff>=0.1.0

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
feedparser>=6.0.0
Pillow>=10.0.0
sounddevice>=0.4.0
numpy>=1.24.0

0
tests/__init__.py Normal file
View File

236
tests/fixtures/__init__.py vendored Normal file
View File

@@ -0,0 +1,236 @@
"""
Pytest fixtures for mocking external dependencies (network, filesystem).
"""
import json
from unittest.mock import MagicMock
import pytest
@pytest.fixture
def mock_feed_response():
"""Mock RSS feed response data."""
return b"""<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>Test Feed</title>
<link>https://example.com</link>
<item>
<title>Test Headline One</title>
<pubDate>Sat, 15 Mar 2025 12:00:00 GMT</pubDate>
</item>
<item>
<title>Test Headline Two</title>
<pubDate>Sat, 15 Mar 2025 11:00:00 GMT</pubDate>
</item>
<item>
<title>Sports: Team Wins Championship</title>
<pubDate>Sat, 15 Mar 2025 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>"""
@pytest.fixture
def mock_gutenberg_response():
"""Mock Project Gutenberg text response."""
return """Project Gutenberg's Collection, by Various
*** START OF SOME TEXT ***
This is a test poem with multiple lines
that should be parsed as stanzas.
Another stanza here with different content
and more lines to test the parsing logic.
Yet another stanza for variety
in the test data.
*** END OF SOME TEXT ***"""
@pytest.fixture
def mock_gutenberg_empty():
"""Mock Gutenberg response with no valid stanzas."""
return """Project Gutenberg's Collection
*** START OF TEXT ***
THIS IS ALL CAPS AND SHOULD BE SKIPPED
I.
*** END OF TEXT ***"""
@pytest.fixture
def mock_ntfy_message():
"""Mock ntfy.sh SSE message."""
return json.dumps(
{
"id": "test123",
"event": "message",
"title": "Test Title",
"message": "Test message body",
"time": 1234567890,
}
).encode()
@pytest.fixture
def mock_ntfy_keepalive():
"""Mock ntfy.sh keepalive message."""
return b'data: {"event":"keepalive"}\n\n'
@pytest.fixture
def mock_google_translate_response():
"""Mock Google Translate API response."""
return json.dumps(
[
[["Translated text", "Original text", None, 0.8], None, "en"],
None,
None,
[],
[],
[],
[],
]
)
@pytest.fixture
def mock_feedparser():
"""Create a mock feedparser.parse function."""
def _mock(data):
mock_result = MagicMock()
mock_result.bozo = False
mock_result.entries = [
{
"title": "Test Headline",
"published_parsed": (2025, 3, 15, 12, 0, 0, 0, 0, 0),
},
{
"title": "Another Headline",
"updated_parsed": (2025, 3, 15, 11, 0, 0, 0, 0, 0),
},
]
return mock_result
return _mock
@pytest.fixture
def mock_urllib_open(mock_feed_response):
"""Create a mock urllib.request.urlopen that returns feed data."""
def _mock(url):
mock_response = MagicMock()
mock_response.read.return_value = mock_feed_response
return mock_response
return _mock
@pytest.fixture
def sample_items():
"""Sample items as returned by fetch module (title, source, timestamp)."""
return [
("Headline One", "Test Source", "12:00"),
("Headline Two", "Another Source", "11:30"),
("Headline Three", "Third Source", "10:45"),
]
@pytest.fixture
def sample_config():
"""Sample config for testing."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="news",
firehose=False,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)
@pytest.fixture
def poetry_config():
"""Sample config for poetry mode."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="poetry",
firehose=False,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)
@pytest.fixture
def firehose_config():
"""Sample config with firehose enabled."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="news",
firehose=True,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)

301
tests/test_config.py Normal file
View File

@@ -0,0 +1,301 @@
"""
Tests for engine.config module.
"""
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from engine import config
class TestArgValue:
"""Tests for _arg_value helper."""
def test_returns_value_when_flag_present(self):
"""Returns the value following the flag."""
with patch.object(sys, "argv", ["prog", "--font-file", "test.otf"]):
result = config._arg_value("--font-file")
assert result == "test.otf"
def test_returns_none_when_flag_missing(self):
"""Returns None when flag is not present."""
with patch.object(sys, "argv", ["prog"]):
result = config._arg_value("--font-file")
assert result is None
def test_returns_none_when_no_value(self):
"""Returns None when flag is last."""
with patch.object(sys, "argv", ["prog", "--font-file"]):
result = config._arg_value("--font-file")
assert result is None
class TestArgInt:
"""Tests for _arg_int helper."""
def test_parses_valid_int(self):
"""Parses valid integer."""
with patch.object(sys, "argv", ["prog", "--font-index", "5"]):
result = config._arg_int("--font-index", 0)
assert result == 5
def test_returns_default_on_invalid(self):
"""Returns default on invalid input."""
with patch.object(sys, "argv", ["prog", "--font-index", "abc"]):
result = config._arg_int("--font-index", 0)
assert result == 0
def test_returns_default_when_missing(self):
"""Returns default when flag missing."""
with patch.object(sys, "argv", ["prog"]):
result = config._arg_int("--font-index", 10)
assert result == 10
class TestResolveFontPath:
"""Tests for _resolve_font_path helper."""
def test_returns_absolute_paths(self):
"""Absolute paths are returned as-is."""
result = config._resolve_font_path("/absolute/path.otf")
assert result == "/absolute/path.otf"
def test_resolves_relative_paths(self):
"""Relative paths are resolved to repo root."""
result = config._resolve_font_path("fonts/test.otf")
assert str(config._REPO_ROOT) in result
def test_expands_user_home(self):
"""Tilde paths are expanded."""
with patch("pathlib.Path.expanduser", return_value=Path("/home/user/fonts")):
result = config._resolve_font_path("~/fonts/test.otf")
assert isinstance(result, str)
class TestListFontFiles:
"""Tests for _list_font_files helper."""
def test_returns_empty_for_missing_dir(self):
"""Returns empty list for missing directory."""
result = config._list_font_files("/nonexistent/directory")
assert result == []
def test_filters_by_extension(self):
"""Only returns valid font extensions."""
with tempfile.TemporaryDirectory() as tmpdir:
Path(tmpdir, "valid.otf").touch()
Path(tmpdir, "valid.ttf").touch()
Path(tmpdir, "invalid.txt").touch()
Path(tmpdir, "image.png").touch()
result = config._list_font_files(tmpdir)
assert len(result) == 2
assert all(f.endswith((".otf", ".ttf")) for f in result)
def test_sorts_alphabetically(self):
"""Results are sorted alphabetically."""
with tempfile.TemporaryDirectory() as tmpdir:
Path(tmpdir, "zfont.otf").touch()
Path(tmpdir, "afont.otf").touch()
result = config._list_font_files(tmpdir)
filenames = [Path(f).name for f in result]
assert filenames == ["afont.otf", "zfont.otf"]
class TestDefaults:
"""Tests for default configuration values."""
def test_headline_limit(self):
"""HEADLINE_LIMIT has sensible default."""
assert config.HEADLINE_LIMIT > 0
def test_feed_timeout(self):
"""FEED_TIMEOUT has sensible default."""
assert config.FEED_TIMEOUT > 0
def test_font_extensions(self):
"""Font extensions are defined."""
assert ".otf" in config._FONT_EXTENSIONS
assert ".ttf" in config._FONT_EXTENSIONS
assert ".ttc" in config._FONT_EXTENSIONS
class TestGlyphs:
"""Tests for glyph constants."""
def test_glitch_glyphs_defined(self):
"""GLITCH glyphs are defined."""
assert len(config.GLITCH) > 0
def test_kata_glyphs_defined(self):
"""KATA glyphs are defined."""
assert len(config.KATA) > 0
class TestSetFontSelection:
"""Tests for set_font_selection function."""
def test_updates_font_path(self):
"""Updates FONT_PATH globally."""
original = config.FONT_PATH
config.set_font_selection(font_path="/new/path.otf")
assert config.FONT_PATH == "/new/path.otf"
config.FONT_PATH = original
def test_updates_font_index(self):
"""Updates FONT_INDEX globally."""
original = config.FONT_INDEX
config.set_font_selection(font_index=5)
assert config.FONT_INDEX == 5
config.FONT_INDEX = original
def test_handles_none_values(self):
"""Handles None values gracefully."""
original_path = config.FONT_PATH
original_index = config.FONT_INDEX
config.set_font_selection(font_path=None, font_index=None)
assert original_path == config.FONT_PATH
assert original_index == config.FONT_INDEX
class TestConfigDataclass:
"""Tests for Config dataclass."""
def test_config_has_required_fields(self):
"""Config has all required fields."""
c = config.Config()
assert hasattr(c, "headline_limit")
assert hasattr(c, "feed_timeout")
assert hasattr(c, "mic_threshold_db")
assert hasattr(c, "mode")
assert hasattr(c, "firehose")
assert hasattr(c, "ntfy_topic")
assert hasattr(c, "ntfy_reconnect_delay")
assert hasattr(c, "message_display_secs")
assert hasattr(c, "font_dir")
assert hasattr(c, "font_path")
assert hasattr(c, "font_index")
assert hasattr(c, "font_picker")
assert hasattr(c, "font_sz")
assert hasattr(c, "render_h")
assert hasattr(c, "ssaa")
assert hasattr(c, "scroll_dur")
assert hasattr(c, "frame_dt")
assert hasattr(c, "firehose_h")
assert hasattr(c, "grad_speed")
assert hasattr(c, "glitch_glyphs")
assert hasattr(c, "kata_glyphs")
assert hasattr(c, "script_fonts")
def test_config_defaults(self):
"""Config has sensible defaults."""
c = config.Config()
assert c.headline_limit == 1000
assert c.feed_timeout == 10
assert c.mic_threshold_db == 50
assert c.mode == "news"
assert c.firehose is False
assert c.ntfy_reconnect_delay == 5
assert c.message_display_secs == 30
def test_config_is_immutable(self):
"""Config is frozen (immutable)."""
c = config.Config()
with pytest.raises(AttributeError):
c.headline_limit = 500 # type: ignore
def test_config_custom_values(self):
"""Config accepts custom values."""
c = config.Config(
headline_limit=500,
mode="poetry",
firehose=True,
ntfy_topic="https://ntfy.sh/test",
)
assert c.headline_limit == 500
assert c.mode == "poetry"
assert c.firehose is True
assert c.ntfy_topic == "https://ntfy.sh/test"
class TestConfigFromArgs:
"""Tests for Config.from_args method."""
def test_from_args_defaults(self):
"""from_args creates config with defaults from empty argv."""
c = config.Config.from_args(["prog"])
assert c.mode == "news"
assert c.firehose is False
assert c.font_picker is True
def test_from_args_poetry_mode(self):
"""from_args detects --poetry flag."""
c = config.Config.from_args(["prog", "--poetry"])
assert c.mode == "poetry"
def test_from_args_poetry_short_flag(self):
"""from_args detects -p short flag."""
c = config.Config.from_args(["prog", "-p"])
assert c.mode == "poetry"
def test_from_args_firehose(self):
"""from_args detects --firehose flag."""
c = config.Config.from_args(["prog", "--firehose"])
assert c.firehose is True
def test_from_args_no_font_picker(self):
"""from_args detects --no-font-picker flag."""
c = config.Config.from_args(["prog", "--no-font-picker"])
assert c.font_picker is False
def test_from_args_font_index(self):
"""from_args parses --font-index."""
c = config.Config.from_args(["prog", "--font-index", "3"])
assert c.font_index == 3
class TestGetSetConfig:
"""Tests for get_config and set_config functions."""
def test_get_config_returns_config(self):
"""get_config returns a Config instance."""
c = config.get_config()
assert isinstance(c, config.Config)
def test_set_config_allows_injection(self):
"""set_config allows injecting a custom config."""
custom = config.Config(mode="poetry", headline_limit=100)
config.set_config(custom)
assert config.get_config().mode == "poetry"
assert config.get_config().headline_limit == 100
def test_set_config_then_get_config(self):
"""set_config followed by get_config returns the set config."""
original = config.get_config()
test_config = config.Config(headline_limit=42)
config.set_config(test_config)
result = config.get_config()
assert result.headline_limit == 42
config.set_config(original)
class TestPlatformFontPaths:
"""Tests for platform font path detection."""
def test_get_platform_font_paths_returns_dict(self):
"""_get_platform_font_paths returns a dictionary."""
fonts = config._get_platform_font_paths()
assert isinstance(fonts, dict)
def test_platform_font_paths_common_languages(self):
"""Common language font mappings exist."""
fonts = config._get_platform_font_paths()
common = {"ja", "zh-cn", "ko", "ru", "ar", "hi"}
found = set(fonts.keys()) & common
assert len(found) > 0

85
tests/test_controller.py Normal file
View File

@@ -0,0 +1,85 @@
"""
Tests for engine.controller module.
"""
from unittest.mock import MagicMock, patch
from engine import config
from engine.controller import StreamController
class TestStreamController:
"""Tests for StreamController class."""
def test_init_default_config(self):
"""StreamController initializes with default config."""
controller = StreamController()
assert controller.config is not None
assert isinstance(controller.config, config.Config)
def test_init_custom_config(self):
"""StreamController accepts custom config."""
custom_config = config.Config(headline_limit=500)
controller = StreamController(config=custom_config)
assert controller.config.headline_limit == 500
def test_init_sources_none_by_default(self):
"""Sources are None until initialized."""
controller = StreamController()
assert controller.mic is None
assert controller.ntfy is None
@patch("engine.controller.MicMonitor")
@patch("engine.controller.NtfyPoller")
def test_initialize_sources(self, mock_ntfy, mock_mic):
"""initialize_sources creates mic and ntfy instances."""
mock_mic_instance = MagicMock()
mock_mic_instance.available = True
mock_mic_instance.start.return_value = True
mock_mic.return_value = mock_mic_instance
mock_ntfy_instance = MagicMock()
mock_ntfy_instance.start.return_value = True
mock_ntfy.return_value = mock_ntfy_instance
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
assert mic_ok is True
assert ntfy_ok is True
assert controller.mic is not None
assert controller.ntfy is not None
@patch("engine.controller.MicMonitor")
@patch("engine.controller.NtfyPoller")
def test_initialize_sources_mic_unavailable(self, mock_ntfy, mock_mic):
"""initialize_sources handles unavailable mic."""
mock_mic_instance = MagicMock()
mock_mic_instance.available = False
mock_mic.return_value = mock_mic_instance
mock_ntfy_instance = MagicMock()
mock_ntfy_instance.start.return_value = True
mock_ntfy.return_value = mock_ntfy_instance
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
assert mic_ok is False
assert ntfy_ok is True
class TestStreamControllerCleanup:
"""Tests for StreamController cleanup."""
@patch("engine.controller.MicMonitor")
def test_cleanup_stops_mic(self, mock_mic):
"""cleanup stops the microphone if running."""
mock_mic_instance = MagicMock()
mock_mic.return_value = mock_mic_instance
controller = StreamController()
controller.mic = mock_mic_instance
controller.cleanup()
mock_mic_instance.stop.assert_called_once()

69
tests/test_emitters.py Normal file
View File

@@ -0,0 +1,69 @@
"""
Tests for engine.emitters module.
"""
from engine.emitters import EventEmitter, Startable, Stoppable
class TestEventEmitterProtocol:
"""Tests for EventEmitter protocol."""
def test_protocol_exists(self):
"""EventEmitter protocol is defined."""
assert EventEmitter is not None
def test_protocol_has_subscribe_method(self):
"""EventEmitter has subscribe method in protocol."""
assert hasattr(EventEmitter, "subscribe")
def test_protocol_has_unsubscribe_method(self):
"""EventEmitter has unsubscribe method in protocol."""
assert hasattr(EventEmitter, "unsubscribe")
class TestStartableProtocol:
"""Tests for Startable protocol."""
def test_protocol_exists(self):
"""Startable protocol is defined."""
assert Startable is not None
def test_protocol_has_start_method(self):
"""Startable has start method in protocol."""
assert hasattr(Startable, "start")
class TestStoppableProtocol:
"""Tests for Stoppable protocol."""
def test_protocol_exists(self):
"""Stoppable protocol is defined."""
assert Stoppable is not None
def test_protocol_has_stop_method(self):
"""Stoppable has stop method in protocol."""
assert hasattr(Stoppable, "stop")
class TestProtocolCompliance:
"""Tests that existing classes comply with protocols."""
def test_ntfy_poller_complies_with_protocol(self):
"""NtfyPoller implements EventEmitter protocol."""
from engine.ntfy import NtfyPoller
poller = NtfyPoller("http://example.com/topic")
assert hasattr(poller, "subscribe")
assert hasattr(poller, "unsubscribe")
assert callable(poller.subscribe)
assert callable(poller.unsubscribe)
def test_mic_monitor_complies_with_protocol(self):
"""MicMonitor implements EventEmitter and Startable protocols."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert hasattr(monitor, "subscribe")
assert hasattr(monitor, "unsubscribe")
assert hasattr(monitor, "start")
assert hasattr(monitor, "stop")

202
tests/test_eventbus.py Normal file
View File

@@ -0,0 +1,202 @@
"""
Tests for engine.eventbus module.
"""
from engine.eventbus import EventBus, get_event_bus, set_event_bus
from engine.events import EventType, NtfyMessageEvent
class TestEventBusInit:
"""Tests for EventBus initialization."""
def test_init_creates_empty_subscribers(self):
"""EventBus starts with no subscribers."""
bus = EventBus()
assert bus.subscriber_count() == 0
class TestEventBusSubscribe:
"""Tests for EventBus.subscribe method."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback for an event type."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
def test_subscribe_multiple_callbacks_same_event(self):
"""Multiple callbacks can be subscribed to the same event type."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.NTFY_MESSAGE, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 2
def test_subscribe_different_event_types(self):
"""Callbacks can be subscribed to different event types."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
assert bus.subscriber_count(EventType.MIC_LEVEL) == 1
class TestEventBusUnsubscribe:
"""Tests for EventBus.unsubscribe method."""
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 0
def test_unsubscribe_nonexistent_callback_no_error(self):
"""unsubscribe() handles non-existent callback gracefully."""
bus = EventBus()
def callback(e):
return None
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
class TestEventBusPublish:
"""Tests for EventBus.publish method."""
def test_publish_calls_subscriber(self):
"""publish() calls the subscriber callback."""
bus = EventBus()
received = []
def callback(event):
received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received) == 1
assert received[0].title == "Test"
def test_publish_multiple_subscribers(self):
"""publish() calls all subscribers for an event type."""
bus = EventBus()
received1 = []
received2 = []
def callback1(event):
received1.append(event)
def callback2(event):
received2.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback1)
bus.subscribe(EventType.NTFY_MESSAGE, callback2)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received1) == 1
assert len(received2) == 1
def test_publish_different_event_types(self):
"""publish() only calls subscribers for the specific event type."""
bus = EventBus()
ntfy_received = []
mic_received = []
def ntfy_callback(event):
ntfy_received.append(event)
def mic_callback(event):
mic_received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, ntfy_callback)
bus.subscribe(EventType.MIC_LEVEL, mic_callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(ntfy_received) == 1
assert len(mic_received) == 0
class TestEventBusClear:
"""Tests for EventBus.clear method."""
def test_clear_removes_all_subscribers(self):
"""clear() removes all subscribers."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
bus.clear()
assert bus.subscriber_count() == 0
class TestEventBusThreadSafety:
"""Tests for EventBus thread safety."""
def test_concurrent_subscribe_unsubscribe(self):
"""subscribe and unsubscribe can be called concurrently."""
import threading
bus = EventBus()
callbacks = [lambda e: None for _ in range(10)]
def subscribe():
for cb in callbacks:
bus.subscribe(EventType.NTFY_MESSAGE, cb)
def unsubscribe():
for cb in callbacks:
bus.unsubscribe(EventType.NTFY_MESSAGE, cb)
t1 = threading.Thread(target=subscribe)
t2 = threading.Thread(target=unsubscribe)
t1.start()
t2.start()
t1.join()
t2.join()
class TestGlobalEventBus:
"""Tests for global event bus functions."""
def test_get_event_bus_returns_singleton(self):
"""get_event_bus() returns the same instance."""
bus1 = get_event_bus()
bus2 = get_event_bus()
assert bus1 is bus2
def test_set_event_bus_replaces_singleton(self):
"""set_event_bus() replaces the global event bus."""
new_bus = EventBus()
set_event_bus(new_bus)
try:
assert get_event_bus() is new_bus
finally:
set_event_bus(None)

112
tests/test_events.py Normal file
View File

@@ -0,0 +1,112 @@
"""
Tests for engine.events module.
"""
from datetime import datetime
from engine import events
class TestEventType:
"""Tests for EventType enum."""
def test_event_types_exist(self):
"""All expected event types exist."""
assert hasattr(events.EventType, "NEW_HEADLINE")
assert hasattr(events.EventType, "FRAME_TICK")
assert hasattr(events.EventType, "MIC_LEVEL")
assert hasattr(events.EventType, "NTFY_MESSAGE")
assert hasattr(events.EventType, "STREAM_START")
assert hasattr(events.EventType, "STREAM_END")
class TestHeadlineEvent:
"""Tests for HeadlineEvent dataclass."""
def test_create_headline_event(self):
"""HeadlineEvent can be created with required fields."""
e = events.HeadlineEvent(
title="Test Headline",
source="Test Source",
timestamp="12:00",
)
assert e.title == "Test Headline"
assert e.source == "Test Source"
assert e.timestamp == "12:00"
def test_headline_event_optional_language(self):
"""HeadlineEvent supports optional language field."""
e = events.HeadlineEvent(
title="Test",
source="Test",
timestamp="12:00",
language="ja",
)
assert e.language == "ja"
class TestFrameTickEvent:
"""Tests for FrameTickEvent dataclass."""
def test_create_frame_tick_event(self):
"""FrameTickEvent can be created."""
now = datetime.now()
e = events.FrameTickEvent(
frame_number=100,
timestamp=now,
delta_seconds=0.05,
)
assert e.frame_number == 100
assert e.timestamp == now
assert e.delta_seconds == 0.05
class TestMicLevelEvent:
"""Tests for MicLevelEvent dataclass."""
def test_create_mic_level_event(self):
"""MicLevelEvent can be created."""
now = datetime.now()
e = events.MicLevelEvent(
db_level=60.0,
excess_above_threshold=10.0,
timestamp=now,
)
assert e.db_level == 60.0
assert e.excess_above_threshold == 10.0
class TestNtfyMessageEvent:
"""Tests for NtfyMessageEvent dataclass."""
def test_create_ntfy_message_event(self):
"""NtfyMessageEvent can be created with required fields."""
e = events.NtfyMessageEvent(
title="Test Title",
body="Test Body",
)
assert e.title == "Test Title"
assert e.body == "Test Body"
assert e.message_id is None
def test_ntfy_message_event_with_id(self):
"""NtfyMessageEvent supports optional message_id."""
e = events.NtfyMessageEvent(
title="Test",
body="Test",
message_id="abc123",
)
assert e.message_id == "abc123"
class TestStreamEvent:
"""Tests for StreamEvent dataclass."""
def test_create_stream_event(self):
"""StreamEvent can be created."""
e = events.StreamEvent(
event_type=events.EventType.STREAM_START,
headline_count=100,
)
assert e.event_type == events.EventType.STREAM_START
assert e.headline_count == 100

93
tests/test_filter.py Normal file
View File

@@ -0,0 +1,93 @@
"""
Tests for engine.filter module.
"""
from engine.filter import skip, strip_tags
class TestStripTags:
"""Tests for strip_tags function."""
def test_strips_simple_html(self):
"""Basic HTML tags are removed."""
assert strip_tags("<p>Hello</p>") == "Hello"
assert strip_tags("<b>Bold</b>") == "Bold"
assert strip_tags("<em>Italic</em>") == "Italic"
def test_strips_nested_html(self):
"""Nested HTML tags are handled."""
assert strip_tags("<div><p>Nested</p></div>") == "Nested"
assert strip_tags("<span><strong>Deep</strong></span>") == "Deep"
def test_strips_html_with_attributes(self):
"""HTML with attributes is handled."""
assert strip_tags('<a href="http://example.com">Link</a>') == "Link"
assert strip_tags('<img src="test.jpg" alt="test">') == ""
def test_handles_empty_string(self):
"""Empty string returns empty string."""
assert strip_tags("") == ""
assert strip_tags(None) == ""
def test_handles_plain_text(self):
"""Plain text without tags passes through."""
assert strip_tags("Plain text") == "Plain text"
def test_unescapes_html_entities(self):
"""HTML entities are decoded and tags are stripped."""
assert strip_tags("&nbsp;test") == "test"
assert strip_tags("Hello &amp; World") == "Hello & World"
def test_handles_malformed_html(self):
"""Malformed HTML is handled gracefully."""
assert strip_tags("<p>Unclosed") == "Unclosed"
assert strip_tags("</p>No start") == "No start"
class TestSkip:
"""Tests for skip function - content filtering."""
def test_skips_sports_content(self):
"""Sports-related headlines are skipped."""
assert skip("Football: Team wins championship") is True
assert skip("NBA Finals Game 7 results") is True
assert skip("Soccer match ends in draw") is True
assert skip("Premier League transfer news") is True
assert skip("Super Bowl halftime show") is True
def test_skips_vapid_content(self):
"""Vapid/celebrity content is skipped."""
assert skip("Kim Kardashian's new look") is True
assert skip("Influencer goes viral") is True
assert skip("Red carpet best dressed") is True
assert skip("Celebrity couple splits") is True
def test_allows_real_news(self):
"""Legitimate news headlines are allowed."""
assert skip("Scientists discover new planet") is False
assert skip("Economy grows by 3%") is False
assert skip("World leaders meet for summit") is False
assert skip("New technology breakthrough") is False
def test_case_insensitive(self):
"""Filter is case insensitive."""
assert skip("FOOTBALL scores") is True
assert skip("Football SCORES") is True
assert skip("Kardashian") is True
def test_word_boundary_matching(self):
"""Word boundary matching works correctly."""
assert skip("The football stadium") is True
assert skip("Footballer scores") is False
assert skip("Footballs on sale") is False
class TestIntegration:
"""Integration tests combining filter functions."""
def test_full_pipeline(self):
"""Test strip_tags followed by skip."""
html = '<p><a href="#">Breaking: Football championship final</a></p>'
text = strip_tags(html)
assert text == "Breaking: Football championship final"
assert skip(text) is True

63
tests/test_frame.py Normal file
View File

@@ -0,0 +1,63 @@
"""
Tests for engine.frame module.
"""
import time
from engine.frame import FrameTimer, calculate_scroll_step
class TestFrameTimer:
"""Tests for FrameTimer class."""
def test_init_default(self):
"""FrameTimer initializes with default values."""
timer = FrameTimer()
assert timer.target_frame_dt == 0.05
assert timer.fps >= 0
def test_init_custom(self):
"""FrameTimer accepts custom frame duration."""
timer = FrameTimer(target_frame_dt=0.1)
assert timer.target_frame_dt == 0.1
def test_fps_calculation(self):
"""FrameTimer calculates FPS correctly."""
timer = FrameTimer()
timer._frame_count = 10
timer._start_time = time.monotonic() - 1.0
assert timer.fps >= 9.0
def test_reset(self):
"""FrameTimer.reset() clears frame count."""
timer = FrameTimer()
timer._frame_count = 100
timer.reset()
assert timer._frame_count == 0
class TestCalculateScrollStep:
"""Tests for calculate_scroll_step function."""
def test_basic_calculation(self):
"""calculate_scroll_step returns positive value."""
result = calculate_scroll_step(5.0, 24)
assert result > 0
def test_with_padding(self):
"""calculate_scroll_step respects padding parameter."""
without_padding = calculate_scroll_step(5.0, 24, padding=0)
with_padding = calculate_scroll_step(5.0, 24, padding=15)
assert with_padding < without_padding
def test_larger_view_slower_scroll(self):
"""Larger view height results in slower scroll steps."""
small = calculate_scroll_step(5.0, 10)
large = calculate_scroll_step(5.0, 50)
assert large < small
def test_longer_duration_slower_scroll(self):
"""Longer scroll duration results in slower scroll steps."""
fast = calculate_scroll_step(2.0, 24)
slow = calculate_scroll_step(10.0, 24)
assert slow > fast

96
tests/test_layers.py Normal file
View File

@@ -0,0 +1,96 @@
"""
Tests for engine.layers module.
"""
import time
from engine import layers
class TestRenderMessageOverlay:
"""Tests for render_message_overlay function."""
def test_no_message_returns_empty(self):
"""Returns empty list when msg is None."""
result, cache = layers.render_message_overlay(None, 80, 24, (None, None))
assert result == []
assert cache[0] is None
def test_message_returns_overlay_lines(self):
"""Returns non-empty list when message is present."""
msg = ("Test Title", "Test Body", time.monotonic())
result, cache = layers.render_message_overlay(msg, 80, 24, (None, None))
assert len(result) > 0
assert cache[0] is not None
def test_cache_key_changes_with_text(self):
"""Cache key changes when message text changes."""
msg1 = ("Title1", "Body1", time.monotonic())
msg2 = ("Title2", "Body2", time.monotonic())
_, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None))
_, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1)
assert cache1[0] != cache2[0]
def test_cache_reuse_avoids_recomputation(self):
"""Cache is returned when same message is passed (interface test)."""
msg = ("Same Title", "Same Body", time.monotonic())
result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None))
result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1)
assert len(result1) > 0
assert len(result2) > 0
assert cache1[0] == cache2[0]
class TestRenderFirehose:
"""Tests for render_firehose function."""
def test_no_firehose_returns_empty(self):
"""Returns empty list when firehose height is 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 0, 24)
assert result == []
def test_firehose_returns_lines(self):
"""Returns lines when firehose height > 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 4, 24)
assert len(result) == 4
def test_firehose_includes_ansi_escapes(self):
"""Returns lines containing ANSI escape sequences."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 1, 24)
assert "\033[" in result[0]
class TestApplyGlitch:
"""Tests for apply_glitch function."""
def test_empty_buffer_unchanged(self):
"""Empty buffer is returned unchanged."""
result = layers.apply_glitch([], 0, 0.0, 80)
assert result == []
def test_buffer_length_preserved(self):
"""Buffer length is preserved after glitch application."""
buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)]
result = layers.apply_glitch(buf, 0, 0.5, 80)
assert len(result) == len(buf)
class TestRenderTickerZone:
"""Tests for render_ticker_zone function - focusing on interface."""
def test_returns_list(self):
"""Returns a list of strings."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
assert isinstance(result, list)
def test_returns_dict_for_cache(self):
"""Returns a dict for the noise cache."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
assert isinstance(cache, dict)

149
tests/test_mic.py Normal file
View File

@@ -0,0 +1,149 @@
"""
Tests for engine.mic module.
"""
from datetime import datetime
from unittest.mock import patch
from engine.events import MicLevelEvent
class TestMicMonitorImport:
"""Tests for module import behavior."""
def test_mic_monitor_imports_without_error(self):
"""MicMonitor can be imported even without sounddevice."""
from engine.mic import MicMonitor
assert MicMonitor is not None
class TestMicMonitorInit:
"""Tests for MicMonitor initialization."""
def test_init_sets_threshold(self):
"""Threshold is set correctly."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=60)
assert monitor.threshold_db == 60
def test_init_defaults(self):
"""Default values are set correctly."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert monitor.threshold_db == 50
def test_init_db_starts_at_negative(self):
"""_db starts at negative value."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert monitor.db == -99.0
class TestMicMonitorProperties:
"""Tests for MicMonitor properties."""
def test_excess_returns_positive_when_above_threshold(self):
"""excess returns positive value when above threshold."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=50)
with patch.object(monitor, "_db", 60.0):
assert monitor.excess == 10.0
def test_excess_returns_zero_when_below_threshold(self):
"""excess returns zero when below threshold."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=50)
with patch.object(monitor, "_db", 40.0):
assert monitor.excess == 0.0
class TestMicMonitorAvailable:
"""Tests for MicMonitor.available property."""
def test_available_is_bool(self):
"""available returns a boolean."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert isinstance(monitor.available, bool)
class TestMicMonitorStop:
"""Tests for MicMonitor.stop method."""
def test_stop_does_nothing_when_no_stream(self):
"""stop() does nothing if no stream exists."""
from engine.mic import MicMonitor
monitor = MicMonitor()
monitor.stop()
assert monitor._stream is None
class TestMicMonitorEventEmission:
"""Tests for MicMonitor event emission."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def callback(e):
return None
monitor.subscribe(callback)
assert callback in monitor._subscribers
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def callback(e):
return None
monitor.subscribe(callback)
monitor.unsubscribe(callback)
assert callback not in monitor._subscribers
def test_emit_calls_subscribers(self):
"""_emit() calls all subscribers."""
from engine.mic import MicMonitor
monitor = MicMonitor()
received = []
def callback(event):
received.append(event)
monitor.subscribe(callback)
event = MicLevelEvent(
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
)
monitor._emit(event)
assert len(received) == 1
assert received[0].db_level == 60.0
def test_emit_handles_subscriber_exception(self):
"""_emit() handles exceptions in subscribers gracefully."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def bad_callback(event):
raise RuntimeError("test")
monitor.subscribe(bad_callback)
event = MicLevelEvent(
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
)
monitor._emit(event)

122
tests/test_ntfy.py Normal file
View File

@@ -0,0 +1,122 @@
"""
Tests for engine.ntfy module.
"""
import time
from unittest.mock import MagicMock, patch
from engine.events import NtfyMessageEvent
from engine.ntfy import NtfyPoller
class TestNtfyPollerInit:
"""Tests for NtfyPoller initialization."""
def test_init_sets_defaults(self):
"""Default values are set correctly."""
poller = NtfyPoller("http://example.com/topic")
assert poller.topic_url == "http://example.com/topic"
assert poller.reconnect_delay == 5
assert poller.display_secs == 30
def test_init_custom_values(self):
"""Custom values are set correctly."""
poller = NtfyPoller(
"http://example.com/topic", reconnect_delay=10, display_secs=60
)
assert poller.reconnect_delay == 10
assert poller.display_secs == 60
class TestNtfyPollerStart:
"""Tests for NtfyPoller.start method."""
@patch("engine.ntfy.threading.Thread")
def test_start_creates_daemon_thread(self, mock_thread):
"""start() creates and starts a daemon thread."""
mock_thread_instance = MagicMock()
mock_thread.return_value = mock_thread_instance
poller = NtfyPoller("http://example.com/topic")
result = poller.start()
assert result is True
mock_thread.assert_called_once()
args, kwargs = mock_thread.call_args
assert kwargs.get("daemon") is True
mock_thread_instance.start.assert_called_once()
class TestNtfyPollerGetActiveMessage:
"""Tests for NtfyPoller.get_active_message method."""
def test_returns_none_when_no_message(self):
"""Returns None when no message has been received."""
poller = NtfyPoller("http://example.com/topic")
result = poller.get_active_message()
assert result is None
class TestNtfyPollerDismiss:
"""Tests for NtfyPoller.dismiss method."""
def test_dismiss_clears_message(self):
"""dismiss() clears the current message."""
poller = NtfyPoller("http://example.com/topic")
with patch.object(poller, "_lock"):
poller._message = ("Title", "Body", time.monotonic())
poller.dismiss()
assert poller._message is None
class TestNtfyPollerEventEmission:
"""Tests for NtfyPoller event emission."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback."""
poller = NtfyPoller("http://example.com/topic")
def callback(e):
return None
poller.subscribe(callback)
assert callback in poller._subscribers
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
poller = NtfyPoller("http://example.com/topic")
def callback(e):
return None
poller.subscribe(callback)
poller.unsubscribe(callback)
assert callback not in poller._subscribers
def test_emit_calls_subscribers(self):
"""_emit() calls all subscribers."""
poller = NtfyPoller("http://example.com/topic")
received = []
def callback(event):
received.append(event)
poller.subscribe(callback)
event = NtfyMessageEvent(title="Test", body="Body")
poller._emit(event)
assert len(received) == 1
assert received[0].title == "Test"
def test_emit_handles_subscriber_exception(self):
"""_emit() handles exceptions in subscribers gracefully."""
poller = NtfyPoller("http://example.com/topic")
def bad_callback(event):
raise RuntimeError("test")
poller.subscribe(bad_callback)
event = NtfyMessageEvent(title="Test", body="Body")
poller._emit(event)

93
tests/test_sources.py Normal file
View File

@@ -0,0 +1,93 @@
"""
Tests for engine.sources module - data validation.
"""
from engine import sources
class TestFeeds:
"""Tests for FEEDS data."""
def test_feeds_is_dict(self):
"""FEEDS is a dictionary."""
assert isinstance(sources.FEEDS, dict)
def test_feeds_has_entries(self):
"""FEEDS has feed entries."""
assert len(sources.FEEDS) > 0
def test_feeds_have_valid_urls(self):
"""All feeds have valid URL format."""
for name, url in sources.FEEDS.items():
assert name
assert url.startswith("http://") or url.startswith("https://")
class TestPoetrySources:
"""Tests for POETRY_SOURCES data."""
def test_poetry_is_dict(self):
"""POETRY_SOURCES is a dictionary."""
assert isinstance(sources.POETRY_SOURCES, dict)
def test_poetry_has_entries(self):
"""POETRY_SOURCES has entries."""
assert len(sources.POETRY_SOURCES) > 0
def test_poetry_have_gutenberg_urls(self):
"""All poetry sources are from Gutenberg."""
for _name, url in sources.POETRY_SOURCES.items():
assert "gutenberg.org" in url
class TestSourceLangs:
"""Tests for SOURCE_LANGS mapping."""
def test_source_langs_is_dict(self):
"""SOURCE_LANGS is a dictionary."""
assert isinstance(sources.SOURCE_LANGS, dict)
def test_source_langs_valid_language_codes(self):
"""Language codes are valid ISO codes."""
valid_codes = {"de", "fr", "ja", "zh-cn", "ar", "hi"}
for code in sources.SOURCE_LANGS.values():
assert code in valid_codes
class TestLocationLangs:
"""Tests for LOCATION_LANGS mapping."""
def test_location_langs_is_dict(self):
"""LOCATION_LANGS is a dictionary."""
assert isinstance(sources.LOCATION_LANGS, dict)
def test_location_langs_has_patterns(self):
"""LOCATION_LANGS has regex patterns."""
assert len(sources.LOCATION_LANGS) > 0
class TestScriptFonts:
"""Tests for SCRIPT_FONTS mapping."""
def test_script_fonts_is_dict(self):
"""SCRIPT_FONTS is a dictionary."""
assert isinstance(sources.SCRIPT_FONTS, dict)
def test_script_fonts_has_paths(self):
"""All script fonts have paths."""
for _lang, path in sources.SCRIPT_FONTS.items():
assert path
class TestNoUpper:
"""Tests for NO_UPPER set."""
def test_no_upper_is_set(self):
"""NO_UPPER is a set."""
assert isinstance(sources.NO_UPPER, set)
def test_no_upper_contains_scripts(self):
"""NO_UPPER contains non-Latin scripts."""
assert "zh-cn" in sources.NO_UPPER
assert "ja" in sources.NO_UPPER
assert "ar" in sources.NO_UPPER

130
tests/test_terminal.py Normal file
View File

@@ -0,0 +1,130 @@
"""
Tests for engine.terminal module.
"""
import io
import sys
from unittest.mock import patch
from engine import terminal
class TestTerminalDimensions:
"""Tests for terminal width/height functions."""
def test_tw_returns_columns(self):
"""tw() returns terminal width."""
with (
patch.object(sys.stdout, "isatty", return_value=True),
patch("os.get_terminal_size") as mock_size,
):
mock_size.return_value = io.StringIO("columns=120")
mock_size.columns = 120
result = terminal.tw()
assert isinstance(result, int)
def test_th_returns_lines(self):
"""th() returns terminal height."""
with (
patch.object(sys.stdout, "isatty", return_value=True),
patch("os.get_terminal_size") as mock_size,
):
mock_size.return_value = io.StringIO("lines=30")
mock_size.lines = 30
result = terminal.th()
assert isinstance(result, int)
def test_tw_fallback_on_error(self):
"""tw() falls back to 80 on error."""
with patch("os.get_terminal_size", side_effect=OSError):
result = terminal.tw()
assert result == 80
def test_th_fallback_on_error(self):
"""th() falls back to 24 on error."""
with patch("os.get_terminal_size", side_effect=OSError):
result = terminal.th()
assert result == 24
class TestANSICodes:
"""Tests for ANSI escape code constants."""
def test_ansi_constants_exist(self):
"""All ANSI constants are defined."""
assert terminal.RST == "\033[0m"
assert terminal.BOLD == "\033[1m"
assert terminal.DIM == "\033[2m"
def test_green_shades_defined(self):
"""Green gradient colors are defined."""
assert terminal.G_HI == "\033[38;5;46m"
assert terminal.G_MID == "\033[38;5;34m"
assert terminal.G_LO == "\033[38;5;22m"
def test_white_shades_defined(self):
"""White/gray tones are defined."""
assert terminal.W_COOL == "\033[38;5;250m"
assert terminal.W_DIM == "\033[2;38;5;245m"
def test_cursor_controls_defined(self):
"""Cursor control codes are defined."""
assert "?" in terminal.CURSOR_OFF
assert "?" in terminal.CURSOR_ON
class TestTypeOut:
"""Tests for type_out function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_type_out_writes_text(self, mock_sleep, mock_stdout):
"""type_out writes text to stdout."""
with patch("random.random", return_value=0.5):
terminal.type_out("Hi", color=terminal.G_HI)
output = mock_stdout.getvalue()
assert len(output) > 0
@patch("time.sleep")
def test_type_out_uses_color(self, mock_sleep):
"""type_out applies color codes."""
with (
patch("sys.stdout", new_callable=io.StringIO),
patch("random.random", return_value=0.5),
):
terminal.type_out("Test", color=terminal.G_HI)
class TestSlowPrint:
"""Tests for slow_print function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_slow_print_writes_text(self, mock_sleep, mock_stdout):
"""slow_print writes text to stdout."""
terminal.slow_print("Hi", color=terminal.G_DIM, delay=0)
output = mock_stdout.getvalue()
assert len(output) > 0
class TestBootLn:
"""Tests for boot_ln function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_boot_ln_writes_label_and_status(self, mock_sleep, mock_stdout):
"""boot_ln shows label and status."""
with patch("random.uniform", return_value=0):
terminal.boot_ln("Loading", "OK", ok=True)
output = mock_stdout.getvalue()
assert "Loading" in output
assert "OK" in output
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_boot_ln_error_status(self, mock_sleep, mock_stdout):
"""boot_ln shows red for error status."""
with patch("random.uniform", return_value=0):
terminal.boot_ln("Loading", "FAIL", ok=False)
output = mock_stdout.getvalue()
assert "FAIL" in output

95
tests/test_types.py Normal file
View File

@@ -0,0 +1,95 @@
"""
Tests for engine.types module.
"""
from engine.types import (
Block,
FetchResult,
HeadlineItem,
items_to_tuples,
tuples_to_items,
)
class TestHeadlineItem:
"""Tests for HeadlineItem dataclass."""
def test_create_headline_item(self):
"""Can create HeadlineItem with required fields."""
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
assert item.title == "Test"
assert item.source == "Source"
assert item.timestamp == "12:00"
def test_to_tuple(self):
"""to_tuple returns correct tuple."""
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
assert item.to_tuple() == ("Test", "Source", "12:00")
def test_from_tuple(self):
"""from_tuple creates HeadlineItem from tuple."""
item = HeadlineItem.from_tuple(("Test", "Source", "12:00"))
assert item.title == "Test"
assert item.source == "Source"
assert item.timestamp == "12:00"
class TestItemsConversion:
"""Tests for list conversion functions."""
def test_items_to_tuples(self):
"""Converts list of HeadlineItem to list of tuples."""
items = [
HeadlineItem(title="A", source="S", timestamp="10:00"),
HeadlineItem(title="B", source="T", timestamp="11:00"),
]
result = items_to_tuples(items)
assert result == [("A", "S", "10:00"), ("B", "T", "11:00")]
def test_tuples_to_items(self):
"""Converts list of tuples to list of HeadlineItem."""
tuples = [("A", "S", "10:00"), ("B", "T", "11:00")]
result = tuples_to_items(tuples)
assert len(result) == 2
assert result[0].title == "A"
assert result[1].title == "B"
class TestFetchResult:
"""Tests for FetchResult dataclass."""
def test_create_fetch_result(self):
"""Can create FetchResult."""
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
result = FetchResult(items=items, linked=1, failed=0)
assert len(result.items) == 1
assert result.linked == 1
assert result.failed == 0
def test_to_legacy_tuple(self):
"""to_legacy_tuple returns correct format."""
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
result = FetchResult(items=items, linked=1, failed=0)
legacy = result.to_legacy_tuple()
assert legacy[0] == [("Test", "Source", "12:00")]
assert legacy[1] == 1
assert legacy[2] == 0
class TestBlock:
"""Tests for Block dataclass."""
def test_create_block(self):
"""Can create Block."""
block = Block(
content=["line1", "line2"], color="\033[38;5;46m", meta_row_index=1
)
assert len(block.content) == 2
assert block.color == "\033[38;5;46m"
assert block.meta_row_index == 1
def test_to_legacy_tuple(self):
"""to_legacy_tuple returns correct format."""
block = Block(content=["line1"], color="green", meta_row_index=0)
legacy = block.to_legacy_tuple()
assert legacy == (["line1"], "green", 0)

64
tests/test_viewport.py Normal file
View File

@@ -0,0 +1,64 @@
"""
Tests for engine.viewport module.
"""
from engine import viewport
class TestViewportTw:
"""Tests for tw() function."""
def test_tw_returns_int(self):
"""tw() returns an integer."""
result = viewport.tw()
assert isinstance(result, int)
def test_tw_positive(self):
"""tw() returns a positive value."""
assert viewport.tw() > 0
class TestViewportTh:
"""Tests for th() function."""
def test_th_returns_int(self):
"""th() returns an integer."""
result = viewport.th()
assert isinstance(result, int)
def test_th_positive(self):
"""th() returns a positive value."""
assert viewport.th() > 0
class TestViewportMoveTo:
"""Tests for move_to() function."""
def test_move_to_format(self):
"""move_to() returns correctly formatted ANSI escape."""
result = viewport.move_to(5, 10)
assert result == "\033[5;10H"
def test_move_to_default_col(self):
"""move_to() defaults to column 1."""
result = viewport.move_to(5)
assert result == "\033[5;1H"
class TestViewportClearScreen:
"""Tests for clear_screen() function."""
def test_clear_screen_format(self):
"""clear_screen() returns clear screen ANSI escape."""
result = viewport.clear_screen()
assert "\033[2J" in result
assert "\033[H" in result
class TestViewportClearLine:
"""Tests for clear_line() function."""
def test_clear_line_format(self):
"""clear_line() returns clear line ANSI escape."""
result = viewport.clear_line()
assert result == "\033[K"