7 Commits

Author SHA1 Message Date
f6f177590b Merge pull request 'Modernize project with uv, pytest, ruff, and git hooks' (#21) from enhance_portability into main
Reviewed-on: #21
2026-03-15 23:21:35 +00:00
9ae4dc2b07 fix: update ntfy tests for SSE API (reconnect_delay) 2026-03-15 15:16:37 -07:00
1ac2dec3b0 fix: use native hk staging in pre-commit hook
fix: add explicit check command to pre-push hook
2026-03-15 15:16:37 -07:00
757c854584 fix: apply ruff auto-fixes and add hk git hooks
- Fix pre-existing lint errors in engine/ modules using ruff --unsafe-fixes
- Add hk.pkl with pre-commit and pre-push hooks using ruff builtin
- Configure hooks to use 'uv run' prefix for tool execution
- Update mise.toml to include hk and pkl tools
- All 73 tests pass

fix: apply ruff auto-fixes and add hk git hooks

- Fix pre-existing lint errors in engine/ modules using ruff --unsafe-fixes
- Add hk.pkl with pre-commit and pre-push hooks using ruff builtin
- Configure hooks to use 'uv run' prefix for tool execution
- Update mise.toml to include hk and pkl tools
- Use 'hk install --mise' for proper mise integration
- All 73 tests pass
2026-03-15 15:16:37 -07:00
4844a64203 style: apply ruff auto-fixes across codebase
- Fix import sorting (isort) across all engine modules
- Fix SIM105 try-except-pass patterns (contextlib.suppress)
- Fix nested with statements in tests
- Fix unused loop variables

Run 'uv run pytest' to verify tests still pass.
2026-03-15 15:16:37 -07:00
9201117096 feat: modernize project with uv, add pytest test suite
- Add pyproject.toml with modern Python packaging (PEP 517/518)
- Add uv-based dependency management replacing inline venv bootstrap
- Add requirements.txt and requirements-dev.txt for compatibility
- Add mise.toml with dev tasks (test, lint, run, sync, ci)
- Add .python-version pinned to Python 3.12
- Add comprehensive pytest test suite (73 tests) for:
  - engine/config, filter, terminal, sources, mic, ntfy modules
- Configure pytest with coverage reporting (16% total, 100% on tested modules)
- Configure ruff for linting with Python 3.10+ target
- Remove redundant venv bootstrap code from mainline.py
- Update .gitignore for uv/venv artifacts

Run 'uv sync' to install dependencies, 'uv run pytest' to test.
2026-03-15 15:16:37 -07:00
d758541156 Merge pull request 'feat: migrate Ntfy message retrieval from polling to SSE streaming, replacing poll_interval with reconnect_delay for continuous updates.' (#20) from feat/ntfy-sse into main
Reviewed-on: #20
2026-03-15 20:50:08 +00:00
28 changed files with 1110 additions and 243 deletions

7
.gitignore vendored
View File

@@ -1,4 +1,11 @@
__pycache__/ __pycache__/
*.pyc *.pyc
.mainline_venv/ .mainline_venv/
.venv/
uv.lock
.mainline_cache_*.json .mainline_cache_*.json
.DS_Store
htmlcov/
.coverage
.pytest_cache/
*.egg-info/

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.12

View File

@@ -155,3 +155,4 @@ msg = poller.get_active_message() # returns (title, body, timestamp) or None
--- ---
*macOS only (script/system font paths for translation are hardcoded). Primary display font is user-selectable via the bundled `fonts/` picker. Python 3.9+.* *macOS only (script/system font paths for translation are hardcoded). Primary display font is user-selectable via the bundled `fonts/` picker. Python 3.9+.*
# test

View File

@@ -2,23 +2,33 @@
Application orchestrator — boot sequence, signal handling, main loop wiring. Application orchestrator — boot sequence, signal handling, main loop wiring.
""" """
import sys
import os
import time
import signal
import atexit import atexit
import os
import signal
import sys
import termios import termios
import time
import tty import tty
from engine import config, render from engine import config, render
from engine.terminal import (
RST, G_HI, G_MID, G_DIM, W_DIM, W_GHOST, CLR, CURSOR_OFF, CURSOR_ON, tw,
slow_print, boot_ln,
)
from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache
from engine.ntfy import NtfyPoller
from engine.mic import MicMonitor from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream from engine.scroll import stream
from engine.terminal import (
CLR,
CURSOR_OFF,
CURSOR_ON,
G_DIM,
G_HI,
G_MID,
RST,
W_DIM,
W_GHOST,
boot_ln,
slow_print,
tw,
)
TITLE = [ TITLE = [
" ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗", " ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗",
@@ -29,6 +39,7 @@ TITLE = [
" ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝", " ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝",
] ]
def _read_picker_key(): def _read_picker_key():
ch = sys.stdin.read(1) ch = sys.stdin.read(1)
if ch == "\x03": if ch == "\x03":
@@ -53,6 +64,7 @@ def _read_picker_key():
return "enter" return "enter"
return None return None
def _normalize_preview_rows(rows): def _normalize_preview_rows(rows):
"""Trim shared left padding and trailing spaces for stable on-screen previews.""" """Trim shared left padding and trailing spaces for stable on-screen previews."""
non_empty = [r for r in rows if r.strip()] non_empty = [r for r in rows if r.strip()]
@@ -99,7 +111,9 @@ def _draw_font_picker(faces, selected):
active = pos == selected active = pos == selected
pointer = "" if active else " " pointer = "" if active else " "
color = G_HI if active else W_DIM color = G_HI if active else W_DIM
print(f" {color}{pointer} {face['name']}{RST}{W_GHOST} · {face['file_name']}{RST}") print(
f" {color}{pointer} {face['name']}{RST}{W_GHOST} · {face['file_name']}{RST}"
)
if top > 0: if top > 0:
print(f" {W_GHOST}{top} above{RST}") print(f" {W_GHOST}{top} above{RST}")
@@ -116,6 +130,7 @@ def _draw_font_picker(faces, selected):
shown = row[:max_preview_w] shown = row[:max_preview_w]
print(f" {shown}") print(f" {shown}")
def pick_font_face(): def pick_font_face():
"""Interactive startup picker for selecting a face from repo OTF files.""" """Interactive startup picker for selecting a face from repo OTF files."""
if not config.FONT_PICKER: if not config.FONT_PICKER:
@@ -225,7 +240,9 @@ def pick_font_face():
font_index=selected_font["font_index"], font_index=selected_font["font_index"],
) )
render.clear_font_cache() render.clear_font_cache()
print(f" {G_DIM}> using {selected_font['name']} ({selected_font['file_name']}){RST}") print(
f" {G_DIM}> using {selected_font['name']} ({selected_font['file_name']}){RST}"
)
time.sleep(0.8) time.sleep(0.8)
print(CLR, end="") print(CLR, end="")
print(CURSOR_OFF, end="") print(CURSOR_OFF, end="")
@@ -255,23 +272,29 @@ def main():
time.sleep(0.07) time.sleep(0.07)
print() print()
_subtitle = "literary consciousness stream" if config.MODE == 'poetry' else "digital consciousness stream" _subtitle = (
"literary consciousness stream"
if config.MODE == "poetry"
else "digital consciousness stream"
)
print(f" {W_DIM}v0.1 · {_subtitle}{RST}") print(f" {W_DIM}v0.1 · {_subtitle}{RST}")
print(f" {W_GHOST}{'' * (w - 4)}{RST}") print(f" {W_GHOST}{'' * (w - 4)}{RST}")
print() print()
time.sleep(0.4) time.sleep(0.4)
cached = load_cache() if '--refresh' not in sys.argv else None cached = load_cache() if "--refresh" not in sys.argv else None
if cached: if cached:
items = cached items = cached
boot_ln("Cache", f"LOADED [{len(items)} SIGNALS]", True) boot_ln("Cache", f"LOADED [{len(items)} SIGNALS]", True)
elif config.MODE == 'poetry': elif config.MODE == "poetry":
slow_print(" > INITIALIZING LITERARY CORPUS...\n") slow_print(" > INITIALIZING LITERARY CORPUS...\n")
time.sleep(0.2) time.sleep(0.2)
print() print()
items, linked, failed = fetch_poetry() items, linked, failed = fetch_poetry()
print() print()
print(f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}") print(
f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}"
)
print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}") print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}")
save_cache(items) save_cache(items)
else: else:
@@ -280,7 +303,9 @@ def main():
print() print()
items, linked, failed = fetch_all() items, linked, failed = fetch_all()
print() print()
print(f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}") print(
f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}"
)
print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}") print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}")
save_cache(items) save_cache(items)
@@ -292,7 +317,13 @@ def main():
mic = MicMonitor(threshold_db=config.MIC_THRESHOLD_DB) mic = MicMonitor(threshold_db=config.MIC_THRESHOLD_DB)
mic_ok = mic.start() mic_ok = mic.start()
if mic.available: if mic.available:
boot_ln("Microphone", "ACTIVE" if mic_ok else "OFFLINE · check System Settings → Privacy → Microphone", bool(mic_ok)) boot_ln(
"Microphone",
"ACTIVE"
if mic_ok
else "OFFLINE · check System Settings → Privacy → Microphone",
bool(mic_ok),
)
ntfy = NtfyPoller( ntfy = NtfyPoller(
config.NTFY_TOPIC, config.NTFY_TOPIC,

View File

@@ -3,8 +3,8 @@ Configuration constants, CLI flags, and glyph tables.
""" """
import sys import sys
from pathlib import Path from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parent.parent _REPO_ROOT = Path(__file__).resolve().parent.parent
_FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"} _FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"}
@@ -51,40 +51,42 @@ def _list_font_files(font_dir):
def list_repo_font_files(): def list_repo_font_files():
"""Public helper for discovering repository font files.""" """Public helper for discovering repository font files."""
return _list_font_files(FONT_DIR) return _list_font_files(FONT_DIR)
# ─── RUNTIME ────────────────────────────────────────────── # ─── RUNTIME ──────────────────────────────────────────────
HEADLINE_LIMIT = 1000 HEADLINE_LIMIT = 1000
FEED_TIMEOUT = 10 FEED_TIMEOUT = 10
MIC_THRESHOLD_DB = 50 # dB above which glitches intensify MIC_THRESHOLD_DB = 50 # dB above which glitches intensify
MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news' MODE = "poetry" if "--poetry" in sys.argv or "-p" in sys.argv else "news"
FIREHOSE = '--firehose' in sys.argv FIREHOSE = "--firehose" in sys.argv
# ─── NTFY MESSAGE QUEUE ────────────────────────────────── # ─── NTFY MESSAGE QUEUE ──────────────────────────────────
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json" NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json"
NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream NTFY_RECONNECT_DELAY = 5 # seconds before reconnecting after a dropped stream
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen
# ─── FONT RENDERING ────────────────────────────────────── # ─── FONT RENDERING ──────────────────────────────────────
FONT_DIR = _resolve_font_path(_arg_value('--font-dir') or "fonts") FONT_DIR = _resolve_font_path(_arg_value("--font-dir") or "fonts")
_FONT_FILE_ARG = _arg_value('--font-file') _FONT_FILE_ARG = _arg_value("--font-file")
_FONT_FILES = _list_font_files(FONT_DIR) _FONT_FILES = _list_font_files(FONT_DIR)
FONT_PATH = ( FONT_PATH = (
_resolve_font_path(_FONT_FILE_ARG) _resolve_font_path(_FONT_FILE_ARG)
if _FONT_FILE_ARG if _FONT_FILE_ARG
else (_FONT_FILES[0] if _FONT_FILES else "") else (_FONT_FILES[0] if _FONT_FILES else "")
) )
FONT_INDEX = max(0, _arg_int('--font-index', 0)) FONT_INDEX = max(0, _arg_int("--font-index", 0))
FONT_PICKER = '--no-font-picker' not in sys.argv FONT_PICKER = "--no-font-picker" not in sys.argv
FONT_SZ = 60 FONT_SZ = 60
RENDER_H = 8 # terminal rows per rendered text line RENDER_H = 8 # terminal rows per rendered text line
# ─── FONT RENDERING (ADVANCED) ──────────────────────────── # ─── FONT RENDERING (ADVANCED) ────────────────────────────
SSAA = 4 # super-sampling factor: render at SSAA× then downsample SSAA = 4 # super-sampling factor: render at SSAA× then downsample
# ─── SCROLL / FRAME ────────────────────────────────────── # ─── SCROLL / FRAME ──────────────────────────────────────
SCROLL_DUR = 5.625 # seconds per headline (2/3 original speed) SCROLL_DUR = 5.625 # seconds per headline (2/3 original speed)
FRAME_DT = 0.05 # 50ms base frame rate (20 FPS) FRAME_DT = 0.05 # 50ms base frame rate (20 FPS)
FIREHOSE_H = 12 # firehose zone height (terminal rows) FIREHOSE_H = 12 # firehose zone height (terminal rows)
GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep) GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep)
# ─── GLYPHS ─────────────────────────────────────────────── # ─── GLYPHS ───────────────────────────────────────────────
GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"

