5 Commits

Author SHA1 Message Date
ab6bbac02b fix: update ntfy tests for SSE API (reconnect_delay) 2026-03-15 15:12:09 -07:00
14cb0bd7d4 fix: use native hk staging in pre-commit hook
fix: add explicit check command to pre-push hook
2026-03-15 15:12:08 -07:00
c2b609e769 fix: apply ruff auto-fixes and add hk git hooks
- Fix pre-existing lint errors in engine/ modules using ruff --unsafe-fixes
- Add hk.pkl with pre-commit and pre-push hooks using ruff builtin
- Configure hooks to use 'uv run' prefix for tool execution
- Update mise.toml to include hk and pkl tools
- All 73 tests pass

fix: apply ruff auto-fixes and add hk git hooks

- Fix pre-existing lint errors in engine/ modules using ruff --unsafe-fixes
- Add hk.pkl with pre-commit and pre-push hooks using ruff builtin
- Configure hooks to use 'uv run' prefix for tool execution
- Update mise.toml to include hk and pkl tools
- Use 'hk install --mise' for proper mise integration
- All 73 tests pass
2026-03-15 15:12:06 -07:00
94ffce1cbd style: apply ruff auto-fixes across codebase
- Fix import sorting (isort) across all engine modules
- Fix SIM105 try-except-pass patterns (contextlib.suppress)
- Fix nested with statements in tests
- Fix unused loop variables

Run 'uv run pytest' to verify tests still pass.
2026-03-15 14:59:29 -07:00
e0d01bd617 feat: modernize project with uv, add pytest test suite
- Add pyproject.toml with modern Python packaging (PEP 517/518)
- Add uv-based dependency management replacing inline venv bootstrap
- Add requirements.txt and requirements-dev.txt for compatibility
- Add mise.toml with dev tasks (test, lint, run, sync, ci)
- Add .python-version pinned to Python 3.12
- Add comprehensive pytest test suite (73 tests) for:
  - engine/config, filter, terminal, sources, mic, ntfy modules
- Configure pytest with coverage reporting (16% total, 100% on tested modules)
- Configure ruff for linting with Python 3.10+ target
- Remove redundant venv bootstrap code from mainline.py
- Update .gitignore for uv/venv artifacts

Run 'uv sync' to install dependencies, 'uv run pytest' to test.
2026-03-15 14:59:29 -07:00
29 changed files with 181 additions and 2257 deletions

110
AGENTS.md
View File

@@ -1,110 +0,0 @@
# Agent Development Guide
## Development Environment
This project uses:
- **mise** (mise.jdx.dev) - tool version manager and task runner
- **hk** (hk.jdx.dev) - git hook manager
- **uv** - fast Python package installer
- **ruff** - linter and formatter
- **pytest** - test runner
### Setup
```bash
# Install dependencies
mise run install
# Or equivalently:
uv sync
```
### Available Commands
```bash
mise run test # Run tests
mise run test-v # Run tests verbose
mise run test-cov # Run tests with coverage report
mise run lint # Run ruff linter
mise run lint-fix # Run ruff with auto-fix
mise run format # Run ruff formatter
mise run ci # Full CI pipeline (sync + test + coverage)
```
## Git Hooks
**At the start of every agent session**, verify hooks are installed:
```bash
ls -la .git/hooks/pre-commit
```
If hooks are not installed, install them with:
```bash
hk init --mise
mise run pre-commit
```
The project uses hk configured in `hk.pkl`:
- **pre-commit**: runs ruff-format and ruff (with auto-fix)
- **pre-push**: runs ruff check
## Workflow Rules
### Before Committing
1. **Always run the test suite** - never commit code that fails tests:
```bash
mise run test
```
2. **Always run the linter**:
```bash
mise run lint
```
3. **Fix any lint errors** before committing (or let the pre-commit hook handle it).
4. **Review your changes** using `git diff` to understand what will be committed.
### On Failing Tests
When tests fail, **determine whether it's an out-of-date test or a correctly failing test**:
- **Out-of-date test**: The test was written for old behavior that has legitimately changed. Update the test to match the new expected behavior.
- **Correctly failing test**: The test correctly identifies a broken contract. Fix the implementation, not the test.
**Never** modify a test to make it pass without understanding why it failed.
### Code Review
Before committing significant changes:
- Run `git diff` to review all changes
- Ensure new code follows existing patterns in the codebase
- Check that type hints are added for new functions
- Verify that tests exist for new functionality
## Testing
Tests live in `tests/` and follow the pattern `test_*.py`.
Run all tests:
```bash
mise run test
```
Run with coverage:
```bash
mise run test-cov
```
The project uses pytest with strict marker enforcement. Test configuration is in `pyproject.toml` under `[tool.pytest.ini_options]`.
## Architecture Notes
- **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies
- **eventbus.py** provides thread-safe event publishing for decoupled communication
- **controller.py** coordinates ntfy/mic monitoring
- The render pipeline: fetch → render → effects → scroll → terminal output

View File

@@ -79,27 +79,18 @@ To add your own fonts, drop `.otf`, `.ttf`, or `.ttc` files into `fonts/` (or po
```
engine/
__init__.py package marker
app.py main(), font picker TUI, boot sequence, signal handler
config.py constants, CLI flags, glyph tables
sources.py FEEDS, POETRY_SOURCES, language/script maps
terminal.py ANSI codes, tw/th, type_out, boot_ln
filter.py HTML stripping, content filter
translate.py Google Translate wrapper + region detection
render.py OTF → half-block pipeline (SSAA, gradient)
effects.py noise, glitch_bar, fade, firehose
fetch.py RSS/Gutenberg fetching + cache load/save
ntfy.py NtfyPoller — standalone, zero internal deps
mic.py MicMonitor — standalone, graceful fallback
scroll.py stream() frame loop + message rendering
viewport.py terminal dimension tracking (tw/th)
frame.py scroll step calculation, timing
layers.py ticker zone, firehose, message overlay rendering
eventbus.py thread-safe event publishing for decoupled communication
events.py event types and definitions
controller.py coordinates ntfy/mic monitoring and event publishing
emitters.py background emitters for ntfy and mic
types.py type definitions and dataclasses
config.py constants, CLI flags, glyph tables
sources.py FEEDS, POETRY_SOURCES, language/script maps
terminal.py ANSI codes, tw/th, type_out, boot_ln
filter.py HTML stripping, content filter
translate.py Google Translate wrapper + region detection
render.py OTF → half-block pipeline (SSAA, gradient)
effects.py noise, glitch_bar, fade, firehose
fetch.py RSS/Gutenberg fetching + cache load/save
ntfy.py NtfyPoller — standalone, zero internal deps
mic.py MicMonitor — standalone, graceful fallback
scroll.py stream() frame loop + message rendering
app.py main(), font picker TUI, boot sequence, signal handler
```
`ntfy.py` and `mic.py` have zero internal dependencies and can be imported by any other visualizer.

View File

@@ -1,28 +1,25 @@
"""
Configuration constants, CLI flags, and glyph tables.
Supports both global constants (backward compatible) and injected config for testing.
"""
import sys
from dataclasses import dataclass, field
from pathlib import Path
_REPO_ROOT = Path(__file__).resolve().parent.parent
_FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"}
def _arg_value(flag, argv: list[str] | None = None):
def _arg_value(flag):
"""Get value following a CLI flag, if present."""
argv = argv or sys.argv
if flag not in argv:
if flag not in sys.argv:
return None
i = argv.index(flag)
return argv[i + 1] if i + 1 < len(argv) else None
i = sys.argv.index(flag)
return sys.argv[i + 1] if i + 1 < len(sys.argv) else None
def _arg_int(flag, default, argv: list[str] | None = None):
def _arg_int(flag, default):
"""Get int CLI argument with safe fallback."""
raw = _arg_value(flag, argv)
raw = _arg_value(flag)
if raw is None:
return default
try:
@@ -56,134 +53,6 @@ def list_repo_font_files():
return _list_font_files(FONT_DIR)
def _get_platform_font_paths() -> dict[str, str]:
"""Get platform-appropriate font paths for non-Latin scripts."""
import platform
system = platform.system()
if system == "Darwin":
return {
"zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc",
"ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc",
"ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc",
"ru": "/System/Library/Fonts/Supplemental/Arial.ttf",
"uk": "/System/Library/Fonts/Supplemental/Arial.ttf",
"el": "/System/Library/Fonts/Supplemental/Arial.ttf",
"he": "/System/Library/Fonts/Supplemental/Arial.ttf",
"ar": "/System/Library/Fonts/GeezaPro.ttc",
"fa": "/System/Library/Fonts/GeezaPro.ttc",
"hi": "/System/Library/Fonts/Kohinoor.ttc",
"th": "/System/Library/Fonts/ThonburiUI.ttc",
}
elif system == "Linux":
return {
"zh-cn": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ja": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ko": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
"ru": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"uk": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"el": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"he": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"ar": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"fa": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
"hi": "/usr/share/fonts/truetype/noto/NotoSansDevanagari-Regular.ttf",
"th": "/usr/share/fonts/truetype/noto/NotoSansThai-Regular.ttf",
}
else:
return {}
@dataclass(frozen=True)
class Config:
"""Immutable configuration container for injected config."""
headline_limit: int = 1000
feed_timeout: int = 10
mic_threshold_db: int = 50
mode: str = "news"
firehose: bool = False
ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json"
ntfy_reconnect_delay: int = 5
message_display_secs: int = 30
font_dir: str = "fonts"
font_path: str = ""
font_index: int = 0
font_picker: bool = True
font_sz: int = 60
render_h: int = 8
ssaa: int = 4
scroll_dur: float = 5.625
frame_dt: float = 0.05
firehose_h: int = 12
grad_speed: float = 0.08
glitch_glyphs: str = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
kata_glyphs: str = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths)
@classmethod
def from_args(cls, argv: list[str] | None = None) -> "Config":
"""Create Config from CLI arguments (or custom argv for testing)."""
argv = argv or sys.argv
font_dir = _resolve_font_path(_arg_value("--font-dir", argv) or "fonts")
font_file_arg = _arg_value("--font-file", argv)
font_files = _list_font_files(font_dir)
font_path = (
_resolve_font_path(font_file_arg)
if font_file_arg
else (font_files[0] if font_files else "")
)
return cls(
headline_limit=1000,
feed_timeout=10,
mic_threshold_db=50,
mode="poetry" if "--poetry" in argv or "-p" in argv else "news",
firehose="--firehose" in argv,
ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir=font_dir,
font_path=font_path,
font_index=max(0, _arg_int("--font-index", 0, argv)),
font_picker="--no-font-picker" not in argv,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋",
kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ",
script_fonts=_get_platform_font_paths(),
)
_config: Config | None = None
def get_config() -> Config:
"""Get the global config instance (lazy-loaded)."""
global _config
if _config is None:
_config = Config.from_args()
return _config
def set_config(config: Config) -> None:
"""Set the global config instance (for testing)."""
global _config
_config = config
# ─── RUNTIME ──────────────────────────────────────────────
HEADLINE_LIMIT = 1000
FEED_TIMEOUT = 10

