Compare commits
5 Commits
19fb4bc4fe
...
7ff78c66ed
| Author | SHA1 | Date | |
|---|---|---|---|
| 7ff78c66ed | |||
| 15de46722a | |||
| 35e5c8d38b | |||
| cdc8094de2 | |||
| f170143939 |
110
AGENTS.md
Normal file
110
AGENTS.md
Normal file
@@ -0,0 +1,110 @@
|
|||||||
|
# Agent Development Guide
|
||||||
|
|
||||||
|
## Development Environment
|
||||||
|
|
||||||
|
This project uses:
|
||||||
|
- **mise** (mise.jdx.dev) - tool version manager and task runner
|
||||||
|
- **hk** (hk.jdx.dev) - git hook manager
|
||||||
|
- **uv** - fast Python package installer
|
||||||
|
- **ruff** - linter and formatter
|
||||||
|
- **pytest** - test runner
|
||||||
|
|
||||||
|
### Setup
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Install dependencies
|
||||||
|
mise run install
|
||||||
|
|
||||||
|
# Or equivalently:
|
||||||
|
uv sync
|
||||||
|
```
|
||||||
|
|
||||||
|
### Available Commands
|
||||||
|
|
||||||
|
```bash
|
||||||
|
mise run test # Run tests
|
||||||
|
mise run test-v # Run tests verbose
|
||||||
|
mise run test-cov # Run tests with coverage report
|
||||||
|
mise run lint # Run ruff linter
|
||||||
|
mise run lint-fix # Run ruff with auto-fix
|
||||||
|
mise run format # Run ruff formatter
|
||||||
|
mise run ci # Full CI pipeline (sync + test + coverage)
|
||||||
|
```
|
||||||
|
|
||||||
|
## Git Hooks
|
||||||
|
|
||||||
|
**At the start of every agent session**, verify hooks are installed:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
ls -la .git/hooks/pre-commit
|
||||||
|
```
|
||||||
|
|
||||||
|
If hooks are not installed, install them with:
|
||||||
|
|
||||||
|
```bash
|
||||||
|
hk init --mise
|
||||||
|
mise run pre-commit
|
||||||
|
```
|
||||||
|
|
||||||
|
The project uses hk configured in `hk.pkl`:
|
||||||
|
- **pre-commit**: runs ruff-format and ruff (with auto-fix)
|
||||||
|
- **pre-push**: runs ruff check
|
||||||
|
|
||||||
|
## Workflow Rules
|
||||||
|
|
||||||
|
### Before Committing
|
||||||
|
|
||||||
|
1. **Always run the test suite** - never commit code that fails tests:
|
||||||
|
```bash
|
||||||
|
mise run test
|
||||||
|
```
|
||||||
|
|
||||||
|
2. **Always run the linter**:
|
||||||
|
```bash
|
||||||
|
mise run lint
|
||||||
|
```
|
||||||
|
|
||||||
|
3. **Fix any lint errors** before committing (or let the pre-commit hook handle it).
|
||||||
|
|
||||||
|
4. **Review your changes** using `git diff` to understand what will be committed.
|
||||||
|
|
||||||
|
### On Failing Tests
|
||||||
|
|
||||||
|
When tests fail, **determine whether it's an out-of-date test or a correctly failing test**:
|
||||||
|
|
||||||
|
- **Out-of-date test**: The test was written for old behavior that has legitimately changed. Update the test to match the new expected behavior.
|
||||||
|
|
||||||
|
- **Correctly failing test**: The test correctly identifies a broken contract. Fix the implementation, not the test.
|
||||||
|
|
||||||
|
**Never** modify a test to make it pass without understanding why it failed.
|
||||||
|
|
||||||
|
### Code Review
|
||||||
|
|
||||||
|
Before committing significant changes:
|
||||||
|
- Run `git diff` to review all changes
|
||||||
|
- Ensure new code follows existing patterns in the codebase
|
||||||
|
- Check that type hints are added for new functions
|
||||||
|
- Verify that tests exist for new functionality
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
Tests live in `tests/` and follow the pattern `test_*.py`.
|
||||||
|
|
||||||
|
Run all tests:
|
||||||
|
```bash
|
||||||
|
mise run test
|
||||||
|
```
|
||||||
|
|
||||||
|
Run with coverage:
|
||||||
|
```bash
|
||||||
|
mise run test-cov
|
||||||
|
```
|
||||||
|
|
||||||
|
The project uses pytest with strict marker enforcement. Test configuration is in `pyproject.toml` under `[tool.pytest.ini_options]`.
|
||||||
|
|
||||||
|
## Architecture Notes
|
||||||
|
|
||||||
|
- **ntfy.py** and **mic.py** are standalone modules with zero internal dependencies
|
||||||
|
- **eventbus.py** provides thread-safe event publishing for decoupled communication
|
||||||
|
- **controller.py** coordinates ntfy/mic monitoring
|
||||||
|
- The render pipeline: fetch → render → effects → scroll → terminal output
|
||||||
41
README.md
41
README.md
@@ -101,26 +101,27 @@ Update `NTFY_TOPIC` in `engine/config.py` to point at your own topic.
|
|||||||
|
|
||||||
```
|
```
|
||||||
engine/
|
engine/
|
||||||
config.py constants, CLI flags, glyph tables
|
__init__.py package marker
|
||||||
sources.py FEEDS, POETRY_SOURCES, language/script maps
|
app.py main(), font picker TUI, boot sequence, signal handler
|
||||||
terminal.py ANSI codes, tw/th, type_out, boot_ln
|
config.py constants, CLI flags, glyph tables
|
||||||
filter.py HTML stripping, content filter
|
sources.py FEEDS, POETRY_SOURCES, language/script maps
|
||||||
translate.py Google Translate wrapper + region detection
|
terminal.py ANSI codes, tw/th, type_out, boot_ln
|
||||||
render.py OTF → half-block pipeline (SSAA, gradient)
|
filter.py HTML stripping, content filter
|
||||||
effects.py noise, glitch_bar, fade, firehose
|
translate.py Google Translate wrapper + region detection
|
||||||
fetch.py RSS/Gutenberg fetching + cache load/save
|
render.py OTF → half-block pipeline (SSAA, gradient)
|
||||||
ntfy.py NtfyPoller — standalone, zero internal deps
|
effects.py noise, glitch_bar, fade, firehose
|
||||||
mic.py MicMonitor — standalone, graceful fallback
|
fetch.py RSS/Gutenberg fetching + cache load/save
|
||||||
scroll.py stream() frame loop + message rendering
|
ntfy.py NtfyPoller — standalone, zero internal deps
|
||||||
app.py main(), font picker TUI, boot sequence, signal handler
|
mic.py MicMonitor — standalone, graceful fallback
|
||||||
|
scroll.py stream() frame loop + message rendering
|
||||||
tests/
|
viewport.py terminal dimension tracking (tw/th)
|
||||||
test_config.py
|
frame.py scroll step calculation, timing
|
||||||
test_filter.py
|
layers.py ticker zone, firehose, message overlay rendering
|
||||||
test_mic.py
|
eventbus.py thread-safe event publishing for decoupled communication
|
||||||
test_ntfy.py
|
events.py event types and definitions
|
||||||
test_sources.py
|
controller.py coordinates ntfy/mic monitoring and event publishing
|
||||||
test_terminal.py
|
emitters.py background emitters for ntfy and mic
|
||||||
|
types.py type definitions and dataclasses
|
||||||
```
|
```
|
||||||
|
|
||||||
`ntfy.py` and `mic.py` have zero internal dependencies and can be imported by any other visualizer.
|
`ntfy.py` and `mic.py` have zero internal dependencies and can be imported by any other visualizer.
|
||||||
|
|||||||
143
engine/config.py
143
engine/config.py
@@ -1,25 +1,28 @@
|
|||||||
"""
|
"""
|
||||||
Configuration constants, CLI flags, and glyph tables.
|
Configuration constants, CLI flags, and glyph tables.
|
||||||
|
Supports both global constants (backward compatible) and injected config for testing.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import sys
|
import sys
|
||||||
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
|
|
||||||
_REPO_ROOT = Path(__file__).resolve().parent.parent
|
_REPO_ROOT = Path(__file__).resolve().parent.parent
|
||||||
_FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"}
|
_FONT_EXTENSIONS = {".otf", ".ttf", ".ttc"}
|
||||||
|
|
||||||
|
|
||||||
def _arg_value(flag):
|
def _arg_value(flag, argv: list[str] | None = None):
|
||||||
"""Get value following a CLI flag, if present."""
|
"""Get value following a CLI flag, if present."""
|
||||||
if flag not in sys.argv:
|
argv = argv or sys.argv
|
||||||
|
if flag not in argv:
|
||||||
return None
|
return None
|
||||||
i = sys.argv.index(flag)
|
i = argv.index(flag)
|
||||||
return sys.argv[i + 1] if i + 1 < len(sys.argv) else None
|
return argv[i + 1] if i + 1 < len(argv) else None
|
||||||
|
|
||||||
|
|
||||||
def _arg_int(flag, default):
|
def _arg_int(flag, default, argv: list[str] | None = None):
|
||||||
"""Get int CLI argument with safe fallback."""
|
"""Get int CLI argument with safe fallback."""
|
||||||
raw = _arg_value(flag)
|
raw = _arg_value(flag, argv)
|
||||||
if raw is None:
|
if raw is None:
|
||||||
return default
|
return default
|
||||||
try:
|
try:
|
||||||
@@ -53,6 +56,134 @@ def list_repo_font_files():
|
|||||||
return _list_font_files(FONT_DIR)
|
return _list_font_files(FONT_DIR)
|
||||||
|
|
||||||
|
|
||||||
|
def _get_platform_font_paths() -> dict[str, str]:
|
||||||
|
"""Get platform-appropriate font paths for non-Latin scripts."""
|
||||||
|
import platform
|
||||||
|
|
||||||
|
system = platform.system()
|
||||||
|
|
||||||
|
if system == "Darwin":
|
||||||
|
return {
|
||||||
|
"zh-cn": "/System/Library/Fonts/STHeiti Medium.ttc",
|
||||||
|
"ja": "/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc",
|
||||||
|
"ko": "/System/Library/Fonts/AppleSDGothicNeo.ttc",
|
||||||
|
"ru": "/System/Library/Fonts/Supplemental/Arial.ttf",
|
||||||
|
"uk": "/System/Library/Fonts/Supplemental/Arial.ttf",
|
||||||
|
"el": "/System/Library/Fonts/Supplemental/Arial.ttf",
|
||||||
|
"he": "/System/Library/Fonts/Supplemental/Arial.ttf",
|
||||||
|
"ar": "/System/Library/Fonts/GeezaPro.ttc",
|
||||||
|
"fa": "/System/Library/Fonts/GeezaPro.ttc",
|
||||||
|
"hi": "/System/Library/Fonts/Kohinoor.ttc",
|
||||||
|
"th": "/System/Library/Fonts/ThonburiUI.ttc",
|
||||||
|
}
|
||||||
|
elif system == "Linux":
|
||||||
|
return {
|
||||||
|
"zh-cn": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
|
||||||
|
"ja": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
|
||||||
|
"ko": "/usr/share/fonts/truetype/noto/NotoSansCJK-Regular.ttc",
|
||||||
|
"ru": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||||||
|
"uk": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||||||
|
"el": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||||||
|
"he": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||||||
|
"ar": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||||||
|
"fa": "/usr/share/fonts/truetype/dejavu/DejaVuSans.ttf",
|
||||||
|
"hi": "/usr/share/fonts/truetype/noto/NotoSansDevanagari-Regular.ttf",
|
||||||
|
"th": "/usr/share/fonts/truetype/noto/NotoSansThai-Regular.ttf",
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass(frozen=True)
|
||||||
|
class Config:
|
||||||
|
"""Immutable configuration container for injected config."""
|
||||||
|
|
||||||
|
headline_limit: int = 1000
|
||||||
|
feed_timeout: int = 10
|
||||||
|
mic_threshold_db: int = 50
|
||||||
|
mode: str = "news"
|
||||||
|
firehose: bool = False
|
||||||
|
|
||||||
|
ntfy_topic: str = "https://ntfy.sh/klubhaus_terminal_mainline/json"
|
||||||
|
ntfy_reconnect_delay: int = 5
|
||||||
|
message_display_secs: int = 30
|
||||||
|
|
||||||
|
font_dir: str = "fonts"
|
||||||
|
font_path: str = ""
|
||||||
|
font_index: int = 0
|
||||||
|
font_picker: bool = True
|
||||||
|
font_sz: int = 60
|
||||||
|
render_h: int = 8
|
||||||
|
|
||||||
|
ssaa: int = 4
|
||||||
|
|
||||||
|
scroll_dur: float = 5.625
|
||||||
|
frame_dt: float = 0.05
|
||||||
|
firehose_h: int = 12
|
||||||
|
grad_speed: float = 0.08
|
||||||
|
|
||||||
|
glitch_glyphs: str = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
|
||||||
|
kata_glyphs: str = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
|
||||||
|
|
||||||
|
script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_args(cls, argv: list[str] | None = None) -> "Config":
|
||||||
|
"""Create Config from CLI arguments (or custom argv for testing)."""
|
||||||
|
argv = argv or sys.argv
|
||||||
|
|
||||||
|
font_dir = _resolve_font_path(_arg_value("--font-dir", argv) or "fonts")
|
||||||
|
font_file_arg = _arg_value("--font-file", argv)
|
||||||
|
font_files = _list_font_files(font_dir)
|
||||||
|
font_path = (
|
||||||
|
_resolve_font_path(font_file_arg)
|
||||||
|
if font_file_arg
|
||||||
|
else (font_files[0] if font_files else "")
|
||||||
|
)
|
||||||
|
|
||||||
|
return cls(
|
||||||
|
headline_limit=1000,
|
||||||
|
feed_timeout=10,
|
||||||
|
mic_threshold_db=50,
|
||||||
|
mode="poetry" if "--poetry" in argv or "-p" in argv else "news",
|
||||||
|
firehose="--firehose" in argv,
|
||||||
|
ntfy_topic="https://ntfy.sh/klubhaus_terminal_mainline/json",
|
||||||
|
ntfy_reconnect_delay=5,
|
||||||
|
message_display_secs=30,
|
||||||
|
font_dir=font_dir,
|
||||||
|
font_path=font_path,
|
||||||
|
font_index=max(0, _arg_int("--font-index", 0, argv)),
|
||||||
|
font_picker="--no-font-picker" not in argv,
|
||||||
|
font_sz=60,
|
||||||
|
render_h=8,
|
||||||
|
ssaa=4,
|
||||||
|
scroll_dur=5.625,
|
||||||
|
frame_dt=0.05,
|
||||||
|
firehose_h=12,
|
||||||
|
grad_speed=0.08,
|
||||||
|
glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋",
|
||||||
|
kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ",
|
||||||
|
script_fonts=_get_platform_font_paths(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
_config: Config | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_config() -> Config:
|
||||||
|
"""Get the global config instance (lazy-loaded)."""
|
||||||
|
global _config
|
||||||
|
if _config is None:
|
||||||
|
_config = Config.from_args()
|
||||||
|
return _config
|
||||||
|
|
||||||
|
|
||||||
|
def set_config(config: Config) -> None:
|
||||||
|
"""Set the global config instance (for testing)."""
|
||||||
|
global _config
|
||||||
|
_config = config
|
||||||
|
|
||||||
|
|
||||||
# ─── RUNTIME ──────────────────────────────────────────────
|
# ─── RUNTIME ──────────────────────────────────────────────
|
||||||
HEADLINE_LIMIT = 1000
|
HEADLINE_LIMIT = 1000
|
||||||
FEED_TIMEOUT = 10
|
FEED_TIMEOUT = 10
|
||||||
|
|||||||
68
engine/controller.py
Normal file
68
engine/controller.py
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
"""
|
||||||
|
Stream controller - manages input sources and orchestrates the render stream.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from engine.config import Config, get_config
|
||||||
|
from engine.eventbus import EventBus
|
||||||
|
from engine.events import EventType, StreamEvent
|
||||||
|
from engine.mic import MicMonitor
|
||||||
|
from engine.ntfy import NtfyPoller
|
||||||
|
from engine.scroll import stream
|
||||||
|
|
||||||
|
|
||||||
|
class StreamController:
|
||||||
|
"""Controls the stream lifecycle - initializes sources and runs the stream."""
|
||||||
|
|
||||||
|
def __init__(self, config: Config | None = None, event_bus: EventBus | None = None):
|
||||||
|
self.config = config or get_config()
|
||||||
|
self.event_bus = event_bus
|
||||||
|
self.mic: MicMonitor | None = None
|
||||||
|
self.ntfy: NtfyPoller | None = None
|
||||||
|
|
||||||
|
def initialize_sources(self) -> tuple[bool, bool]:
|
||||||
|
"""Initialize microphone and ntfy sources.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(mic_ok, ntfy_ok) - success status for each source
|
||||||
|
"""
|
||||||
|
self.mic = MicMonitor(threshold_db=self.config.mic_threshold_db)
|
||||||
|
mic_ok = self.mic.start() if self.mic.available else False
|
||||||
|
|
||||||
|
self.ntfy = NtfyPoller(
|
||||||
|
self.config.ntfy_topic,
|
||||||
|
reconnect_delay=self.config.ntfy_reconnect_delay,
|
||||||
|
display_secs=self.config.message_display_secs,
|
||||||
|
)
|
||||||
|
ntfy_ok = self.ntfy.start()
|
||||||
|
|
||||||
|
return bool(mic_ok), ntfy_ok
|
||||||
|
|
||||||
|
def run(self, items: list) -> None:
|
||||||
|
"""Run the stream with initialized sources."""
|
||||||
|
if self.mic is None or self.ntfy is None:
|
||||||
|
self.initialize_sources()
|
||||||
|
|
||||||
|
if self.event_bus:
|
||||||
|
self.event_bus.publish(
|
||||||
|
EventType.STREAM_START,
|
||||||
|
StreamEvent(
|
||||||
|
event_type=EventType.STREAM_START,
|
||||||
|
headline_count=len(items),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
stream(items, self.ntfy, self.mic)
|
||||||
|
|
||||||
|
if self.event_bus:
|
||||||
|
self.event_bus.publish(
|
||||||
|
EventType.STREAM_END,
|
||||||
|
StreamEvent(
|
||||||
|
event_type=EventType.STREAM_END,
|
||||||
|
headline_count=len(items),
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def cleanup(self) -> None:
|
||||||
|
"""Clean up resources."""
|
||||||
|
if self.mic:
|
||||||
|
self.mic.stop()
|
||||||
25
engine/emitters.py
Normal file
25
engine/emitters.py
Normal file
@@ -0,0 +1,25 @@
|
|||||||
|
"""
|
||||||
|
Event emitter protocols - abstract interfaces for event-producing components.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from collections.abc import Callable
|
||||||
|
from typing import Any, Protocol
|
||||||
|
|
||||||
|
|
||||||
|
class EventEmitter(Protocol):
|
||||||
|
"""Protocol for components that emit events."""
|
||||||
|
|
||||||
|
def subscribe(self, callback: Callable[[Any], None]) -> None: ...
|
||||||
|
def unsubscribe(self, callback: Callable[[Any], None]) -> None: ...
|
||||||
|
|
||||||
|
|
||||||
|
class Startable(Protocol):
|
||||||
|
"""Protocol for components that can be started."""
|
||||||
|
|
||||||
|
def start(self) -> Any: ...
|
||||||
|
|
||||||
|
|
||||||
|
class Stoppable(Protocol):
|
||||||
|
"""Protocol for components that can be stopped."""
|
||||||
|
|
||||||
|
def stop(self) -> None: ...
|
||||||
72
engine/eventbus.py
Normal file
72
engine/eventbus.py
Normal file
@@ -0,0 +1,72 @@
|
|||||||
|
"""
|
||||||
|
Event bus - pub/sub messaging for decoupled component communication.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import threading
|
||||||
|
from collections import defaultdict
|
||||||
|
from collections.abc import Callable
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from engine.events import EventType
|
||||||
|
|
||||||
|
|
||||||
|
class EventBus:
|
||||||
|
"""Thread-safe event bus for publish-subscribe messaging."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self._subscribers: dict[EventType, list[Callable[[Any], None]]] = defaultdict(
|
||||||
|
list
|
||||||
|
)
|
||||||
|
self._lock = threading.Lock()
|
||||||
|
|
||||||
|
def subscribe(self, event_type: EventType, callback: Callable[[Any], None]) -> None:
|
||||||
|
"""Register a callback for a specific event type."""
|
||||||
|
with self._lock:
|
||||||
|
self._subscribers[event_type].append(callback)
|
||||||
|
|
||||||
|
def unsubscribe(
|
||||||
|
self, event_type: EventType, callback: Callable[[Any], None]
|
||||||
|
) -> None:
|
||||||
|
"""Remove a callback for a specific event type."""
|
||||||
|
with self._lock:
|
||||||
|
if callback in self._subscribers[event_type]:
|
||||||
|
self._subscribers[event_type].remove(callback)
|
||||||
|
|
||||||
|
def publish(self, event_type: EventType, event: Any = None) -> None:
|
||||||
|
"""Publish an event to all subscribers."""
|
||||||
|
with self._lock:
|
||||||
|
callbacks = list(self._subscribers.get(event_type, []))
|
||||||
|
for callback in callbacks:
|
||||||
|
try:
|
||||||
|
callback(event)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def clear(self) -> None:
|
||||||
|
"""Remove all subscribers."""
|
||||||
|
with self._lock:
|
||||||
|
self._subscribers.clear()
|
||||||
|
|
||||||
|
def subscriber_count(self, event_type: EventType | None = None) -> int:
|
||||||
|
"""Get subscriber count for an event type, or total if None."""
|
||||||
|
with self._lock:
|
||||||
|
if event_type is None:
|
||||||
|
return sum(len(cb) for cb in self._subscribers.values())
|
||||||
|
return len(self._subscribers.get(event_type, []))
|
||||||
|
|
||||||
|
|
||||||
|
_event_bus: EventBus | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def get_event_bus() -> EventBus:
|
||||||
|
"""Get the global event bus instance."""
|
||||||
|
global _event_bus
|
||||||
|
if _event_bus is None:
|
||||||
|
_event_bus = EventBus()
|
||||||
|
return _event_bus
|
||||||
|
|
||||||
|
|
||||||
|
def set_event_bus(bus: EventBus) -> None:
|
||||||
|
"""Set the global event bus instance (for testing)."""
|
||||||
|
global _event_bus
|
||||||
|
_event_bus = bus
|
||||||
67
engine/events.py
Normal file
67
engine/events.py
Normal file
@@ -0,0 +1,67 @@
|
|||||||
|
"""
|
||||||
|
Event types for the mainline application.
|
||||||
|
Defines the core events that flow through the system.
|
||||||
|
These types support a future migration to an event-driven architecture.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
from datetime import datetime
|
||||||
|
from enum import Enum, auto
|
||||||
|
|
||||||
|
|
||||||
|
class EventType(Enum):
|
||||||
|
"""Core event types in the mainline application."""
|
||||||
|
|
||||||
|
NEW_HEADLINE = auto()
|
||||||
|
FRAME_TICK = auto()
|
||||||
|
MIC_LEVEL = auto()
|
||||||
|
NTFY_MESSAGE = auto()
|
||||||
|
STREAM_START = auto()
|
||||||
|
STREAM_END = auto()
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HeadlineEvent:
|
||||||
|
"""Event emitted when a new headline is ready for display."""
|
||||||
|
|
||||||
|
title: str
|
||||||
|
source: str
|
||||||
|
timestamp: str
|
||||||
|
language: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FrameTickEvent:
|
||||||
|
"""Event emitted on each render frame."""
|
||||||
|
|
||||||
|
frame_number: int
|
||||||
|
timestamp: datetime
|
||||||
|
delta_seconds: float
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class MicLevelEvent:
|
||||||
|
"""Event emitted when microphone level changes significantly."""
|
||||||
|
|
||||||
|
db_level: float
|
||||||
|
excess_above_threshold: float
|
||||||
|
timestamp: datetime
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class NtfyMessageEvent:
|
||||||
|
"""Event emitted when an ntfy message is received."""
|
||||||
|
|
||||||
|
title: str
|
||||||
|
body: str
|
||||||
|
message_id: str | None = None
|
||||||
|
timestamp: datetime | None = None
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class StreamEvent:
|
||||||
|
"""Event emitted when stream starts or ends."""
|
||||||
|
|
||||||
|
event_type: EventType
|
||||||
|
headline_count: int = 0
|
||||||
|
timestamp: datetime | None = None
|
||||||
@@ -8,6 +8,7 @@ import pathlib
|
|||||||
import re
|
import re
|
||||||
import urllib.request
|
import urllib.request
|
||||||
from datetime import datetime
|
from datetime import datetime
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
import feedparser
|
import feedparser
|
||||||
|
|
||||||
@@ -16,9 +17,13 @@ from engine.filter import skip, strip_tags
|
|||||||
from engine.sources import FEEDS, POETRY_SOURCES
|
from engine.sources import FEEDS, POETRY_SOURCES
|
||||||
from engine.terminal import boot_ln
|
from engine.terminal import boot_ln
|
||||||
|
|
||||||
|
# Type alias for headline items
|
||||||
|
HeadlineTuple = tuple[str, str, str]
|
||||||
|
|
||||||
|
|
||||||
# ─── SINGLE FEED ──────────────────────────────────────────
|
# ─── SINGLE FEED ──────────────────────────────────────────
|
||||||
def fetch_feed(url):
|
def fetch_feed(url: str) -> Any | None:
|
||||||
|
"""Fetch and parse a single RSS feed URL."""
|
||||||
try:
|
try:
|
||||||
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
||||||
resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT)
|
resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT)
|
||||||
@@ -28,8 +33,9 @@ def fetch_feed(url):
|
|||||||
|
|
||||||
|
|
||||||
# ─── ALL RSS FEEDS ────────────────────────────────────────
|
# ─── ALL RSS FEEDS ────────────────────────────────────────
|
||||||
def fetch_all():
|
def fetch_all() -> tuple[list[HeadlineTuple], int, int]:
|
||||||
items = []
|
"""Fetch all RSS feeds and return items, linked count, failed count."""
|
||||||
|
items: list[HeadlineTuple] = []
|
||||||
linked = failed = 0
|
linked = failed = 0
|
||||||
for src, url in FEEDS.items():
|
for src, url in FEEDS.items():
|
||||||
feed = fetch_feed(url)
|
feed = fetch_feed(url)
|
||||||
@@ -59,7 +65,7 @@ def fetch_all():
|
|||||||
|
|
||||||
|
|
||||||
# ─── PROJECT GUTENBERG ────────────────────────────────────
|
# ─── PROJECT GUTENBERG ────────────────────────────────────
|
||||||
def _fetch_gutenberg(url, label):
|
def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]:
|
||||||
"""Download and parse stanzas/passages from a Project Gutenberg text."""
|
"""Download and parse stanzas/passages from a Project Gutenberg text."""
|
||||||
try:
|
try:
|
||||||
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
||||||
|
|||||||
57
engine/frame.py
Normal file
57
engine/frame.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
"""
|
||||||
|
Frame timing utilities — FPS control and precise timing.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
|
||||||
|
class FrameTimer:
|
||||||
|
"""Frame timer for consistent render loop timing."""
|
||||||
|
|
||||||
|
def __init__(self, target_frame_dt: float = 0.05):
|
||||||
|
self.target_frame_dt = target_frame_dt
|
||||||
|
self._frame_count = 0
|
||||||
|
self._start_time = time.monotonic()
|
||||||
|
self._last_frame_time = self._start_time
|
||||||
|
|
||||||
|
@property
|
||||||
|
def fps(self) -> float:
|
||||||
|
"""Current FPS based on elapsed frames."""
|
||||||
|
elapsed = time.monotonic() - self._start_time
|
||||||
|
if elapsed > 0:
|
||||||
|
return self._frame_count / elapsed
|
||||||
|
return 0.0
|
||||||
|
|
||||||
|
def sleep_until_next_frame(self) -> float:
|
||||||
|
"""Sleep to maintain target frame rate. Returns actual elapsed time."""
|
||||||
|
now = time.monotonic()
|
||||||
|
elapsed = now - self._last_frame_time
|
||||||
|
self._last_frame_time = now
|
||||||
|
self._frame_count += 1
|
||||||
|
|
||||||
|
sleep_time = max(0, self.target_frame_dt - elapsed)
|
||||||
|
if sleep_time > 0:
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
return elapsed
|
||||||
|
|
||||||
|
def reset(self) -> None:
|
||||||
|
"""Reset frame counter and start time."""
|
||||||
|
self._frame_count = 0
|
||||||
|
self._start_time = time.monotonic()
|
||||||
|
self._last_frame_time = self._start_time
|
||||||
|
|
||||||
|
|
||||||
|
def calculate_scroll_step(
|
||||||
|
scroll_dur: float, view_height: int, padding: int = 15
|
||||||
|
) -> float:
|
||||||
|
"""Calculate scroll step interval for smooth scrolling.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
scroll_dur: Duration in seconds for one headline to scroll through view
|
||||||
|
view_height: Terminal height in rows
|
||||||
|
padding: Extra rows for off-screen content
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Time in seconds between scroll steps
|
||||||
|
"""
|
||||||
|
return scroll_dur / (view_height + padding) * 2
|
||||||
201
engine/layers.py
Normal file
201
engine/layers.py
Normal file
@@ -0,0 +1,201 @@
|
|||||||
|
"""
|
||||||
|
Layer compositing — message overlay, ticker zone, firehose, noise.
|
||||||
|
Depends on: config, render, effects.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import random
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from engine import config
|
||||||
|
from engine.effects import (
|
||||||
|
fade_line,
|
||||||
|
firehose_line,
|
||||||
|
glitch_bar,
|
||||||
|
noise,
|
||||||
|
vis_trunc,
|
||||||
|
)
|
||||||
|
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite
|
||||||
|
from engine.terminal import RST, W_COOL
|
||||||
|
|
||||||
|
MSG_META = "\033[38;5;245m"
|
||||||
|
MSG_BORDER = "\033[2;38;5;37m"
|
||||||
|
|
||||||
|
|
||||||
|
def render_message_overlay(
|
||||||
|
msg: tuple[str, str, float] | None,
|
||||||
|
w: int,
|
||||||
|
h: int,
|
||||||
|
msg_cache: tuple,
|
||||||
|
) -> tuple[list[str], tuple]:
|
||||||
|
"""Render ntfy message overlay.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
msg: (title, body, timestamp) or None
|
||||||
|
w: terminal width
|
||||||
|
h: terminal height
|
||||||
|
msg_cache: (cache_key, rendered_rows) for caching
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list of ANSI strings, updated cache)
|
||||||
|
"""
|
||||||
|
overlay = []
|
||||||
|
if msg is None:
|
||||||
|
return overlay, msg_cache
|
||||||
|
|
||||||
|
m_title, m_body, m_ts = msg
|
||||||
|
display_text = m_body or m_title or "(empty)"
|
||||||
|
display_text = re.sub(r"\s+", " ", display_text.upper())
|
||||||
|
|
||||||
|
cache_key = (display_text, w)
|
||||||
|
if msg_cache[0] != cache_key:
|
||||||
|
msg_rows = big_wrap(display_text, w - 4)
|
||||||
|
msg_cache = (cache_key, msg_rows)
|
||||||
|
else:
|
||||||
|
msg_rows = msg_cache[1]
|
||||||
|
|
||||||
|
msg_rows = lr_gradient_opposite(
|
||||||
|
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
|
||||||
|
)
|
||||||
|
|
||||||
|
elapsed_s = int(time.monotonic() - m_ts)
|
||||||
|
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
|
||||||
|
ts_str = datetime.now().strftime("%H:%M:%S")
|
||||||
|
panel_h = len(msg_rows) + 2
|
||||||
|
panel_top = max(0, (h - panel_h) // 2)
|
||||||
|
|
||||||
|
row_idx = 0
|
||||||
|
for mr in msg_rows:
|
||||||
|
ln = vis_trunc(mr, w)
|
||||||
|
overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K")
|
||||||
|
row_idx += 1
|
||||||
|
|
||||||
|
meta_parts = []
|
||||||
|
if m_title and m_title != m_body:
|
||||||
|
meta_parts.append(m_title)
|
||||||
|
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
|
||||||
|
meta = (
|
||||||
|
" " + " \u00b7 ".join(meta_parts)
|
||||||
|
if len(meta_parts) > 1
|
||||||
|
else " " + meta_parts[0]
|
||||||
|
)
|
||||||
|
overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K")
|
||||||
|
row_idx += 1
|
||||||
|
|
||||||
|
bar = "\u2500" * (w - 4)
|
||||||
|
overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K")
|
||||||
|
|
||||||
|
return overlay, msg_cache
|
||||||
|
|
||||||
|
|
||||||
|
def render_ticker_zone(
|
||||||
|
active: list,
|
||||||
|
scroll_cam: int,
|
||||||
|
ticker_h: int,
|
||||||
|
w: int,
|
||||||
|
noise_cache: dict,
|
||||||
|
grad_offset: float,
|
||||||
|
) -> tuple[list[str], dict]:
|
||||||
|
"""Render the ticker scroll zone.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
active: list of (content_rows, color, canvas_y, meta_idx)
|
||||||
|
scroll_cam: camera position (viewport top)
|
||||||
|
ticker_h: height of ticker zone
|
||||||
|
w: terminal width
|
||||||
|
noise_cache: dict of cy -> noise string
|
||||||
|
grad_offset: gradient animation offset
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
(list of ANSI strings, updated noise_cache)
|
||||||
|
"""
|
||||||
|
buf = []
|
||||||
|
top_zone = max(1, int(ticker_h * 0.25))
|
||||||
|
bot_zone = max(1, int(ticker_h * 0.10))
|
||||||
|
|
||||||
|
def noise_at(cy):
|
||||||
|
if cy not in noise_cache:
|
||||||
|
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
|
||||||
|
return noise_cache[cy]
|
||||||
|
|
||||||
|
for r in range(ticker_h):
|
||||||
|
scr_row = r + 1
|
||||||
|
cy = scroll_cam + r
|
||||||
|
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
|
||||||
|
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
|
||||||
|
row_fade = min(top_f, bot_f)
|
||||||
|
drawn = False
|
||||||
|
|
||||||
|
for content, hc, by, midx in active:
|
||||||
|
cr = cy - by
|
||||||
|
if 0 <= cr < len(content):
|
||||||
|
raw = content[cr]
|
||||||
|
if cr != midx:
|
||||||
|
colored = lr_gradient([raw], grad_offset)[0]
|
||||||
|
else:
|
||||||
|
colored = raw
|
||||||
|
ln = vis_trunc(colored, w)
|
||||||
|
if row_fade < 1.0:
|
||||||
|
ln = fade_line(ln, row_fade)
|
||||||
|
|
||||||
|
if cr == midx:
|
||||||
|
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
|
||||||
|
elif ln.strip():
|
||||||
|
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
|
||||||
|
else:
|
||||||
|
buf.append(f"\033[{scr_row};1H\033[K")
|
||||||
|
drawn = True
|
||||||
|
break
|
||||||
|
|
||||||
|
if not drawn:
|
||||||
|
n = noise_at(cy)
|
||||||
|
if row_fade < 1.0 and n:
|
||||||
|
n = fade_line(n, row_fade)
|
||||||
|
if n:
|
||||||
|
buf.append(f"\033[{scr_row};1H{n}")
|
||||||
|
else:
|
||||||
|
buf.append(f"\033[{scr_row};1H\033[K")
|
||||||
|
|
||||||
|
return buf, noise_cache
|
||||||
|
|
||||||
|
|
||||||
|
def apply_glitch(
|
||||||
|
buf: list[str],
|
||||||
|
ticker_buf_start: int,
|
||||||
|
mic_excess: float,
|
||||||
|
w: int,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Apply glitch effect to ticker buffer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
buf: current buffer
|
||||||
|
ticker_buf_start: index where ticker starts in buffer
|
||||||
|
mic_excess: mic level above threshold
|
||||||
|
w: terminal width
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Updated buffer with glitches applied
|
||||||
|
"""
|
||||||
|
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
|
||||||
|
n_hits = 4 + int(mic_excess / 2)
|
||||||
|
ticker_buf_len = len(buf) - ticker_buf_start
|
||||||
|
|
||||||
|
if random.random() < glitch_prob and ticker_buf_len > 0:
|
||||||
|
for _ in range(min(n_hits, ticker_buf_len)):
|
||||||
|
gi = random.randint(0, ticker_buf_len - 1)
|
||||||
|
scr_row = gi + 1
|
||||||
|
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
|
||||||
|
|
||||||
|
return buf
|
||||||
|
|
||||||
|
|
||||||
|
def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]:
|
||||||
|
"""Render firehose strip at bottom of screen."""
|
||||||
|
buf = []
|
||||||
|
if fh > 0:
|
||||||
|
for fr in range(fh):
|
||||||
|
scr_row = h - fh + fr + 1
|
||||||
|
fline = firehose_line(items, w)
|
||||||
|
buf.append(f"\033[{scr_row};1H{fline}\033[K")
|
||||||
|
return buf
|
||||||
@@ -4,6 +4,8 @@ Gracefully degrades if sounddevice/numpy are unavailable.
|
|||||||
"""
|
"""
|
||||||
|
|
||||||
import atexit
|
import atexit
|
||||||
|
from collections.abc import Callable
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import numpy as _np
|
import numpy as _np
|
||||||
@@ -14,6 +16,9 @@ except Exception:
|
|||||||
_HAS_MIC = False
|
_HAS_MIC = False
|
||||||
|
|
||||||
|
|
||||||
|
from engine.events import MicLevelEvent
|
||||||
|
|
||||||
|
|
||||||
class MicMonitor:
|
class MicMonitor:
|
||||||
"""Background mic stream that exposes current RMS dB level."""
|
"""Background mic stream that exposes current RMS dB level."""
|
||||||
|
|
||||||
@@ -21,6 +26,7 @@ class MicMonitor:
|
|||||||
self.threshold_db = threshold_db
|
self.threshold_db = threshold_db
|
||||||
self._db = -99.0
|
self._db = -99.0
|
||||||
self._stream = None
|
self._stream = None
|
||||||
|
self._subscribers: list[Callable[[MicLevelEvent], None]] = []
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def available(self):
|
def available(self):
|
||||||
@@ -37,6 +43,23 @@ class MicMonitor:
|
|||||||
"""dB above threshold (clamped to 0)."""
|
"""dB above threshold (clamped to 0)."""
|
||||||
return max(0.0, self._db - self.threshold_db)
|
return max(0.0, self._db - self.threshold_db)
|
||||||
|
|
||||||
|
def subscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
|
||||||
|
"""Register a callback to be called when mic level changes."""
|
||||||
|
self._subscribers.append(callback)
|
||||||
|
|
||||||
|
def unsubscribe(self, callback: Callable[[MicLevelEvent], None]) -> None:
|
||||||
|
"""Remove a registered callback."""
|
||||||
|
if callback in self._subscribers:
|
||||||
|
self._subscribers.remove(callback)
|
||||||
|
|
||||||
|
def _emit(self, event: MicLevelEvent) -> None:
|
||||||
|
"""Emit an event to all subscribers."""
|
||||||
|
for cb in self._subscribers:
|
||||||
|
try:
|
||||||
|
cb(event)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Start background mic stream. Returns True on success, False/None otherwise."""
|
"""Start background mic stream. Returns True on success, False/None otherwise."""
|
||||||
if not _HAS_MIC:
|
if not _HAS_MIC:
|
||||||
@@ -45,6 +68,13 @@ class MicMonitor:
|
|||||||
def _cb(indata, frames, t, status):
|
def _cb(indata, frames, t, status):
|
||||||
rms = float(_np.sqrt(_np.mean(indata**2)))
|
rms = float(_np.sqrt(_np.mean(indata**2)))
|
||||||
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
|
self._db = 20 * _np.log10(rms) if rms > 0 else -99.0
|
||||||
|
if self._subscribers:
|
||||||
|
event = MicLevelEvent(
|
||||||
|
db_level=self._db,
|
||||||
|
excess_above_threshold=max(0.0, self._db - self.threshold_db),
|
||||||
|
timestamp=datetime.now(),
|
||||||
|
)
|
||||||
|
self._emit(event)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self._stream = _sd.InputStream(
|
self._stream = _sd.InputStream(
|
||||||
|
|||||||
@@ -16,8 +16,12 @@ import json
|
|||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
import urllib.request
|
import urllib.request
|
||||||
|
from collections.abc import Callable
|
||||||
|
from datetime import datetime
|
||||||
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
from urllib.parse import parse_qs, urlencode, urlparse, urlunparse
|
||||||
|
|
||||||
|
from engine.events import NtfyMessageEvent
|
||||||
|
|
||||||
|
|
||||||
class NtfyPoller:
|
class NtfyPoller:
|
||||||
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
|
"""SSE stream listener for ntfy.sh topics. Messages arrive in ~1s (network RTT)."""
|
||||||
@@ -28,6 +32,24 @@ class NtfyPoller:
|
|||||||
self.display_secs = display_secs
|
self.display_secs = display_secs
|
||||||
self._message = None # (title, body, monotonic_timestamp) or None
|
self._message = None # (title, body, monotonic_timestamp) or None
|
||||||
self._lock = threading.Lock()
|
self._lock = threading.Lock()
|
||||||
|
self._subscribers: list[Callable[[NtfyMessageEvent], None]] = []
|
||||||
|
|
||||||
|
def subscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
|
||||||
|
"""Register a callback to be called when a message is received."""
|
||||||
|
self._subscribers.append(callback)
|
||||||
|
|
||||||
|
def unsubscribe(self, callback: Callable[[NtfyMessageEvent], None]) -> None:
|
||||||
|
"""Remove a registered callback."""
|
||||||
|
if callback in self._subscribers:
|
||||||
|
self._subscribers.remove(callback)
|
||||||
|
|
||||||
|
def _emit(self, event: NtfyMessageEvent) -> None:
|
||||||
|
"""Emit an event to all subscribers."""
|
||||||
|
for cb in self._subscribers:
|
||||||
|
try:
|
||||||
|
cb(event)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
"""Start background stream thread. Returns True."""
|
"""Start background stream thread. Returns True."""
|
||||||
@@ -88,6 +110,13 @@ class NtfyPoller:
|
|||||||
data.get("message", ""),
|
data.get("message", ""),
|
||||||
time.monotonic(),
|
time.monotonic(),
|
||||||
)
|
)
|
||||||
|
event = NtfyMessageEvent(
|
||||||
|
title=data.get("title", ""),
|
||||||
|
body=data.get("message", ""),
|
||||||
|
message_id=data.get("id"),
|
||||||
|
timestamp=datetime.now(),
|
||||||
|
)
|
||||||
|
self._emit(event)
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
time.sleep(self.reconnect_delay)
|
time.sleep(self.reconnect_delay)
|
||||||
|
|||||||
177
engine/scroll.py
177
engine/scroll.py
@@ -1,25 +1,22 @@
|
|||||||
"""
|
"""
|
||||||
Render engine — ticker content, scroll motion, message panel, and firehose overlay.
|
Render engine — ticker content, scroll motion, message panel, and firehose overlay.
|
||||||
Depends on: config, terminal, render, effects, ntfy, mic.
|
Orchestrates viewport, frame timing, and layers.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import random
|
import random
|
||||||
import re
|
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
from datetime import datetime
|
|
||||||
|
|
||||||
from engine import config
|
from engine import config
|
||||||
from engine.effects import (
|
from engine.frame import calculate_scroll_step
|
||||||
fade_line,
|
from engine.layers import (
|
||||||
firehose_line,
|
apply_glitch,
|
||||||
glitch_bar,
|
render_firehose,
|
||||||
next_headline,
|
render_message_overlay,
|
||||||
noise,
|
render_ticker_zone,
|
||||||
vis_trunc,
|
|
||||||
)
|
)
|
||||||
from engine.render import big_wrap, lr_gradient, lr_gradient_opposite, make_block
|
from engine.terminal import CLR
|
||||||
from engine.terminal import CLR, RST, W_COOL, th, tw
|
from engine.viewport import th, tw
|
||||||
|
|
||||||
|
|
||||||
def stream(items, ntfy_poller, mic_monitor):
|
def stream(items, ntfy_poller, mic_monitor):
|
||||||
@@ -35,114 +32,51 @@ def stream(items, ntfy_poller, mic_monitor):
|
|||||||
|
|
||||||
w, h = tw(), th()
|
w, h = tw(), th()
|
||||||
fh = config.FIREHOSE_H if config.FIREHOSE else 0
|
fh = config.FIREHOSE_H if config.FIREHOSE else 0
|
||||||
ticker_view_h = h - fh # reserve fixed firehose strip at bottom
|
ticker_view_h = h - fh
|
||||||
GAP = 3 # blank rows between headlines
|
GAP = 3
|
||||||
scroll_step_interval = config.SCROLL_DUR / (ticker_view_h + 15) * 2
|
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
|
||||||
|
|
||||||
# Taxonomy:
|
|
||||||
# - message: centered ntfy overlay panel
|
|
||||||
# - ticker: large headline text content
|
|
||||||
# - scroll: upward camera motion applied to ticker content
|
|
||||||
# - firehose: fixed carriage-return style strip pinned at bottom
|
|
||||||
# Active ticker blocks: (content_rows, color, canvas_y, meta_idx)
|
|
||||||
active = []
|
active = []
|
||||||
scroll_cam = 0 # viewport top in virtual canvas coords
|
scroll_cam = 0
|
||||||
ticker_next_y = (
|
ticker_next_y = ticker_view_h
|
||||||
ticker_view_h # canvas-y where next block starts (off-screen bottom)
|
|
||||||
)
|
|
||||||
noise_cache = {}
|
noise_cache = {}
|
||||||
scroll_motion_accum = 0.0
|
scroll_motion_accum = 0.0
|
||||||
|
msg_cache = (None, None)
|
||||||
|
|
||||||
def _noise_at(cy):
|
while True:
|
||||||
if cy not in noise_cache:
|
if queued >= config.HEADLINE_LIMIT and not active:
|
||||||
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
|
break
|
||||||
return noise_cache[cy]
|
|
||||||
|
|
||||||
# Message color: bright cyan/white — distinct from headline greens
|
|
||||||
MSG_META = "\033[38;5;245m" # cool grey
|
|
||||||
MSG_BORDER = "\033[2;38;5;37m" # dim teal
|
|
||||||
_msg_cache = (None, None) # (cache_key, rendered_rows)
|
|
||||||
|
|
||||||
while queued < config.HEADLINE_LIMIT or active:
|
|
||||||
t0 = time.monotonic()
|
t0 = time.monotonic()
|
||||||
w, h = tw(), th()
|
w, h = tw(), th()
|
||||||
fh = config.FIREHOSE_H if config.FIREHOSE else 0
|
fh = config.FIREHOSE_H if config.FIREHOSE else 0
|
||||||
ticker_view_h = h - fh
|
ticker_view_h = h - fh
|
||||||
|
scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h)
|
||||||
|
|
||||||
# ── Check for ntfy message ────────────────────────
|
|
||||||
msg_h = 0
|
|
||||||
msg_overlay = []
|
|
||||||
msg = ntfy_poller.get_active_message()
|
msg = ntfy_poller.get_active_message()
|
||||||
|
msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache)
|
||||||
|
|
||||||
buf = []
|
buf = []
|
||||||
if msg is not None:
|
ticker_h = ticker_view_h
|
||||||
m_title, m_body, m_ts = msg
|
|
||||||
# ── Message overlay: centered in the viewport ──
|
|
||||||
display_text = m_body or m_title or "(empty)"
|
|
||||||
display_text = re.sub(r"\s+", " ", display_text.upper())
|
|
||||||
cache_key = (display_text, w)
|
|
||||||
if _msg_cache[0] != cache_key:
|
|
||||||
msg_rows = big_wrap(display_text, w - 4)
|
|
||||||
_msg_cache = (cache_key, msg_rows)
|
|
||||||
else:
|
|
||||||
msg_rows = _msg_cache[1]
|
|
||||||
msg_rows = lr_gradient_opposite(
|
|
||||||
msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0
|
|
||||||
)
|
|
||||||
# Layout: rendered text + meta + border
|
|
||||||
elapsed_s = int(time.monotonic() - m_ts)
|
|
||||||
remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s)
|
|
||||||
ts_str = datetime.now().strftime("%H:%M:%S")
|
|
||||||
panel_h = len(msg_rows) + 2 # meta + border
|
|
||||||
panel_top = max(0, (h - panel_h) // 2)
|
|
||||||
row_idx = 0
|
|
||||||
for mr in msg_rows:
|
|
||||||
ln = vis_trunc(mr, w)
|
|
||||||
msg_overlay.append(
|
|
||||||
f"\033[{panel_top + row_idx + 1};1H {ln}{RST}\033[K"
|
|
||||||
)
|
|
||||||
row_idx += 1
|
|
||||||
# Meta line: title (if distinct) + source + countdown
|
|
||||||
meta_parts = []
|
|
||||||
if m_title and m_title != m_body:
|
|
||||||
meta_parts.append(m_title)
|
|
||||||
meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s")
|
|
||||||
meta = (
|
|
||||||
" " + " \u00b7 ".join(meta_parts)
|
|
||||||
if len(meta_parts) > 1
|
|
||||||
else " " + meta_parts[0]
|
|
||||||
)
|
|
||||||
msg_overlay.append(
|
|
||||||
f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}{RST}\033[K"
|
|
||||||
)
|
|
||||||
row_idx += 1
|
|
||||||
# Border — constant boundary under message panel
|
|
||||||
bar = "\u2500" * (w - 4)
|
|
||||||
msg_overlay.append(
|
|
||||||
f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}{RST}\033[K"
|
|
||||||
)
|
|
||||||
|
|
||||||
# Ticker draws above the fixed firehose strip; message is a centered overlay.
|
|
||||||
ticker_h = ticker_view_h - msg_h
|
|
||||||
|
|
||||||
# ── Ticker content + scroll motion (always runs) ──
|
|
||||||
scroll_motion_accum += config.FRAME_DT
|
scroll_motion_accum += config.FRAME_DT
|
||||||
while scroll_motion_accum >= scroll_step_interval:
|
while scroll_motion_accum >= scroll_step_interval:
|
||||||
scroll_motion_accum -= scroll_step_interval
|
scroll_motion_accum -= scroll_step_interval
|
||||||
scroll_cam += 1
|
scroll_cam += 1
|
||||||
|
|
||||||
# Enqueue new headlines when room at the bottom
|
|
||||||
while (
|
while (
|
||||||
ticker_next_y < scroll_cam + ticker_view_h + 10
|
ticker_next_y < scroll_cam + ticker_view_h + 10
|
||||||
and queued < config.HEADLINE_LIMIT
|
and queued < config.HEADLINE_LIMIT
|
||||||
):
|
):
|
||||||
|
from engine.effects import next_headline
|
||||||
|
from engine.render import make_block
|
||||||
|
|
||||||
t, src, ts = next_headline(pool, items, seen)
|
t, src, ts = next_headline(pool, items, seen)
|
||||||
ticker_content, hc, midx = make_block(t, src, ts, w)
|
ticker_content, hc, midx = make_block(t, src, ts, w)
|
||||||
active.append((ticker_content, hc, ticker_next_y, midx))
|
active.append((ticker_content, hc, ticker_next_y, midx))
|
||||||
ticker_next_y += len(ticker_content) + GAP
|
ticker_next_y += len(ticker_content) + GAP
|
||||||
queued += 1
|
queued += 1
|
||||||
|
|
||||||
# Prune off-screen blocks and stale noise
|
|
||||||
active = [
|
active = [
|
||||||
(c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam
|
(c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam
|
||||||
]
|
]
|
||||||
@@ -150,69 +84,26 @@ def stream(items, ntfy_poller, mic_monitor):
|
|||||||
if k < scroll_cam:
|
if k < scroll_cam:
|
||||||
del noise_cache[k]
|
del noise_cache[k]
|
||||||
|
|
||||||
# Draw ticker zone (above fixed firehose strip)
|
|
||||||
top_zone = max(1, int(ticker_h * 0.25))
|
|
||||||
bot_zone = max(1, int(ticker_h * 0.10))
|
|
||||||
grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0
|
grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0
|
||||||
ticker_buf_start = len(buf) # track where ticker rows start in buf
|
ticker_buf_start = len(buf)
|
||||||
for r in range(ticker_h):
|
|
||||||
scr_row = r + 1 # 1-indexed ANSI screen row
|
ticker_buf, noise_cache = render_ticker_zone(
|
||||||
cy = scroll_cam + r
|
active, scroll_cam, ticker_h, w, noise_cache, grad_offset
|
||||||
top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0
|
)
|
||||||
bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0
|
buf.extend(ticker_buf)
|
||||||
row_fade = min(top_f, bot_f)
|
|
||||||
drawn = False
|
|
||||||
for content, hc, by, midx in active:
|
|
||||||
cr = cy - by
|
|
||||||
if 0 <= cr < len(content):
|
|
||||||
raw = content[cr]
|
|
||||||
if cr != midx:
|
|
||||||
colored = lr_gradient([raw], grad_offset)[0]
|
|
||||||
else:
|
|
||||||
colored = raw
|
|
||||||
ln = vis_trunc(colored, w)
|
|
||||||
if row_fade < 1.0:
|
|
||||||
ln = fade_line(ln, row_fade)
|
|
||||||
if cr == midx:
|
|
||||||
buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K")
|
|
||||||
elif ln.strip():
|
|
||||||
buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K")
|
|
||||||
else:
|
|
||||||
buf.append(f"\033[{scr_row};1H\033[K")
|
|
||||||
drawn = True
|
|
||||||
break
|
|
||||||
if not drawn:
|
|
||||||
n = _noise_at(cy)
|
|
||||||
if row_fade < 1.0 and n:
|
|
||||||
n = fade_line(n, row_fade)
|
|
||||||
if n:
|
|
||||||
buf.append(f"\033[{scr_row};1H{n}")
|
|
||||||
else:
|
|
||||||
buf.append(f"\033[{scr_row};1H\033[K")
|
|
||||||
|
|
||||||
# Glitch — base rate + mic-reactive spikes (ticker zone only)
|
|
||||||
mic_excess = mic_monitor.excess
|
mic_excess = mic_monitor.excess
|
||||||
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
|
buf = apply_glitch(buf, ticker_buf_start, mic_excess, w)
|
||||||
n_hits = 4 + int(mic_excess / 2)
|
|
||||||
ticker_buf_len = len(buf) - ticker_buf_start
|
firehose_buf = render_firehose(items, w, fh, h)
|
||||||
if random.random() < glitch_prob and ticker_buf_len > 0:
|
buf.extend(firehose_buf)
|
||||||
for _ in range(min(n_hits, ticker_buf_len)):
|
|
||||||
gi = random.randint(0, ticker_buf_len - 1)
|
|
||||||
scr_row = gi + 1
|
|
||||||
buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}"
|
|
||||||
|
|
||||||
if config.FIREHOSE and fh > 0:
|
|
||||||
for fr in range(fh):
|
|
||||||
scr_row = h - fh + fr + 1
|
|
||||||
fline = firehose_line(items, w)
|
|
||||||
buf.append(f"\033[{scr_row};1H{fline}\033[K")
|
|
||||||
if msg_overlay:
|
if msg_overlay:
|
||||||
buf.extend(msg_overlay)
|
buf.extend(msg_overlay)
|
||||||
|
|
||||||
sys.stdout.buffer.write("".join(buf).encode())
|
sys.stdout.buffer.write("".join(buf).encode())
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
# Precise frame timing
|
|
||||||
elapsed = time.monotonic() - t0
|
elapsed = time.monotonic() - t0
|
||||||
time.sleep(max(0, config.FRAME_DT - elapsed))
|
time.sleep(max(0, config.FRAME_DT - elapsed))
|
||||||
|
|
||||||
|
|||||||
@@ -7,26 +7,16 @@ import json
|
|||||||
import re
|
import re
|
||||||
import urllib.parse
|
import urllib.parse
|
||||||
import urllib.request
|
import urllib.request
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
from engine.sources import LOCATION_LANGS
|
from engine.sources import LOCATION_LANGS
|
||||||
|
|
||||||
_TRANSLATE_CACHE = {}
|
TRANSLATE_CACHE_SIZE = 500
|
||||||
|
|
||||||
|
|
||||||
def detect_location_language(title):
|
@lru_cache(maxsize=TRANSLATE_CACHE_SIZE)
|
||||||
"""Detect if headline mentions a location, return target language."""
|
def _translate_cached(title: str, target_lang: str) -> str:
|
||||||
title_lower = title.lower()
|
"""Cached translation implementation."""
|
||||||
for pattern, lang in LOCATION_LANGS.items():
|
|
||||||
if re.search(pattern, title_lower):
|
|
||||||
return lang
|
|
||||||
return None
|
|
||||||
|
|
||||||
|
|
||||||
def translate_headline(title, target_lang):
|
|
||||||
"""Translate headline via Google Translate API (zero dependencies)."""
|
|
||||||
key = (title, target_lang)
|
|
||||||
if key in _TRANSLATE_CACHE:
|
|
||||||
return _TRANSLATE_CACHE[key]
|
|
||||||
try:
|
try:
|
||||||
q = urllib.parse.quote(title)
|
q = urllib.parse.quote(title)
|
||||||
url = (
|
url = (
|
||||||
@@ -39,5 +29,18 @@ def translate_headline(title, target_lang):
|
|||||||
result = "".join(p[0] for p in data[0] if p[0]) or title
|
result = "".join(p[0] for p in data[0] if p[0]) or title
|
||||||
except Exception:
|
except Exception:
|
||||||
result = title
|
result = title
|
||||||
_TRANSLATE_CACHE[key] = result
|
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def detect_location_language(title):
|
||||||
|
"""Detect if headline mentions a location, return target language."""
|
||||||
|
title_lower = title.lower()
|
||||||
|
for pattern, lang in LOCATION_LANGS.items():
|
||||||
|
if re.search(pattern, title_lower):
|
||||||
|
return lang
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def translate_headline(title: str, target_lang: str) -> str:
|
||||||
|
"""Translate headline via Google Translate API (zero dependencies)."""
|
||||||
|
return _translate_cached(title, target_lang)
|
||||||
|
|||||||
60
engine/types.py
Normal file
60
engine/types.py
Normal file
@@ -0,0 +1,60 @@
|
|||||||
|
"""
|
||||||
|
Shared dataclasses for the mainline application.
|
||||||
|
Provides named types for tuple returns across modules.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class HeadlineItem:
|
||||||
|
"""A single headline item: title, source, and timestamp."""
|
||||||
|
|
||||||
|
title: str
|
||||||
|
source: str
|
||||||
|
timestamp: str
|
||||||
|
|
||||||
|
def to_tuple(self) -> tuple[str, str, str]:
|
||||||
|
"""Convert to tuple for backward compatibility."""
|
||||||
|
return (self.title, self.source, self.timestamp)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def from_tuple(cls, t: tuple[str, str, str]) -> "HeadlineItem":
|
||||||
|
"""Create from tuple for backward compatibility."""
|
||||||
|
return cls(title=t[0], source=t[1], timestamp=t[2])
|
||||||
|
|
||||||
|
|
||||||
|
def items_to_tuples(items: list[HeadlineItem]) -> list[tuple[str, str, str]]:
|
||||||
|
"""Convert list of HeadlineItem to list of tuples."""
|
||||||
|
return [item.to_tuple() for item in items]
|
||||||
|
|
||||||
|
|
||||||
|
def tuples_to_items(tuples: list[tuple[str, str, str]]) -> list[HeadlineItem]:
|
||||||
|
"""Convert list of tuples to list of HeadlineItem."""
|
||||||
|
return [HeadlineItem.from_tuple(t) for t in tuples]
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class FetchResult:
|
||||||
|
"""Result from fetch_all() or fetch_poetry()."""
|
||||||
|
|
||||||
|
items: list[HeadlineItem]
|
||||||
|
linked: int
|
||||||
|
failed: int
|
||||||
|
|
||||||
|
def to_legacy_tuple(self) -> tuple[list[tuple], int, int]:
|
||||||
|
"""Convert to legacy tuple format for backward compatibility."""
|
||||||
|
return ([item.to_tuple() for item in self.items], self.linked, self.failed)
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class Block:
|
||||||
|
"""Rendered headline block from make_block()."""
|
||||||
|
|
||||||
|
content: list[str]
|
||||||
|
color: str
|
||||||
|
meta_row_index: int
|
||||||
|
|
||||||
|
def to_legacy_tuple(self) -> tuple[list[str], str, int]:
|
||||||
|
"""Convert to legacy tuple format for backward compatibility."""
|
||||||
|
return (self.content, self.color, self.meta_row_index)
|
||||||
37
engine/viewport.py
Normal file
37
engine/viewport.py
Normal file
@@ -0,0 +1,37 @@
|
|||||||
|
"""
|
||||||
|
Viewport utilities — terminal dimensions and ANSI positioning helpers.
|
||||||
|
No internal dependencies.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
def tw() -> int:
|
||||||
|
"""Get terminal width (columns)."""
|
||||||
|
try:
|
||||||
|
return os.get_terminal_size().columns
|
||||||
|
except Exception:
|
||||||
|
return 80
|
||||||
|
|
||||||
|
|
||||||
|
def th() -> int:
|
||||||
|
"""Get terminal height (lines)."""
|
||||||
|
try:
|
||||||
|
return os.get_terminal_size().lines
|
||||||
|
except Exception:
|
||||||
|
return 24
|
||||||
|
|
||||||
|
|
||||||
|
def move_to(row: int, col: int = 1) -> str:
|
||||||
|
"""Generate ANSI escape to move cursor to row, col (1-indexed)."""
|
||||||
|
return f"\033[{row};{col}H"
|
||||||
|
|
||||||
|
|
||||||
|
def clear_screen() -> str:
|
||||||
|
"""Clear screen and move cursor to home."""
|
||||||
|
return "\033[2J\033[H"
|
||||||
|
|
||||||
|
|
||||||
|
def clear_line() -> str:
|
||||||
|
"""Clear current line."""
|
||||||
|
return "\033[K"
|
||||||
@@ -22,6 +22,7 @@ classifiers = [
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"feedparser>=6.0.0",
|
"feedparser>=6.0.0",
|
||||||
"Pillow>=10.0.0",
|
"Pillow>=10.0.0",
|
||||||
|
"pyright>=1.1.408",
|
||||||
]
|
]
|
||||||
|
|
||||||
[project.optional-dependencies]
|
[project.optional-dependencies]
|
||||||
|
|||||||
236
tests/fixtures/__init__.py
vendored
Normal file
236
tests/fixtures/__init__.py
vendored
Normal file
@@ -0,0 +1,236 @@
|
|||||||
|
"""
|
||||||
|
Pytest fixtures for mocking external dependencies (network, filesystem).
|
||||||
|
"""
|
||||||
|
|
||||||
|
import json
|
||||||
|
from unittest.mock import MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_feed_response():
|
||||||
|
"""Mock RSS feed response data."""
|
||||||
|
return b"""<?xml version="1.0" encoding="UTF-8" ?>
|
||||||
|
<rss version="2.0">
|
||||||
|
<channel>
|
||||||
|
<title>Test Feed</title>
|
||||||
|
<link>https://example.com</link>
|
||||||
|
<item>
|
||||||
|
<title>Test Headline One</title>
|
||||||
|
<pubDate>Sat, 15 Mar 2025 12:00:00 GMT</pubDate>
|
||||||
|
</item>
|
||||||
|
<item>
|
||||||
|
<title>Test Headline Two</title>
|
||||||
|
<pubDate>Sat, 15 Mar 2025 11:00:00 GMT</pubDate>
|
||||||
|
</item>
|
||||||
|
<item>
|
||||||
|
<title>Sports: Team Wins Championship</title>
|
||||||
|
<pubDate>Sat, 15 Mar 2025 10:00:00 GMT</pubDate>
|
||||||
|
</item>
|
||||||
|
</channel>
|
||||||
|
</rss>"""
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_gutenberg_response():
|
||||||
|
"""Mock Project Gutenberg text response."""
|
||||||
|
return """Project Gutenberg's Collection, by Various
|
||||||
|
|
||||||
|
*** START OF SOME TEXT ***
|
||||||
|
This is a test poem with multiple lines
|
||||||
|
that should be parsed as stanzas.
|
||||||
|
|
||||||
|
Another stanza here with different content
|
||||||
|
and more lines to test the parsing logic.
|
||||||
|
|
||||||
|
Yet another stanza for variety
|
||||||
|
in the test data.
|
||||||
|
|
||||||
|
*** END OF SOME TEXT ***"""
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_gutenberg_empty():
|
||||||
|
"""Mock Gutenberg response with no valid stanzas."""
|
||||||
|
return """Project Gutenberg's Collection
|
||||||
|
|
||||||
|
*** START OF TEXT ***
|
||||||
|
THIS IS ALL CAPS AND SHOULD BE SKIPPED
|
||||||
|
|
||||||
|
I.
|
||||||
|
|
||||||
|
*** END OF TEXT ***"""
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_ntfy_message():
|
||||||
|
"""Mock ntfy.sh SSE message."""
|
||||||
|
return json.dumps(
|
||||||
|
{
|
||||||
|
"id": "test123",
|
||||||
|
"event": "message",
|
||||||
|
"title": "Test Title",
|
||||||
|
"message": "Test message body",
|
||||||
|
"time": 1234567890,
|
||||||
|
}
|
||||||
|
).encode()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_ntfy_keepalive():
|
||||||
|
"""Mock ntfy.sh keepalive message."""
|
||||||
|
return b'data: {"event":"keepalive"}\n\n'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_google_translate_response():
|
||||||
|
"""Mock Google Translate API response."""
|
||||||
|
return json.dumps(
|
||||||
|
[
|
||||||
|
[["Translated text", "Original text", None, 0.8], None, "en"],
|
||||||
|
None,
|
||||||
|
None,
|
||||||
|
[],
|
||||||
|
[],
|
||||||
|
[],
|
||||||
|
[],
|
||||||
|
]
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_feedparser():
|
||||||
|
"""Create a mock feedparser.parse function."""
|
||||||
|
|
||||||
|
def _mock(data):
|
||||||
|
mock_result = MagicMock()
|
||||||
|
mock_result.bozo = False
|
||||||
|
mock_result.entries = [
|
||||||
|
{
|
||||||
|
"title": "Test Headline",
|
||||||
|
"published_parsed": (2025, 3, 15, 12, 0, 0, 0, 0, 0),
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"title": "Another Headline",
|
||||||
|
"updated_parsed": (2025, 3, 15, 11, 0, 0, 0, 0, 0),
|
||||||
|
},
|
||||||
|
]
|
||||||
|
return mock_result
|
||||||
|
|
||||||
|
return _mock
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_urllib_open(mock_feed_response):
|
||||||
|
"""Create a mock urllib.request.urlopen that returns feed data."""
|
||||||
|
|
||||||
|
def _mock(url):
|
||||||
|
mock_response = MagicMock()
|
||||||
|
mock_response.read.return_value = mock_feed_response
|
||||||
|
return mock_response
|
||||||
|
|
||||||
|
return _mock
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_items():
|
||||||
|
"""Sample items as returned by fetch module (title, source, timestamp)."""
|
||||||
|
return [
|
||||||
|
("Headline One", "Test Source", "12:00"),
|
||||||
|
("Headline Two", "Another Source", "11:30"),
|
||||||
|
("Headline Three", "Third Source", "10:45"),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def sample_config():
|
||||||
|
"""Sample config for testing."""
|
||||||
|
from engine.config import Config
|
||||||
|
|
||||||
|
return Config(
|
||||||
|
headline_limit=100,
|
||||||
|
feed_timeout=10,
|
||||||
|
mic_threshold_db=50,
|
||||||
|
mode="news",
|
||||||
|
firehose=False,
|
||||||
|
ntfy_topic="https://ntfy.sh/test/json",
|
||||||
|
ntfy_reconnect_delay=5,
|
||||||
|
message_display_secs=30,
|
||||||
|
font_dir="fonts",
|
||||||
|
font_path="",
|
||||||
|
font_index=0,
|
||||||
|
font_picker=False,
|
||||||
|
font_sz=60,
|
||||||
|
render_h=8,
|
||||||
|
ssaa=4,
|
||||||
|
scroll_dur=5.625,
|
||||||
|
frame_dt=0.05,
|
||||||
|
firehose_h=12,
|
||||||
|
grad_speed=0.08,
|
||||||
|
glitch_glyphs="░▒▓█▌▐",
|
||||||
|
kata_glyphs="ハミヒーウ",
|
||||||
|
script_fonts={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def poetry_config():
|
||||||
|
"""Sample config for poetry mode."""
|
||||||
|
from engine.config import Config
|
||||||
|
|
||||||
|
return Config(
|
||||||
|
headline_limit=100,
|
||||||
|
feed_timeout=10,
|
||||||
|
mic_threshold_db=50,
|
||||||
|
mode="poetry",
|
||||||
|
firehose=False,
|
||||||
|
ntfy_topic="https://ntfy.sh/test/json",
|
||||||
|
ntfy_reconnect_delay=5,
|
||||||
|
message_display_secs=30,
|
||||||
|
font_dir="fonts",
|
||||||
|
font_path="",
|
||||||
|
font_index=0,
|
||||||
|
font_picker=False,
|
||||||
|
font_sz=60,
|
||||||
|
render_h=8,
|
||||||
|
ssaa=4,
|
||||||
|
scroll_dur=5.625,
|
||||||
|
frame_dt=0.05,
|
||||||
|
firehose_h=12,
|
||||||
|
grad_speed=0.08,
|
||||||
|
glitch_glyphs="░▒▓█▌▐",
|
||||||
|
kata_glyphs="ハミヒーウ",
|
||||||
|
script_fonts={},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def firehose_config():
|
||||||
|
"""Sample config with firehose enabled."""
|
||||||
|
from engine.config import Config
|
||||||
|
|
||||||
|
return Config(
|
||||||
|
headline_limit=100,
|
||||||
|
feed_timeout=10,
|
||||||
|
mic_threshold_db=50,
|
||||||
|
mode="news",
|
||||||
|
firehose=True,
|
||||||
|
ntfy_topic="https://ntfy.sh/test/json",
|
||||||
|
ntfy_reconnect_delay=5,
|
||||||
|
message_display_secs=30,
|
||||||
|
font_dir="fonts",
|
||||||
|
font_path="",
|
||||||
|
font_index=0,
|
||||||
|
font_picker=False,
|
||||||
|
font_sz=60,
|
||||||
|
render_h=8,
|
||||||
|
ssaa=4,
|
||||||
|
scroll_dur=5.625,
|
||||||
|
frame_dt=0.05,
|
||||||
|
firehose_h=12,
|
||||||
|
grad_speed=0.08,
|
||||||
|
glitch_glyphs="░▒▓█▌▐",
|
||||||
|
kata_glyphs="ハミヒーウ",
|
||||||
|
script_fonts={},
|
||||||
|
)
|
||||||
@@ -7,6 +7,8 @@ import tempfile
|
|||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
from engine import config
|
from engine import config
|
||||||
|
|
||||||
|
|
||||||
@@ -160,3 +162,140 @@ class TestSetFontSelection:
|
|||||||
config.set_font_selection(font_path=None, font_index=None)
|
config.set_font_selection(font_path=None, font_index=None)
|
||||||
assert original_path == config.FONT_PATH
|
assert original_path == config.FONT_PATH
|
||||||
assert original_index == config.FONT_INDEX
|
assert original_index == config.FONT_INDEX
|
||||||
|
|
||||||
|
|
||||||
|
class TestConfigDataclass:
|
||||||
|
"""Tests for Config dataclass."""
|
||||||
|
|
||||||
|
def test_config_has_required_fields(self):
|
||||||
|
"""Config has all required fields."""
|
||||||
|
c = config.Config()
|
||||||
|
assert hasattr(c, "headline_limit")
|
||||||
|
assert hasattr(c, "feed_timeout")
|
||||||
|
assert hasattr(c, "mic_threshold_db")
|
||||||
|
assert hasattr(c, "mode")
|
||||||
|
assert hasattr(c, "firehose")
|
||||||
|
assert hasattr(c, "ntfy_topic")
|
||||||
|
assert hasattr(c, "ntfy_reconnect_delay")
|
||||||
|
assert hasattr(c, "message_display_secs")
|
||||||
|
assert hasattr(c, "font_dir")
|
||||||
|
assert hasattr(c, "font_path")
|
||||||
|
assert hasattr(c, "font_index")
|
||||||
|
assert hasattr(c, "font_picker")
|
||||||
|
assert hasattr(c, "font_sz")
|
||||||
|
assert hasattr(c, "render_h")
|
||||||
|
assert hasattr(c, "ssaa")
|
||||||
|
assert hasattr(c, "scroll_dur")
|
||||||
|
assert hasattr(c, "frame_dt")
|
||||||
|
assert hasattr(c, "firehose_h")
|
||||||
|
assert hasattr(c, "grad_speed")
|
||||||
|
assert hasattr(c, "glitch_glyphs")
|
||||||
|
assert hasattr(c, "kata_glyphs")
|
||||||
|
assert hasattr(c, "script_fonts")
|
||||||
|
|
||||||
|
def test_config_defaults(self):
|
||||||
|
"""Config has sensible defaults."""
|
||||||
|
c = config.Config()
|
||||||
|
assert c.headline_limit == 1000
|
||||||
|
assert c.feed_timeout == 10
|
||||||
|
assert c.mic_threshold_db == 50
|
||||||
|
assert c.mode == "news"
|
||||||
|
assert c.firehose is False
|
||||||
|
assert c.ntfy_reconnect_delay == 5
|
||||||
|
assert c.message_display_secs == 30
|
||||||
|
|
||||||
|
def test_config_is_immutable(self):
|
||||||
|
"""Config is frozen (immutable)."""
|
||||||
|
c = config.Config()
|
||||||
|
with pytest.raises(AttributeError):
|
||||||
|
c.headline_limit = 500 # type: ignore
|
||||||
|
|
||||||
|
def test_config_custom_values(self):
|
||||||
|
"""Config accepts custom values."""
|
||||||
|
c = config.Config(
|
||||||
|
headline_limit=500,
|
||||||
|
mode="poetry",
|
||||||
|
firehose=True,
|
||||||
|
ntfy_topic="https://ntfy.sh/test",
|
||||||
|
)
|
||||||
|
assert c.headline_limit == 500
|
||||||
|
assert c.mode == "poetry"
|
||||||
|
assert c.firehose is True
|
||||||
|
assert c.ntfy_topic == "https://ntfy.sh/test"
|
||||||
|
|
||||||
|
|
||||||
|
class TestConfigFromArgs:
|
||||||
|
"""Tests for Config.from_args method."""
|
||||||
|
|
||||||
|
def test_from_args_defaults(self):
|
||||||
|
"""from_args creates config with defaults from empty argv."""
|
||||||
|
c = config.Config.from_args(["prog"])
|
||||||
|
assert c.mode == "news"
|
||||||
|
assert c.firehose is False
|
||||||
|
assert c.font_picker is True
|
||||||
|
|
||||||
|
def test_from_args_poetry_mode(self):
|
||||||
|
"""from_args detects --poetry flag."""
|
||||||
|
c = config.Config.from_args(["prog", "--poetry"])
|
||||||
|
assert c.mode == "poetry"
|
||||||
|
|
||||||
|
def test_from_args_poetry_short_flag(self):
|
||||||
|
"""from_args detects -p short flag."""
|
||||||
|
c = config.Config.from_args(["prog", "-p"])
|
||||||
|
assert c.mode == "poetry"
|
||||||
|
|
||||||
|
def test_from_args_firehose(self):
|
||||||
|
"""from_args detects --firehose flag."""
|
||||||
|
c = config.Config.from_args(["prog", "--firehose"])
|
||||||
|
assert c.firehose is True
|
||||||
|
|
||||||
|
def test_from_args_no_font_picker(self):
|
||||||
|
"""from_args detects --no-font-picker flag."""
|
||||||
|
c = config.Config.from_args(["prog", "--no-font-picker"])
|
||||||
|
assert c.font_picker is False
|
||||||
|
|
||||||
|
def test_from_args_font_index(self):
|
||||||
|
"""from_args parses --font-index."""
|
||||||
|
c = config.Config.from_args(["prog", "--font-index", "3"])
|
||||||
|
assert c.font_index == 3
|
||||||
|
|
||||||
|
|
||||||
|
class TestGetSetConfig:
|
||||||
|
"""Tests for get_config and set_config functions."""
|
||||||
|
|
||||||
|
def test_get_config_returns_config(self):
|
||||||
|
"""get_config returns a Config instance."""
|
||||||
|
c = config.get_config()
|
||||||
|
assert isinstance(c, config.Config)
|
||||||
|
|
||||||
|
def test_set_config_allows_injection(self):
|
||||||
|
"""set_config allows injecting a custom config."""
|
||||||
|
custom = config.Config(mode="poetry", headline_limit=100)
|
||||||
|
config.set_config(custom)
|
||||||
|
assert config.get_config().mode == "poetry"
|
||||||
|
assert config.get_config().headline_limit == 100
|
||||||
|
|
||||||
|
def test_set_config_then_get_config(self):
|
||||||
|
"""set_config followed by get_config returns the set config."""
|
||||||
|
original = config.get_config()
|
||||||
|
test_config = config.Config(headline_limit=42)
|
||||||
|
config.set_config(test_config)
|
||||||
|
result = config.get_config()
|
||||||
|
assert result.headline_limit == 42
|
||||||
|
config.set_config(original)
|
||||||
|
|
||||||
|
|
||||||
|
class TestPlatformFontPaths:
|
||||||
|
"""Tests for platform font path detection."""
|
||||||
|
|
||||||
|
def test_get_platform_font_paths_returns_dict(self):
|
||||||
|
"""_get_platform_font_paths returns a dictionary."""
|
||||||
|
fonts = config._get_platform_font_paths()
|
||||||
|
assert isinstance(fonts, dict)
|
||||||
|
|
||||||
|
def test_platform_font_paths_common_languages(self):
|
||||||
|
"""Common language font mappings exist."""
|
||||||
|
fonts = config._get_platform_font_paths()
|
||||||
|
common = {"ja", "zh-cn", "ko", "ru", "ar", "hi"}
|
||||||
|
found = set(fonts.keys()) & common
|
||||||
|
assert len(found) > 0
|
||||||
|
|||||||
85
tests/test_controller.py
Normal file
85
tests/test_controller.py
Normal file
@@ -0,0 +1,85 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.controller module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from engine import config
|
||||||
|
from engine.controller import StreamController
|
||||||
|
|
||||||
|
|
||||||
|
class TestStreamController:
|
||||||
|
"""Tests for StreamController class."""
|
||||||
|
|
||||||
|
def test_init_default_config(self):
|
||||||
|
"""StreamController initializes with default config."""
|
||||||
|
controller = StreamController()
|
||||||
|
assert controller.config is not None
|
||||||
|
assert isinstance(controller.config, config.Config)
|
||||||
|
|
||||||
|
def test_init_custom_config(self):
|
||||||
|
"""StreamController accepts custom config."""
|
||||||
|
custom_config = config.Config(headline_limit=500)
|
||||||
|
controller = StreamController(config=custom_config)
|
||||||
|
assert controller.config.headline_limit == 500
|
||||||
|
|
||||||
|
def test_init_sources_none_by_default(self):
|
||||||
|
"""Sources are None until initialized."""
|
||||||
|
controller = StreamController()
|
||||||
|
assert controller.mic is None
|
||||||
|
assert controller.ntfy is None
|
||||||
|
|
||||||
|
@patch("engine.controller.MicMonitor")
|
||||||
|
@patch("engine.controller.NtfyPoller")
|
||||||
|
def test_initialize_sources(self, mock_ntfy, mock_mic):
|
||||||
|
"""initialize_sources creates mic and ntfy instances."""
|
||||||
|
mock_mic_instance = MagicMock()
|
||||||
|
mock_mic_instance.available = True
|
||||||
|
mock_mic_instance.start.return_value = True
|
||||||
|
mock_mic.return_value = mock_mic_instance
|
||||||
|
|
||||||
|
mock_ntfy_instance = MagicMock()
|
||||||
|
mock_ntfy_instance.start.return_value = True
|
||||||
|
mock_ntfy.return_value = mock_ntfy_instance
|
||||||
|
|
||||||
|
controller = StreamController()
|
||||||
|
mic_ok, ntfy_ok = controller.initialize_sources()
|
||||||
|
|
||||||
|
assert mic_ok is True
|
||||||
|
assert ntfy_ok is True
|
||||||
|
assert controller.mic is not None
|
||||||
|
assert controller.ntfy is not None
|
||||||
|
|
||||||
|
@patch("engine.controller.MicMonitor")
|
||||||
|
@patch("engine.controller.NtfyPoller")
|
||||||
|
def test_initialize_sources_mic_unavailable(self, mock_ntfy, mock_mic):
|
||||||
|
"""initialize_sources handles unavailable mic."""
|
||||||
|
mock_mic_instance = MagicMock()
|
||||||
|
mock_mic_instance.available = False
|
||||||
|
mock_mic.return_value = mock_mic_instance
|
||||||
|
|
||||||
|
mock_ntfy_instance = MagicMock()
|
||||||
|
mock_ntfy_instance.start.return_value = True
|
||||||
|
mock_ntfy.return_value = mock_ntfy_instance
|
||||||
|
|
||||||
|
controller = StreamController()
|
||||||
|
mic_ok, ntfy_ok = controller.initialize_sources()
|
||||||
|
|
||||||
|
assert mic_ok is False
|
||||||
|
assert ntfy_ok is True
|
||||||
|
|
||||||
|
|
||||||
|
class TestStreamControllerCleanup:
|
||||||
|
"""Tests for StreamController cleanup."""
|
||||||
|
|
||||||
|
@patch("engine.controller.MicMonitor")
|
||||||
|
def test_cleanup_stops_mic(self, mock_mic):
|
||||||
|
"""cleanup stops the microphone if running."""
|
||||||
|
mock_mic_instance = MagicMock()
|
||||||
|
mock_mic.return_value = mock_mic_instance
|
||||||
|
|
||||||
|
controller = StreamController()
|
||||||
|
controller.mic = mock_mic_instance
|
||||||
|
controller.cleanup()
|
||||||
|
|
||||||
|
mock_mic_instance.stop.assert_called_once()
|
||||||
69
tests/test_emitters.py
Normal file
69
tests/test_emitters.py
Normal file
@@ -0,0 +1,69 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.emitters module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from engine.emitters import EventEmitter, Startable, Stoppable
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventEmitterProtocol:
|
||||||
|
"""Tests for EventEmitter protocol."""
|
||||||
|
|
||||||
|
def test_protocol_exists(self):
|
||||||
|
"""EventEmitter protocol is defined."""
|
||||||
|
assert EventEmitter is not None
|
||||||
|
|
||||||
|
def test_protocol_has_subscribe_method(self):
|
||||||
|
"""EventEmitter has subscribe method in protocol."""
|
||||||
|
assert hasattr(EventEmitter, "subscribe")
|
||||||
|
|
||||||
|
def test_protocol_has_unsubscribe_method(self):
|
||||||
|
"""EventEmitter has unsubscribe method in protocol."""
|
||||||
|
assert hasattr(EventEmitter, "unsubscribe")
|
||||||
|
|
||||||
|
|
||||||
|
class TestStartableProtocol:
|
||||||
|
"""Tests for Startable protocol."""
|
||||||
|
|
||||||
|
def test_protocol_exists(self):
|
||||||
|
"""Startable protocol is defined."""
|
||||||
|
assert Startable is not None
|
||||||
|
|
||||||
|
def test_protocol_has_start_method(self):
|
||||||
|
"""Startable has start method in protocol."""
|
||||||
|
assert hasattr(Startable, "start")
|
||||||
|
|
||||||
|
|
||||||
|
class TestStoppableProtocol:
|
||||||
|
"""Tests for Stoppable protocol."""
|
||||||
|
|
||||||
|
def test_protocol_exists(self):
|
||||||
|
"""Stoppable protocol is defined."""
|
||||||
|
assert Stoppable is not None
|
||||||
|
|
||||||
|
def test_protocol_has_stop_method(self):
|
||||||
|
"""Stoppable has stop method in protocol."""
|
||||||
|
assert hasattr(Stoppable, "stop")
|
||||||
|
|
||||||
|
|
||||||
|
class TestProtocolCompliance:
|
||||||
|
"""Tests that existing classes comply with protocols."""
|
||||||
|
|
||||||
|
def test_ntfy_poller_complies_with_protocol(self):
|
||||||
|
"""NtfyPoller implements EventEmitter protocol."""
|
||||||
|
from engine.ntfy import NtfyPoller
|
||||||
|
|
||||||
|
poller = NtfyPoller("http://example.com/topic")
|
||||||
|
assert hasattr(poller, "subscribe")
|
||||||
|
assert hasattr(poller, "unsubscribe")
|
||||||
|
assert callable(poller.subscribe)
|
||||||
|
assert callable(poller.unsubscribe)
|
||||||
|
|
||||||
|
def test_mic_monitor_complies_with_protocol(self):
|
||||||
|
"""MicMonitor implements EventEmitter and Startable protocols."""
|
||||||
|
from engine.mic import MicMonitor
|
||||||
|
|
||||||
|
monitor = MicMonitor()
|
||||||
|
assert hasattr(monitor, "subscribe")
|
||||||
|
assert hasattr(monitor, "unsubscribe")
|
||||||
|
assert hasattr(monitor, "start")
|
||||||
|
assert hasattr(monitor, "stop")
|
||||||
202
tests/test_eventbus.py
Normal file
202
tests/test_eventbus.py
Normal file
@@ -0,0 +1,202 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.eventbus module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
from engine.eventbus import EventBus, get_event_bus, set_event_bus
|
||||||
|
from engine.events import EventType, NtfyMessageEvent
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventBusInit:
|
||||||
|
"""Tests for EventBus initialization."""
|
||||||
|
|
||||||
|
def test_init_creates_empty_subscribers(self):
|
||||||
|
"""EventBus starts with no subscribers."""
|
||||||
|
bus = EventBus()
|
||||||
|
assert bus.subscriber_count() == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventBusSubscribe:
|
||||||
|
"""Tests for EventBus.subscribe method."""
|
||||||
|
|
||||||
|
def test_subscribe_adds_callback(self):
|
||||||
|
"""subscribe() adds a callback for an event type."""
|
||||||
|
bus = EventBus()
|
||||||
|
def callback(e):
|
||||||
|
return None
|
||||||
|
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, callback)
|
||||||
|
|
||||||
|
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
|
||||||
|
|
||||||
|
def test_subscribe_multiple_callbacks_same_event(self):
|
||||||
|
"""Multiple callbacks can be subscribed to the same event type."""
|
||||||
|
bus = EventBus()
|
||||||
|
def cb1(e):
|
||||||
|
return None
|
||||||
|
def cb2(e):
|
||||||
|
return None
|
||||||
|
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, cb2)
|
||||||
|
|
||||||
|
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 2
|
||||||
|
|
||||||
|
def test_subscribe_different_event_types(self):
|
||||||
|
"""Callbacks can be subscribed to different event types."""
|
||||||
|
bus = EventBus()
|
||||||
|
def cb1(e):
|
||||||
|
return None
|
||||||
|
def cb2(e):
|
||||||
|
return None
|
||||||
|
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
|
||||||
|
bus.subscribe(EventType.MIC_LEVEL, cb2)
|
||||||
|
|
||||||
|
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 1
|
||||||
|
assert bus.subscriber_count(EventType.MIC_LEVEL) == 1
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventBusUnsubscribe:
|
||||||
|
"""Tests for EventBus.unsubscribe method."""
|
||||||
|
|
||||||
|
def test_unsubscribe_removes_callback(self):
|
||||||
|
"""unsubscribe() removes a callback."""
|
||||||
|
bus = EventBus()
|
||||||
|
def callback(e):
|
||||||
|
return None
|
||||||
|
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, callback)
|
||||||
|
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
|
||||||
|
|
||||||
|
assert bus.subscriber_count(EventType.NTFY_MESSAGE) == 0
|
||||||
|
|
||||||
|
def test_unsubscribe_nonexistent_callback_no_error(self):
|
||||||
|
"""unsubscribe() handles non-existent callback gracefully."""
|
||||||
|
bus = EventBus()
|
||||||
|
def callback(e):
|
||||||
|
return None
|
||||||
|
|
||||||
|
bus.unsubscribe(EventType.NTFY_MESSAGE, callback)
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventBusPublish:
|
||||||
|
"""Tests for EventBus.publish method."""
|
||||||
|
|
||||||
|
def test_publish_calls_subscriber(self):
|
||||||
|
"""publish() calls the subscriber callback."""
|
||||||
|
bus = EventBus()
|
||||||
|
received = []
|
||||||
|
|
||||||
|
def callback(event):
|
||||||
|
received.append(event)
|
||||||
|
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, callback)
|
||||||
|
event = NtfyMessageEvent(title="Test", body="Body")
|
||||||
|
bus.publish(EventType.NTFY_MESSAGE, event)
|
||||||
|
|
||||||
|
assert len(received) == 1
|
||||||
|
assert received[0].title == "Test"
|
||||||
|
|
||||||
|
def test_publish_multiple_subscribers(self):
|
||||||
|
"""publish() calls all subscribers for an event type."""
|
||||||
|
bus = EventBus()
|
||||||
|
received1 = []
|
||||||
|
received2 = []
|
||||||
|
|
||||||
|
def callback1(event):
|
||||||
|
received1.append(event)
|
||||||
|
|
||||||
|
def callback2(event):
|
||||||
|
received2.append(event)
|
||||||
|
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, callback1)
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, callback2)
|
||||||
|
event = NtfyMessageEvent(title="Test", body="Body")
|
||||||
|
bus.publish(EventType.NTFY_MESSAGE, event)
|
||||||
|
|
||||||
|
assert len(received1) == 1
|
||||||
|
assert len(received2) == 1
|
||||||
|
|
||||||
|
def test_publish_different_event_types(self):
|
||||||
|
"""publish() only calls subscribers for the specific event type."""
|
||||||
|
bus = EventBus()
|
||||||
|
ntfy_received = []
|
||||||
|
mic_received = []
|
||||||
|
|
||||||
|
def ntfy_callback(event):
|
||||||
|
ntfy_received.append(event)
|
||||||
|
|
||||||
|
def mic_callback(event):
|
||||||
|
mic_received.append(event)
|
||||||
|
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, ntfy_callback)
|
||||||
|
bus.subscribe(EventType.MIC_LEVEL, mic_callback)
|
||||||
|
event = NtfyMessageEvent(title="Test", body="Body")
|
||||||
|
bus.publish(EventType.NTFY_MESSAGE, event)
|
||||||
|
|
||||||
|
assert len(ntfy_received) == 1
|
||||||
|
assert len(mic_received) == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventBusClear:
|
||||||
|
"""Tests for EventBus.clear method."""
|
||||||
|
|
||||||
|
def test_clear_removes_all_subscribers(self):
|
||||||
|
"""clear() removes all subscribers."""
|
||||||
|
bus = EventBus()
|
||||||
|
def cb1(e):
|
||||||
|
return None
|
||||||
|
def cb2(e):
|
||||||
|
return None
|
||||||
|
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, cb1)
|
||||||
|
bus.subscribe(EventType.MIC_LEVEL, cb2)
|
||||||
|
bus.clear()
|
||||||
|
|
||||||
|
assert bus.subscriber_count() == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventBusThreadSafety:
|
||||||
|
"""Tests for EventBus thread safety."""
|
||||||
|
|
||||||
|
def test_concurrent_subscribe_unsubscribe(self):
|
||||||
|
"""subscribe and unsubscribe can be called concurrently."""
|
||||||
|
import threading
|
||||||
|
|
||||||
|
bus = EventBus()
|
||||||
|
callbacks = [lambda e: None for _ in range(10)]
|
||||||
|
|
||||||
|
def subscribe():
|
||||||
|
for cb in callbacks:
|
||||||
|
bus.subscribe(EventType.NTFY_MESSAGE, cb)
|
||||||
|
|
||||||
|
def unsubscribe():
|
||||||
|
for cb in callbacks:
|
||||||
|
bus.unsubscribe(EventType.NTFY_MESSAGE, cb)
|
||||||
|
|
||||||
|
t1 = threading.Thread(target=subscribe)
|
||||||
|
t2 = threading.Thread(target=unsubscribe)
|
||||||
|
t1.start()
|
||||||
|
t2.start()
|
||||||
|
t1.join()
|
||||||
|
t2.join()
|
||||||
|
|
||||||
|
|
||||||
|
class TestGlobalEventBus:
|
||||||
|
"""Tests for global event bus functions."""
|
||||||
|
|
||||||
|
def test_get_event_bus_returns_singleton(self):
|
||||||
|
"""get_event_bus() returns the same instance."""
|
||||||
|
bus1 = get_event_bus()
|
||||||
|
bus2 = get_event_bus()
|
||||||
|
assert bus1 is bus2
|
||||||
|
|
||||||
|
def test_set_event_bus_replaces_singleton(self):
|
||||||
|
"""set_event_bus() replaces the global event bus."""
|
||||||
|
new_bus = EventBus()
|
||||||
|
set_event_bus(new_bus)
|
||||||
|
try:
|
||||||
|
assert get_event_bus() is new_bus
|
||||||
|
finally:
|
||||||
|
set_event_bus(None)
|
||||||
112
tests/test_events.py
Normal file
112
tests/test_events.py
Normal file
@@ -0,0 +1,112 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.events module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
from engine import events
|
||||||
|
|
||||||
|
|
||||||
|
class TestEventType:
|
||||||
|
"""Tests for EventType enum."""
|
||||||
|
|
||||||
|
def test_event_types_exist(self):
|
||||||
|
"""All expected event types exist."""
|
||||||
|
assert hasattr(events.EventType, "NEW_HEADLINE")
|
||||||
|
assert hasattr(events.EventType, "FRAME_TICK")
|
||||||
|
assert hasattr(events.EventType, "MIC_LEVEL")
|
||||||
|
assert hasattr(events.EventType, "NTFY_MESSAGE")
|
||||||
|
assert hasattr(events.EventType, "STREAM_START")
|
||||||
|
assert hasattr(events.EventType, "STREAM_END")
|
||||||
|
|
||||||
|
|
||||||
|
class TestHeadlineEvent:
|
||||||
|
"""Tests for HeadlineEvent dataclass."""
|
||||||
|
|
||||||
|
def test_create_headline_event(self):
|
||||||
|
"""HeadlineEvent can be created with required fields."""
|
||||||
|
e = events.HeadlineEvent(
|
||||||
|
title="Test Headline",
|
||||||
|
source="Test Source",
|
||||||
|
timestamp="12:00",
|
||||||
|
)
|
||||||
|
assert e.title == "Test Headline"
|
||||||
|
assert e.source == "Test Source"
|
||||||
|
assert e.timestamp == "12:00"
|
||||||
|
|
||||||
|
def test_headline_event_optional_language(self):
|
||||||
|
"""HeadlineEvent supports optional language field."""
|
||||||
|
e = events.HeadlineEvent(
|
||||||
|
title="Test",
|
||||||
|
source="Test",
|
||||||
|
timestamp="12:00",
|
||||||
|
language="ja",
|
||||||
|
)
|
||||||
|
assert e.language == "ja"
|
||||||
|
|
||||||
|
|
||||||
|
class TestFrameTickEvent:
|
||||||
|
"""Tests for FrameTickEvent dataclass."""
|
||||||
|
|
||||||
|
def test_create_frame_tick_event(self):
|
||||||
|
"""FrameTickEvent can be created."""
|
||||||
|
now = datetime.now()
|
||||||
|
e = events.FrameTickEvent(
|
||||||
|
frame_number=100,
|
||||||
|
timestamp=now,
|
||||||
|
delta_seconds=0.05,
|
||||||
|
)
|
||||||
|
assert e.frame_number == 100
|
||||||
|
assert e.timestamp == now
|
||||||
|
assert e.delta_seconds == 0.05
|
||||||
|
|
||||||
|
|
||||||
|
class TestMicLevelEvent:
|
||||||
|
"""Tests for MicLevelEvent dataclass."""
|
||||||
|
|
||||||
|
def test_create_mic_level_event(self):
|
||||||
|
"""MicLevelEvent can be created."""
|
||||||
|
now = datetime.now()
|
||||||
|
e = events.MicLevelEvent(
|
||||||
|
db_level=60.0,
|
||||||
|
excess_above_threshold=10.0,
|
||||||
|
timestamp=now,
|
||||||
|
)
|
||||||
|
assert e.db_level == 60.0
|
||||||
|
assert e.excess_above_threshold == 10.0
|
||||||
|
|
||||||
|
|
||||||
|
class TestNtfyMessageEvent:
|
||||||
|
"""Tests for NtfyMessageEvent dataclass."""
|
||||||
|
|
||||||
|
def test_create_ntfy_message_event(self):
|
||||||
|
"""NtfyMessageEvent can be created with required fields."""
|
||||||
|
e = events.NtfyMessageEvent(
|
||||||
|
title="Test Title",
|
||||||
|
body="Test Body",
|
||||||
|
)
|
||||||
|
assert e.title == "Test Title"
|
||||||
|
assert e.body == "Test Body"
|
||||||
|
assert e.message_id is None
|
||||||
|
|
||||||
|
def test_ntfy_message_event_with_id(self):
|
||||||
|
"""NtfyMessageEvent supports optional message_id."""
|
||||||
|
e = events.NtfyMessageEvent(
|
||||||
|
title="Test",
|
||||||
|
body="Test",
|
||||||
|
message_id="abc123",
|
||||||
|
)
|
||||||
|
assert e.message_id == "abc123"
|
||||||
|
|
||||||
|
|
||||||
|
class TestStreamEvent:
|
||||||
|
"""Tests for StreamEvent dataclass."""
|
||||||
|
|
||||||
|
def test_create_stream_event(self):
|
||||||
|
"""StreamEvent can be created."""
|
||||||
|
e = events.StreamEvent(
|
||||||
|
event_type=events.EventType.STREAM_START,
|
||||||
|
headline_count=100,
|
||||||
|
)
|
||||||
|
assert e.event_type == events.EventType.STREAM_START
|
||||||
|
assert e.headline_count == 100
|
||||||
63
tests/test_frame.py
Normal file
63
tests/test_frame.py
Normal file
@@ -0,0 +1,63 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.frame module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from engine.frame import FrameTimer, calculate_scroll_step
|
||||||
|
|
||||||
|
|
||||||
|
class TestFrameTimer:
|
||||||
|
"""Tests for FrameTimer class."""
|
||||||
|
|
||||||
|
def test_init_default(self):
|
||||||
|
"""FrameTimer initializes with default values."""
|
||||||
|
timer = FrameTimer()
|
||||||
|
assert timer.target_frame_dt == 0.05
|
||||||
|
assert timer.fps >= 0
|
||||||
|
|
||||||
|
def test_init_custom(self):
|
||||||
|
"""FrameTimer accepts custom frame duration."""
|
||||||
|
timer = FrameTimer(target_frame_dt=0.1)
|
||||||
|
assert timer.target_frame_dt == 0.1
|
||||||
|
|
||||||
|
def test_fps_calculation(self):
|
||||||
|
"""FrameTimer calculates FPS correctly."""
|
||||||
|
timer = FrameTimer()
|
||||||
|
timer._frame_count = 10
|
||||||
|
timer._start_time = time.monotonic() - 1.0
|
||||||
|
assert timer.fps >= 9.0
|
||||||
|
|
||||||
|
def test_reset(self):
|
||||||
|
"""FrameTimer.reset() clears frame count."""
|
||||||
|
timer = FrameTimer()
|
||||||
|
timer._frame_count = 100
|
||||||
|
timer.reset()
|
||||||
|
assert timer._frame_count == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestCalculateScrollStep:
|
||||||
|
"""Tests for calculate_scroll_step function."""
|
||||||
|
|
||||||
|
def test_basic_calculation(self):
|
||||||
|
"""calculate_scroll_step returns positive value."""
|
||||||
|
result = calculate_scroll_step(5.0, 24)
|
||||||
|
assert result > 0
|
||||||
|
|
||||||
|
def test_with_padding(self):
|
||||||
|
"""calculate_scroll_step respects padding parameter."""
|
||||||
|
without_padding = calculate_scroll_step(5.0, 24, padding=0)
|
||||||
|
with_padding = calculate_scroll_step(5.0, 24, padding=15)
|
||||||
|
assert with_padding < without_padding
|
||||||
|
|
||||||
|
def test_larger_view_slower_scroll(self):
|
||||||
|
"""Larger view height results in slower scroll steps."""
|
||||||
|
small = calculate_scroll_step(5.0, 10)
|
||||||
|
large = calculate_scroll_step(5.0, 50)
|
||||||
|
assert large < small
|
||||||
|
|
||||||
|
def test_longer_duration_slower_scroll(self):
|
||||||
|
"""Longer scroll duration results in slower scroll steps."""
|
||||||
|
fast = calculate_scroll_step(2.0, 24)
|
||||||
|
slow = calculate_scroll_step(10.0, 24)
|
||||||
|
assert slow > fast
|
||||||
96
tests/test_layers.py
Normal file
96
tests/test_layers.py
Normal file
@@ -0,0 +1,96 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.layers module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
from engine import layers
|
||||||
|
|
||||||
|
|
||||||
|
class TestRenderMessageOverlay:
|
||||||
|
"""Tests for render_message_overlay function."""
|
||||||
|
|
||||||
|
def test_no_message_returns_empty(self):
|
||||||
|
"""Returns empty list when msg is None."""
|
||||||
|
result, cache = layers.render_message_overlay(None, 80, 24, (None, None))
|
||||||
|
assert result == []
|
||||||
|
assert cache[0] is None
|
||||||
|
|
||||||
|
def test_message_returns_overlay_lines(self):
|
||||||
|
"""Returns non-empty list when message is present."""
|
||||||
|
msg = ("Test Title", "Test Body", time.monotonic())
|
||||||
|
result, cache = layers.render_message_overlay(msg, 80, 24, (None, None))
|
||||||
|
assert len(result) > 0
|
||||||
|
assert cache[0] is not None
|
||||||
|
|
||||||
|
def test_cache_key_changes_with_text(self):
|
||||||
|
"""Cache key changes when message text changes."""
|
||||||
|
msg1 = ("Title1", "Body1", time.monotonic())
|
||||||
|
msg2 = ("Title2", "Body2", time.monotonic())
|
||||||
|
|
||||||
|
_, cache1 = layers.render_message_overlay(msg1, 80, 24, (None, None))
|
||||||
|
_, cache2 = layers.render_message_overlay(msg2, 80, 24, cache1)
|
||||||
|
|
||||||
|
assert cache1[0] != cache2[0]
|
||||||
|
|
||||||
|
def test_cache_reuse_avoids_recomputation(self):
|
||||||
|
"""Cache is returned when same message is passed (interface test)."""
|
||||||
|
msg = ("Same Title", "Same Body", time.monotonic())
|
||||||
|
|
||||||
|
result1, cache1 = layers.render_message_overlay(msg, 80, 24, (None, None))
|
||||||
|
result2, cache2 = layers.render_message_overlay(msg, 80, 24, cache1)
|
||||||
|
|
||||||
|
assert len(result1) > 0
|
||||||
|
assert len(result2) > 0
|
||||||
|
assert cache1[0] == cache2[0]
|
||||||
|
|
||||||
|
|
||||||
|
class TestRenderFirehose:
|
||||||
|
"""Tests for render_firehose function."""
|
||||||
|
|
||||||
|
def test_no_firehose_returns_empty(self):
|
||||||
|
"""Returns empty list when firehose height is 0."""
|
||||||
|
items = [("Headline", "Source", "12:00")]
|
||||||
|
result = layers.render_firehose(items, 80, 0, 24)
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
def test_firehose_returns_lines(self):
|
||||||
|
"""Returns lines when firehose height > 0."""
|
||||||
|
items = [("Headline", "Source", "12:00")]
|
||||||
|
result = layers.render_firehose(items, 80, 4, 24)
|
||||||
|
assert len(result) == 4
|
||||||
|
|
||||||
|
def test_firehose_includes_ansi_escapes(self):
|
||||||
|
"""Returns lines containing ANSI escape sequences."""
|
||||||
|
items = [("Headline", "Source", "12:00")]
|
||||||
|
result = layers.render_firehose(items, 80, 1, 24)
|
||||||
|
assert "\033[" in result[0]
|
||||||
|
|
||||||
|
|
||||||
|
class TestApplyGlitch:
|
||||||
|
"""Tests for apply_glitch function."""
|
||||||
|
|
||||||
|
def test_empty_buffer_unchanged(self):
|
||||||
|
"""Empty buffer is returned unchanged."""
|
||||||
|
result = layers.apply_glitch([], 0, 0.0, 80)
|
||||||
|
assert result == []
|
||||||
|
|
||||||
|
def test_buffer_length_preserved(self):
|
||||||
|
"""Buffer length is preserved after glitch application."""
|
||||||
|
buf = [f"\033[{i + 1};1Htest\033[K" for i in range(10)]
|
||||||
|
result = layers.apply_glitch(buf, 0, 0.5, 80)
|
||||||
|
assert len(result) == len(buf)
|
||||||
|
|
||||||
|
|
||||||
|
class TestRenderTickerZone:
|
||||||
|
"""Tests for render_ticker_zone function - focusing on interface."""
|
||||||
|
|
||||||
|
def test_returns_list(self):
|
||||||
|
"""Returns a list of strings."""
|
||||||
|
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
|
||||||
|
assert isinstance(result, list)
|
||||||
|
|
||||||
|
def test_returns_dict_for_cache(self):
|
||||||
|
"""Returns a dict for the noise cache."""
|
||||||
|
result, cache = layers.render_ticker_zone([], 0, 10, 80, {}, 0.0)
|
||||||
|
assert isinstance(cache, dict)
|
||||||
@@ -2,8 +2,11 @@
|
|||||||
Tests for engine.mic module.
|
Tests for engine.mic module.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from datetime import datetime
|
||||||
from unittest.mock import patch
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from engine.events import MicLevelEvent
|
||||||
|
|
||||||
|
|
||||||
class TestMicMonitorImport:
|
class TestMicMonitorImport:
|
||||||
"""Tests for module import behavior."""
|
"""Tests for module import behavior."""
|
||||||
@@ -81,3 +84,66 @@ class TestMicMonitorStop:
|
|||||||
monitor = MicMonitor()
|
monitor = MicMonitor()
|
||||||
monitor.stop()
|
monitor.stop()
|
||||||
assert monitor._stream is None
|
assert monitor._stream is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestMicMonitorEventEmission:
|
||||||
|
"""Tests for MicMonitor event emission."""
|
||||||
|
|
||||||
|
def test_subscribe_adds_callback(self):
|
||||||
|
"""subscribe() adds a callback."""
|
||||||
|
from engine.mic import MicMonitor
|
||||||
|
|
||||||
|
monitor = MicMonitor()
|
||||||
|
def callback(e):
|
||||||
|
return None
|
||||||
|
|
||||||
|
monitor.subscribe(callback)
|
||||||
|
|
||||||
|
assert callback in monitor._subscribers
|
||||||
|
|
||||||
|
def test_unsubscribe_removes_callback(self):
|
||||||
|
"""unsubscribe() removes a callback."""
|
||||||
|
from engine.mic import MicMonitor
|
||||||
|
|
||||||
|
monitor = MicMonitor()
|
||||||
|
def callback(e):
|
||||||
|
return None
|
||||||
|
monitor.subscribe(callback)
|
||||||
|
|
||||||
|
monitor.unsubscribe(callback)
|
||||||
|
|
||||||
|
assert callback not in monitor._subscribers
|
||||||
|
|
||||||
|
def test_emit_calls_subscribers(self):
|
||||||
|
"""_emit() calls all subscribers."""
|
||||||
|
from engine.mic import MicMonitor
|
||||||
|
|
||||||
|
monitor = MicMonitor()
|
||||||
|
received = []
|
||||||
|
|
||||||
|
def callback(event):
|
||||||
|
received.append(event)
|
||||||
|
|
||||||
|
monitor.subscribe(callback)
|
||||||
|
event = MicLevelEvent(
|
||||||
|
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
|
||||||
|
)
|
||||||
|
monitor._emit(event)
|
||||||
|
|
||||||
|
assert len(received) == 1
|
||||||
|
assert received[0].db_level == 60.0
|
||||||
|
|
||||||
|
def test_emit_handles_subscriber_exception(self):
|
||||||
|
"""_emit() handles exceptions in subscribers gracefully."""
|
||||||
|
from engine.mic import MicMonitor
|
||||||
|
|
||||||
|
monitor = MicMonitor()
|
||||||
|
|
||||||
|
def bad_callback(event):
|
||||||
|
raise RuntimeError("test")
|
||||||
|
|
||||||
|
monitor.subscribe(bad_callback)
|
||||||
|
event = MicLevelEvent(
|
||||||
|
db_level=60.0, excess_above_threshold=10.0, timestamp=datetime.now()
|
||||||
|
)
|
||||||
|
monitor._emit(event)
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ Tests for engine.ntfy module.
|
|||||||
import time
|
import time
|
||||||
from unittest.mock import MagicMock, patch
|
from unittest.mock import MagicMock, patch
|
||||||
|
|
||||||
|
from engine.events import NtfyMessageEvent
|
||||||
from engine.ntfy import NtfyPoller
|
from engine.ntfy import NtfyPoller
|
||||||
|
|
||||||
|
|
||||||
@@ -68,3 +69,54 @@ class TestNtfyPollerDismiss:
|
|||||||
poller.dismiss()
|
poller.dismiss()
|
||||||
|
|
||||||
assert poller._message is None
|
assert poller._message is None
|
||||||
|
|
||||||
|
|
||||||
|
class TestNtfyPollerEventEmission:
|
||||||
|
"""Tests for NtfyPoller event emission."""
|
||||||
|
|
||||||
|
def test_subscribe_adds_callback(self):
|
||||||
|
"""subscribe() adds a callback."""
|
||||||
|
poller = NtfyPoller("http://example.com/topic")
|
||||||
|
def callback(e):
|
||||||
|
return None
|
||||||
|
|
||||||
|
poller.subscribe(callback)
|
||||||
|
|
||||||
|
assert callback in poller._subscribers
|
||||||
|
|
||||||
|
def test_unsubscribe_removes_callback(self):
|
||||||
|
"""unsubscribe() removes a callback."""
|
||||||
|
poller = NtfyPoller("http://example.com/topic")
|
||||||
|
def callback(e):
|
||||||
|
return None
|
||||||
|
poller.subscribe(callback)
|
||||||
|
|
||||||
|
poller.unsubscribe(callback)
|
||||||
|
|
||||||
|
assert callback not in poller._subscribers
|
||||||
|
|
||||||
|
def test_emit_calls_subscribers(self):
|
||||||
|
"""_emit() calls all subscribers."""
|
||||||
|
poller = NtfyPoller("http://example.com/topic")
|
||||||
|
received = []
|
||||||
|
|
||||||
|
def callback(event):
|
||||||
|
received.append(event)
|
||||||
|
|
||||||
|
poller.subscribe(callback)
|
||||||
|
event = NtfyMessageEvent(title="Test", body="Body")
|
||||||
|
poller._emit(event)
|
||||||
|
|
||||||
|
assert len(received) == 1
|
||||||
|
assert received[0].title == "Test"
|
||||||
|
|
||||||
|
def test_emit_handles_subscriber_exception(self):
|
||||||
|
"""_emit() handles exceptions in subscribers gracefully."""
|
||||||
|
poller = NtfyPoller("http://example.com/topic")
|
||||||
|
|
||||||
|
def bad_callback(event):
|
||||||
|
raise RuntimeError("test")
|
||||||
|
|
||||||
|
poller.subscribe(bad_callback)
|
||||||
|
event = NtfyMessageEvent(title="Test", body="Body")
|
||||||
|
poller._emit(event)
|
||||||
|
|||||||
95
tests/test_types.py
Normal file
95
tests/test_types.py
Normal file
@@ -0,0 +1,95 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.types module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from engine.types import (
|
||||||
|
Block,
|
||||||
|
FetchResult,
|
||||||
|
HeadlineItem,
|
||||||
|
items_to_tuples,
|
||||||
|
tuples_to_items,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestHeadlineItem:
|
||||||
|
"""Tests for HeadlineItem dataclass."""
|
||||||
|
|
||||||
|
def test_create_headline_item(self):
|
||||||
|
"""Can create HeadlineItem with required fields."""
|
||||||
|
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
|
||||||
|
assert item.title == "Test"
|
||||||
|
assert item.source == "Source"
|
||||||
|
assert item.timestamp == "12:00"
|
||||||
|
|
||||||
|
def test_to_tuple(self):
|
||||||
|
"""to_tuple returns correct tuple."""
|
||||||
|
item = HeadlineItem(title="Test", source="Source", timestamp="12:00")
|
||||||
|
assert item.to_tuple() == ("Test", "Source", "12:00")
|
||||||
|
|
||||||
|
def test_from_tuple(self):
|
||||||
|
"""from_tuple creates HeadlineItem from tuple."""
|
||||||
|
item = HeadlineItem.from_tuple(("Test", "Source", "12:00"))
|
||||||
|
assert item.title == "Test"
|
||||||
|
assert item.source == "Source"
|
||||||
|
assert item.timestamp == "12:00"
|
||||||
|
|
||||||
|
|
||||||
|
class TestItemsConversion:
|
||||||
|
"""Tests for list conversion functions."""
|
||||||
|
|
||||||
|
def test_items_to_tuples(self):
|
||||||
|
"""Converts list of HeadlineItem to list of tuples."""
|
||||||
|
items = [
|
||||||
|
HeadlineItem(title="A", source="S", timestamp="10:00"),
|
||||||
|
HeadlineItem(title="B", source="T", timestamp="11:00"),
|
||||||
|
]
|
||||||
|
result = items_to_tuples(items)
|
||||||
|
assert result == [("A", "S", "10:00"), ("B", "T", "11:00")]
|
||||||
|
|
||||||
|
def test_tuples_to_items(self):
|
||||||
|
"""Converts list of tuples to list of HeadlineItem."""
|
||||||
|
tuples = [("A", "S", "10:00"), ("B", "T", "11:00")]
|
||||||
|
result = tuples_to_items(tuples)
|
||||||
|
assert len(result) == 2
|
||||||
|
assert result[0].title == "A"
|
||||||
|
assert result[1].title == "B"
|
||||||
|
|
||||||
|
|
||||||
|
class TestFetchResult:
|
||||||
|
"""Tests for FetchResult dataclass."""
|
||||||
|
|
||||||
|
def test_create_fetch_result(self):
|
||||||
|
"""Can create FetchResult."""
|
||||||
|
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
|
||||||
|
result = FetchResult(items=items, linked=1, failed=0)
|
||||||
|
assert len(result.items) == 1
|
||||||
|
assert result.linked == 1
|
||||||
|
assert result.failed == 0
|
||||||
|
|
||||||
|
def test_to_legacy_tuple(self):
|
||||||
|
"""to_legacy_tuple returns correct format."""
|
||||||
|
items = [HeadlineItem(title="Test", source="Source", timestamp="12:00")]
|
||||||
|
result = FetchResult(items=items, linked=1, failed=0)
|
||||||
|
legacy = result.to_legacy_tuple()
|
||||||
|
assert legacy[0] == [("Test", "Source", "12:00")]
|
||||||
|
assert legacy[1] == 1
|
||||||
|
assert legacy[2] == 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestBlock:
|
||||||
|
"""Tests for Block dataclass."""
|
||||||
|
|
||||||
|
def test_create_block(self):
|
||||||
|
"""Can create Block."""
|
||||||
|
block = Block(
|
||||||
|
content=["line1", "line2"], color="\033[38;5;46m", meta_row_index=1
|
||||||
|
)
|
||||||
|
assert len(block.content) == 2
|
||||||
|
assert block.color == "\033[38;5;46m"
|
||||||
|
assert block.meta_row_index == 1
|
||||||
|
|
||||||
|
def test_to_legacy_tuple(self):
|
||||||
|
"""to_legacy_tuple returns correct format."""
|
||||||
|
block = Block(content=["line1"], color="green", meta_row_index=0)
|
||||||
|
legacy = block.to_legacy_tuple()
|
||||||
|
assert legacy == (["line1"], "green", 0)
|
||||||
64
tests/test_viewport.py
Normal file
64
tests/test_viewport.py
Normal file
@@ -0,0 +1,64 @@
|
|||||||
|
"""
|
||||||
|
Tests for engine.viewport module.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from engine import viewport
|
||||||
|
|
||||||
|
|
||||||
|
class TestViewportTw:
|
||||||
|
"""Tests for tw() function."""
|
||||||
|
|
||||||
|
def test_tw_returns_int(self):
|
||||||
|
"""tw() returns an integer."""
|
||||||
|
result = viewport.tw()
|
||||||
|
assert isinstance(result, int)
|
||||||
|
|
||||||
|
def test_tw_positive(self):
|
||||||
|
"""tw() returns a positive value."""
|
||||||
|
assert viewport.tw() > 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestViewportTh:
|
||||||
|
"""Tests for th() function."""
|
||||||
|
|
||||||
|
def test_th_returns_int(self):
|
||||||
|
"""th() returns an integer."""
|
||||||
|
result = viewport.th()
|
||||||
|
assert isinstance(result, int)
|
||||||
|
|
||||||
|
def test_th_positive(self):
|
||||||
|
"""th() returns a positive value."""
|
||||||
|
assert viewport.th() > 0
|
||||||
|
|
||||||
|
|
||||||
|
class TestViewportMoveTo:
|
||||||
|
"""Tests for move_to() function."""
|
||||||
|
|
||||||
|
def test_move_to_format(self):
|
||||||
|
"""move_to() returns correctly formatted ANSI escape."""
|
||||||
|
result = viewport.move_to(5, 10)
|
||||||
|
assert result == "\033[5;10H"
|
||||||
|
|
||||||
|
def test_move_to_default_col(self):
|
||||||
|
"""move_to() defaults to column 1."""
|
||||||
|
result = viewport.move_to(5)
|
||||||
|
assert result == "\033[5;1H"
|
||||||
|
|
||||||
|
|
||||||
|
class TestViewportClearScreen:
|
||||||
|
"""Tests for clear_screen() function."""
|
||||||
|
|
||||||
|
def test_clear_screen_format(self):
|
||||||
|
"""clear_screen() returns clear screen ANSI escape."""
|
||||||
|
result = viewport.clear_screen()
|
||||||
|
assert "\033[2J" in result
|
||||||
|
assert "\033[H" in result
|
||||||
|
|
||||||
|
|
||||||
|
class TestViewportClearLine:
|
||||||
|
"""Tests for clear_line() function."""
|
||||||
|
|
||||||
|
def test_clear_line_format(self):
|
||||||
|
"""clear_line() returns clear line ANSI escape."""
|
||||||
|
result = viewport.clear_line()
|
||||||
|
assert result == "\033[K"
|
||||||
Reference in New Issue
Block a user