View File

@@ -7,8 +7,8 @@ import random
from datetime import datetime from datetime import datetime
from engine import config from engine import config
from engine.terminal import RST, DIM, G_LO, G_DIM, W_GHOST, C_DIM
from engine.sources import FEEDS, POETRY_SOURCES from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import C_DIM, DIM, G_DIM, G_LO, RST, W_GHOST
def noise(w): def noise(w):
@@ -34,23 +34,23 @@ def fade_line(s, fade):
if fade >= 1.0: if fade >= 1.0:
return s return s
if fade <= 0.0: if fade <= 0.0:
return '' return ""
result = [] result = []
i = 0 i = 0
while i < len(s): while i < len(s):
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[': if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2 j = i + 2
while j < len(s) and not s[j].isalpha(): while j < len(s) and not s[j].isalpha():
j += 1 j += 1
result.append(s[i:j + 1]) result.append(s[i : j + 1])
i = j + 1 i = j + 1
elif s[i] == ' ': elif s[i] == " ":
result.append(' ') result.append(" ")
i += 1 i += 1
else: else:
result.append(s[i] if random.random() < fade else ' ') result.append(s[i] if random.random() < fade else " ")
i += 1 i += 1
return ''.join(result) return "".join(result)
def vis_trunc(s, w): def vis_trunc(s, w):
@@ -61,17 +61,17 @@ def vis_trunc(s, w):
while i < len(s): while i < len(s):
if vw >= w: if vw >= w:
break break
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[': if s[i] == "\033" and i + 1 < len(s) and s[i + 1] == "[":
j = i + 2 j = i + 2
while j < len(s) and not s[j].isalpha(): while j < len(s) and not s[j].isalpha():
j += 1 j += 1
result.append(s[i:j + 1]) result.append(s[i : j + 1])
i = j + 1 i = j + 1
else: else:
result.append(s[i]) result.append(s[i])
vw += 1 vw += 1
i += 1 i += 1
return ''.join(result) return "".join(result)
def next_headline(pool, items, seen): def next_headline(pool, items, seen):
@@ -94,7 +94,7 @@ def firehose_line(items, w):
if r < 0.35: if r < 0.35:
# Raw headline text # Raw headline text
title, src, ts = random.choice(items) title, src, ts = random.choice(items)
text = title[:w - 1] text = title[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM]) color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
return f"{color}{text}{RST}" return f"{color}{text}{RST}"
elif r < 0.55: elif r < 0.55:
@@ -103,12 +103,13 @@ def firehose_line(items, w):
return "".join( return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(config.GLITCH + config.KATA)}{RST}" f"{random.choice(config.GLITCH + config.KATA)}{RST}"
if random.random() < d else " " if random.random() < d
else " "
for _ in range(w) for _ in range(w)
) )
elif r < 0.78: elif r < 0.78:
# Status / program output # Status / program output
sources = FEEDS if config.MODE == 'news' else POETRY_SOURCES sources = FEEDS if config.MODE == "news" else POETRY_SOURCES
src = random.choice(list(sources.keys())) src = random.choice(list(sources.keys()))
msgs = [ msgs = [
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}", f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
@@ -118,16 +119,16 @@ def firehose_line(items, w):
f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM " f" {''.join(random.choice(config.KATA) for _ in range(3))} STRM "
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}", f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
] ]
text = random.choice(msgs)[:w - 1] text = random.choice(msgs)[: w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST]) color = random.choice([G_LO, G_DIM, W_GHOST])
return f"{color}{text}{RST}" return f"{color}{text}{RST}"
else: else:
# Headline fragment with glitch prefix # Headline fragment with glitch prefix
title, _, _ = random.choice(items) title, _, _ = random.choice(items)
start = random.randint(0, max(0, len(title) - 20)) start = random.randint(0, max(0, len(title) - 20))
frag = title[start:start + random.randint(10, 35)] frag = title[start : start + random.randint(10, 35)]
pad = random.randint(0, max(0, w - len(frag) - 8)) pad = random.randint(0, max(0, w - len(frag) - 8))
gp = ''.join(random.choice(config.GLITCH) for _ in range(random.randint(1, 3))) gp = "".join(random.choice(config.GLITCH) for _ in range(random.randint(1, 3)))
text = (' ' * pad + gp + ' ' + frag)[:w - 1] text = (" " * pad + gp + " " + frag)[: w - 1]
color = random.choice([G_LO, C_DIM, W_GHOST]) color = random.choice([G_LO, C_DIM, W_GHOST])
return f"{color}{text}{RST}" return f"{color}{text}{RST}"

View File