View File

@@ -1,68 +0,0 @@
"""
Stream controller - manages input sources and orchestrates the render stream.
"""
from engine.config import Config, get_config
from engine.eventbus import EventBus
from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
class StreamController:
"""Controls the stream lifecycle - initializes sources and runs the stream."""
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
self.config = config or get_config()
self.event_bus = event_bus
self.mic: MicMonitor | None = None
self.ntfy: NtfyPoller | None = None
def initialize_sources(self) -> tuple[bool, bool]:
"""Initialize microphone and ntfy sources.
Returns:
(mic_ok, ntfy_ok) - success status for each source
"""
self.mic = MicMonitor(threshold_db=self.config.mic_threshold_db)
mic_ok = self.mic.start() if self.mic.available else False
self.ntfy = NtfyPoller(
self.config.ntfy_topic,
reconnect_delay=self.config.ntfy_reconnect_delay,
display_secs=self.config.message_display_secs,
)
ntfy_ok = self.ntfy.start()
return bool(mic_ok), ntfy_ok
def run(self, items: list) -> None:
"""Run the stream with initialized sources."""
if self.mic is None or self.ntfy is None:
self.initialize_sources()
if self.event_bus:
self.event_bus.publish(
EventType.STREAM_START,
StreamEvent(
event_type=EventType.STREAM_START,
headline_count=len(items),
),
)
stream(items, self.ntfy, self.mic)
if self.event_bus:
self.event_bus.publish(
EventType.STREAM_END,
StreamEvent(
event_type=EventType.STREAM_END,
headline_count=len(items),
),
)
def cleanup(self) -> None:
"""Clean up resources."""
if self.mic:
self.mic.stop()

View File

@@ -1,25 +0,0 @@
"""
Event emitter protocols - abstract interfaces for event-producing components.
"""
from collections.abc import Callable
from typing import Any, Protocol
class EventEmitter(Protocol):
"""Protocol for components that emit events."""
def subscribe(self, callback: Callable[[Any], None]) -> None: ...
def unsubscribe(self, callback: Callable[[Any], None]) -> None: ...
class Startable(Protocol):
"""Protocol for components that can be started."""
def start(self) -> Any: ...
class Stoppable(Protocol):
"""Protocol for components that can be stopped."""
def stop(self) -> None: ...

View File

@@ -1,72 +0,0 @@
"""
Event bus - pub/sub messaging for decoupled component communication.
"""
import threading
from collections import defaultdict
from collections.abc import Callable
from typing import Any
from engine.events import EventType
class EventBus:
"""Thread-safe event bus for publish-subscribe messaging."""
def __init__(self):
self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict(
list
)
self._lock = threading.Lock()
def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None:
"""Register a callback for a specific event type."""
with self._lock:
self._subscribers[event_type].append(callback)
def unsubscribe(
self, event_type: EventType, callback: Callable[[Any], None]
) -> None:
"""Remove a callback for a specific event type."""
with self._lock:
if callback in self._subscribers[event_type]:
self._subscribers[event_type].remove(callback)
def publish(self, event_type: EventType, event: Any = None) -> None:
"""Publish an event to all subscribers."""
with self._lock:
callbacks = list(self._subscribers.get(event_type, []))
for callback in callbacks:
try:
callback(event)
except Exception:
pass
def clear(self) -> None:
"""Remove all subscribers."""
with self._lock:
self._subscribers.clear()
def subscriber_count(self, event_type: EventType | None = None) -> int:
"""Get subscriber count for an event type, or total if None."""
with self._lock:
if event_type is None:
return sum(len(cb) for cb in self._subscribers.values())
return len(self._subscribers.get(event_type, []))
_event_bus: EventBus | None = None
def get_event_bus() -> EventBus:
"""Get the global event bus instance."""
global _event_bus
if _event_bus is None:
_event_bus = EventBus()
return _event_bus
def set_event_bus(bus: EventBus) -> None:
"""Set the global event bus instance (for testing)."""
global _event_bus
_event_bus = bus

View File

@@ -1,67 +0,0 @@
"""
Event types for the mainline application.
Defines the core events that flow through the system.
These types support a future migration to an event-driven architecture.
"""
from dataclasses import dataclass
from datetime import datetime
from enum import Enum, auto
class EventType(Enum):
"""Core event types in the mainline application."""
NEW_HEADLINE = auto()
FRAME_TICK = auto()
MIC_LEVEL = auto()
NTFY_MESSAGE = auto()
STREAM_START = auto()
STREAM_END = auto()
@dataclass
class HeadlineEvent:
"""Event emitted when a new headline is ready for display."""
title: str
source: str
timestamp: str
language: str | None = None
@dataclass
class FrameTickEvent:
"""Event emitted on each render frame."""
frame_number: int
timestamp: datetime
delta_seconds: float
@dataclass
class MicLevelEvent:
"""Event emitted when microphone level changes significantly."""
db_level: float
excess_above_threshold: float
timestamp: datetime
@dataclass
class NtfyMessageEvent:
"""Event emitted when an ntfy message is received."""
title: str
body: str
message_id: str | None = None
timestamp: datetime | None = None
@dataclass
class StreamEvent:
"""Event emitted when stream starts or ends."""
event_type: EventType
headline_count: int = 0
timestamp: datetime | None = None

View File

