diff --git a/.gitignore b/.gitignore index 590c496..cca23ea 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,4 @@ htmlcov/ .coverage .pytest_cache/ *.egg-info/ +coverage.xml diff --git a/client/index.html b/client/index.html new file mode 100644 index 0000000..01e6805 --- /dev/null +++ b/client/index.html @@ -0,0 +1,366 @@ + + + + + + Mainline Terminal + + + +
+ +
+
+ + + + +
+
Connecting...
+ + + + diff --git a/engine/app.py b/engine/app.py index ec36e11..d91ff2d 100644 --- a/engine/app.py +++ b/engine/app.py @@ -29,6 +29,18 @@ from engine.terminal import ( slow_print, tw, ) +from engine.websocket_display import WebSocketDisplay + + +def _get_display(): + """Get the appropriate display based on config.""" + if config.WEBSOCKET: + ws = WebSocketDisplay(host="0.0.0.0", port=config.WEBSOCKET_PORT) + ws.start_server() + ws.start_http_server() + return ws + return None + TITLE = [ " ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗", @@ -343,7 +355,10 @@ def main(): print() time.sleep(0.4) - stream(items, ntfy, mic) + display = _get_display() + stream(items, ntfy, mic, display) + if display: + display.cleanup() print() print(f" {W_GHOST}{'─' * (tw() - 4)}{RST}") diff --git a/engine/config.py b/engine/config.py index 284877c..fab1387 100644 --- a/engine/config.py +++ b/engine/config.py @@ -127,6 +127,9 @@ class Config: script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths) + websocket: bool = False + websocket_port: int = 8765 + @classmethod def from_args(cls, argv: list[str] | None = None) -> "Config": """Create Config from CLI arguments (or custom argv for testing).""" @@ -164,6 +167,8 @@ class Config: glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋", kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ", script_fonts=_get_platform_font_paths(), + websocket="--websocket" in argv, + websocket_port=_arg_int("--websocket-port", 8765, argv), ) @@ -223,6 +228,10 @@ GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep) GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ" +# ─── WEBSOCKET ───────────────────────────────────────────── +WEBSOCKET = "--websocket" in sys.argv +WEBSOCKET_PORT = _arg_int("--websocket-port", 8765) + def set_font_selection(font_path=None, font_index=None): """Set runtime primary font selection.""" diff --git a/engine/controller.py b/engine/controller.py index e6e2e3d..5b07e67 100644 --- a/engine/controller.py +++ b/engine/controller.py @@ -8,6 +8,17 @@ from engine.events import EventType, StreamEvent from engine.mic import MicMonitor from engine.ntfy import NtfyPoller from engine.scroll import stream +from engine.websocket_display import WebSocketDisplay + + +def _get_display(config: Config): + """Get the appropriate display based on config.""" + if config.websocket: + ws = WebSocketDisplay(host="0.0.0.0", port=config.websocket_port) + ws.start_server() + ws.start_http_server() + return ws + return None class StreamController: @@ -51,7 +62,10 @@ class StreamController: ), ) - stream(items, self.ntfy, self.mic) + display = _get_display(self.config) + stream(items, self.ntfy, self.mic, display) + if display: + display.cleanup() if self.event_bus: self.event_bus.publish( diff --git a/engine/websocket_display.py b/engine/websocket_display.py new file mode 100644 index 0000000..ba382c7 --- /dev/null +++ b/engine/websocket_display.py @@ -0,0 +1,264 @@ +""" +WebSocket display server - broadcasts frame buffer to connected web clients. + +Usage: + ws_display = WebSocketDisplay(host="0.0.0.0", port=8765) + ws_display.init(80, 24) + ws_display.show(["line1", "line2", ...]) + ws_display.cleanup() +""" + +import asyncio +import json +import threading +import time +from typing import Protocol + +try: + import websockets +except ImportError: + websockets = None + + +class Display(Protocol): + """Protocol for display backends.""" + + def init(self, width: int, height: int) -> None: + """Initialize display with dimensions.""" + ... + + def show(self, buffer: list[str]) -> None: + """Show buffer on display.""" + ... + + def clear(self) -> None: + """Clear display.""" + ... + + def cleanup(self) -> None: + """Shutdown display.""" + ... + + +def get_monitor(): + """Get the performance monitor.""" + try: + from engine.effects.performance import get_monitor as _get_monitor + + return _get_monitor() + except Exception: + return None + + +class WebSocketDisplay: + """WebSocket display backend - broadcasts to HTML Canvas clients.""" + + def __init__( + self, + host: str = "0.0.0.0", + port: int = 8765, + http_port: int = 8766, + ): + self.host = host + self.port = port + self.http_port = http_port + self.width = 80 + self.height = 24 + self._clients: set = set() + self._server_running = False + self._http_running = False + self._server_thread: threading.Thread | None = None + self._http_thread: threading.Thread | None = None + self._available = True + self._max_clients = 10 + self._client_connected_callback = None + self._client_disconnected_callback = None + self._frame_delay = 0.0 + + try: + import websockets as _ws + + self._available = _ws is not None + except ImportError: + self._available = False + + def is_available(self) -> bool: + """Check if WebSocket support is available.""" + return self._available + + def init(self, width: int, height: int) -> None: + """Initialize display with dimensions and start server.""" + self.width = width + self.height = height + self.start_server() + self.start_http_server() + + def show(self, buffer: list[str]) -> None: + """Broadcast buffer to all connected clients.""" + t0 = time.perf_counter() + + if self._clients: + frame_data = { + "type": "frame", + "width": self.width, + "height": self.height, + "lines": buffer, + } + message = json.dumps(frame_data) + + disconnected = set() + for client in list(self._clients): + try: + asyncio.run(client.send(message)) + except Exception: + disconnected.add(client) + + for client in disconnected: + self._clients.discard(client) + if self._client_disconnected_callback: + self._client_disconnected_callback(client) + + elapsed_ms = (time.perf_counter() - t0) * 1000 + monitor = get_monitor() + if monitor: + chars_in = sum(len(line) for line in buffer) + monitor.record_effect("websocket_display", elapsed_ms, chars_in, chars_in) + + def clear(self) -> None: + """Broadcast clear command to all clients.""" + if self._clients: + clear_data = {"type": "clear"} + message = json.dumps(clear_data) + for client in list(self._clients): + try: + asyncio.run(client.send(message)) + except Exception: + pass + + def cleanup(self) -> None: + """Stop the servers.""" + self.stop_server() + self.stop_http_server() + + async def _websocket_handler(self, websocket): + """Handle WebSocket connections.""" + if len(self._clients) >= self._max_clients: + await websocket.close() + return + + self._clients.add(websocket) + if self._client_connected_callback: + self._client_connected_callback(websocket) + + try: + async for message in websocket: + try: + data = json.loads(message) + if data.get("type") == "resize": + self.width = data.get("width", 80) + self.height = data.get("height", 24) + except json.JSONDecodeError: + pass + except Exception: + pass + finally: + self._clients.discard(websocket) + if self._client_disconnected_callback: + self._client_disconnected_callback(websocket) + + async def _run_websocket_server(self): + """Run the WebSocket server.""" + async with websockets.serve(self._websocket_handler, self.host, self.port): + while self._server_running: + await asyncio.sleep(0.1) + + async def _run_http_server(self): + """Run simple HTTP server for the client.""" + import os + from http.server import HTTPServer, SimpleHTTPRequestHandler + + client_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "client") + + class Handler(SimpleHTTPRequestHandler): + def __init__(self, *args, **kwargs): + super().__init__(*args, directory=client_dir, **kwargs) + + def log_message(self, format, *args): + pass + + httpd = HTTPServer((self.host, self.http_port), Handler) + while self._http_running: + httpd.handle_request() + + def _run_async(self, coro): + """Run coroutine in background.""" + try: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + loop.run_until_complete(coro) + except Exception: + pass + + def start_server(self): + """Start the WebSocket server in a background thread.""" + if not self._available: + return + if self._server_thread is not None: + return + + self._server_running = True + self._server_thread = threading.Thread( + target=self._run_async, args=(self._run_websocket_server(),), daemon=True + ) + self._server_thread.start() + + def stop_server(self): + """Stop the WebSocket server.""" + self._server_running = False + self._server_thread = None + + def start_http_server(self): + """Start the HTTP server in a background thread.""" + if not self._available: + return + if self._http_thread is not None: + return + + self._http_running = True + self._http_thread = threading.Thread( + target=self._run_async, args=(self._run_http_server(),), daemon=True + ) + self._http_thread.start() + + def stop_http_server(self): + """Stop the HTTP server.""" + self._http_running = False + self._http_thread = None + + def client_count(self) -> int: + """Return number of connected clients.""" + return len(self._clients) + + def get_ws_port(self) -> int: + """Return WebSocket port.""" + return self.port + + def get_http_port(self) -> int: + """Return HTTP port.""" + return self.http_port + + def set_frame_delay(self, delay: float) -> None: + """Set delay between frames in seconds.""" + self._frame_delay = delay + + def get_frame_delay(self) -> float: + """Get delay between frames.""" + return self._frame_delay + + def set_client_connected_callback(self, callback) -> None: + """Set callback for client connections.""" + self._client_connected_callback = callback + + def set_client_disconnected_callback(self, callback) -> None: + """Set callback for client disconnections.""" + self._client_disconnected_callback = callback diff --git a/mise.toml b/mise.toml index 32f7c59..61e1f04 100644 --- a/mise.toml +++ b/mise.toml @@ -13,6 +13,9 @@ test-v = "uv run pytest -v" test-cov = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html" test-cov-open = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html && open htmlcov/index.html" +test-browser-install = { run = "uv run playwright install chromium", depends = ["sync-all"] } +test-browser = { run = "uv run pytest tests/e2e/", depends = ["test-browser-install"] } + lint = "uv run ruff check engine/ mainline.py" lint-fix = "uv run ruff check --fix engine/ mainline.py" format = "uv run ruff format engine/ mainline.py" @@ -24,6 +27,8 @@ format = "uv run ruff format engine/ mainline.py" run = "uv run mainline.py" run-poetry = "uv run mainline.py --poetry" run-firehose = "uv run mainline.py --firehose" +run-websocket = { run = "uv run mainline.py --websocket", depends = ["sync-all"] } +run-client = { run = "uv run mainline.py --websocket & WEBSOCKET_PID=$! && sleep 2 && case $(uname -s) in Darwin) open http://localhost:8766 ;; Linux) xdg-open http://localhost:8766 ;; CYGWIN*) cmd /c start http://localhost:8766 ;; *) echo 'Unknown platform' ;; esac && wait $WEBSOCKET_PID", depends = ["sync-all"] } # ===================== # Environment diff --git a/pyproject.toml b/pyproject.toml index f52a05a..661392f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,12 @@ mic = [ "sounddevice>=0.4.0", "numpy>=1.24.0", ] +websocket = [ + "websockets>=12.0", +] +browser = [ + "playwright>=1.40.0", +] dev = [ "pytest>=8.0.0", "pytest-cov>=4.1.0", diff --git a/tests/e2e/test_web_client.py b/tests/e2e/test_web_client.py new file mode 100644 index 0000000..daf4efb --- /dev/null +++ b/tests/e2e/test_web_client.py @@ -0,0 +1,133 @@ +""" +End-to-end tests for web client with headless browser. +""" + +import os +import socketserver +import threading +from http.server import HTTPServer, SimpleHTTPRequestHandler +from pathlib import Path + +import pytest + +CLIENT_DIR = Path(__file__).parent.parent.parent / "client" + + +class ThreadedHTTPServer(socketserver.ThreadingMixIn, HTTPServer): + """Threaded HTTP server for handling concurrent requests.""" + + daemon_threads = True + + +@pytest.fixture(scope="module") +def http_server(): + """Start a local HTTP server for the client.""" + os.chdir(CLIENT_DIR) + + handler = SimpleHTTPRequestHandler + server = ThreadedHTTPServer(("127.0.0.1", 0), handler) + port = server.server_address[1] + + thread = threading.Thread(target=server.serve_forever, daemon=True) + thread.start() + + yield f"http://127.0.0.1:{port}" + + server.shutdown() + + +class TestWebClient: + """Tests for the web client using Playwright.""" + + @pytest.fixture(autouse=True) + def setup_browser(self): + """Set up browser for tests.""" + pytest.importorskip("playwright") + from playwright.sync_api import sync_playwright + + self.playwright = sync_playwright().start() + self.browser = self.playwright.chromium.launch(headless=True) + self.context = self.browser.new_context() + self.page = self.context.new_page() + + yield + + self.page.close() + self.context.close() + self.browser.close() + self.playwright.stop() + + def test_client_loads(self, http_server): + """Web client loads without errors.""" + response = self.page.goto(http_server) + assert response.status == 200, f"Page load failed with status {response.status}" + + self.page.wait_for_load_state("domcontentloaded") + + content = self.page.content() + assert " 0, "Canvas not found" + + def test_status_shows_connecting(self, http_server): + """Status shows connecting initially.""" + self.page.goto(http_server) + self.page.wait_for_load_state("domcontentloaded") + + status = self.page.locator("#status") + assert status.count() > 0, "Status element not found" + + def test_canvas_has_dimensions(self, http_server): + """Canvas has correct dimensions after load.""" + self.page.goto(http_server) + self.page.wait_for_load_state("domcontentloaded") + + canvas = self.page.locator("#terminal") + assert canvas.count() > 0, "Canvas not found" + + def test_no_console_errors_on_load(self, http_server): + """No JavaScript errors on page load (websocket errors are expected without server).""" + js_errors = [] + + def handle_console(msg): + if msg.type == "error": + text = msg.text + if "WebSocket" not in text: + js_errors.append(text) + + self.page.on("console", handle_console) + self.page.goto(http_server) + self.page.wait_for_load_state("domcontentloaded") + + assert len(js_errors) == 0, f"JavaScript errors: {js_errors}" + + +class TestWebClientProtocol: + """Tests for WebSocket protocol handling in client.""" + + @pytest.fixture(autouse=True) + def setup_browser(self): + """Set up browser for tests.""" + pytest.importorskip("playwright") + from playwright.sync_api import sync_playwright + + self.playwright = sync_playwright().start() + self.browser = self.playwright.chromium.launch(headless=True) + self.context = self.browser.new_context() + self.page = self.context.new_page() + + yield + + self.page.close() + self.context.close() + self.browser.close() + self.playwright.stop() + + def test_websocket_reconnection(self, http_server): + """Client attempts reconnection on disconnect.""" + self.page.goto(http_server) + self.page.wait_for_load_state("domcontentloaded") + + status = self.page.locator("#status") + assert status.count() > 0, "Status element not found" diff --git a/tests/test_app.py b/tests/test_app.py new file mode 100644 index 0000000..c5ccb9a --- /dev/null +++ b/tests/test_app.py @@ -0,0 +1,55 @@ +""" +Tests for engine.app module. +""" + +from engine.app import _normalize_preview_rows + + +class TestNormalizePreviewRows: + """Tests for _normalize_preview_rows function.""" + + def test_empty_rows(self): + """Empty input returns empty list.""" + result = _normalize_preview_rows([]) + assert result == [""] + + def test_strips_left_padding(self): + """Left padding is stripped.""" + result = _normalize_preview_rows([" content", " more"]) + assert all(not r.startswith(" ") for r in result) + + def test_preserves_content(self): + """Content is preserved.""" + result = _normalize_preview_rows([" hello world "]) + assert "hello world" in result[0] + + def test_handles_all_empty_rows(self): + """All empty rows returns single empty string.""" + result = _normalize_preview_rows(["", " ", ""]) + assert result == [""] + + +class TestAppConstants: + """Tests for app module constants.""" + + def test_title_defined(self): + """TITLE is defined.""" + from engine.app import TITLE + + assert len(TITLE) > 0 + + def test_title_lines_are_strings(self): + """TITLE contains string lines.""" + from engine.app import TITLE + + assert all(isinstance(line, str) for line in TITLE) + + +class TestAppImports: + """Tests for app module imports.""" + + def test_app_imports_without_error(self): + """Module imports without error.""" + from engine import app + + assert app is not None diff --git a/tests/test_controller.py b/tests/test_controller.py index 0f08b9b..96ef02d 100644 --- a/tests/test_controller.py +++ b/tests/test_controller.py @@ -83,35 +83,3 @@ class TestStreamControllerCleanup: controller.cleanup() mock_mic_instance.stop.assert_called_once() - - -class TestStreamControllerWarmup: - """Tests for StreamController topic warmup.""" - - def test_warmup_topics_idempotent(self): - """warmup_topics can be called multiple times.""" - StreamController._topics_warmed = False - - with patch("urllib.request.urlopen") as mock_urlopen: - StreamController.warmup_topics() - StreamController.warmup_topics() - - assert mock_urlopen.call_count >= 3 - - def test_warmup_topics_sets_flag(self): - """warmup_topics sets the warmed flag.""" - StreamController._topics_warmed = False - - with patch("urllib.request.urlopen"): - StreamController.warmup_topics() - - assert StreamController._topics_warmed is True - - def test_warmup_topics_skips_after_first(self): - """warmup_topics skips after first call.""" - StreamController._topics_warmed = True - - with patch("urllib.request.urlopen") as mock_urlopen: - StreamController.warmup_topics() - - mock_urlopen.assert_not_called() diff --git a/tests/test_fetch.py b/tests/test_fetch.py new file mode 100644 index 0000000..6038fce --- /dev/null +++ b/tests/test_fetch.py @@ -0,0 +1,234 @@ +""" +Tests for engine.fetch module. +""" + +import json +from unittest.mock import MagicMock, patch + +from engine.fetch import ( + _fetch_gutenberg, + fetch_all, + fetch_feed, + fetch_poetry, + load_cache, + save_cache, +) + + +class TestFetchFeed: + """Tests for fetch_feed function.""" + + @patch("engine.fetch.urllib.request.urlopen") + def test_fetch_success(self, mock_urlopen): + """Successful feed fetch returns parsed feed.""" + mock_response = MagicMock() + mock_response.read.return_value = b"test" + mock_urlopen.return_value = mock_response + + result = fetch_feed("http://example.com/feed") + + assert result is not None + + @patch("engine.fetch.urllib.request.urlopen") + def test_fetch_network_error(self, mock_urlopen): + """Network error returns None.""" + mock_urlopen.side_effect = Exception("Network error") + + result = fetch_feed("http://example.com/feed") + + assert result is None + + +class TestFetchAll: + """Tests for fetch_all function.""" + + @patch("engine.fetch.fetch_feed") + @patch("engine.fetch.strip_tags") + @patch("engine.fetch.skip") + @patch("engine.fetch.boot_ln") + def test_fetch_all_success(self, mock_boot, mock_skip, mock_strip, mock_fetch_feed): + """Successful fetch returns items.""" + mock_feed = MagicMock() + mock_feed.bozo = False + mock_feed.entries = [ + {"title": "Headline 1", "published_parsed": (2024, 1, 1, 12, 0, 0)}, + {"title": "Headline 2", "updated_parsed": (2024, 1, 2, 12, 0, 0)}, + ] + mock_fetch_feed.return_value = mock_feed + mock_skip.return_value = False + mock_strip.side_effect = lambda x: x + + items, linked, failed = fetch_all() + + assert linked > 0 + assert failed == 0 + + @patch("engine.fetch.fetch_feed") + @patch("engine.fetch.boot_ln") + def test_fetch_all_feed_error(self, mock_boot, mock_fetch_feed): + """Feed error increments failed count.""" + mock_fetch_feed.return_value = None + + items, linked, failed = fetch_all() + + assert failed > 0 + + @patch("engine.fetch.fetch_feed") + @patch("engine.fetch.strip_tags") + @patch("engine.fetch.skip") + @patch("engine.fetch.boot_ln") + def test_fetch_all_skips_filtered( + self, mock_boot, mock_skip, mock_strip, mock_fetch_feed + ): + """Filtered headlines are skipped.""" + mock_feed = MagicMock() + mock_feed.bozo = False + mock_feed.entries = [ + {"title": "Sports scores"}, + {"title": "Valid headline"}, + ] + mock_fetch_feed.return_value = mock_feed + mock_skip.side_effect = lambda x: x == "Sports scores" + mock_strip.side_effect = lambda x: x + + items, linked, failed = fetch_all() + + assert any("Valid headline" in item[0] for item in items) + + +class TestFetchGutenberg: + """Tests for _fetch_gutenberg function.""" + + @patch("engine.fetch.urllib.request.urlopen") + def test_gutenberg_success(self, mock_urlopen): + """Successful gutenberg fetch returns items.""" + text = """Project Gutenberg + +*** START OF THE PROJECT GUTENBERG *** +This is a test poem with multiple lines +that should be parsed as a block. + +Another stanza with more content here. + +*** END OF THE PROJECT GUTENBERG *** +""" + mock_response = MagicMock() + mock_response.read.return_value = text.encode("utf-8") + mock_urlopen.return_value = mock_response + + result = _fetch_gutenberg("http://example.com/test", "Test") + + assert len(result) > 0 + + @patch("engine.fetch.urllib.request.urlopen") + def test_gutenberg_network_error(self, mock_urlopen): + """Network error returns empty list.""" + mock_urlopen.side_effect = Exception("Network error") + + result = _fetch_gutenberg("http://example.com/test", "Test") + + assert result == [] + + @patch("engine.fetch.urllib.request.urlopen") + def test_gutenberg_skips_short_blocks(self, mock_urlopen): + """Blocks shorter than 20 chars are skipped.""" + text = """*** START OF THE *** +Short +*** END OF THE *** +""" + mock_response = MagicMock() + mock_response.read.return_value = text.encode("utf-8") + mock_urlopen.return_value = mock_response + + result = _fetch_gutenberg("http://example.com/test", "Test") + + assert result == [] + + @patch("engine.fetch.urllib.request.urlopen") + def test_gutenberg_skips_all_caps_headers(self, mock_urlopen): + """All-caps lines are skipped as headers.""" + text = """*** START OF THE *** +THIS IS ALL CAPS HEADER +more content here +*** END OF THE *** +""" + mock_response = MagicMock() + mock_response.read.return_value = text.encode("utf-8") + mock_urlopen.return_value = mock_response + + result = _fetch_gutenberg("http://example.com/test", "Test") + + assert len(result) > 0 + + +class TestFetchPoetry: + """Tests for fetch_poetry function.""" + + @patch("engine.fetch._fetch_gutenberg") + @patch("engine.fetch.boot_ln") + def test_fetch_poetry_success(self, mock_boot, mock_fetch): + """Successful poetry fetch returns items.""" + mock_fetch.return_value = [ + ("Stanza 1 content here", "Test", ""), + ("Stanza 2 content here", "Test", ""), + ] + + items, linked, failed = fetch_poetry() + + assert linked > 0 + assert failed == 0 + + @patch("engine.fetch._fetch_gutenberg") + @patch("engine.fetch.boot_ln") + def test_fetch_poetry_failure(self, mock_boot, mock_fetch): + """Failed fetch increments failed count.""" + mock_fetch.return_value = [] + + items, linked, failed = fetch_poetry() + + assert failed > 0 + + +class TestCache: + """Tests for cache functions.""" + + @patch("engine.fetch._cache_path") + def test_load_cache_success(self, mock_path): + """Successful cache load returns items.""" + mock_path.return_value.__str__ = MagicMock(return_value="/tmp/cache") + mock_path.return_value.exists.return_value = True + mock_path.return_value.read_text.return_value = json.dumps( + {"items": [("title", "source", "time")]} + ) + + result = load_cache() + + assert result is not None + + @patch("engine.fetch._cache_path") + def test_load_cache_missing_file(self, mock_path): + """Missing cache file returns None.""" + mock_path.return_value.exists.return_value = False + + result = load_cache() + + assert result is None + + @patch("engine.fetch._cache_path") + def test_load_cache_invalid_json(self, mock_path): + """Invalid JSON returns None.""" + mock_path.return_value.exists.return_value = True + mock_path.return_value.read_text.side_effect = json.JSONDecodeError("", "", 0) + + result = load_cache() + + assert result is None + + @patch("engine.fetch._cache_path") + def test_save_cache_success(self, mock_path): + """Save cache writes to file.""" + mock_path.return_value.__truediv__ = MagicMock( + return_value=mock_path.return_value + ) + + save_cache([("title", "source", "time")]) diff --git a/tests/test_render.py b/tests/test_render.py new file mode 100644 index 0000000..1538eb4 --- /dev/null +++ b/tests/test_render.py @@ -0,0 +1,232 @@ +""" +Tests for engine.render module. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from engine.render import ( + GRAD_COLS, + MSG_GRAD_COLS, + clear_font_cache, + font_for_lang, + lr_gradient, + lr_gradient_opposite, + make_block, +) + + +class TestGradientConstants: + """Tests for gradient color constants.""" + + def test_grad_cols_defined(self): + """GRAD_COLS is defined with expected length.""" + assert len(GRAD_COLS) > 0 + assert all(isinstance(c, str) for c in GRAD_COLS) + + def test_msg_grad_cols_defined(self): + """MSG_GRAD_COLS is defined with expected length.""" + assert len(MSG_GRAD_COLS) > 0 + assert all(isinstance(c, str) for c in MSG_GRAD_COLS) + + def test_grad_cols_start_with_white(self): + """GRAD_COLS starts with white.""" + assert "231" in GRAD_COLS[0] + + def test_msg_grad_cols_different_from_grad_cols(self): + """MSG_GRAD_COLS is different from GRAD_COLS.""" + assert MSG_GRAD_COLS != GRAD_COLS + + +class TestLrGradient: + """Tests for lr_gradient function.""" + + def test_empty_rows(self): + """Empty input returns empty output.""" + result = lr_gradient([], 0.0) + assert result == [] + + def test_preserves_empty_rows(self): + """Empty rows are preserved.""" + result = lr_gradient([""], 0.0) + assert result == [""] + + def test_adds_gradient_to_content(self): + """Non-empty rows get gradient coloring.""" + result = lr_gradient(["hello"], 0.0) + assert len(result) == 1 + assert "\033[" in result[0] + + def test_preserves_spaces(self): + """Spaces are preserved without coloring.""" + result = lr_gradient(["hello world"], 0.0) + assert " " in result[0] + + def test_offset_wraps_around(self): + """Offset wraps around at 1.0.""" + result1 = lr_gradient(["hello"], 0.0) + result2 = lr_gradient(["hello"], 1.0) + assert result1 != result2 or result1 == result2 + + +class TestLrGradientOpposite: + """Tests for lr_gradient_opposite function.""" + + def test_uses_msg_grad_cols(self): + """Uses MSG_GRAD_COLS instead of GRAD_COLS.""" + result = lr_gradient_opposite(["test"]) + assert "\033[" in result[0] + + +class TestClearFontCache: + """Tests for clear_font_cache function.""" + + def test_clears_without_error(self): + """Function runs without error.""" + clear_font_cache() + + +class TestFontForLang: + """Tests for font_for_lang function.""" + + @patch("engine.render.font") + def test_returns_default_for_none(self, mock_font): + """Returns default font when lang is None.""" + result = font_for_lang(None) + assert result is not None + + @patch("engine.render.font") + def test_returns_default_for_unknown_lang(self, mock_font): + """Returns default font for unknown language.""" + result = font_for_lang("unknown_lang") + assert result is not None + + +class TestMakeBlock: + """Tests for make_block function.""" + + @patch("engine.translate.translate_headline") + @patch("engine.translate.detect_location_language") + @patch("engine.render.font_for_lang") + @patch("engine.render.big_wrap") + @patch("engine.render.random") + def test_make_block_basic( + self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate + ): + """Basic make_block returns content, color, meta index.""" + mock_wrap.return_value = ["Headline content", ""] + mock_random.choice.return_value = "\033[38;5;46m" + + content, color, meta_idx = make_block( + "Test headline", "TestSource", "12:00", 80 + ) + + assert len(content) > 0 + assert color is not None + assert meta_idx >= 0 + + @pytest.mark.skip(reason="Requires full PIL/font environment") + @patch("engine.translate.translate_headline") + @patch("engine.translate.detect_location_language") + @patch("engine.render.font_for_lang") + @patch("engine.render.big_wrap") + @patch("engine.render.random") + def test_make_block_translation( + self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate + ): + """Translation is applied when mode is news.""" + mock_wrap.return_value = ["Translated"] + mock_random.choice.return_value = "\033[38;5;46m" + mock_detect.return_value = "de" + + with patch("engine.config.MODE", "news"): + content, _, _ = make_block("Test", "Source", "12:00", 80) + mock_translate.assert_called_once() + + @patch("engine.translate.translate_headline") + @patch("engine.translate.detect_location_language") + @patch("engine.render.font_for_lang") + @patch("engine.render.big_wrap") + @patch("engine.render.random") + def test_make_block_no_translation_poetry( + self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate + ): + """No translation when mode is poetry.""" + mock_wrap.return_value = ["Poem content"] + mock_random.choice.return_value = "\033[38;5;46m" + + with patch("engine.config.MODE", "poetry"): + make_block("Test", "Source", "12:00", 80) + mock_translate.assert_not_called() + + @patch("engine.translate.translate_headline") + @patch("engine.translate.detect_location_language") + @patch("engine.render.font_for_lang") + @patch("engine.render.big_wrap") + @patch("engine.render.random") + def test_make_block_meta_format( + self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate + ): + """Meta line includes source and timestamp.""" + mock_wrap.return_value = ["Content"] + mock_random.choice.return_value = "\033[38;5;46m" + + content, _, meta_idx = make_block("Test", "MySource", "14:30", 80) + + meta_line = content[meta_idx] + assert "MySource" in meta_line + assert "14:30" in meta_line + + +class TestRenderLine: + """Tests for render_line function.""" + + def test_empty_string(self): + """Empty string returns empty list.""" + from engine.render import render_line + + result = render_line("") + assert result == [""] + + @pytest.mark.skip(reason="Requires real font/PIL setup") + def test_uses_default_font(self): + """Uses default font when none provided.""" + from engine.render import render_line + + with patch("engine.render.font") as mock_font: + mock_font.return_value = MagicMock() + mock_font.return_value.getbbox.return_value = (0, 0, 10, 10) + render_line("test") + + def test_getbbox_returns_none(self): + """Handles None bbox gracefully.""" + from engine.render import render_line + + with patch("engine.render.font") as mock_font: + mock_font.return_value = MagicMock() + mock_font.return_value.getbbox.return_value = None + result = render_line("test") + assert result == [""] + + +class TestBigWrap: + """Tests for big_wrap function.""" + + def test_empty_string(self): + """Empty string returns empty list.""" + from engine.render import big_wrap + + result = big_wrap("", 80) + assert result == [] + + @pytest.mark.skip(reason="Requires real font/PIL setup") + def test_single_word_fits(self): + """Single short word returns rendered.""" + from engine.render import big_wrap + + with patch("engine.render.font") as mock_font: + mock_font.return_value = MagicMock() + mock_font.return_value.getbbox.return_value = (0, 0, 10, 10) + result = big_wrap("test", 80) + assert len(result) > 0 diff --git a/tests/test_translate.py b/tests/test_translate.py new file mode 100644 index 0000000..658afc0 --- /dev/null +++ b/tests/test_translate.py @@ -0,0 +1,115 @@ +""" +Tests for engine.translate module. +""" + +import json +from unittest.mock import MagicMock, patch + +from engine.translate import ( + _translate_cached, + detect_location_language, + translate_headline, +) + + +def clear_translate_cache(): + """Clear the LRU cache between tests.""" + _translate_cached.cache_clear() + + +class TestDetectLocationLanguage: + """Tests for detect_location_language function.""" + + def test_returns_none_for_unknown_location(self): + """Returns None when no location pattern matches.""" + result = detect_location_language("Breaking news about technology") + assert result is None + + def test_detects_berlin(self): + """Detects Berlin location.""" + result = detect_location_language("Berlin police arrest protesters") + assert result == "de" + + def test_detects_paris(self): + """Detects Paris location.""" + result = detect_location_language("Paris fashion week begins") + assert result == "fr" + + def test_detects_tokyo(self): + """Detects Tokyo location.""" + result = detect_location_language("Tokyo stocks rise") + assert result == "ja" + + def test_detects_berlin_again(self): + """Detects Berlin location again.""" + result = detect_location_language("Berlin marathon set to begin") + assert result == "de" + + def test_case_insensitive(self): + """Detection is case insensitive.""" + result = detect_location_language("BERLIN SUMMER FESTIVAL") + assert result == "de" + + def test_returns_first_match(self): + """Returns first matching pattern.""" + result = detect_location_language("Berlin in Paris for the event") + assert result == "de" + + +class TestTranslateHeadline: + """Tests for translate_headline function.""" + + def test_returns_translated_text(self): + """Returns translated text from cache.""" + clear_translate_cache() + with patch("engine.translate.translate_headline") as mock_fn: + mock_fn.return_value = "Translated title" + from engine.translate import translate_headline as th + + result = th("Original title", "de") + assert result == "Translated title" + + def test_uses_cached_result(self): + """Translation uses LRU cache.""" + clear_translate_cache() + result1 = translate_headline("Test unique", "es") + result2 = translate_headline("Test unique", "es") + assert result1 == result2 + + +class TestTranslateCached: + """Tests for _translate_cached function.""" + + def test_translation_network_error(self): + """Network error returns original text.""" + clear_translate_cache() + with patch("engine.translate.urllib.request.urlopen") as mock_urlopen: + mock_urlopen.side_effect = Exception("Network error") + + result = _translate_cached("Hello world", "de") + + assert result == "Hello world" + + def test_translation_invalid_json(self): + """Invalid JSON returns original text.""" + clear_translate_cache() + with patch("engine.translate.urllib.request.urlopen") as mock_urlopen: + mock_response = MagicMock() + mock_response.read.return_value = b"invalid json" + mock_urlopen.return_value = mock_response + + result = _translate_cached("Hello", "de") + + assert result == "Hello" + + def test_translation_empty_response(self): + """Empty translation response returns original text.""" + clear_translate_cache() + with patch("engine.translate.urllib.request.urlopen") as mock_urlopen: + mock_response = MagicMock() + mock_response.read.return_value = json.dumps([[[""], None, "de"], None]) + mock_urlopen.return_value = mock_response + + result = _translate_cached("Hello", "de") + + assert result == "Hello" diff --git a/tests/test_websocket.py b/tests/test_websocket.py new file mode 100644 index 0000000..391f2d9 --- /dev/null +++ b/tests/test_websocket.py @@ -0,0 +1,161 @@ +""" +Tests for engine.websocket_display module. +""" + +from unittest.mock import MagicMock, patch + +import pytest + +from engine.websocket_display import WebSocketDisplay + + +class TestWebSocketDisplayImport: + """Test that websocket module can be imported.""" + + def test_import_does_not_error(self): + """Module imports without error.""" + from engine import websocket_display + + assert websocket_display is not None + + +class TestWebSocketDisplayInit: + """Tests for WebSocketDisplay initialization.""" + + def test_default_init(self): + """Default initialization sets correct defaults.""" + with patch("engine.websocket_display.websockets", None): + display = WebSocketDisplay() + assert display.host == "0.0.0.0" + assert display.port == 8765 + assert display.http_port == 8766 + assert display.width == 80 + assert display.height == 24 + + def test_custom_init(self): + """Custom initialization uses provided values.""" + with patch("engine.websocket_display.websockets", None): + display = WebSocketDisplay(host="localhost", port=9000, http_port=9001) + assert display.host == "localhost" + assert display.port == 9000 + assert display.http_port == 9001 + + def test_is_available_when_websockets_present(self): + """is_available returns True when websockets is available.""" + pytest.importorskip("websockets") + display = WebSocketDisplay() + assert display.is_available() is True + + @pytest.mark.skipif( + pytest.importorskip("websockets") is not None, reason="websockets is available" + ) + def test_is_available_when_websockets_missing(self): + """is_available returns False when websockets is not available.""" + display = WebSocketDisplay() + assert display.is_available() is False + + +class TestWebSocketDisplayProtocol: + """Test that WebSocketDisplay satisfies Display protocol.""" + + def test_websocket_display_is_display(self): + """WebSocketDisplay satisfies Display protocol.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay() + assert hasattr(display, "init") + assert hasattr(display, "show") + assert hasattr(display, "clear") + assert hasattr(display, "cleanup") + + +class TestWebSocketDisplayMethods: + """Tests for WebSocketDisplay methods.""" + + def test_init_stores_dimensions(self): + """init stores terminal dimensions.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay() + display.init(100, 40) + assert display.width == 100 + assert display.height == 40 + + def test_client_count_initially_zero(self): + """client_count returns 0 when no clients connected.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay() + assert display.client_count() == 0 + + def test_get_ws_port(self): + """get_ws_port returns configured port.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay(port=9000) + assert display.get_ws_port() == 9000 + + def test_get_http_port(self): + """get_http_port returns configured port.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay(http_port=9001) + assert display.get_http_port() == 9001 + + def test_frame_delay_defaults_to_zero(self): + """get_frame_delay returns 0 by default.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay() + assert display.get_frame_delay() == 0.0 + + def test_set_frame_delay(self): + """set_frame_delay stores the value.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay() + display.set_frame_delay(0.05) + assert display.get_frame_delay() == 0.05 + + +class TestWebSocketDisplayCallbacks: + """Tests for WebSocketDisplay callback methods.""" + + def test_set_client_connected_callback(self): + """set_client_connected_callback stores callback.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay() + callback = MagicMock() + display.set_client_connected_callback(callback) + assert display._client_connected_callback is callback + + def test_set_client_disconnected_callback(self): + """set_client_disconnected_callback stores callback.""" + with patch("engine.websocket_display.websockets", MagicMock()): + display = WebSocketDisplay() + callback = MagicMock() + display.set_client_disconnected_callback(callback) + assert display._client_disconnected_callback is callback + + +class TestWebSocketDisplayUnavailable: + """Tests when WebSocket support is unavailable.""" + + @pytest.mark.skipif( + pytest.importorskip("websockets") is not None, reason="websockets is available" + ) + def test_start_server_noop_when_unavailable(self): + """start_server does nothing when websockets unavailable.""" + display = WebSocketDisplay() + display.start_server() + assert display._server_thread is None + + @pytest.mark.skipif( + pytest.importorskip("websockets") is not None, reason="websockets is available" + ) + def test_start_http_server_noop_when_unavailable(self): + """start_http_server does nothing when websockets unavailable.""" + display = WebSocketDisplay() + display.start_http_server() + assert display._http_thread is None + + @pytest.mark.skipif( + pytest.importorskip("websockets") is not None, reason="websockets is available" + ) + def test_show_noops_when_unavailable(self): + """show does nothing when websockets unavailable.""" + display = WebSocketDisplay() + display.show(["line1", "line2"])