@@ -3,19 +3,20 @@ RSS feed fetching, Project Gutenberg parsing, and headline caching.
Depends on: config, sources, filter, terminal. Depends on: config, sources, filter, terminal.
""" """
import re
import json import json
import pathlib import pathlib
import re
import urllib.request import urllib.request
from datetime import datetime from datetime import datetime
import feedparser import feedparser
from engine import config from engine import config
from engine.filter import skip, strip_tags
from engine.sources import FEEDS, POETRY_SOURCES from engine.sources import FEEDS, POETRY_SOURCES
from engine.filter import strip_tags, skip
from engine.terminal import boot_ln from engine.terminal import boot_ln
# ─── SINGLE FEED ────────────────────────────────────────── # ─── SINGLE FEED ──────────────────────────────────────────
def fetch_feed(url): def fetch_feed(url):
try: try:
@@ -63,26 +64,31 @@ def _fetch_gutenberg(url, label):
try: try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=15) resp = urllib.request.urlopen(req, timeout=15)
text = resp.read().decode('utf-8', errors='replace').replace('\r\n', '\n').replace('\r', '\n') text = (
resp.read()
.decode("utf-8", errors="replace")
.replace("\r\n", "\n")
.replace("\r", "\n")
)
# Strip PG boilerplate # Strip PG boilerplate
m = re.search(r'\*\*\*\s*START OF[^\n]*\n', text) m = re.search(r"\*\*\*\s*START OF[^\n]*\n", text)
if m: if m:
text = text[m.end():] text = text[m.end() :]
m = re.search(r'\*\*\*\s*END OF', text) m = re.search(r"\*\*\*\s*END OF", text)
if m: if m:
text = text[:m.start()] text = text[: m.start()]
# Split on blank lines into stanzas/passages # Split on blank lines into stanzas/passages
blocks = re.split(r'\n{2,}', text.strip()) blocks = re.split(r"\n{2,}", text.strip())
items = [] items = []
for blk in blocks: for blk in blocks:
blk = ' '.join(blk.split()) # flatten to one line blk = " ".join(blk.split()) # flatten to one line
if len(blk) < 20 or len(blk) > 280: if len(blk) < 20 or len(blk) > 280:
continue continue
if blk.isupper(): # skip all-caps headers if blk.isupper(): # skip all-caps headers
continue continue
if re.match(r'^[IVXLCDM]+\.?\s*$', blk): # roman numerals if re.match(r"^[IVXLCDM]+\.?\s*$", blk): # roman numerals
continue continue
items.append((blk, label, '')) items.append((blk, label, ""))
return items return items
except Exception: except Exception:
return [] return []

View File

@@ -29,29 +29,29 @@ def strip_tags(html):
# ─── CONTENT FILTER ─────────────────────────────────────── # ─── CONTENT FILTER ───────────────────────────────────────
_SKIP_RE = re.compile( _SKIP_RE = re.compile(
r'\b(?:' r"\b(?:"
# ── sports ── # ── sports ──
r'football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|' r"football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|"
r'hockey|lacrosse|volleyball|badminton|' r"hockey|lacrosse|volleyball|badminton|"
r'nba|nfl|nhl|mlb|mls|fifa|uefa|' r"nba|nfl|nhl|mlb|mls|fifa|uefa|"
r'premier league|champions league|la liga|serie a|bundesliga|' r"premier league|champions league|la liga|serie a|bundesliga|"
r'world cup|super bowl|world series|stanley cup|' r"world cup|super bowl|world series|stanley cup|"
r'playoff|playoffs|touchdown|goalkeeper|striker|quarterback|' r"playoff|playoffs|touchdown|goalkeeper|striker|quarterback|"
r'slam dunk|home run|grand slam|offside|halftime|' r"slam dunk|home run|grand slam|offside|halftime|"
r'batting|wicket|innings|' r"batting|wicket|innings|"
r'formula 1|nascar|motogp|' r"formula 1|nascar|motogp|"
r'boxing|ufc|mma|' r"boxing|ufc|mma|"
r'marathon|tour de france|' r"marathon|tour de france|"
r'transfer window|draft pick|relegation|' r"transfer window|draft pick|relegation|"
# ── vapid / insipid ── # ── vapid / insipid ──
r'kardashian|jenner|reality tv|reality show|' r"kardashian|jenner|reality tv|reality show|"
r'influencer|viral video|tiktok|instagram|' r"influencer|viral video|tiktok|instagram|"
r'best dressed|worst dressed|red carpet|' r"best dressed|worst dressed|red carpet|"
r'horoscope|zodiac|gossip|bikini|selfie|' r"horoscope|zodiac|gossip|bikini|selfie|"
r'you won.t believe|what happened next|' r"you won.t believe|what happened next|"
r'celebrity couple|celebrity feud|baby bump' r"celebrity couple|celebrity feud|baby bump"
r')\b', r")\b",
re.IGNORECASE re.IGNORECASE,
) )

View File

@@ -6,8 +6,9 @@ Gracefully degrades if sounddevice/numpy are unavailable.
import atexit import atexit
try: try:
import sounddevice as _sd
import numpy as _np import numpy as _np
import sounddevice as _sd
_HAS_MIC = True _HAS_MIC = True
except Exception: except Exception:
_HAS_MIC = False _HAS_MIC = False
@@ -40,12 +41,15 @@ class MicMonitor:
"""Start background mic stream. Returns True on success, False/None otherwise.""" """Start background mic stream. Returns True on success, False/None otherwise."""
if not _HAS_MIC: if not _HAS_MIC:
return None return None
def _cb(indata, frames, t, status): def _cb(indata, frames, t, status):
rms = float(_np.sqrt(_np.mean(indata ** 2))) rms = float(_np.sqrt(_np.mean(indata**2)))
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0 self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
try: try:
self._stream = _sd.InputStream( self._stream = _sd.InputStream(
callback=_cb, channels=1, samplerate=44100, blocksize=2048) callback=_cb, channels=1, samplerate=44100, blocksize=2048
)
self._stream.start() self._stream.start()
atexit.register(self.stop) atexit.register(self.stop)
return True return True

View File

@@ -13,10 +13,10 @@ Reusable by any visualizer:
""" """
import json import json
import time
import threading import threading
import time
import urllib.request import urllib.request
from urllib.parse import urlparse, parse_qs, urlencode, urlunparse from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
class NtfyPoller: class NtfyPoller:
@@ -26,7 +26,7 @@ class NtfyPoller:
self.topic_url = topic_url self.topic_url = topic_url
self.reconnect_delay = reconnect_delay self.reconnect_delay = reconnect_delay
self.display_secs = display_secs self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock() self._lock = threading.Lock()
def start(self): def start(self):
@@ -55,7 +55,7 @@ class NtfyPoller:
"""Build the stream URL, substituting since= to avoid message replays on reconnect.""" """Build the stream URL, substituting since= to avoid message replays on reconnect."""
parsed = urlparse(self.topic_url) parsed = urlparse(self.topic_url)
params = parse_qs(parsed.query, keep_blank_values=True) params = parse_qs(parsed.query, keep_blank_values=True)
params['since'] = [last_id if last_id else '20s'] params["since"] = [last_id if last_id else "20s"]
new_query = urlencode({k: v[0] for k, v in params.items()}) new_query = urlencode({k: v[0] for k, v in params.items()})
return urlunparse(parsed._replace(query=new_query)) return urlunparse(parsed._replace(query=new_query))
@@ -65,7 +65,8 @@ class NtfyPoller:
try: try:
url = self._build_url(last_id) url = self._build_url(last_id)
req = urllib.request.Request( req = urllib.request.Request(
url, headers={"User-Agent": "mainline/0.1"}) url, headers={"User-Agent": "mainline/0.1"}
)
# timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats # timeout=90 keeps the socket alive through ntfy.sh keepalive heartbeats
resp = urllib.request.urlopen(req, timeout=90) resp = urllib.request.urlopen(req, timeout=90)
while True: while True:
@@ -73,7 +74,7 @@ class NtfyPoller:
if not line: if not line:
break # server closed connection — reconnect break # server closed connection — reconnect
try: try:
data = json.loads(line.decode('utf-8', errors='replace')) data = json.loads(line.decode("utf-8", errors="replace"))
except json.JSONDecodeError: except json.JSONDecodeError:
continue continue
# Advance cursor on every event (message + keepalive) to # Advance cursor on every event (message + keepalive) to

View File