@@ -8,7 +8,6 @@ import pathlib
import re
import urllib.request
from datetime import datetime
from typing import Any
import feedparser
@@ -17,13 +16,9 @@ from engine.filter import skip, strip_tags
from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import boot_ln
# Type alias for headline items
HeadlineTuple = tuple[str, str, str]
# ─── SINGLE FEED ──────────────────────────────────────────
def fetch_feed(url: str) -> Any | None:
"""Fetch and parse a single RSS feed URL."""
def fetch_feed(url):
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT)
@@ -33,9 +28,8 @@ def fetch_feed(url: str) -> Any | None:
# ─── ALL RSS FEEDS ────────────────────────────────────────
def fetch_all() -> tuple[list[HeadlineTuple], int, int]:
"""Fetch all RSS feeds and return items, linked count, failed count."""
items: list[HeadlineTuple] = []
def fetch_all():
items = []
linked = failed = 0
for src, url in FEEDS.items():
feed = fetch_feed(url)
@@ -65,7 +59,7 @@ def fetch_all() -> tuple[list[HeadlineTuple], int, int]:
# ─── PROJECT GUTENBERG ────────────────────────────────────
def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]:
def _fetch_gutenberg(url, label):
"""Download and parse stanzas/passages from a Project Gutenberg text."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})

View File

@@ -1,57 +0,0 @@
"""
Frame timing utilities — FPS control and precise timing.
"""
import time
class FrameTimer:
"""Frame timer for consistent render loop timing."""
def __init__(self, target_frame_dt: float = 0.05):
self.target_frame_dt = target_frame_dt
self._frame_count = 0
self._start_time = time.monotonic()
self._last_frame_time = self._start_time
@property
def fps(self) -> float:
"""Current FPS based on elapsed frames."""
elapsed = time.monotonic() - self._start_time
if elapsed > 0:
return self._frame_count / elapsed
return 0.0
def sleep_until_next_frame(self) -> float:
"""Sleep to maintain target frame rate. Returns actual elapsed time."""
now = time.monotonic()
elapsed = now - self._last_frame_time
self._last_frame_time = now
self._frame_count += 1
sleep_time = max(0, self.target_frame_dt - elapsed)
if sleep_time > 0:
time.sleep(sleep_time)
return elapsed
def reset(self) -> None:
"""Reset frame counter and start time."""
self._frame_count = 0
self._start_time = time.monotonic()
self._last_frame_time = self._start_time
def calculate_scroll_step(
scroll_dur: float, view_height: int, padding: int = 15
) -> float:
"""Calculate scroll step interval for smooth scrolling.
Args:
scroll_dur: Duration in seconds for one headline to scroll through view
view_height: Terminal height in rows
padding: Extra rows for off-screen content
Returns:
Time in seconds between scroll steps
"""
return scroll_dur / (view_height + padding) * 2

View File

@@ -1,201 +0,0 @@
"""
Layer compositing — message overlay, ticker zone, firehose, noise.
Depends on: config, render, effects.
"""
import random
import re
import time
from datetime import datetime
from engine import config
from engine.effects import (
fade_line,
firehose_line,
glitch_bar,
noise,
vis_trunc,
)
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite
from engine.terminal import RST, W_COOL
MSG_META = "\033[38;5;245m"
MSG_BORDER = "\033[2;38;5;37m"
def render_message_overlay(
msg: tuple[str, str, float] | None,
w: int,
h: int,
msg_cache: tuple,
) -> tuple[list[str], tuple]:
"""Render ntfy message overlay.
Args:
msg: (title, body, timestamp) or None
w: terminal width
h: terminal height
msg_cache: (cache_key, rendered_rows) for caching
Returns:
(list of ANSI strings, updated cache)
"""
overlay = []
if msg is None:
return overlay, msg_cache
m_title, m_body, m_ts = msg
display_text = m_body or m_title or "(empty)"
display_text = re.sub(r"\s+", " ", display_text.upper())
cache_key = (display_text, w)
if msg_cache[0] != cache_key:
msg_rows = big_wrap(display_text, w - 4)
msg_cache = (cache_key, msg_rows)
else:
msg_rows = msg_cache[1]
msg_rows = lr_gradient_opposite(
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
)
elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
ts_str = datetime.now().strftime("%H:%M:%S")
panel_h = len(msg_rows) + 2
panel_top = max(0, (h - panel_h) // 2)
row_idx = 0
for mr in msg_rows:
ln = vis_trunc(mr, w)
overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K")
row_idx += 1
meta_parts = []
if m_title and m_title != m_body:
meta_parts.append(m_title)
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
meta = (
" " + " \u00b7 ".join(meta_parts)
if len(meta_parts) > 1
else " " + meta_parts[0]
)
overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K")
row_idx += 1
bar = "\u2500" * (w - 4)
overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K")
return overlay, msg_cache
def render_ticker_zone(
active: list,
scroll_cam: int,
ticker_h: int,
w: int,
noise_cache: dict,
grad_offset: float,
) -> tuple[list[str], dict]:
"""Render the ticker scroll zone.
Args:
active: list of (content_rows, color, canvas_y, meta_idx)
scroll_cam: camera position (viewport top)
ticker_h: height of ticker zone
w: terminal width
noise_cache: dict of cy -> noise string
grad_offset: gradient animation offset
Returns:
(list of ANSI strings, updated noise_cache)
"""
buf = []
top_zone = max(1, int(ticker_h * 0.25))
bot_zone = max(1, int(ticker_h * 0.10))
def noise_at(cy):
if cy not in noise_cache:
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
return noise_cache[cy]
for r in range(ticker_h):
scr_row = r + 1
cy = scroll_cam + r
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
row_fade = min(top_f, bot_f)
drawn = False
for content, hc, by, midx in active:
cr = cy - by
if 0 <= cr < len(content):
raw = content[cr]
if cr != midx:
colored = lr_gradient([raw], grad_offset)[0]
else:
colored = raw
ln = vis_trunc(colored, w)
if row_fade < 1.0:
ln = fade_line(ln, row_fade)
if cr == midx:
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
elif ln.strip():
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
else:
buf.append(f"\033[{scr_row};1H\033[K")
drawn = True
break
if not drawn:
n = noise_at(cy)
if row_fade < 1.0 and n:
n = fade_line(n, row_fade)
if n:
buf.append(f"\033[{scr_row};1H{n}")
else:
buf.append(f"\033[{scr_row};1H\033[K")
return buf, noise_cache
def apply_glitch(
buf: list[str],
ticker_buf_start: int,
mic_excess: float,
w: int,
) -> list[str]:
"""Apply glitch effect to ticker buffer.
Args:
buf: current buffer
ticker_buf_start: index where ticker starts in buffer
mic_excess: mic level above threshold
w: terminal width
Returns:
Updated buffer with glitches applied
"""
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
n_hits = 4 + int(mic_excess / 2)
ticker_buf_len = len(buf) - ticker_buf_start
if random.random() < glitch_prob and ticker_buf_len > 0:
for _ in range(min(n_hits, ticker_buf_len)):
gi = random.randint(0, ticker_buf_len - 1)
scr_row = gi + 1
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
return buf
def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
"""Render firehose strip at bottom of screen."""
buf = []
if fh > 0:
for fr in range(fh):
scr_row = h - fh + fr + 1
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
return buf

View File

@@ -4,8 +4,6 @@ Gracefully degrades if sounddevice/numpy are unavailable.
"""
import atexit
from collections.abc import Callable
from datetime import datetime
try:
import numpy as _np
@@ -16,9 +14,6 @@ except Exception:
_HAS_MIC = False
from engine.events import MicLevelEvent
class MicMonitor:
"""Background mic stream that exposes current RMS dB level."""
@@ -26,7 +21,6 @@ class MicMonitor:
self.threshold_db = threshold_db
self._db = -99.0
self._stream = None
self._subscribers: list[Callable[[MicLevelEvent], None]] = []
@property
def available(self):
@@ -43,23 +37,6 @@ class MicMonitor:
"""dB above threshold (clamped to 0)."""
return max(0.0, self._db - self.threshold_db)
def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Register a callback to be called when mic level changes."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: MicLevelEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background mic stream. Returns True on success, False/None otherwise."""
if not _HAS_MIC:
@@ -68,13 +45,6 @@ class MicMonitor:
def _cb(indata, frames, t, status):
rms = float(_np.sqrt(_np.mean(indata**2)))
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
if self._subscribers:
event = MicLevelEvent(
db_level=self._db,
excess_above_threshold=max(0.0, self._db - self.threshold_db),
timestamp=datetime.now(),
)
self._emit(event)
try:
self._stream = _sd.InputStream(

View File

@@ -16,12 +16,8 @@ import json
import threading
import time
import urllib.request
from collections.abc import Callable
from datetime import datetime
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
from engine.events import NtfyMessageEvent
class NtfyPoller:
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
@@ -32,24 +28,6 @@ class NtfyPoller:
self.display_secs = display_secs
self._message = None # (title, body, monotonic_timestamp) or None
self._lock = threading.Lock()
self._subscribers: list[Callable[[NtfyMessageEvent], None]] = []
def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Register a callback to be called when a message is received."""
self._subscribers.append(callback)
def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
"""Remove a registered callback."""
if callback in self._subscribers:
self._subscribers.remove(callback)
def _emit(self, event: NtfyMessageEvent) -> None:
"""Emit an event to all subscribers."""
for cb in self._subscribers:
try:
cb(event)
except Exception:
pass
def start(self):
"""Start background stream thread. Returns True."""
@@ -110,13 +88,6 @@ class NtfyPoller:
data.get("message", ""),
time.monotonic(),
)
event = NtfyMessageEvent(
title=data.get("title", ""),
body=data.get("message", ""),
message_id=data.get("id"),
timestamp=datetime.now(),
)
self._emit(event)
except Exception:
pass
time.sleep(self.reconnect_delay)

View File

@@ -1,22 +1,25 @@
"""
Render engine — ticker content, scroll motion, message panel, and firehose overlay.
Orchestrates viewport, frame timing, and layers.
Depends on: config, terminal, render, effects, ntfy, mic.
"""
import random
import re
import sys
import time
from datetime import datetime
from engine import config
from engine.frame import calculate_scroll_step
from engine.layers import (
apply_glitch,
render_firehose,
render_message_overlay,
render_ticker_zone,
from engine.effects import (
fade_line,
firehose_line,
glitch_bar,
next_headline,
noise,
vis_trunc,
)
from engine.terminal import CLR
from engine.viewport import th, tw
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite, make_block
from engine.terminal import CLR, RST, W_COOL, th, tw
def stream(items, ntfy_poller, mic_monitor):
@@ -32,51 +35,114 @@ def stream(items, ntfy_poller, mic_monitor):
w, h = tw(), th()
fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh
GAP = 3
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
ticker_view_h = h - fh # reserve fixed firehose strip at bottom
GAP = 3 # blank rows between headlines
scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2
# Taxonomy:
# - message: centered ntfy overlay panel
# - ticker: large headline text content
# - scroll: upward camera motion applied to ticker content
# - firehose: fixed carriage-return style strip pinned at bottom
# Active ticker blocks: (content_rows, color, canvas_y, meta_idx)
active = []
scroll_cam = 0
ticker_next_y = ticker_view_h
scroll_cam = 0 # viewport top in virtual canvas coords
ticker_next_y = (
ticker_view_h # canvas-y where next block starts (off-screen bottom)
)
noise_cache = {}
scroll_motion_accum = 0.0
msg_cache = (None, None)
while True:
if queued >= config.HEADLINE_LIMIT and not active:
break
def _noise_at(cy):
if cy not in noise_cache:
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
return noise_cache[cy]
# Message color: bright cyan/white — distinct from headline greens
MSG_META = "\033[38;5;245m" # cool grey
MSG_BORDER = "\033[2;38;5;37m" # dim teal
_msg_cache = (None, None) # (cache_key, rendered_rows)
while queued < config.HEADLINE_LIMIT or active:
t0 = time.monotonic()
w, h = tw(), th()
fh = config.FIREHOSE_H if config.FIREHOSE else 0
ticker_view_h = h - fh
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
# ── Check for ntfy message ────────────────────────
msg_h = 0
msg_overlay = []
msg = ntfy_poller.get_active_message()
msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache)
buf = []
ticker_h = ticker_view_h
if msg is not None:
m_title, m_body, m_ts = msg
# ── Message overlay: centered in the viewport ──
display_text = m_body or m_title or "(empty)"
display_text = re.sub(r"\s+", " ", display_text.upper())
cache_key = (display_text, w)
if _msg_cache[0] != cache_key:
msg_rows = big_wrap(display_text, w - 4)
_msg_cache = (cache_key, msg_rows)
else:
msg_rows = _msg_cache[1]
msg_rows = lr_gradient_opposite(
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
)
# Layout: rendered text + meta + border
elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
ts_str = datetime.now().strftime("%H:%M:%S")
panel_h = len(msg_rows) + 2 # meta + border
panel_top = max(0, (h - panel_h) // 2)
row_idx = 0
for mr in msg_rows:
ln = vis_trunc(mr, w)
msg_overlay.append(
f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K"
)
row_idx += 1
# Meta line: title (if distinct) + source + countdown
meta_parts = []
if m_title and m_title != m_body:
meta_parts.append(m_title)
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
meta = (
" " + " \u00b7 ".join(meta_parts)
if len(meta_parts) > 1
else " " + meta_parts[0]
)
msg_overlay.append(
f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K"
)
row_idx += 1
# Border — constant boundary under message panel
bar = "\u2500" * (w - 4)
msg_overlay.append(
f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K"
)
# Ticker draws above the fixed firehose strip; message is a centered overlay.
ticker_h = ticker_view_h - msg_h
# ── Ticker content + scroll motion (always runs) ──
scroll_motion_accum += config.FRAME_DT
while scroll_motion_accum >= scroll_step_interval:
scroll_motion_accum -= scroll_step_interval
scroll_cam += 1
# Enqueue new headlines when room at the bottom
while (
ticker_next_y < scroll_cam + ticker_view_h + 10
and queued < config.HEADLINE_LIMIT
):
from engine.effects import next_headline
from engine.render import make_block
t, src, ts = next_headline(pool, items, seen)
ticker_content, hc, midx = make_block(t, src, ts, w)
active.append((ticker_content, hc, ticker_next_y, midx))
ticker_next_y += len(ticker_content) + GAP
queued += 1
# Prune off-screen blocks and stale noise
active = [
(c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam
]
@@ -84,26 +150,69 @@ def stream(items, ntfy_poller, mic_monitor):
if k < scroll_cam:
del noise_cache[k]
# Draw ticker zone (above fixed firehose strip)
top_zone = max(1, int(ticker_h * 0.25))
bot_zone = max(1, int(ticker_h * 0.10))
grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0
ticker_buf_start = len(buf)
ticker_buf, noise_cache = render_ticker_zone(
active, scroll_cam, ticker_h, w, noise_cache, grad_offset
)
buf.extend(ticker_buf)
ticker_buf_start = len(buf) # track where ticker rows start in buf
for r in range(ticker_h):
scr_row = r + 1 # 1-indexed ANSI screen row
cy = scroll_cam + r
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
row_fade = min(top_f, bot_f)
drawn = False
for content, hc, by, midx in active:
cr = cy - by
if 0 <= cr < len(content):
raw = content[cr]
if cr != midx:
colored = lr_gradient([raw], grad_offset)[0]
else:
colored = raw
ln = vis_trunc(colored, w)
if row_fade < 1.0:
ln = fade_line(ln, row_fade)
if cr == midx:
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
elif ln.strip():
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
else:
buf.append(f"\033[{scr_row};1H\033[K")
drawn = True
break
if not drawn:
n = _noise_at(cy)
if row_fade < 1.0 and n:
n = fade_line(n, row_fade)
if n:
buf.append(f"\033[{scr_row};1H{n}")
else:
buf.append(f"\033[{scr_row};1H\033[K")
# Glitch — base rate + mic-reactive spikes (ticker zone only)
mic_excess = mic_monitor.excess
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
firehose_buf = render_firehose(items, w, fh, h)
buf.extend(firehose_buf)
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
n_hits = 4 + int(mic_excess / 2)
ticker_buf_len = len(buf) - ticker_buf_start
if random.random() < glitch_prob and ticker_buf_len > 0:
for _ in range(min(n_hits, ticker_buf_len)):
gi = random.randint(0, ticker_buf_len - 1)
scr_row = gi + 1
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
if config.FIREHOSE and fh > 0:
for fr in range(fh):
scr_row = h - fh + fr + 1
fline = firehose_line(items, w)
buf.append(f"\033[{scr_row};1H{fline}\033[K")
if msg_overlay:
buf.extend(msg_overlay)
sys.stdout.buffer.write("".join(buf).encode())
sys.stdout.flush()
# Precise frame timing
elapsed = time.monotonic() - t0
time.sleep(max(0, config.FRAME_DT - elapsed))

View File

@@ -7,16 +7,26 @@ import json
import re
import urllib.parse
import urllib.request
from functools import lru_cache
from engine.sources import LOCATION_LANGS
TRANSLATE_CACHE_SIZE = 500
_TRANSLATE_CACHE = {}
@lru_cache(maxsize=TRANSLATE_CACHE_SIZE)
def _translate_cached(title: str, target_lang: str) -> str:
"""Cached translation implementation."""
def detect_location_language(title):
"""Detect if headline mentions a location, return target language."""
title_lower = title.lower()
for pattern, lang in LOCATION_LANGS.items():
if re.search(pattern, title_lower):
return lang
return None
def translate_headline(title, target_lang):
"""Translate headline via Google Translate API (zero dependencies)."""
key = (title, target_lang)
if key in _TRANSLATE_CACHE:
return _TRANSLATE_CACHE[key]
try:
q = urllib.parse.quote(title)
url = (
@@ -29,18 +39,5 @@ def _translate_cached(title: str, target_lang: str) -> str:
result = "".join(p[0] for p in data[0] if p[0]) or title
except Exception:
result = title
_TRANSLATE_CACHE[key] = result
return result
def detect_location_language(title):
"""Detect if headline mentions a location, return target language."""
title_lower = title.lower()
for pattern, lang in LOCATION_LANGS.items():
if re.search(pattern, title_lower):
return lang
return None
def translate_headline(title: str, target_lang: str) -> str:
"""Translate headline via Google Translate API (zero dependencies)."""
return _translate_cached(title, target_lang)

View File

@@ -1,60 +0,0 @@
"""
Shared dataclasses for the mainline application.
Provides named types for tuple returns across modules.
"""
from dataclasses import dataclass
@dataclass
class HeadlineItem:
"""A single headline item: title, source, and timestamp."""
title: str
source: str
timestamp: str
def to_tuple(self) -> tuple[str, str, str]:
"""Convert to tuple for backward compatibility."""
return (self.title, self.source, self.timestamp)
@classmethod
def from_tuple(cls, t: tuple[str, str, str]) -> "HeadlineItem":
"""Create from tuple for backward compatibility."""
return cls(title=t[0], source=t[1], timestamp=t[2])
def items_to_tuples(items: list[HeadlineItem]) -> list[tuple[str, str, str]]:
"""Convert list of HeadlineItem to list of tuples."""
return [item.to_tuple() for item in items]
def tuples_to_items(tuples: list[tuple[str, str, str]]) -> list[HeadlineItem]:
"""Convert list of tuples to list of HeadlineItem."""
return [HeadlineItem.from_tuple(t) for t in tuples]
@dataclass
class FetchResult:
"""Result from fetch_all() or fetch_poetry()."""
items: list[HeadlineItem]
linked: int
failed: int
def to_legacy_tuple(self) -> tuple[list[tuple], int, int]:
"""Convert to legacy tuple format for backward compatibility."""
return ([item.to_tuple() for item in self.items], self.linked, self.failed)
@dataclass
class Block:
"""Rendered headline block from make_block()."""
content: list[str]
color: str
meta_row_index: int
def to_legacy_tuple(self) -> tuple[list[str], str, int]:
"""Convert to legacy tuple format for backward compatibility."""
return (self.content, self.color, self.meta_row_index)

View File

@@ -1,37 +0,0 @@
"""
Viewport utilities — terminal dimensions and ANSI positioning helpers.
No internal dependencies.
"""
import os
def tw() -> int:
"""Get terminal width (columns)."""
try:
return os.get_terminal_size().columns
except Exception:
return 80
def th() -> int:
"""Get terminal height (lines)."""
try:
return os.get_terminal_size().lines
except Exception:
return 24
def move_to(row: int, col: int = 1) -> str:
"""Generate ANSI escape to move cursor to row, col (1-indexed)."""
return f"\033[{row};{col}H"
def clear_screen() -> str:
"""Clear screen and move cursor to home."""
return "\033[2J\033[H"
def clear_line() -> str:
"""Clear current line."""
return "\033[K"

View File

@@ -22,7 +22,6 @@ classifiers = [
dependencies = [
"feedparser>=6.0.0",
"Pillow>=10.0.0",
"pyright>=1.1.408",
]
[project.optional-dependencies]

View File

@@ -1,236 +0,0 @@
"""
Pytest fixtures for mocking external dependencies (network, filesystem).
"""
import json
from unittest.mock import MagicMock
import pytest
@pytest.fixture
def mock_feed_response():
"""Mock RSS feed response data."""
return b"""<?xml version="1.0" encoding="UTF-8" ?>
<rss version="2.0">
<channel>
<title>Test Feed</title>
<link>https://example.com</link>
<item>
<title>Test Headline One</title>
<pubDate>Sat, 15 Mar 2025 12:00:00 GMT</pubDate>
</item>
<item>
<title>Test Headline Two</title>
<pubDate>Sat, 15 Mar 2025 11:00:00 GMT</pubDate>
</item>
<item>
<title>Sports: Team Wins Championship</title>
<pubDate>Sat, 15 Mar 2025 10:00:00 GMT</pubDate>
</item>
</channel>
</rss>"""
@pytest.fixture
def mock_gutenberg_response():
"""Mock Project Gutenberg text response."""
return """Project Gutenberg's Collection, by Various
*** START OF SOME TEXT ***
This is a test poem with multiple lines
that should be parsed as stanzas.
Another stanza here with different content
and more lines to test the parsing logic.
Yet another stanza for variety
in the test data.
*** END OF SOME TEXT ***"""
@pytest.fixture
def mock_gutenberg_empty():
"""Mock Gutenberg response with no valid stanzas."""
return """Project Gutenberg's Collection
*** START OF TEXT ***
THIS IS ALL CAPS AND SHOULD BE SKIPPED
I.
*** END OF TEXT ***"""
@pytest.fixture
def mock_ntfy_message():
"""Mock ntfy.sh SSE message."""
return json.dumps(
{
"id": "test123",
"event": "message",
"title": "Test Title",
"message": "Test message body",
"time": 1234567890,
}
).encode()
@pytest.fixture
def mock_ntfy_keepalive():
"""Mock ntfy.sh keepalive message."""
return b'data: {"event":"keepalive"}\n\n'
@pytest.fixture
def mock_google_translate_response():
"""Mock Google Translate API response."""
return json.dumps(
[
[["Translated text", "Original text", None, 0.8], None, "en"],
None,
None,
[],
[],
[],
[],
]
)
@pytest.fixture
def mock_feedparser():
"""Create a mock feedparser.parse function."""
def _mock(data):
mock_result = MagicMock()
mock_result.bozo = False
mock_result.entries = [
{
"title": "Test Headline",
"published_parsed": (2025, 3, 15, 12, 0, 0, 0, 0, 0),
},
{
"title": "Another Headline",
"updated_parsed": (2025, 3, 15, 11, 0, 0, 0, 0, 0),
},
]
return mock_result
return _mock
@pytest.fixture
def mock_urllib_open(mock_feed_response):
"""Create a mock urllib.request.urlopen that returns feed data."""
def _mock(url):
mock_response = MagicMock()
mock_response.read.return_value = mock_feed_response
return mock_response
return _mock
@pytest.fixture
def sample_items():
"""Sample items as returned by fetch module (title, source, timestamp)."""
return [
("Headline One", "Test Source", "12:00"),
("Headline Two", "Another Source", "11:30"),
("Headline Three", "Third Source", "10:45"),
]
@pytest.fixture
def sample_config():
"""Sample config for testing."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="news",
firehose=False,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)
@pytest.fixture
def poetry_config():
"""Sample config for poetry mode."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="poetry",
firehose=False,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)
@pytest.fixture
def firehose_config():
"""Sample config with firehose enabled."""
from engine.config import Config
return Config(
headline_limit=100,
feed_timeout=10,
mic_threshold_db=50,
mode="news",
firehose=True,
ntfy_topic="https://ntfy.sh/test/json",
ntfy_reconnect_delay=5,
message_display_secs=30,
font_dir="fonts",
font_path="",
font_index=0,
font_picker=False,
font_sz=60,
render_h=8,
ssaa=4,
scroll_dur=5.625,
frame_dt=0.05,
firehose_h=12,
grad_speed=0.08,
glitch_glyphs="░▒▓█▌▐",
kata_glyphs="ハミヒーウ",
script_fonts={},
)

View File

@@ -7,8 +7,6 @@ import tempfile
from pathlib import Path
from unittest.mock import patch
import pytest
from engine import config
@@ -162,140 +160,3 @@ class TestSetFontSelection:
config.set_font_selection(font_path=None, font_index=None)
assert original_path == config.FONT_PATH
assert original_index == config.FONT_INDEX
class TestConfigDataclass:
"""Tests for Config dataclass."""
def test_config_has_required_fields(self):
"""Config has all required fields."""
c = config.Config()
assert hasattr(c, "headline_limit")
assert hasattr(c, "feed_timeout")
assert hasattr(c, "mic_threshold_db")
assert hasattr(c, "mode")
assert hasattr(c, "firehose")
assert hasattr(c, "ntfy_topic")
assert hasattr(c, "ntfy_reconnect_delay")
assert hasattr(c, "message_display_secs")
assert hasattr(c, "font_dir")
assert hasattr(c, "font_path")
assert hasattr(c, "font_index")
assert hasattr(c, "font_picker")
assert hasattr(c, "font_sz")
assert hasattr(c, "render_h")
assert hasattr(c, "ssaa")
assert hasattr(c, "scroll_dur")
assert hasattr(c, "frame_dt")
assert hasattr(c, "firehose_h")
assert hasattr(c, "grad_speed")
assert hasattr(c, "glitch_glyphs")
assert hasattr(c, "kata_glyphs")
assert hasattr(c, "script_fonts")
def test_config_defaults(self):
"""Config has sensible defaults."""
c = config.Config()
assert c.headline_limit == 1000
assert c.feed_timeout == 10
assert c.mic_threshold_db == 50
assert c.mode == "news"
assert c.firehose is False
assert c.ntfy_reconnect_delay == 5
assert c.message_display_secs == 30
def test_config_is_immutable(self):
"""Config is frozen (immutable)."""
c = config.Config()
with pytest.raises(AttributeError):
c.headline_limit = 500 # type: ignore
def test_config_custom_values(self):
"""Config accepts custom values."""
c = config.Config(
headline_limit=500,
mode="poetry",
firehose=True,
ntfy_topic="https://ntfy.sh/test",
)
assert c.headline_limit == 500
assert c.mode == "poetry"
assert c.firehose is True
assert c.ntfy_topic == "https://ntfy.sh/test"
class TestConfigFromArgs:
"""Tests for Config.from_args method."""
def test_from_args_defaults(self):
"""from_args creates config with defaults from empty argv."""
c = config.Config.from_args(["prog"])
assert c.mode == "news"
assert c.firehose is False
assert c.font_picker is True
def test_from_args_poetry_mode(self):
"""from_args detects --poetry flag."""
c = config.Config.from_args(["prog", "--poetry"])
assert c.mode == "poetry"
def test_from_args_poetry_short_flag(self):
"""from_args detects -p short flag."""
c = config.Config.from_args(["prog", "-p"])
assert c.mode == "poetry"
def test_from_args_firehose(self):
"""from_args detects --firehose flag."""
c = config.Config.from_args(["prog", "--firehose"])
assert c.firehose is True
def test_from_args_no_font_picker(self):
"""from_args detects --no-font-picker flag."""
c = config.Config.from_args(["prog", "--no-font-picker"])
assert c.font_picker is False
def test_from_args_font_index(self):
"""from_args parses --font-index."""
c = config.Config.from_args(["prog", "--font-index", "3"])
assert c.font_index == 3
class TestGetSetConfig:
"""Tests for get_config and set_config functions."""
def test_get_config_returns_config(self):
"""get_config returns a Config instance."""
c = config.get_config()
assert isinstance(c, config.Config)
def test_set_config_allows_injection(self):
"""set_config allows injecting a custom config."""
custom = config.Config(mode="poetry", headline_limit=100)
config.set_config(custom)
assert config.get_config().mode == "poetry"
assert config.get_config().headline_limit == 100
def test_set_config_then_get_config(self):
"""set_config followed by get_config returns the set config."""
original = config.get_config()
test_config = config.Config(headline_limit=42)
config.set_config(test_config)
result = config.get_config()
assert result.headline_limit == 42
config.set_config(original)
class TestPlatformFontPaths:
"""Tests for platform font path detection."""
def test_get_platform_font_paths_returns_dict(self):
"""_get_platform_font_paths returns a dictionary."""
fonts = config._get_platform_font_paths()
assert isinstance(fonts, dict)
def test_platform_font_paths_common_languages(self):
"""Common language font mappings exist."""
fonts = config._get_platform_font_paths()
common = {"ja", "zh-cn", "ko", "ru", "ar", "hi"}
found = set(fonts.keys()) & common
assert len(found) > 0

View File

@@ -1,85 +0,0 @@
"""
Tests for engine.controller module.
"""
from unittest.mock import MagicMock, patch
from engine import config
from engine.controller import StreamController
class TestStreamController:
"""Tests for StreamController class."""
def test_init_default_config(self):
"""StreamController initializes with default config."""
controller = StreamController()
assert controller.config is not None
assert isinstance(controller.config, config.Config)
def test_init_custom_config(self):
"""StreamController accepts custom config."""
custom_config = config.Config(headline_limit=500)
controller = StreamController(config=custom_config)
assert controller.config.headline_limit == 500
def test_init_sources_none_by_default(self):
"""Sources are None until initialized."""
controller = StreamController()
assert controller.mic is None
assert controller.ntfy is None
@patch("engine.controller.MicMonitor")
@patch("engine.controller.NtfyPoller")
def test_initialize_sources(self, mock_ntfy, mock_mic):
"""initialize_sources creates mic and ntfy instances."""
mock_mic_instance = MagicMock()
mock_mic_instance.available = True
mock_mic_instance.start.return_value = True
mock_mic.return_value = mock_mic_instance
mock_ntfy_instance = MagicMock()
mock_ntfy_instance.start.return_value = True
mock_ntfy.return_value = mock_ntfy_instance
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
assert mic_ok is True
assert ntfy_ok is True
assert controller.mic is not None
assert controller.ntfy is not None
@patch("engine.controller.MicMonitor")
@patch("engine.controller.NtfyPoller")
def test_initialize_sources_mic_unavailable(self, mock_ntfy, mock_mic):
"""initialize_sources handles unavailable mic."""
mock_mic_instance = MagicMock()
mock_mic_instance.available = False
mock_mic.return_value = mock_mic_instance
mock_ntfy_instance = MagicMock()
mock_ntfy_instance.start.return_value = True
mock_ntfy.return_value = mock_ntfy_instance
controller = StreamController()
mic_ok, ntfy_ok = controller.initialize_sources()
assert mic_ok is False
assert ntfy_ok is True
class TestStreamControllerCleanup:
"""Tests for StreamController cleanup."""
@patch("engine.controller.MicMonitor")
def test_cleanup_stops_mic(self, mock_mic):
"""cleanup stops the microphone if running."""
mock_mic_instance = MagicMock()
mock_mic.return_value = mock_mic_instance
controller = StreamController()
controller.mic = mock_mic_instance
controller.cleanup()
mock_mic_instance.stop.assert_called_once()

View File

@@ -1,69 +0,0 @@
"""
Tests for engine.emitters module.
"""
from engine.emitters import EventEmitter, Startable, Stoppable
class TestEventEmitterProtocol:
"""Tests for EventEmitter protocol."""
def test_protocol_exists(self):
"""EventEmitter protocol is defined."""
assert EventEmitter is not None
def test_protocol_has_subscribe_method(self):
"""EventEmitter has subscribe method in protocol."""
assert hasattr(EventEmitter, "subscribe")
def test_protocol_has_unsubscribe_method(self):
"""EventEmitter has unsubscribe method in protocol."""
assert hasattr(EventEmitter, "unsubscribe")
class TestStartableProtocol:
"""Tests for Startable protocol."""
def test_protocol_exists(self):
"""Startable protocol is defined."""
assert Startable is not None
def test_protocol_has_start_method(self):
"""Startable has start method in protocol."""
assert hasattr(Startable, "start")
class TestStoppableProtocol:
"""Tests for Stoppable protocol."""
def test_protocol_exists(self):
"""Stoppable protocol is defined."""
assert Stoppable is not None
def test_protocol_has_stop_method(self):
"""Stoppable has stop method in protocol."""
assert hasattr(Stoppable, "stop")
class TestProtocolCompliance:
"""Tests that existing classes comply with protocols."""
def test_ntfy_poller_complies_with_protocol(self):
"""NtfyPoller implements EventEmitter protocol."""
from engine.ntfy import NtfyPoller
poller = NtfyPoller("http://example.com/topic")
assert hasattr(poller, "subscribe")
assert hasattr(poller, "unsubscribe")
assert callable(poller.subscribe)
assert callable(poller.unsubscribe)
def test_mic_monitor_complies_with_protocol(self):
"""MicMonitor implements EventEmitter and Startable protocols."""
from engine.mic import MicMonitor
monitor = MicMonitor()
assert hasattr(monitor, "subscribe")
assert hasattr(monitor, "unsubscribe")
assert hasattr(monitor, "start")
assert hasattr(monitor, "stop")

View File

@@ -1,202 +0,0 @@
"""
Tests for engine.eventbus module.
"""
from engine.eventbus import EventBus, get_event_bus, set_event_bus
from engine.events import EventType, NtfyMessageEvent
class TestEventBusInit:
"""Tests for EventBus initialization."""
def test_init_creates_empty_subscribers(self):
"""EventBus starts with no subscribers."""
bus = EventBus()
assert bus.subscriber_count() == 0
class TestEventBusSubscribe:
"""Tests for EventBus.subscribe method."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback for an event type."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
def test_subscribe_multiple_callbacks_same_event(self):
"""Multiple callbacks can be subscribed to the same event type."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.NTFY_MESSAGE, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 2
def test_subscribe_different_event_types(self):
"""Callbacks can be subscribed to different event types."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
assert bus.subscriber_count(EventType.MIC_LEVEL) == 1
class TestEventBusUnsubscribe:
"""Tests for EventBus.unsubscribe method."""
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
bus = EventBus()
def callback(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, callback)
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 0
def test_unsubscribe_nonexistent_callback_no_error(self):
"""unsubscribe() handles non-existent callback gracefully."""
bus = EventBus()
def callback(e):
return None
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
class TestEventBusPublish:
"""Tests for EventBus.publish method."""
def test_publish_calls_subscriber(self):
"""publish() calls the subscriber callback."""
bus = EventBus()
received = []
def callback(event):
received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received) == 1
assert received[0].title == "Test"
def test_publish_multiple_subscribers(self):
"""publish() calls all subscribers for an event type."""
bus = EventBus()
received1 = []
received2 = []
def callback1(event):
received1.append(event)
def callback2(event):
received2.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, callback1)
bus.subscribe(EventType.NTFY_MESSAGE, callback2)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(received1) == 1
assert len(received2) == 1
def test_publish_different_event_types(self):
"""publish() only calls subscribers for the specific event type."""
bus = EventBus()
ntfy_received = []
mic_received = []
def ntfy_callback(event):
ntfy_received.append(event)
def mic_callback(event):
mic_received.append(event)
bus.subscribe(EventType.NTFY_MESSAGE, ntfy_callback)
bus.subscribe(EventType.MIC_LEVEL, mic_callback)
event = NtfyMessageEvent(title="Test", body="Body")
bus.publish(EventType.NTFY_MESSAGE, event)
assert len(ntfy_received) == 1
assert len(mic_received) == 0
class TestEventBusClear:
"""Tests for EventBus.clear method."""
def test_clear_removes_all_subscribers(self):
"""clear() removes all subscribers."""
bus = EventBus()
def cb1(e):
return None
def cb2(e):
return None
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
bus.subscribe(EventType.MIC_LEVEL, cb2)
bus.clear()
assert bus.subscriber_count() == 0
class TestEventBusThreadSafety:
"""Tests for EventBus thread safety."""
def test_concurrent_subscribe_unsubscribe(self):
"""subscribe and unsubscribe can be called concurrently."""
import threading
bus = EventBus()
callbacks = [lambda e: None for _ in range(10)]
def subscribe():
for cb in callbacks:
bus.subscribe(EventType.NTFY_MESSAGE, cb)
def unsubscribe():
for cb in callbacks:
bus.unsubscribe(EventType.NTFY_MESSAGE, cb)
t1 = threading.Thread(target=subscribe)
t2 = threading.Thread(target=unsubscribe)
t1.start()
t2.start()
t1.join()
t2.join()
class TestGlobalEventBus:
"""Tests for global event bus functions."""
def test_get_event_bus_returns_singleton(self):
"""get_event_bus() returns the same instance."""
bus1 = get_event_bus()
bus2 = get_event_bus()
assert bus1 is bus2
def test_set_event_bus_replaces_singleton(self):
"""set_event_bus() replaces the global event bus."""
new_bus = EventBus()
set_event_bus(new_bus)
try:
assert get_event_bus() is new_bus
finally:
set_event_bus(None)