@@ -4,15 +4,15 @@ Font loading, text rasterization, word-wrap, gradient coloring, headline block a
Depends on: config, terminal, sources, translate. Depends on: config, terminal, sources, translate.
""" """
import re
import random import random
import re
from pathlib import Path from pathlib import Path
from PIL import Image, ImageDraw, ImageFont from PIL import Image, ImageDraw, ImageFont
from engine import config from engine import config
from engine.sources import NO_UPPER, SCRIPT_FONTS, SOURCE_LANGS
from engine.terminal import RST from engine.terminal import RST
from engine.sources import SCRIPT_FONTS, SOURCE_LANGS, NO_UPPER
from engine.translate import detect_location_language, translate_headline from engine.translate import detect_location_language, translate_headline
# ─── GRADIENT ───────────────────────────────────────────── # ─── GRADIENT ─────────────────────────────────────────────
@@ -20,15 +20,15 @@ from engine.translate import detect_location_language, translate_headline
GRAD_COLS = [ GRAD_COLS = [
"\033[1;38;5;231m", # white "\033[1;38;5;231m", # white
"\033[1;38;5;195m", # pale cyan-white "\033[1;38;5;195m", # pale cyan-white
"\033[38;5;123m", # bright cyan "\033[38;5;123m", # bright cyan
"\033[38;5;118m", # bright lime "\033[38;5;118m", # bright lime
"\033[38;5;82m", # lime "\033[38;5;82m", # lime
"\033[38;5;46m", # bright green "\033[38;5;46m", # bright green
"\033[38;5;40m", # green "\033[38;5;40m", # green
"\033[38;5;34m", # medium green "\033[38;5;34m", # medium green
"\033[38;5;28m", # dark green "\033[38;5;28m", # dark green
"\033[38;5;22m", # deep green "\033[38;5;22m", # deep green
"\033[2;38;5;22m", # dim deep green "\033[2;38;5;22m", # dim deep green
"\033[2;38;5;235m", # near black "\033[2;38;5;235m", # near black
] ]
@@ -36,15 +36,15 @@ GRAD_COLS = [
MSG_GRAD_COLS = [ MSG_GRAD_COLS = [
"\033[1;38;5;231m", # white "\033[1;38;5;231m", # white
"\033[1;38;5;225m", # pale pink-white "\033[1;38;5;225m", # pale pink-white
"\033[38;5;219m", # bright pink "\033[38;5;219m", # bright pink
"\033[38;5;213m", # hot pink "\033[38;5;213m", # hot pink
"\033[38;5;207m", # magenta "\033[38;5;207m", # magenta
"\033[38;5;201m", # bright magenta "\033[38;5;201m", # bright magenta
"\033[38;5;165m", # orchid-red "\033[38;5;165m", # orchid-red
"\033[38;5;161m", # ruby-magenta "\033[38;5;161m", # ruby-magenta
"\033[38;5;125m", # dark magenta "\033[38;5;125m", # dark magenta
"\033[38;5;89m", # deep maroon-magenta "\033[38;5;89m", # deep maroon-magenta
"\033[2;38;5;89m", # dim deep maroon-magenta "\033[2;38;5;89m", # dim deep maroon-magenta
"\033[2;38;5;235m", # near black "\033[2;38;5;235m", # near black
] ]
@@ -62,13 +62,14 @@ def font():
f"No primary font selected. Add .otf/.ttf/.ttc files to {config.FONT_DIR}." f"No primary font selected. Add .otf/.ttf/.ttc files to {config.FONT_DIR}."
) )
key = (config.FONT_PATH, config.FONT_INDEX, config.FONT_SZ) key = (config.FONT_PATH, config.FONT_INDEX, config.FONT_SZ)
if _FONT_OBJ is None or _FONT_OBJ_KEY != key: if _FONT_OBJ is None or key != _FONT_OBJ_KEY:
_FONT_OBJ = ImageFont.truetype( _FONT_OBJ = ImageFont.truetype(
config.FONT_PATH, config.FONT_SZ, index=config.FONT_INDEX config.FONT_PATH, config.FONT_SZ, index=config.FONT_INDEX
) )
_FONT_OBJ_KEY = key _FONT_OBJ_KEY = key
return _FONT_OBJ return _FONT_OBJ
def clear_font_cache(): def clear_font_cache():
"""Reset cached font objects after changing primary font selection.""" """Reset cached font objects after changing primary font selection."""
global _FONT_OBJ, _FONT_OBJ_KEY global _FONT_OBJ, _FONT_OBJ_KEY
@@ -123,7 +124,7 @@ def render_line(text, fnt=None):
pad = 4 pad = 4
img_w = bbox[2] - bbox[0] + pad * 2 img_w = bbox[2] - bbox[0] + pad * 2
img_h = bbox[3] - bbox[1] + pad * 2 img_h = bbox[3] - bbox[1] + pad * 2
img = Image.new('L', (img_w, img_h), 0) img = Image.new("L", (img_w, img_h), 0)
draw = ImageDraw.Draw(img) draw = ImageDraw.Draw(img)
draw.text((-bbox[0] + pad, -bbox[1] + pad), text, fill=255, font=fnt) draw.text((-bbox[0] + pad, -bbox[1] + pad), text, fill=255, font=fnt)
pix_h = config.RENDER_H * 2 pix_h = config.RENDER_H * 2
@@ -200,8 +201,8 @@ def lr_gradient(rows, offset=0.0, grad_cols=None):
continue continue
buf = [] buf = []
for x, ch in enumerate(row): for x, ch in enumerate(row):
if ch == ' ': if ch == " ":
buf.append(' ') buf.append(" ")
else: else:
shifted = (x / max(max_x - 1, 1) + offset) % 1.0 shifted = (x / max(max_x - 1, 1) + offset) % 1.0
idx = min(round(shifted * (n - 1)), n - 1) idx = min(round(shifted * (n - 1)), n - 1)
@@ -218,7 +219,11 @@ def lr_gradient_opposite(rows, offset=0.0):
# ─── HEADLINE BLOCK ASSEMBLY ───────────────────────────── # ─── HEADLINE BLOCK ASSEMBLY ─────────────────────────────
def make_block(title, src, ts, w): def make_block(title, src, ts, w):
"""Render a headline into a content block with color.""" """Render a headline into a content block with color."""
target_lang = (SOURCE_LANGS.get(src) or detect_location_language(title)) if config.MODE == 'news' else None target_lang = (
(SOURCE_LANGS.get(src) or detect_location_language(title))
if config.MODE == "news"
else None
)
lang_font = font_for_lang(target_lang) lang_font = font_for_lang(target_lang)
if target_lang: if target_lang:
title = translate_headline(title, target_lang) title = translate_headline(title, target_lang)
@@ -227,28 +232,36 @@ def make_block(title, src, ts, w):
title_up = re.sub(r"\s+", " ", title) title_up = re.sub(r"\s+", " ", title)
else: else:
title_up = re.sub(r"\s+", " ", title.upper()) title_up = re.sub(r"\s+", " ", title.upper())
for old, new in [("\u2019","'"), ("\u2018","'"), ("\u201c",'"'), for old, new in [
("\u201d",'"'), ("\u2013","-"), ("\u2014","-")]: ("\u2019", "'"),
("\u2018", "'"),
("\u201c", '"'),
("\u201d", '"'),
("\u2013", "-"),
("\u2014", "-"),
]:
title_up = title_up.replace(old, new) title_up = title_up.replace(old, new)
big_rows = big_wrap(title_up, w - 4, lang_font) big_rows = big_wrap(title_up, w - 4, lang_font)
hc = random.choice([ hc = random.choice(
"\033[38;5;46m", # matrix green [
"\033[38;5;34m", # dark green "\033[38;5;46m", # matrix green
"\033[38;5;82m", # lime "\033[38;5;34m", # dark green
"\033[38;5;48m", # sea green "\033[38;5;82m", # lime
"\033[38;5;37m", # teal "\033[38;5;48m", # sea green
"\033[38;5;44m", # cyan "\033[38;5;37m", # teal
"\033[38;5;87m", # sky "\033[38;5;44m", # cyan
"\033[38;5;117m", # ice blue "\033[38;5;87m", # sky
"\033[38;5;250m", # cool white "\033[38;5;117m", # ice blue
"\033[38;5;156m", # pale green "\033[38;5;250m", # cool white
"\033[38;5;120m", # mint "\033[38;5;156m", # pale green
"\033[38;5;80m", # dark cyan "\033[38;5;120m", # mint
"\033[38;5;108m", # grey-green "\033[38;5;80m", # dark cyan
"\033[38;5;115m", # sage "\033[38;5;108m", # grey-green
"\033[1;38;5;46m", # bold green "\033[38;5;115m", # sage
"\033[1;38;5;250m", # bold white "\033[1;38;5;46m", # bold green
]) "\033[1;38;5;250m", # bold white
]
)
content = [" " + r for r in big_rows] content = [" " + r for r in big_rows]
content.append("") content.append("")
meta = f"\u2591 {src} \u00b7 {ts}" meta = f"\u2591 {src} \u00b7 {ts}"

View File

@@ -3,16 +3,23 @@ Render engine — ticker content, scroll motion, message panel, and firehose ove
Depends on: config, terminal, render, effects, ntfy, mic. Depends on: config, terminal, render, effects, ntfy, mic.
""" """
import random
import re import re
import sys import sys
import time import time
import random
from datetime import datetime from datetime import datetime
from engine import config from engine import config
from engine.terminal import RST, W_COOL, CLR, tw, th from engine.effects import (
fade_line,
firehose_line,
glitch_bar,
next_headline,
noise,
vis_trunc,
)
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite, make_block from engine.render import big_wrap, lr_gradient, lr_gradient_opposite, make_block
from engine.effects import noise, glitch_bar, fade_line, vis_trunc, next_headline, firehose_line from engine.terminal import CLR, RST, W_COOL, th, tw
def stream(items, ntfy_poller, mic_monitor): def stream(items, ntfy_poller, mic_monitor):
@@ -28,8 +35,8 @@ def stream(items, ntfy_poller, mic_monitor):
w, h = tw(), th() w, h = tw(), th()
fh = config.FIREHOSE_H if config.FIREHOSE else 0 fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh # reserve fixed firehose strip at bottom ticker_view_h = h - fh # reserve fixed firehose strip at bottom
GAP = 3 # blank rows between headlines GAP = 3 # blank rows between headlines
scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2 scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2
# Taxonomy: # Taxonomy:
@@ -39,8 +46,10 @@ def stream(items, ntfy_poller, mic_monitor):
# - firehose: fixed carriage-return style strip pinned at bottom # - firehose: fixed carriage-return style strip pinned at bottom
# Active ticker blocks: (content_rows, color, canvas_y, meta_idx) # Active ticker blocks: (content_rows, color, canvas_y, meta_idx)
active = [] active = []
scroll_cam = 0 # viewport top in virtual canvas coords scroll_cam = 0 # viewport top in virtual canvas coords
ticker_next_y = ticker_view_h # canvas-y where next block starts (off-screen bottom) ticker_next_y = (
ticker_view_h # canvas-y where next block starts (off-screen bottom)
)
noise_cache = {} noise_cache = {}
scroll_motion_accum = 0.0 scroll_motion_accum = 0.0
@@ -50,9 +59,9 @@ def stream(items, ntfy_poller, mic_monitor):
return noise_cache[cy] return noise_cache[cy]
# Message color: bright cyan/white — distinct from headline greens # Message color: bright cyan/white — distinct from headline greens
MSG_META = "\033[38;5;245m" # cool grey MSG_META = "\033[38;5;245m" # cool grey
MSG_BORDER = "\033[2;38;5;37m" # dim teal MSG_BORDER = "\033[2;38;5;37m" # dim teal
_msg_cache = (None, None) # (cache_key, rendered_rows) _msg_cache = (None, None) # (cache_key, rendered_rows)
while queued < config.HEADLINE_LIMIT or active: while queued < config.HEADLINE_LIMIT or active:
t0 = time.monotonic() t0 = time.monotonic()
@@ -77,7 +86,9 @@ def stream(items, ntfy_poller, mic_monitor):
_msg_cache = (cache_key, msg_rows) _msg_cache = (cache_key, msg_rows)
else: else:
msg_rows = _msg_cache[1] msg_rows = _msg_cache[1]
msg_rows = lr_gradient_opposite(msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0) msg_rows = lr_gradient_opposite(
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
)
# Layout: rendered text + meta + border # Layout: rendered text + meta + border
elapsed_s = int(time.monotonic() - m_ts) elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
@@ -87,19 +98,29 @@ def stream(items, ntfy_poller, mic_monitor):
row_idx = 0 row_idx = 0
for mr in msg_rows: for mr in msg_rows:
ln = vis_trunc(mr, w) ln = vis_trunc(mr, w)
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K") msg_overlay.append(
f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K"
)
row_idx += 1 row_idx += 1
# Meta line: title (if distinct) + source + countdown # Meta line: title (if distinct) + source + countdown
meta_parts = [] meta_parts = []
if m_title and m_title != m_body: if m_title and m_title != m_body:
meta_parts.append(m_title) meta_parts.append(m_title)
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
meta = " " + " \u00b7 ".join(meta_parts) if len(meta_parts) > 1 else " " + meta_parts[0] meta = (
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K") " " + " \u00b7 ".join(meta_parts)
if len(meta_parts) > 1
else " " + meta_parts[0]
)
msg_overlay.append(
f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K"
)
row_idx += 1 row_idx += 1
# Border — constant boundary under message panel # Border — constant boundary under message panel
bar = "\u2500" * (w - 4) bar = "\u2500" * (w - 4)
msg_overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K") msg_overlay.append(
f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K"
)
# Ticker draws above the fixed firehose strip; message is a centered overlay. # Ticker draws above the fixed firehose strip; message is a centered overlay.
ticker_h = ticker_view_h - msg_h ticker_h = ticker_view_h - msg_h
@@ -111,7 +132,10 @@ def stream(items, ntfy_poller, mic_monitor):
scroll_cam += 1 scroll_cam += 1
# Enqueue new headlines when room at the bottom # Enqueue new headlines when room at the bottom
while ticker_next_y < scroll_cam + ticker_view_h + 10 and queued < config.HEADLINE_LIMIT: while (
ticker_next_y < scroll_cam + ticker_view_h + 10
and queued < config.HEADLINE_LIMIT
):
t, src, ts = next_headline(pool, items, seen) t, src, ts = next_headline(pool, items, seen)
ticker_content, hc, midx = make_block(t, src, ts, w) ticker_content, hc, midx = make_block(t, src, ts, w)
active.append((ticker_content, hc, ticker_next_y, midx)) active.append((ticker_content, hc, ticker_next_y, midx))
@@ -119,8 +143,9 @@ def stream(items, ntfy_poller, mic_monitor):
queued += 1 queued += 1
# Prune off-screen blocks and stale noise # Prune off-screen blocks and stale noise
active = [(c, hc, by, mi) for c, hc, by, mi in active active = [
if by + len(c) > scroll_cam] (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam
]
for k in list(noise_cache): for k in list(noise_cache):
if k < scroll_cam: if k < scroll_cam:
del noise_cache[k] del noise_cache[k]

View File

@@ -47,69 +47,69 @@ FEEDS = {
# ─── POETRY / LITERATURE ───────────────────────────────── # ─── POETRY / LITERATURE ─────────────────────────────────
# Public domain via Project Gutenberg # Public domain via Project Gutenberg
POETRY_SOURCES = { POETRY_SOURCES = {
"Whitman": "https://www.gutenberg.org/cache/epub/1322/pg1322.txt", "Whitman": "https://www.gutenberg.org/cache/epub/1322/pg1322.txt",
"Dickinson": "https://www.gutenberg.org/cache/epub/12242/pg12242.txt", "Dickinson": "https://www.gutenberg.org/cache/epub/12242/pg12242.txt",
"Whitman II": "https://www.gutenberg.org/cache/epub/8388/pg8388.txt", "Whitman II": "https://www.gutenberg.org/cache/epub/8388/pg8388.txt",
"Rilke": "https://www.gutenberg.org/cache/epub/38594/pg38594.txt", "Rilke": "https://www.gutenberg.org/cache/epub/38594/pg38594.txt",
"Pound": "https://www.gutenberg.org/cache/epub/41162/pg41162.txt", "Pound": "https://www.gutenberg.org/cache/epub/41162/pg41162.txt",
"Pound II": "https://www.gutenberg.org/cache/epub/51992/pg51992.txt", "Pound II": "https://www.gutenberg.org/cache/epub/51992/pg51992.txt",
"Eliot": "https://www.gutenberg.org/cache/epub/1567/pg1567.txt", "Eliot": "https://www.gutenberg.org/cache/epub/1567/pg1567.txt",
"Yeats": "https://www.gutenberg.org/cache/epub/38877/pg38877.txt", "Yeats": "https://www.gutenberg.org/cache/epub/38877/pg38877.txt",
"Masters": "https://www.gutenberg.org/cache/epub/1280/pg1280.txt", "Masters": "https://www.gutenberg.org/cache/epub/1280/pg1280.txt",
"Baudelaire": "https://www.gutenberg.org/cache/epub/36098/pg36098.txt", "Baudelaire": "https://www.gutenberg.org/cache/epub/36098/pg36098.txt",
"Crane": "https://www.gutenberg.org/cache/epub/40786/pg40786.txt", "Crane": "https://www.gutenberg.org/cache/epub/40786/pg40786.txt",
"Poe": "https://www.gutenberg.org/cache/epub/10031/pg10031.txt", "Poe": "https://www.gutenberg.org/cache/epub/10031/pg10031.txt",
} }
# ─── SOURCE → LANGUAGE MAPPING ─────────────────────────── # ─── SOURCE → LANGUAGE MAPPING ───────────────────────────
# Headlines from these outlets render in their cultural home language # Headlines from these outlets render in their cultural home language
SOURCE_LANGS = { SOURCE_LANGS = {
"Der Spiegel": "de", "Der Spiegel": "de",
"DW": "de", "DW": "de",
"France24": "fr", "France24": "fr",
"Japan Times": "ja", "Japan Times": "ja",
"The Hindu": "hi", "The Hindu": "hi",
"SCMP": "zh-cn", "SCMP": "zh-cn",
"Al Jazeera": "ar", "Al Jazeera": "ar",
} }
# ─── LOCATION → LANGUAGE ───────────────────────────────── # ─── LOCATION → LANGUAGE ─────────────────────────────────
LOCATION_LANGS = { LOCATION_LANGS = {
r'\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b': 'zh-cn', r"\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b": "zh-cn",
r'\b(?:japan|japanese|tokyo|osaka|kishida)\b': 'ja', r"\b(?:japan|japanese|tokyo|osaka|kishida)\b": "ja",
r'\b(?:korea|korean|seoul|pyongyang)\b': 'ko', r"\b(?:korea|korean|seoul|pyongyang)\b": "ko",
r'\b(?:russia|russian|moscow|kremlin|putin)\b': 'ru', r"\b(?:russia|russian|moscow|kremlin|putin)\b": "ru",
r'\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b': 'ar', r"\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b": "ar",
r'\b(?:india|indian|delhi|mumbai|modi)\b': 'hi', r"\b(?:india|indian|delhi|mumbai|modi)\b": "hi",
r'\b(?:germany|german|berlin|munich|scholz)\b': 'de', r"\b(?:germany|german|berlin|munich|scholz)\b": "de",
r'\b(?:france|french|paris|lyon|macron)\b': 'fr', r"\b(?:france|french|paris|lyon|macron)\b": "fr",
r'\b(?:spain|spanish|madrid)\b': 'es', r"\b(?:spain|spanish|madrid)\b": "es",
r'\b(?:italy|italian|rome|milan|meloni)\b': 'it', r"\b(?:italy|italian|rome|milan|meloni)\b": "it",
r'\b(?:portugal|portuguese|lisbon)\b': 'pt', r"\b(?:portugal|portuguese|lisbon)\b": "pt",
r'\b(?:brazil|brazilian|são paulo|lula)\b': 'pt', r"\b(?:brazil|brazilian|são paulo|lula)\b": "pt",
r'\b(?:greece|greek|athens)\b': 'el', r"\b(?:greece|greek|athens)\b": "el",
r'\b(?:turkey|turkish|istanbul|ankara|erdogan)\b': 'tr', r"\b(?:turkey|turkish|istanbul|ankara|erdogan)\b": "tr",
r'\b(?:iran|iranian|tehran)\b': 'fa', r"\b(?:iran|iranian|tehran)\b": "fa",
r'\b(?:thailand|thai|bangkok)\b': 'th', r"\b(?:thailand|thai|bangkok)\b": "th",
r'\b(?:vietnam|vietnamese|hanoi)\b': 'vi', r"\b(?:vietnam|vietnamese|hanoi)\b": "vi",
r'\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b': 'uk', r"\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b": "uk",
r'\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b': 'he', r"\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b": "he",
} }
# ─── NON-LATIN SCRIPT FONTS (macOS) ────────────────────── # ─── NON-LATIN SCRIPT FONTS (macOS) ──────────────────────
SCRIPT_FONTS = { SCRIPT_FONTS = {
'zh-cn': '/System/Library/Fonts/STHeiti Medium.ttc', "zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc",
'ja': '/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc', "ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc",
'ko': '/System/Library/Fonts/AppleSDGothicNeo.ttc', "ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc",
'ru': '/System/Library/Fonts/Supplemental/Arial.ttf', "ru": "/System/Library/Fonts/Supplemental/Arial.ttf",
'uk': '/System/Library/Fonts/Supplemental/Arial.ttf', "uk": "/System/Library/Fonts/Supplemental/Arial.ttf",
'el': '/System/Library/Fonts/Supplemental/Arial.ttf', "el": "/System/Library/Fonts/Supplemental/Arial.ttf",
'he': '/System/Library/Fonts/Supplemental/Arial.ttf', "he": "/System/Library/Fonts/Supplemental/Arial.ttf",
'ar': '/System/Library/Fonts/GeezaPro.ttc', "ar": "/System/Library/Fonts/GeezaPro.ttc",
'fa': '/System/Library/Fonts/GeezaPro.ttc', "fa": "/System/Library/Fonts/GeezaPro.ttc",
'hi': '/System/Library/Fonts/Kohinoor.ttc', "hi": "/System/Library/Fonts/Kohinoor.ttc",
'th': '/System/Library/Fonts/ThonburiUI.ttc', "th": "/System/Library/Fonts/ThonburiUI.ttc",
} }
# Scripts that have no uppercase # Scripts that have no uppercase
NO_UPPER = {'zh-cn', 'ja', 'ko', 'ar', 'fa', 'hi', 'th', 'he'} NO_UPPER = {"zh-cn", "ja", "ko", "ar", "fa", "hi", "th", "he"}

View File

@@ -4,8 +4,8 @@ No internal dependencies.
""" """
import os import os
import sys
import random import random
import sys
import time import time
# ─── ANSI ───────────────────────────────────────────────── # ─── ANSI ─────────────────────────────────────────────────
@@ -49,7 +49,7 @@ def type_out(text, color=G_HI):
while i < len(text): while i < len(text):
if random.random() < 0.3: if random.random() < 0.3:
b = random.randint(2, 5) b = random.randint(2, 5)
sys.stdout.write(f"{color}{text[i:i+b]}{RST}") sys.stdout.write(f"{color}{text[i : i + b]}{RST}")
i += b i += b
else: else:
sys.stdout.write(f"{color}{text[i]}{RST}") sys.stdout.write(f"{color}{text[i]}{RST}")

View File

@@ -3,10 +3,10 @@ Google Translate wrapper and location→language detection.
Depends on: sources (for LOCATION_LANGS). Depends on: sources (for LOCATION_LANGS).
""" """
import re
import json import json
import urllib.request import re
import urllib.parse import urllib.parse
import urllib.request
from engine.sources import LOCATION_LANGS from engine.sources import LOCATION_LANGS
@@ -29,8 +29,10 @@ def translate_headline(title, target_lang):
return _TRANSLATE_CACHE[key] return _TRANSLATE_CACHE[key]
try: try:
q = urllib.parse.quote(title) q = urllib.parse.quote(title)
url = ("https://translate.googleapis.com/translate_a/single" url = (
f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}") "https://translate.googleapis.com/translate_a/single"
f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}"
)
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=5) resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read()) data = json.loads(resp.read())