View File

@@ -1,112 +0,0 @@
"""
Tests for engine.events module.
"""
from datetime import datetime
from engine import events
class TestEventType:
"""Tests for EventType enum."""
def test_event_types_exist(self):
"""All expected event types exist."""
assert hasattr(events.EventType, "NEW_HEADLINE")
assert hasattr(events.EventType, "FRAME_TICK")
assert hasattr(events.EventType, "MIC_LEVEL")
assert hasattr(events.EventType, "NTFY_MESSAGE")
assert hasattr(events.EventType, "STREAM_START")
assert hasattr(events.EventType, "STREAM_END")
class TestHeadlineEvent:
"""Tests for HeadlineEvent dataclass."""
def test_create_headline_event(self):
"""HeadlineEvent can be created with required fields."""
e = events.HeadlineEvent(
title="Test Headline",
source="Test Source",
timestamp="12:00",
)
assert e.title == "Test Headline"
assert e.source == "Test Source"
assert e.timestamp == "12:00"
def test_headline_event_optional_language(self):
"""HeadlineEvent supports optional language field."""
e = events.HeadlineEvent(
title="Test",
source="Test",
timestamp="12:00",
language="ja",
)
assert e.language == "ja"
class TestFrameTickEvent:
"""Tests for FrameTickEvent dataclass."""
def test_create_frame_tick_event(self):
"""FrameTickEvent can be created."""
now = datetime.now()
e = events.FrameTickEvent(
frame_number=100,
timestamp=now,
delta_seconds=0.05,
)
assert e.frame_number == 100
assert e.timestamp == now
assert e.delta_seconds == 0.05
class TestMicLevelEvent:
"""Tests for MicLevelEvent dataclass."""
def test_create_mic_level_event(self):
"""MicLevelEvent can be created."""
now = datetime.now()
e = events.MicLevelEvent(
db_level=60.0,
excess_above_threshold=10.0,
timestamp=now,
)
assert e.db_level == 60.0
assert e.excess_above_threshold == 10.0
class TestNtfyMessageEvent:
"""Tests for NtfyMessageEvent dataclass."""
def test_create_ntfy_message_event(self):
"""NtfyMessageEvent can be created with required fields."""
e = events.NtfyMessageEvent(
title="Test Title",
body="Test Body",
)
assert e.title == "Test Title"
assert e.body == "Test Body"
assert e.message_id is None
def test_ntfy_message_event_with_id(self):
"""NtfyMessageEvent supports optional message_id."""
e = events.NtfyMessageEvent(
title="Test",
body="Test",
message_id="abc123",
)
assert e.message_id == "abc123"
class TestStreamEvent:
"""Tests for StreamEvent dataclass."""
def test_create_stream_event(self):
"""StreamEvent can be created."""
e = events.StreamEvent(
event_type=events.EventType.STREAM_START,
headline_count=100,
)
assert e.event_type == events.EventType.STREAM_START
assert e.headline_count == 100