27
hk.pkl Normal file
View File

@@ -0,0 +1,27 @@
amends "package://github.com/jdx/hk/releases/download/v1.38.0/hk@1.38.0#/Config.pkl"
import "package://github.com/jdx/hk/releases/download/v1.38.0/hk@1.38.0#/Builtins.pkl"
hooks {
["pre-commit"] {
fix = true
stash = "git"
steps {
["ruff-format"] = (Builtins.ruff_format) {
prefix = "uv run"
}
["ruff"] = (Builtins.ruff) {
prefix = "uv run"
check = "ruff check engine/ tests/"
fix = "ruff check --fix --unsafe-fixes engine/ tests/"
}
}
}
["pre-push"] {
steps {
["ruff"] = (Builtins.ruff) {
prefix = "uv run"
check = "ruff check engine/ tests/"
}
}
}
}

View File

@@ -5,40 +5,7 @@ Digital news consciousness stream.
Matrix aesthetic · THX-1138 hue. Matrix aesthetic · THX-1138 hue.
""" """
import subprocess, sys, pathlib from engine.app import main
# ─── BOOTSTRAP VENV ───────────────────────────────────────
_VENV = pathlib.Path(__file__).resolve().parent / ".mainline_venv"
_MARKER = _VENV / ".installed_v3"
def _ensure_venv():
"""Create a local venv and install deps if needed."""
if _MARKER.exists():
return
import venv
print("\033[2;38;5;34m > first run — creating environment...\033[0m")
venv.create(str(_VENV), with_pip=True, clear=True)
pip = str(_VENV / "bin" / "pip")
subprocess.check_call(
[pip, "install", "feedparser", "Pillow", "-q"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
_MARKER.touch()
_ensure_venv()
# Install sounddevice on first run after v3
_MARKER_SD = _VENV / ".installed_sd"
if not _MARKER_SD.exists():
_pip = str(_VENV / "bin" / "pip")
subprocess.check_call([_pip, "install", "sounddevice", "numpy", "-q"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_MARKER_SD.touch()
sys.path.insert(0, str(next((_VENV / "lib").glob("python*/site-packages"))))
# ─── DELEGATE TO ENGINE ───────────────────────────────────
from engine.app import main # noqa: E402
if __name__ == "__main__": if __name__ == "__main__":
main() main()

52
mise.toml Normal file
View File

@@ -0,0 +1,52 @@
[tools]
python = "3.12"
hk = "latest"
pkl = "latest"
[tasks]
# =====================
# Development
# =====================
test = "uv run pytest"
test-v = "uv run pytest -v"
test-cov = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html"
test-cov-open = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html && open htmlcov/index.html"
lint = "uv run ruff check engine/ mainline.py"
lint-fix = "uv run ruff check --fix engine/ mainline.py"
format = "uv run ruff format engine/ mainline.py"
# =====================
# Runtime
# =====================
run = "uv run mainline.py"
run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose"
# =====================
# Environment
# =====================
sync = "uv sync"
sync-all = "uv sync --all-extras"
install = "uv sync"
install-dev = "uv sync --group dev"
bootstrap = "uv sync && uv run mainline.py --help"
clean = "rm -rf .venv htmlcov .coverage tests/.pytest_cache"
# =====================
# CI/CD
# =====================
ci = "uv sync --group dev && uv run pytest --cov=engine --cov-report=term-missing --cov-report=xml"
ci-lint = "uv run ruff check engine/ mainline.py"
# =====================
# Git Hooks (via hk)
# =====================
pre-commit = "hk run pre-commit"

88
pyproject.toml Normal file
View File

@@ -0,0 +1,88 @@
[project]
name = "mainline"
version = "0.1.0"
description = "Terminal news ticker with Matrix aesthetic"
readme = "README.md"
requires-python = ">=3.10"
authors = [
{ name = "Mainline", email = "mainline@example.com" }
]
license = { text = "MIT" }
classifiers = [
"Development Status :: 4 - Beta",
"Environment :: Console",
"License :: OSI Approved :: MIT License",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Topic :: Terminals",
]
dependencies = [
"feedparser>=6.0.0",
"Pillow>=10.0.0",
]
[project.optional-dependencies]
mic = [
"sounddevice>=0.4.0",
"numpy>=1.24.0",
]
dev = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",
"pytest-mock>=3.12.0",
"ruff>=0.1.0",
]
[project.scripts]
mainline = "engine.app:main"
[build-system]
requires = ["hatchling"]
build-backend = "hatchling.build"
[dependency-groups]
dev = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",
"pytest-mock>=3.12.0",
"ruff>=0.1.0",
]
[tool.pytest.ini_options]
testpaths = ["tests"]
python_files = ["test_*.py"]
python_functions = ["test_*"]
addopts = [
"--strict-markers",
"--tb=short",
"-v",
]
filterwarnings = [
"ignore::DeprecationWarning",
]
[tool.coverage.run]
source = ["engine"]
branch = true
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise AssertionError",
"raise NotImplementedError",
"if __name__ == .__main__.:",
"if TYPE_CHECKING:",
"@abstractmethod",
]
[tool.ruff]
line-length = 88
target-version = "py310"
[tool.ruff.lint]
select = ["E", "F", "W", "I", "N", "UP", "B", "C4", "SIM"]
ignore = ["E501", "SIM105", "N806", "B007", "SIM108"]

4
requirements-dev.txt Normal file
View File

@@ -0,0 +1,4 @@
pytest>=8.0.0
pytest-cov>=4.1.0
pytest-mock>=3.12.0
ruff>=0.1.0

4
requirements.txt Normal file
View File

@@ -0,0 +1,4 @@
feedparser>=6.0.0
Pillow>=10.0.0
sounddevice>=0.4.0
numpy>=1.24.0

0
tests/__init__.py Normal file
View File

162
tests/test_config.py Normal file
View File

@@ -0,0 +1,162 @@
"""
Tests for engine.config module.
"""
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
from engine import config
class TestArgValue:
"""Tests for _arg_value helper."""
def test_returns_value_when_flag_present(self):
"""Returns the value following the flag."""
with patch.object(sys, "argv", ["prog", "--font-file", "test.otf"]):
result = config._arg_value("--font-file")
assert result == "test.otf"
def test_returns_none_when_flag_missing(self):
"""Returns None when flag is not present."""
with patch.object(sys, "argv", ["prog"]):
result = config._arg_value("--font-file")
assert result is None
def test_returns_none_when_no_value(self):
"""Returns None when flag is last."""
with patch.object(sys, "argv", ["prog", "--font-file"]):
result = config._arg_value("--font-file")
assert result is None
class TestArgInt:
"""Tests for _arg_int helper."""
def test_parses_valid_int(self):
"""Parses valid integer."""
with patch.object(sys, "argv", ["prog", "--font-index", "5"]):
result = config._arg_int("--font-index", 0)
assert result == 5
def test_returns_default_on_invalid(self):
"""Returns default on invalid input."""
with patch.object(sys, "argv", ["prog", "--font-index", "abc"]):
result = config._arg_int("--font-index", 0)
assert result == 0
def test_returns_default_when_missing(self):
"""Returns default when flag missing."""
with patch.object(sys, "argv", ["prog"]):
result = config._arg_int("--font-index", 10)
assert result == 10
class TestResolveFontPath:
"""Tests for _resolve_font_path helper."""
def test_returns_absolute_paths(self):
"""Absolute paths are returned as-is."""
result = config._resolve_font_path("/absolute/path.otf")
assert result == "/absolute/path.otf"
def test_resolves_relative_paths(self):
"""Relative paths are resolved to repo root."""
result = config._resolve_font_path("fonts/test.otf")
assert str(config._REPO_ROOT) in result
def test_expands_user_home(self):
"""Tilde paths are expanded."""
with patch("pathlib.Path.expanduser", return_value=Path("/home/user/fonts")):
result = config._resolve_font_path("~/fonts/test.otf")
assert isinstance(result, str)
class TestListFontFiles:
"""Tests for _list_font_files helper."""
def test_returns_empty_for_missing_dir(self):
"""Returns empty list for missing directory."""
result = config._list_font_files("/nonexistent/directory")
assert result == []
def test_filters_by_extension(self):
"""Only returns valid font extensions."""
with tempfile.TemporaryDirectory() as tmpdir:
Path(tmpdir, "valid.otf").touch()
Path(tmpdir, "valid.ttf").touch()
Path(tmpdir, "invalid.txt").touch()
Path(tmpdir, "image.png").touch()
result = config._list_font_files(tmpdir)
assert len(result) == 2
assert all(f.endswith((".otf", ".ttf")) for f in result)
def test_sorts_alphabetically(self):
"""Results are sorted alphabetically."""
with tempfile.TemporaryDirectory() as tmpdir:
Path(tmpdir, "zfont.otf").touch()
Path(tmpdir, "afont.otf").touch()
result = config._list_font_files(tmpdir)
filenames = [Path(f).name for f in result]
assert filenames == ["afont.otf", "zfont.otf"]
class TestDefaults:
"""Tests for default configuration values."""
def test_headline_limit(self):
"""HEADLINE_LIMIT has sensible default."""
assert config.HEADLINE_LIMIT > 0
def test_feed_timeout(self):
"""FEED_TIMEOUT has sensible default."""
assert config.FEED_TIMEOUT > 0
def test_font_extensions(self):
"""Font extensions are defined."""
assert ".otf" in config._FONT_EXTENSIONS
assert ".ttf" in config._FONT_EXTENSIONS
assert ".ttc" in config._FONT_EXTENSIONS
class TestGlyphs:
"""Tests for glyph constants."""
def test_glitch_glyphs_defined(self):
"""GLITCH glyphs are defined."""
assert len(config.GLITCH) > 0
def test_kata_glyphs_defined(self):
"""KATA glyphs are defined."""
assert len(config.KATA) > 0
class TestSetFontSelection:
"""Tests for set_font_selection function."""
def test_updates_font_path(self):
"""Updates FONT_PATH globally."""
original = config.FONT_PATH
config.set_font_selection(font_path="/new/path.otf")
assert config.FONT_PATH == "/new/path.otf"
config.FONT_PATH = original
def test_updates_font_index(self):
"""Updates FONT_INDEX globally."""
original = config.FONT_INDEX
config.set_font_selection(font_index=5)
assert config.FONT_INDEX == 5
config.FONT_INDEX = original
def test_handles_none_values(self):
"""Handles None values gracefully."""
original_path = config.FONT_PATH
original_index = config.FONT_INDEX
config.set_font_selection(font_path=None, font_index=None)
assert original_path == config.FONT_PATH
assert original_index == config.FONT_INDEX

93
tests/test_filter.py Normal file
View File

@@ -0,0 +1,93 @@
"""
Tests for engine.filter module.
"""
from engine.filter import skip, strip_tags
class TestStripTags:
"""Tests for strip_tags function."""
def test_strips_simple_html(self):
"""Basic HTML tags are removed."""
assert strip_tags("<p>Hello</p>") == "Hello"
assert strip_tags("<b>Bold</b>") == "Bold"
assert strip_tags("<em>Italic</em>") == "Italic"
def test_strips_nested_html(self):
"""Nested HTML tags are handled."""
assert strip_tags("<div><p>Nested</p></div>") == "Nested"
assert strip_tags("<span><strong>Deep</strong></span>") == "Deep"
def test_strips_html_with_attributes(self):
"""HTML with attributes is handled."""
assert strip_tags('<a href="http://example.com">Link</a>') == "Link"
assert strip_tags('<img src="test.jpg" alt="test">') == ""
def test_handles_empty_string(self):
"""Empty string returns empty string."""
assert strip_tags("") == ""
assert strip_tags(None) == ""
def test_handles_plain_text(self):
"""Plain text without tags passes through."""
assert strip_tags("Plain text") == "Plain text"
def test_unescapes_html_entities(self):
"""HTML entities are decoded and tags are stripped."""
assert strip_tags("&nbsp;test") == "test"
assert strip_tags("Hello &amp; World") == "Hello & World"
def test_handles_malformed_html(self):
"""Malformed HTML is handled gracefully."""
assert strip_tags("<p>Unclosed") == "Unclosed"
assert strip_tags("</p>No start") == "No start"
class TestSkip:
"""Tests for skip function - content filtering."""
def test_skips_sports_content(self):
"""Sports-related headlines are skipped."""
assert skip("Football: Team wins championship") is True
assert skip("NBA Finals Game 7 results") is True
assert skip("Soccer match ends in draw") is True
assert skip("Premier League transfer news") is True
assert skip("Super Bowl halftime show") is True
def test_skips_vapid_content(self):
"""Vapid/celebrity content is skipped."""
assert skip("Kim Kardashian's new look") is True
assert skip("Influencer goes viral") is True
assert skip("Red carpet best dressed") is True
assert skip("Celebrity couple splits") is True
def test_allows_real_news(self):
"""Legitimate news headlines are allowed."""
assert skip("Scientists discover new planet") is False
assert skip("Economy grows by 3%") is False
assert skip("World leaders meet for summit") is False
assert skip("New technology breakthrough") is False
def test_case_insensitive(self):
"""Filter is case insensitive."""
assert skip("FOOTBALL scores") is True
assert skip("Football SCORES") is True
assert skip("Kardashian") is True
def test_word_boundary_matching(self):
"""Word boundary matching works correctly."""
assert skip("The football stadium") is True
assert skip("Footballer scores") is False
assert skip("Footballs on sale") is False
class TestIntegration:
"""Integration tests combining filter functions."""
def test_full_pipeline(self):
"""Test strip_tags followed by skip."""
html = '<p><a href="#">Breaking: Football championship final</a></p>'
text = strip_tags(html)
assert text == "Breaking: Football championship final"
assert skip(text) is True

83
tests/test_mic.py Normal file
View File

@@ -0,0 +1,83 @@
"""
Tests for engine.mic module.
"""
from unittest.mock import patch
class TestMicMonitorImport:
"""Tests for module import behavior."""
def test_mic_monitor_imports_without_error(self):
"""MicMonitor can be imported even without sounddevice."""
from engine.mic import MicMonitor
assert MicMonitor is not None
class TestMicMonitorInit:
"""Tests for MicMonitor initialization."""
def test_init_sets_threshold(self):
"""Threshold is set correctly."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=60)
assert monitor.threshold_db == 60
def test_init_defaults(self):
"""Default values are set correctly."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert monitor.threshold_db == 50
def test_init_db_starts_at_negative(self):
"""_db starts at negative value."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert monitor.db == -99.0
class TestMicMonitorProperties:
"""Tests for MicMonitor properties."""
def test_excess_returns_positive_when_above_threshold(self):
"""excess returns positive value when above threshold."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=50)
with patch.object(monitor, "_db", 60.0):
assert monitor.excess == 10.0
def test_excess_returns_zero_when_below_threshold(self):
"""excess returns zero when below threshold."""
from engine.mic import MicMonitor
monitor = MicMonitor(threshold_db=50)
with patch.object(monitor, "_db", 40.0):
assert monitor.excess == 0.0
class TestMicMonitorAvailable:
"""Tests for MicMonitor.available property."""
def test_available_is_bool(self):
"""available returns a boolean."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert isinstance(monitor.available, bool)
class TestMicMonitorStop:
"""Tests for MicMonitor.stop method."""
def test_stop_does_nothing_when_no_stream(self):
"""stop() does nothing if no stream exists."""
from engine.mic import MicMonitor
monitor = MicMonitor()
monitor.stop()
assert monitor._stream is None

70
tests/test_ntfy.py Normal file
View File

@@ -0,0 +1,70 @@
"""
Tests for engine.ntfy module.
"""
import time
from unittest.mock import MagicMock, patch
from engine.ntfy import NtfyPoller
class TestNtfyPollerInit:
"""Tests for NtfyPoller initialization."""
def test_init_sets_defaults(self):
"""Default values are set correctly."""
poller = NtfyPoller("http://example.com/topic")
assert poller.topic_url == "http://example.com/topic"
assert poller.reconnect_delay == 5
assert poller.display_secs == 30
def test_init_custom_values(self):
"""Custom values are set correctly."""
poller = NtfyPoller(
"http://example.com/topic", reconnect_delay=10, display_secs=60
)
assert poller.reconnect_delay == 10
assert poller.display_secs == 60
class TestNtfyPollerStart:
"""Tests for NtfyPoller.start method."""
@patch("engine.ntfy.threading.Thread")
def test_start_creates_daemon_thread(self, mock_thread):
"""start() creates and starts a daemon thread."""
mock_thread_instance = MagicMock()
mock_thread.return_value = mock_thread_instance
poller = NtfyPoller("http://example.com/topic")
result = poller.start()
assert result is True
mock_thread.assert_called_once()
args, kwargs = mock_thread.call_args
assert kwargs.get("daemon") is True
mock_thread_instance.start.assert_called_once()
class TestNtfyPollerGetActiveMessage:
"""Tests for NtfyPoller.get_active_message method."""
def test_returns_none_when_no_message(self):
"""Returns None when no message has been received."""
poller = NtfyPoller("http://example.com/topic")
result = poller.get_active_message()
assert result is None
class TestNtfyPollerDismiss:
"""Tests for NtfyPoller.dismiss method."""
def test_dismiss_clears_message(self):
"""dismiss() clears the current message."""
poller = NtfyPoller("http://example.com/topic")
with patch.object(poller, "_lock"):
poller._message = ("Title", "Body", time.monotonic())
poller.dismiss()
assert poller._message is None

93
tests/test_sources.py Normal file
View File

@@ -0,0 +1,93 @@
"""
Tests for engine.sources module - data validation.
"""
from engine import sources
class TestFeeds:
"""Tests for FEEDS data."""
def test_feeds_is_dict(self):
"""FEEDS is a dictionary."""
assert isinstance(sources.FEEDS, dict)
def test_feeds_has_entries(self):
"""FEEDS has feed entries."""
assert len(sources.FEEDS) > 0
def test_feeds_have_valid_urls(self):
"""All feeds have valid URL format."""
for name, url in sources.FEEDS.items():
assert name
assert url.startswith("http://") or url.startswith("https://")
class TestPoetrySources:
"""Tests for POETRY_SOURCES data."""
def test_poetry_is_dict(self):
"""POETRY_SOURCES is a dictionary."""
assert isinstance(sources.POETRY_SOURCES, dict)
def test_poetry_has_entries(self):
"""POETRY_SOURCES has entries."""
assert len(sources.POETRY_SOURCES) > 0
def test_poetry_have_gutenberg_urls(self):
"""All poetry sources are from Gutenberg."""
for _name, url in sources.POETRY_SOURCES.items():
assert "gutenberg.org" in url
class TestSourceLangs:
"""Tests for SOURCE_LANGS mapping."""
def test_source_langs_is_dict(self):
"""SOURCE_LANGS is a dictionary."""
assert isinstance(sources.SOURCE_LANGS, dict)
def test_source_langs_valid_language_codes(self):
"""Language codes are valid ISO codes."""
valid_codes = {"de", "fr", "ja", "zh-cn", "ar", "hi"}
for code in sources.SOURCE_LANGS.values():
assert code in valid_codes
class TestLocationLangs:
"""Tests for LOCATION_LANGS mapping."""
def test_location_langs_is_dict(self):
"""LOCATION_LANGS is a dictionary."""
assert isinstance(sources.LOCATION_LANGS, dict)
def test_location_langs_has_patterns(self):
"""LOCATION_LANGS has regex patterns."""
assert len(sources.LOCATION_LANGS) > 0
class TestScriptFonts:
"""Tests for SCRIPT_FONTS mapping."""
def test_script_fonts_is_dict(self):
"""SCRIPT_FONTS is a dictionary."""
assert isinstance(sources.SCRIPT_FONTS, dict)
def test_script_fonts_has_paths(self):
"""All script fonts have paths."""
for _lang, path in sources.SCRIPT_FONTS.items():
assert path
class TestNoUpper:
"""Tests for NO_UPPER set."""
def test_no_upper_is_set(self):
"""NO_UPPER is a set."""
assert isinstance(sources.NO_UPPER, set)
def test_no_upper_contains_scripts(self):
"""NO_UPPER contains non-Latin scripts."""
assert "zh-cn" in sources.NO_UPPER
assert "ja" in sources.NO_UPPER
assert "ar" in sources.NO_UPPER

130
tests/test_terminal.py Normal file
View File

@@ -0,0 +1,130 @@
"""
Tests for engine.terminal module.
"""
import io
import sys
from unittest.mock import patch
from engine import terminal
class TestTerminalDimensions:
"""Tests for terminal width/height functions."""
def test_tw_returns_columns(self):
"""tw() returns terminal width."""
with (
patch.object(sys.stdout, "isatty", return_value=True),
patch("os.get_terminal_size") as mock_size,
):
mock_size.return_value = io.StringIO("columns=120")
mock_size.columns = 120
result = terminal.tw()
assert isinstance(result, int)
def test_th_returns_lines(self):
"""th() returns terminal height."""
with (
patch.object(sys.stdout, "isatty", return_value=True),
patch("os.get_terminal_size") as mock_size,
):
mock_size.return_value = io.StringIO("lines=30")
mock_size.lines = 30
result = terminal.th()
assert isinstance(result, int)
def test_tw_fallback_on_error(self):
"""tw() falls back to 80 on error."""
with patch("os.get_terminal_size", side_effect=OSError):
result = terminal.tw()
assert result == 80
def test_th_fallback_on_error(self):
"""th() falls back to 24 on error."""
with patch("os.get_terminal_size", side_effect=OSError):
result = terminal.th()
assert result == 24
class TestANSICodes:
"""Tests for ANSI escape code constants."""
def test_ansi_constants_exist(self):
"""All ANSI constants are defined."""
assert terminal.RST == "\033[0m"
assert terminal.BOLD == "\033[1m"
assert terminal.DIM == "\033[2m"
def test_green_shades_defined(self):
"""Green gradient colors are defined."""
assert terminal.G_HI == "\033[38;5;46m"
assert terminal.G_MID == "\033[38;5;34m"
assert terminal.G_LO == "\033[38;5;22m"
def test_white_shades_defined(self):
"""White/gray tones are defined."""
assert terminal.W_COOL == "\033[38;5;250m"
assert terminal.W_DIM == "\033[2;38;5;245m"
def test_cursor_controls_defined(self):
"""Cursor control codes are defined."""
assert "?" in terminal.CURSOR_OFF
assert "?" in terminal.CURSOR_ON
class TestTypeOut:
"""Tests for type_out function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_type_out_writes_text(self, mock_sleep, mock_stdout):
"""type_out writes text to stdout."""
with patch("random.random", return_value=0.5):
terminal.type_out("Hi", color=terminal.G_HI)
output = mock_stdout.getvalue()
assert len(output) > 0
@patch("time.sleep")
def test_type_out_uses_color(self, mock_sleep):
"""type_out applies color codes."""
with (
patch("sys.stdout", new_callable=io.StringIO),
patch("random.random", return_value=0.5),
):
terminal.type_out("Test", color=terminal.G_HI)
class TestSlowPrint:
"""Tests for slow_print function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_slow_print_writes_text(self, mock_sleep, mock_stdout):
"""slow_print writes text to stdout."""
terminal.slow_print("Hi", color=terminal.G_DIM, delay=0)
output = mock_stdout.getvalue()
assert len(output) > 0
class TestBootLn:
"""Tests for boot_ln function."""
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_boot_ln_writes_label_and_status(self, mock_sleep, mock_stdout):
"""boot_ln shows label and status."""
with patch("random.uniform", return_value=0):
terminal.boot_ln("Loading", "OK", ok=True)
output = mock_stdout.getvalue()
assert "Loading" in output
assert "OK" in output
@patch("sys.stdout", new_callable=io.StringIO)
@patch("time.sleep")
def test_boot_ln_error_status(self, mock_sleep, mock_stdout):
"""boot_ln shows red for error status."""
with patch("random.uniform", return_value=0):
terminal.boot_ln("Loading", "FAIL", ok=False)
output = mock_stdout.getvalue()
assert "FAIL" in output