View File

@@ -1,63 +0,0 @@
"""
Tests for engine.frame module.
"""
import time
from engine.frame import FrameTimer, calculate_scroll_step
class TestFrameTimer:
"""Tests for FrameTimer class."""
def test_init_default(self):
"""FrameTimer initializes with default values."""
timer = FrameTimer()
assert timer.target_frame_dt == 0.05
assert timer.fps >= 0
def test_init_custom(self):
"""FrameTimer accepts custom frame duration."""
timer = FrameTimer(target_frame_dt=0.1)
assert timer.target_frame_dt == 0.1
def test_fps_calculation(self):
"""FrameTimer calculates FPS correctly."""
timer = FrameTimer()
timer._frame_count = 10
timer._start_time = time.monotonic() - 1.0
assert timer.fps >= 9.0
def test_reset(self):
"""FrameTimer.reset() clears frame count."""
timer = FrameTimer()
timer._frame_count = 100
timer.reset()
assert timer._frame_count == 0
class TestCalculateScrollStep:
"""Tests for calculate_scroll_step function."""
def test_basic_calculation(self):
"""calculate_scroll_step returns positive value."""
result = calculate_scroll_step(5.0, 24)
assert result > 0
def test_with_padding(self):
"""calculate_scroll_step respects padding parameter."""
without_padding = calculate_scroll_step(5.0, 24, padding=0)
with_padding = calculate_scroll_step(5.0, 24, padding=15)
assert with_padding < without_padding
def test_larger_view_slower_scroll(self):
"""Larger view height results in slower scroll steps."""
small = calculate_scroll_step(5.0, 10)
large = calculate_scroll_step(5.0, 50)
assert large < small
def test_longer_duration_slower_scroll(self):
"""Longer scroll duration results in slower scroll steps."""
fast = calculate_scroll_step(2.0, 24)
slow = calculate_scroll_step(10.0, 24)
assert slow > fast

View File

@@ -1,96 +0,0 @@
"""
Tests for engine.layers module.
"""
import time
from engine import layers
class TestRenderMessageOverlay:
"""Tests for render_message_overlay function."""
def test_no_message_returns_empty(self):
"""Returns empty list when msg is None."""
result, cache = layers.render_message_overlay(None, 80, 24, (None, None))
assert result == []
assert cache[0] is None
def test_message_returns_overlay_lines(self):
"""Returns non-empty list when message is present."""
msg = ("Test Title", "Test Body", time.monotonic())
result, cache = layers.render_message_overlay(msg, 80, 24, (None, None))
assert len(result) > 0
assert cache[0] is not None
def test_cache_key_changes_with_text(self):
"""Cache key changes when message text changes."""
msg1 = ("Title1", "Body1", time.monotonic())
msg2 = ("Title2", "Body2", time.monotonic())
_, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None))
_, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1)
assert cache1[0] != cache2[0]
def test_cache_reuse_avoids_recomputation(self):
"""Cache is returned when same message is passed (interface test)."""
msg = ("Same Title", "Same Body", time.monotonic())
result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None))
result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1)
assert len(result1) > 0
assert len(result2) > 0
assert cache1[0] == cache2[0]
class TestRenderFirehose:
"""Tests for render_firehose function."""
def test_no_firehose_returns_empty(self):
"""Returns empty list when firehose height is 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 0, 24)
assert result == []
def test_firehose_returns_lines(self):
"""Returns lines when firehose height > 0."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 4, 24)
assert len(result) == 4
def test_firehose_includes_ansi_escapes(self):
"""Returns lines containing ANSI escape sequences."""
items = [("Headline", "Source", "12:00")]
result = layers.render_firehose(items, 80, 1, 24)
assert "\033[" in result[0]
class TestApplyGlitch:
"""Tests for apply_glitch function."""
def test_empty_buffer_unchanged(self):
"""Empty buffer is returned unchanged."""
result = layers.apply_glitch([], 0, 0.0, 80)
assert result == []
def test_buffer_length_preserved(self):
"""Buffer length is preserved after glitch application."""
buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)]
result = layers.apply_glitch(buf, 0, 0.5, 80)
assert len(result) == len(buf)
class TestRenderTickerZone:
"""Tests for render_ticker_zone function - focusing on interface."""
def test_returns_list(self):
"""Returns a list of strings."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
assert isinstance(result, list)
def test_returns_dict_for_cache(self):
"""Returns a dict for the noise cache."""
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
assert isinstance(cache, dict)

View File

@@ -2,11 +2,8 @@
Tests for engine.mic module.
"""
from datetime import datetime
from unittest.mock import patch
from engine.events import MicLevelEvent
class TestMicMonitorImport:
"""Tests for module import behavior."""
@@ -84,66 +81,3 @@ class TestMicMonitorStop:
monitor = MicMonitor()
monitor.stop()
assert monitor._stream is None
class TestMicMonitorEventEmission:
"""Tests for MicMonitor event emission."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def callback(e):
return None
monitor.subscribe(callback)
assert callback in monitor._subscribers
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def callback(e):
return None
monitor.subscribe(callback)
monitor.unsubscribe(callback)
assert callback not in monitor._subscribers
def test_emit_calls_subscribers(self):
"""_emit() calls all subscribers."""
from engine.mic import MicMonitor
monitor = MicMonitor()
received = []
def callback(event):
received.append(event)
monitor.subscribe(callback)
event = MicLevelEvent(
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
)
monitor._emit(event)
assert len(received) == 1
assert received[0].db_level == 60.0
def test_emit_handles_subscriber_exception(self):
"""_emit() handles exceptions in subscribers gracefully."""
from engine.mic import MicMonitor
monitor = MicMonitor()
def bad_callback(event):
raise RuntimeError("test")
monitor.subscribe(bad_callback)
event = MicLevelEvent(
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
)
monitor._emit(event)

View File

@@ -5,7 +5,6 @@ Tests for engine.ntfy module.
import time
from unittest.mock import MagicMock, patch
from engine.events import NtfyMessageEvent
from engine.ntfy import NtfyPoller
@@ -69,54 +68,3 @@ class TestNtfyPollerDismiss:
poller.dismiss()
assert poller._message is None
class TestNtfyPollerEventEmission:
"""Tests for NtfyPoller event emission."""
def test_subscribe_adds_callback(self):
"""subscribe() adds a callback."""
poller = NtfyPoller("http://example.com/topic")
def callback(e):
return None
poller.subscribe(callback)
assert callback in poller._subscribers
def test_unsubscribe_removes_callback(self):
"""unsubscribe() removes a callback."""
poller = NtfyPoller("http://example.com/topic")
def callback(e):
return None
poller.subscribe(callback)
poller.unsubscribe(callback)
assert callback not in poller._subscribers
def test_emit_calls_subscribers(self):
"""_emit() calls all subscribers."""
poller = NtfyPoller("http://example.com/topic")
received = []
def callback(event):
received.append(event)
poller.subscribe(callback)
event = NtfyMessageEvent(title="Test", body="Body")
poller._emit(event)
assert len(received) == 1
assert received[0].title == "Test"
def test_emit_handles_subscriber_exception(self):
"""_emit() handles exceptions in subscribers gracefully."""
poller = NtfyPoller("http://example.com/topic")
def bad_callback(event):
raise RuntimeError("test")
poller.subscribe(bad_callback)
event = NtfyMessageEvent(title="Test", body="Body")
poller._emit(event)

View File

@@ -1,95 +0,0 @@
"""
Tests for engine.types module.
"""
from engine.types import (
Block,
FetchResult,
HeadlineItem,
items_to_tuples,
tuples_to_items,
)
class TestHeadlineItem:
"""Tests for HeadlineItem dataclass."""
def test_create_headline_item(self):
"""Can create HeadlineItem with required fields."""
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
assert item.title == "Test"
assert item.source == "Source"
assert item.timestamp == "12:00"
def test_to_tuple(self):
"""to_tuple returns correct tuple."""
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
assert item.to_tuple() == ("Test", "Source", "12:00")
def test_from_tuple(self):
"""from_tuple creates HeadlineItem from tuple."""
item = HeadlineItem.from_tuple(("Test", "Source", "12:00"))
assert item.title == "Test"
assert item.source == "Source"
assert item.timestamp == "12:00"
class TestItemsConversion:
"""Tests for list conversion functions."""
def test_items_to_tuples(self):
"""Converts list of HeadlineItem to list of tuples."""
items = [
HeadlineItem(title="A", source="S", timestamp="10:00"),
HeadlineItem(title="B", source="T", timestamp="11:00"),
]
result = items_to_tuples(items)
assert result == [("A", "S", "10:00"), ("B", "T", "11:00")]
def test_tuples_to_items(self):
"""Converts list of tuples to list of HeadlineItem."""
tuples = [("A", "S", "10:00"), ("B", "T", "11:00")]
result = tuples_to_items(tuples)
assert len(result) == 2
assert result[0].title == "A"
assert result[1].title == "B"
class TestFetchResult:
"""Tests for FetchResult dataclass."""
def test_create_fetch_result(self):
"""Can create FetchResult."""
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
result = FetchResult(items=items, linked=1, failed=0)
assert len(result.items) == 1
assert result.linked == 1
assert result.failed == 0
def test_to_legacy_tuple(self):
"""to_legacy_tuple returns correct format."""
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
result = FetchResult(items=items, linked=1, failed=0)
legacy = result.to_legacy_tuple()
assert legacy[0] == [("Test", "Source", "12:00")]
assert legacy[1] == 1
assert legacy[2] == 0
class TestBlock:
"""Tests for Block dataclass."""
def test_create_block(self):
"""Can create Block."""
block = Block(
content=["line1", "line2"], color="\033[38;5;46m", meta_row_index=1
)
assert len(block.content) == 2
assert block.color == "\033[38;5;46m"
assert block.meta_row_index == 1
def test_to_legacy_tuple(self):
"""to_legacy_tuple returns correct format."""
block = Block(content=["line1"], color="green", meta_row_index=0)
legacy = block.to_legacy_tuple()
assert legacy == (["line1"], "green", 0)

View File

@@ -1,64 +0,0 @@
"""
Tests for engine.viewport module.
"""
from engine import viewport
class TestViewportTw:
"""Tests for tw() function."""
def test_tw_returns_int(self):
"""tw() returns an integer."""
result = viewport.tw()
assert isinstance(result, int)
def test_tw_positive(self):
"""tw() returns a positive value."""
assert viewport.tw() > 0
class TestViewportTh:
"""Tests for th() function."""
def test_th_returns_int(self):
"""th() returns an integer."""
result = viewport.th()
assert isinstance(result, int)
def test_th_positive(self):
"""th() returns a positive value."""
assert viewport.th() > 0
class TestViewportMoveTo:
"""Tests for move_to() function."""
def test_move_to_format(self):
"""move_to() returns correctly formatted ANSI escape."""
result = viewport.move_to(5, 10)
assert result == "\033[5;10H"
def test_move_to_default_col(self):
"""move_to() defaults to column 1."""
result = viewport.move_to(5)
assert result == "\033[5;1H"
class TestViewportClearScreen:
"""Tests for clear_screen() function."""
def test_clear_screen_format(self):
"""clear_screen() returns clear screen ANSI escape."""
result = viewport.clear_screen()
assert "\033[2J" in result
assert "\033[H" in result
class TestViewportClearLine:
"""Tests for clear_line() function."""
def test_clear_line_format(self):
"""clear_line() returns clear line ANSI escape."""
result = viewport.clear_line()
assert result == "\033[K"