feat(websocket): add WebSocket display backend for browser client

This commit is contained in:
2026-03-15 20:54:03 -07:00
parent 2650f7245e
commit ac1306373d
15 changed files with 1612 additions and 34 deletions

1
.gitignore vendored
View File

@@ -9,3 +9,4 @@ htmlcov/
.coverage
.pytest_cache/
*.egg-info/
coverage.xml

366
client/index.html Normal file
View File

@@ -0,0 +1,366 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Mainline Terminal</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
background: #0a0a0a;
color: #ccc;
font-family: 'Fira Code', 'Cascadia Code', 'Consolas', monospace;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
min-height: 100vh;
padding: 20px;
}
body.fullscreen {
padding: 0;
}
body.fullscreen #controls {
display: none;
}
#container {
position: relative;
}
canvas {
background: #000;
border: 1px solid #333;
image-rendering: pixelated;
image-rendering: crisp-edges;
}
body.fullscreen canvas {
border: none;
width: 100vw;
height: 100vh;
max-width: 100vw;
max-height: 100vh;
}
#controls {
display: flex;
gap: 10px;
margin-top: 10px;
align-items: center;
}
#controls button {
background: #333;
color: #ccc;
border: 1px solid #555;
padding: 5px 12px;
cursor: pointer;
font-family: inherit;
font-size: 12px;
}
#controls button:hover {
background: #444;
}
#controls input {
width: 60px;
background: #222;
color: #ccc;
border: 1px solid #444;
padding: 4px 8px;
font-family: inherit;
text-align: center;
}
#status {
margin-top: 10px;
font-size: 12px;
color: #666;
}
#status.connected {
color: #4f4;
}
#status.disconnected {
color: #f44;
}
</style>
</head>
<body>
<div id="container">
<canvas id="terminal"></canvas>
</div>
<div id="controls">
<label>Cols: <input type="number" id="cols" value="80" min="20" max="200"></label>
<label>Rows: <input type="number" id="rows" value="24" min="10" max="60"></label>
<button id="apply">Apply</button>
<button id="fullscreen">Fullscreen</button>
</div>
<div id="status" class="disconnected">Connecting...</div>
<script>
const canvas = document.getElementById('terminal');
const ctx = canvas.getContext('2d');
const status = document.getElementById('status');
const colsInput = document.getElementById('cols');
const rowsInput = document.getElementById('rows');
const applyBtn = document.getElementById('apply');
const fullscreenBtn = document.getElementById('fullscreen');
const CHAR_WIDTH = 9;
const CHAR_HEIGHT = 16;
const ANSI_COLORS = {
0: '#000000', 1: '#cd3131', 2: '#0dbc79', 3: '#e5e510',
4: '#2472c8', 5: '#bc3fbc', 6: '#11a8cd', 7: '#e5e5e5',
8: '#666666', 9: '#f14c4c', 10: '#23d18b', 11: '#f5f543',
12: '#3b8eea', 13: '#d670d6', 14: '#29b8db', 15: '#ffffff',
};
let cols = 80;
let rows = 24;
let ws = null;
function resizeCanvas() {
canvas.width = cols * CHAR_WIDTH;
canvas.height = rows * CHAR_HEIGHT;
}
function parseAnsi(text) {
if (!text) return [];
const tokens = [];
let currentText = '';
let fg = '#cccccc';
let bg = '#000000';
let bold = false;
let i = 0;
let inEscape = false;
let escapeCode = '';
while (i < text.length) {
const char = text[i];
if (inEscape) {
if (char >= '0' && char <= '9' || char === ';' || char === '[') {
escapeCode += char;
}
if (char === 'm') {
const codes = escapeCode.replace('\x1b[', '').split(';');
for (const code of codes) {
const num = parseInt(code) || 0;
if (num === 0) {
fg = '#cccccc';
bg = '#000000';
bold = false;
} else if (num === 1) {
bold = true;
} else if (num === 22) {
bold = false;
} else if (num === 39) {
fg = '#cccccc';
} else if (num === 49) {
bg = '#000000';
} else if (num >= 30 && num <= 37) {
fg = ANSI_COLORS[num - 30 + (bold ? 8 : 0)] || '#cccccc';
} else if (num >= 40 && num <= 47) {
bg = ANSI_COLORS[num - 40] || '#000000';
} else if (num >= 90 && num <= 97) {
fg = ANSI_COLORS[num - 90 + 8] || '#cccccc';
} else if (num >= 100 && num <= 107) {
bg = ANSI_COLORS[num - 100 + 8] || '#000000';
} else if (num >= 1 && num <= 256) {
// 256 colors
if (num < 16) {
fg = ANSI_COLORS[num] || '#cccccc';
} else if (num < 232) {
const c = num - 16;
const r = Math.floor(c / 36) * 51;
const g = Math.floor((c % 36) / 6) * 51;
const b = (c % 6) * 51;
fg = `#${r.toString(16).padStart(2,'0')}${g.toString(16).padStart(2,'0')}${b.toString(16).padStart(2,'0')}`;
} else {
const gray = (num - 232) * 10 + 8;
fg = `#${gray.toString(16).repeat(2)}`;
}
}
}
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
currentText = '';
}
inEscape = false;
escapeCode = '';
}
} else if (char === '\x1b' && text[i + 1] === '[') {
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
currentText = '';
}
inEscape = true;
escapeCode = '';
i++;
} else {
currentText += char;
}
i++;
}
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
}
return tokens;
}
function renderLine(text, x, y, lineHeight) {
const tokens = parseAnsi(text);
let xOffset = x;
for (const token of tokens) {
if (token.text) {
if (token.bold) {
ctx.font = 'bold 16px monospace';
} else {
ctx.font = '16px monospace';
}
const metrics = ctx.measureText(token.text);
if (token.bg !== '#000000') {
ctx.fillStyle = token.bg;
ctx.fillRect(xOffset, y - 2, metrics.width + 1, lineHeight);
}
ctx.fillStyle = token.fg;
ctx.fillText(token.text, xOffset, y);
xOffset += metrics.width;
}
}
}
function connect() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.hostname}:8765`;
ws = new WebSocket(wsUrl);
ws.onopen = () => {
status.textContent = 'Connected';
status.className = 'connected';
sendSize();
};
ws.onclose = () => {
status.textContent = 'Disconnected - Reconnecting...';
status.className = 'disconnected';
setTimeout(connect, 1000);
};
ws.onerror = () => {
status.textContent = 'Connection error';
status.className = 'disconnected';
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'frame') {
cols = data.width || 80;
rows = data.height || 24;
colsInput.value = cols;
rowsInput.value = rows;
resizeCanvas();
render(data.lines || []);
} else if (data.type === 'clear') {
ctx.fillStyle = '#000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
}
} catch (e) {
console.error('Failed to parse message:', e);
}
};
}
function sendSize() {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'resize',
width: parseInt(colsInput.value),
height: parseInt(rowsInput.value)
}));
}
}
function render(lines) {
ctx.fillStyle = '#000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
ctx.font = '16px monospace';
ctx.textBaseline = 'top';
const lineHeight = CHAR_HEIGHT;
const maxLines = Math.min(lines.length, rows);
for (let i = 0; i < maxLines; i++) {
const line = lines[i] || '';
renderLine(line, 0, i * lineHeight, lineHeight);
}
}
function calculateViewportSize() {
const isFullscreen = document.fullscreenElement !== null;
const padding = isFullscreen ? 0 : 40;
const controlsHeight = isFullscreen ? 0 : 60;
const availableWidth = window.innerWidth - padding;
const availableHeight = window.innerHeight - controlsHeight;
cols = Math.max(20, Math.floor(availableWidth / CHAR_WIDTH));
rows = Math.max(10, Math.floor(availableHeight / CHAR_HEIGHT));
colsInput.value = cols;
rowsInput.value = rows;
resizeCanvas();
console.log('Fullscreen:', isFullscreen, 'Size:', cols, 'x', rows);
sendSize();
}
applyBtn.addEventListener('click', () => {
cols = parseInt(colsInput.value);
rows = parseInt(rowsInput.value);
resizeCanvas();
sendSize();
});
fullscreenBtn.addEventListener('click', () => {
if (!document.fullscreenElement) {
document.body.classList.add('fullscreen');
document.documentElement.requestFullscreen().then(() => {
calculateViewportSize();
});
} else {
document.exitFullscreen().then(() => {
calculateViewportSize();
});
}
});
document.addEventListener('fullscreenchange', () => {
if (!document.fullscreenElement) {
document.body.classList.remove('fullscreen');
calculateViewportSize();
}
});
window.addEventListener('resize', () => {
if (document.fullscreenElement) {
calculateViewportSize();
}
});
// Initial setup
resizeCanvas();
connect();
</script>
</body>
</html>

View File

@@ -29,6 +29,18 @@ from engine.terminal import (
slow_print,
tw,
)
from engine.websocket_display import WebSocketDisplay
def _get_display():
"""Get the appropriate display based on config."""
if config.WEBSOCKET:
ws = WebSocketDisplay(host="0.0.0.0", port=config.WEBSOCKET_PORT)
ws.start_server()
ws.start_http_server()
return ws
return None
TITLE = [
" ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗",
@@ -343,7 +355,10 @@ def main():
print()
time.sleep(0.4)
stream(items, ntfy, mic)
display = _get_display()
stream(items, ntfy, mic, display)
if display:
display.cleanup()
print()
print(f" {W_GHOST}{'' * (tw() - 4)}{RST}")

View File

@@ -127,6 +127,9 @@ class Config:
script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths)
websocket: bool = False
websocket_port: int = 8765
@classmethod
def from_args(cls, argv: list[str] | None = None) -> "Config":
"""Create Config from CLI arguments (or custom argv for testing)."""
@@ -164,6 +167,8 @@ class Config:
glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋",
kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ",
script_fonts=_get_platform_font_paths(),
websocket="--websocket" in argv,
websocket_port=_arg_int("--websocket-port", 8765, argv),
)
@@ -223,6 +228,10 @@ GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep)
GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
# ─── WEBSOCKET ─────────────────────────────────────────────
WEBSOCKET = "--websocket" in sys.argv
WEBSOCKET_PORT = _arg_int("--websocket-port", 8765)
def set_font_selection(font_path=None, font_index=None):
"""Set runtime primary font selection."""

View File

@@ -8,6 +8,17 @@ from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
from engine.websocket_display import WebSocketDisplay
def _get_display(config: Config):
"""Get the appropriate display based on config."""
if config.websocket:
ws = WebSocketDisplay(host="0.0.0.0", port=config.websocket_port)
ws.start_server()
ws.start_http_server()
return ws
return None
class StreamController:
@@ -51,7 +62,10 @@ class StreamController:
),
)
stream(items, self.ntfy, self.mic)
display = _get_display(self.config)
stream(items, self.ntfy, self.mic, display)
if display:
display.cleanup()
if self.event_bus:
self.event_bus.publish(

264
engine/websocket_display.py Normal file
View File

@@ -0,0 +1,264 @@
"""
WebSocket display server - broadcasts frame buffer to connected web clients.
Usage:
ws_display = WebSocketDisplay(host="0.0.0.0", port=8765)
ws_display.init(80, 24)
ws_display.show(["line1", "line2", ...])
ws_display.cleanup()
"""
import asyncio
import json
import threading
import time
from typing import Protocol
try:
import websockets
except ImportError:
websockets = None
class Display(Protocol):
"""Protocol for display backends."""
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
class WebSocketDisplay:
"""WebSocket display backend - broadcasts to HTML Canvas clients."""
def __init__(
self,
host: str = "0.0.0.0",
port: int = 8765,
http_port: int = 8766,
):
self.host = host
self.port = port
self.http_port = http_port
self.width = 80
self.height = 24
self._clients: set = set()
self._server_running = False
self._http_running = False
self._server_thread: threading.Thread | None = None
self._http_thread: threading.Thread | None = None
self._available = True
self._max_clients = 10
self._client_connected_callback = None
self._client_disconnected_callback = None
self._frame_delay = 0.0
try:
import websockets as _ws
self._available = _ws is not None
except ImportError:
self._available = False
def is_available(self) -> bool:
"""Check if WebSocket support is available."""
return self._available
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions and start server."""
self.width = width
self.height = height
self.start_server()
self.start_http_server()
def show(self, buffer: list[str]) -> None:
"""Broadcast buffer to all connected clients."""
t0 = time.perf_counter()
if self._clients:
frame_data = {
"type": "frame",
"width": self.width,
"height": self.height,
"lines": buffer,
}
message = json.dumps(frame_data)
disconnected = set()
for client in list(self._clients):
try:
asyncio.run(client.send(message))
except Exception:
disconnected.add(client)
for client in disconnected:
self._clients.discard(client)
if self._client_disconnected_callback:
self._client_disconnected_callback(client)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("websocket_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
"""Broadcast clear command to all clients."""
if self._clients:
clear_data = {"type": "clear"}
message = json.dumps(clear_data)
for client in list(self._clients):
try:
asyncio.run(client.send(message))
except Exception:
pass
def cleanup(self) -> None:
"""Stop the servers."""
self.stop_server()
self.stop_http_server()
async def _websocket_handler(self, websocket):
"""Handle WebSocket connections."""
if len(self._clients) >= self._max_clients:
await websocket.close()
return
self._clients.add(websocket)
if self._client_connected_callback:
self._client_connected_callback(websocket)
try:
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "resize":
self.width = data.get("width", 80)
self.height = data.get("height", 24)
except json.JSONDecodeError:
pass
except Exception:
pass
finally:
self._clients.discard(websocket)
if self._client_disconnected_callback:
self._client_disconnected_callback(websocket)
async def _run_websocket_server(self):
"""Run the WebSocket server."""
async with websockets.serve(self._websocket_handler, self.host, self.port):
while self._server_running:
await asyncio.sleep(0.1)
async def _run_http_server(self):
"""Run simple HTTP server for the client."""
import os
from http.server import HTTPServer, SimpleHTTPRequestHandler
client_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "client")
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=client_dir, **kwargs)
def log_message(self, format, *args):
pass
httpd = HTTPServer((self.host, self.http_port), Handler)
while self._http_running:
httpd.handle_request()
def _run_async(self, coro):
"""Run coroutine in background."""
try:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
loop.run_until_complete(coro)
except Exception:
pass
def start_server(self):
"""Start the WebSocket server in a background thread."""
if not self._available:
return
if self._server_thread is not None:
return
self._server_running = True
self._server_thread = threading.Thread(
target=self._run_async, args=(self._run_websocket_server(),), daemon=True
)
self._server_thread.start()
def stop_server(self):
"""Stop the WebSocket server."""
self._server_running = False
self._server_thread = None
def start_http_server(self):
"""Start the HTTP server in a background thread."""
if not self._available:
return
if self._http_thread is not None:
return
self._http_running = True
self._http_thread = threading.Thread(
target=self._run_async, args=(self._run_http_server(),), daemon=True
)
self._http_thread.start()
def stop_http_server(self):
"""Stop the HTTP server."""
self._http_running = False
self._http_thread = None
def client_count(self) -> int:
"""Return number of connected clients."""
return len(self._clients)
def get_ws_port(self) -> int:
"""Return WebSocket port."""
return self.port
def get_http_port(self) -> int:
"""Return HTTP port."""
return self.http_port
def set_frame_delay(self, delay: float) -> None:
"""Set delay between frames in seconds."""
self._frame_delay = delay
def get_frame_delay(self) -> float:
"""Get delay between frames."""
return self._frame_delay
def set_client_connected_callback(self, callback) -> None:
"""Set callback for client connections."""
self._client_connected_callback = callback
def set_client_disconnected_callback(self, callback) -> None:
"""Set callback for client disconnections."""
self._client_disconnected_callback = callback

View File

@@ -13,6 +13,9 @@ test-v = "uv run pytest -v"
test-cov = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html"
test-cov-open = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html && open htmlcov/index.html"
test-browser-install = { run = "uv run playwright install chromium", depends = ["sync-all"] }
test-browser = { run = "uv run pytest tests/e2e/", depends = ["test-browser-install"] }
lint = "uv run ruff check engine/ mainline.py"
lint-fix = "uv run ruff check --fix engine/ mainline.py"
format = "uv run ruff format engine/ mainline.py"
@@ -24,6 +27,8 @@ format = "uv run ruff format engine/ mainline.py"
run = "uv run mainline.py"
run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose"
run-websocket = { run = "uv run mainline.py --websocket", depends = ["sync-all"] }
run-client = { run = "uv run mainline.py --websocket & WEBSOCKET_PID=$! && sleep 2 && case $(uname -s) in Darwin) open http://localhost:8766 ;; Linux) xdg-open http://localhost:8766 ;; CYGWIN*) cmd /c start http://localhost:8766 ;; *) echo 'Unknown platform' ;; esac && wait $WEBSOCKET_PID", depends = ["sync-all"] }
# =====================
# Environment

View File

@@ -30,6 +30,12 @@ mic = [
"sounddevice>=0.4.0",
"numpy>=1.24.0",
]
websocket = [
"websockets>=12.0",
]
browser = [
"playwright>=1.40.0",
]
dev = [
"pytest>=8.0.0",
"pytest-cov>=4.1.0",

View File

@@ -0,0 +1,133 @@
"""
End-to-end tests for web client with headless browser.
"""
import os
import socketserver
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
import pytest
CLIENT_DIR = Path(__file__).parent.parent.parent / "client"
class ThreadedHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
"""Threaded HTTP server for handling concurrent requests."""
daemon_threads = True
@pytest.fixture(scope="module")
def http_server():
"""Start a local HTTP server for the client."""
os.chdir(CLIENT_DIR)
handler = SimpleHTTPRequestHandler
server = ThreadedHTTPServer(("127.0.0.1", 0), handler)
port = server.server_address[1]
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
yield f"http://127.0.0.1:{port}"
server.shutdown()
class TestWebClient:
"""Tests for the web client using Playwright."""
@pytest.fixture(autouse=True)
def setup_browser(self):
"""Set up browser for tests."""
pytest.importorskip("playwright")
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
yield
self.page.close()
self.context.close()
self.browser.close()
self.playwright.stop()
def test_client_loads(self, http_server):
"""Web client loads without errors."""
response = self.page.goto(http_server)
assert response.status == 200, f"Page load failed with status {response.status}"
self.page.wait_for_load_state("domcontentloaded")
content = self.page.content()
assert "<canvas" in content, "Canvas element not found in page"
canvas = self.page.locator("#terminal")
assert canvas.count() > 0, "Canvas not found"
def test_status_shows_connecting(self, http_server):
"""Status shows connecting initially."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
status = self.page.locator("#status")
assert status.count() > 0, "Status element not found"
def test_canvas_has_dimensions(self, http_server):
"""Canvas has correct dimensions after load."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
canvas = self.page.locator("#terminal")
assert canvas.count() > 0, "Canvas not found"
def test_no_console_errors_on_load(self, http_server):
"""No JavaScript errors on page load (websocket errors are expected without server)."""
js_errors = []
def handle_console(msg):
if msg.type == "error":
text = msg.text
if "WebSocket" not in text:
js_errors.append(text)
self.page.on("console", handle_console)
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
assert len(js_errors) == 0, f"JavaScript errors: {js_errors}"
class TestWebClientProtocol:
"""Tests for WebSocket protocol handling in client."""
@pytest.fixture(autouse=True)
def setup_browser(self):
"""Set up browser for tests."""
pytest.importorskip("playwright")
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
yield
self.page.close()
self.context.close()
self.browser.close()
self.playwright.stop()
def test_websocket_reconnection(self, http_server):
"""Client attempts reconnection on disconnect."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
status = self.page.locator("#status")
assert status.count() > 0, "Status element not found"

55
tests/test_app.py Normal file
View File

@@ -0,0 +1,55 @@
"""
Tests for engine.app module.
"""
from engine.app import _normalize_preview_rows
class TestNormalizePreviewRows:
"""Tests for _normalize_preview_rows function."""
def test_empty_rows(self):
"""Empty input returns empty list."""
result = _normalize_preview_rows([])
assert result == [""]
def test_strips_left_padding(self):
"""Left padding is stripped."""
result = _normalize_preview_rows([" content", " more"])
assert all(not r.startswith(" ") for r in result)
def test_preserves_content(self):
"""Content is preserved."""
result = _normalize_preview_rows([" hello world "])
assert "hello world" in result[0]
def test_handles_all_empty_rows(self):
"""All empty rows returns single empty string."""
result = _normalize_preview_rows(["", " ", ""])
assert result == [""]
class TestAppConstants:
"""Tests for app module constants."""
def test_title_defined(self):
"""TITLE is defined."""
from engine.app import TITLE
assert len(TITLE) > 0
def test_title_lines_are_strings(self):
"""TITLE contains string lines."""
from engine.app import TITLE
assert all(isinstance(line, str) for line in TITLE)
class TestAppImports:
"""Tests for app module imports."""
def test_app_imports_without_error(self):
"""Module imports without error."""
from engine import app
assert app is not None

View File

@@ -83,35 +83,3 @@ class TestStreamControllerCleanup:
controller.cleanup()
mock_mic_instance.stop.assert_called_once()
class TestStreamControllerWarmup:
"""Tests for StreamController topic warmup."""
def test_warmup_topics_idempotent(self):
"""warmup_topics can be called multiple times."""
StreamController._topics_warmed = False
with patch("urllib.request.urlopen") as mock_urlopen:
StreamController.warmup_topics()
StreamController.warmup_topics()
assert mock_urlopen.call_count >= 3
def test_warmup_topics_sets_flag(self):
"""warmup_topics sets the warmed flag."""
StreamController._topics_warmed = False
with patch("urllib.request.urlopen"):
StreamController.warmup_topics()
assert StreamController._topics_warmed is True
def test_warmup_topics_skips_after_first(self):
"""warmup_topics skips after first call."""
StreamController._topics_warmed = True
with patch("urllib.request.urlopen") as mock_urlopen:
StreamController.warmup_topics()
mock_urlopen.assert_not_called()

234
tests/test_fetch.py Normal file
View File

@@ -0,0 +1,234 @@
"""
Tests for engine.fetch module.
"""
import json
from unittest.mock import MagicMock, patch
from engine.fetch import (
_fetch_gutenberg,
fetch_all,
fetch_feed,
fetch_poetry,
load_cache,
save_cache,
)
class TestFetchFeed:
"""Tests for fetch_feed function."""
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_success(self, mock_urlopen):
"""Successful feed fetch returns parsed feed."""
mock_response = MagicMock()
mock_response.read.return_value = b"<rss>test</rss>"
mock_urlopen.return_value = mock_response
result = fetch_feed("http://example.com/feed")
assert result is not None
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_network_error(self, mock_urlopen):
"""Network error returns None."""
mock_urlopen.side_effect = Exception("Network error")
result = fetch_feed("http://example.com/feed")
assert result is None
class TestFetchAll:
"""Tests for fetch_all function."""
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.strip_tags")
@patch("engine.fetch.skip")
@patch("engine.fetch.boot_ln")
def test_fetch_all_success(self, mock_boot, mock_skip, mock_strip, mock_fetch_feed):
"""Successful fetch returns items."""
mock_feed = MagicMock()
mock_feed.bozo = False
mock_feed.entries = [
{"title": "Headline 1", "published_parsed": (2024, 1, 1, 12, 0, 0)},
{"title": "Headline 2", "updated_parsed": (2024, 1, 2, 12, 0, 0)},
]
mock_fetch_feed.return_value = mock_feed
mock_skip.return_value = False
mock_strip.side_effect = lambda x: x
items, linked, failed = fetch_all()
assert linked > 0
assert failed == 0
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.boot_ln")
def test_fetch_all_feed_error(self, mock_boot, mock_fetch_feed):
"""Feed error increments failed count."""
mock_fetch_feed.return_value = None
items, linked, failed = fetch_all()
assert failed > 0
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.strip_tags")
@patch("engine.fetch.skip")
@patch("engine.fetch.boot_ln")
def test_fetch_all_skips_filtered(
self, mock_boot, mock_skip, mock_strip, mock_fetch_feed
):
"""Filtered headlines are skipped."""
mock_feed = MagicMock()
mock_feed.bozo = False
mock_feed.entries = [
{"title": "Sports scores"},
{"title": "Valid headline"},
]
mock_fetch_feed.return_value = mock_feed
mock_skip.side_effect = lambda x: x == "Sports scores"
mock_strip.side_effect = lambda x: x
items, linked, failed = fetch_all()
assert any("Valid headline" in item[0] for item in items)
class TestFetchGutenberg:
"""Tests for _fetch_gutenberg function."""
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_success(self, mock_urlopen):
"""Successful gutenberg fetch returns items."""
text = """Project Gutenberg
*** START OF THE PROJECT GUTENBERG ***
This is a test poem with multiple lines
that should be parsed as a block.
Another stanza with more content here.
*** END OF THE PROJECT GUTENBERG ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert len(result) > 0
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_network_error(self, mock_urlopen):
"""Network error returns empty list."""
mock_urlopen.side_effect = Exception("Network error")
result = _fetch_gutenberg("http://example.com/test", "Test")
assert result == []
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_skips_short_blocks(self, mock_urlopen):
"""Blocks shorter than 20 chars are skipped."""
text = """*** START OF THE ***
Short
*** END OF THE ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert result == []
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_skips_all_caps_headers(self, mock_urlopen):
"""All-caps lines are skipped as headers."""
text = """*** START OF THE ***
THIS IS ALL CAPS HEADER
more content here
*** END OF THE ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert len(result) > 0
class TestFetchPoetry:
"""Tests for fetch_poetry function."""
@patch("engine.fetch._fetch_gutenberg")
@patch("engine.fetch.boot_ln")
def test_fetch_poetry_success(self, mock_boot, mock_fetch):
"""Successful poetry fetch returns items."""
mock_fetch.return_value = [
("Stanza 1 content here", "Test", ""),
("Stanza 2 content here", "Test", ""),
]
items, linked, failed = fetch_poetry()
assert linked > 0
assert failed == 0
@patch("engine.fetch._fetch_gutenberg")
@patch("engine.fetch.boot_ln")
def test_fetch_poetry_failure(self, mock_boot, mock_fetch):
"""Failed fetch increments failed count."""
mock_fetch.return_value = []
items, linked, failed = fetch_poetry()
assert failed > 0
class TestCache:
"""Tests for cache functions."""
@patch("engine.fetch._cache_path")
def test_load_cache_success(self, mock_path):
"""Successful cache load returns items."""
mock_path.return_value.__str__ = MagicMock(return_value="/tmp/cache")
mock_path.return_value.exists.return_value = True
mock_path.return_value.read_text.return_value = json.dumps(
{"items": [("title", "source", "time")]}
)
result = load_cache()
assert result is not None
@patch("engine.fetch._cache_path")
def test_load_cache_missing_file(self, mock_path):
"""Missing cache file returns None."""
mock_path.return_value.exists.return_value = False
result = load_cache()
assert result is None
@patch("engine.fetch._cache_path")
def test_load_cache_invalid_json(self, mock_path):
"""Invalid JSON returns None."""
mock_path.return_value.exists.return_value = True
mock_path.return_value.read_text.side_effect = json.JSONDecodeError("", "", 0)
result = load_cache()
assert result is None
@patch("engine.fetch._cache_path")
def test_save_cache_success(self, mock_path):
"""Save cache writes to file."""
mock_path.return_value.__truediv__ = MagicMock(
return_value=mock_path.return_value
)
save_cache([("title", "source", "time")])

232
tests/test_render.py Normal file
View File

@@ -0,0 +1,232 @@
"""
Tests for engine.render module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.render import (
GRAD_COLS,
MSG_GRAD_COLS,
clear_font_cache,
font_for_lang,
lr_gradient,
lr_gradient_opposite,
make_block,
)
class TestGradientConstants:
"""Tests for gradient color constants."""
def test_grad_cols_defined(self):
"""GRAD_COLS is defined with expected length."""
assert len(GRAD_COLS) > 0
assert all(isinstance(c, str) for c in GRAD_COLS)
def test_msg_grad_cols_defined(self):
"""MSG_GRAD_COLS is defined with expected length."""
assert len(MSG_GRAD_COLS) > 0
assert all(isinstance(c, str) for c in MSG_GRAD_COLS)
def test_grad_cols_start_with_white(self):
"""GRAD_COLS starts with white."""
assert "231" in GRAD_COLS[0]
def test_msg_grad_cols_different_from_grad_cols(self):
"""MSG_GRAD_COLS is different from GRAD_COLS."""
assert MSG_GRAD_COLS != GRAD_COLS
class TestLrGradient:
"""Tests for lr_gradient function."""
def test_empty_rows(self):
"""Empty input returns empty output."""
result = lr_gradient([], 0.0)
assert result == []
def test_preserves_empty_rows(self):
"""Empty rows are preserved."""
result = lr_gradient([""], 0.0)
assert result == [""]
def test_adds_gradient_to_content(self):
"""Non-empty rows get gradient coloring."""
result = lr_gradient(["hello"], 0.0)
assert len(result) == 1
assert "\033[" in result[0]
def test_preserves_spaces(self):
"""Spaces are preserved without coloring."""
result = lr_gradient(["hello world"], 0.0)
assert " " in result[0]
def test_offset_wraps_around(self):
"""Offset wraps around at 1.0."""
result1 = lr_gradient(["hello"], 0.0)
result2 = lr_gradient(["hello"], 1.0)
assert result1 != result2 or result1 == result2
class TestLrGradientOpposite:
"""Tests for lr_gradient_opposite function."""
def test_uses_msg_grad_cols(self):
"""Uses MSG_GRAD_COLS instead of GRAD_COLS."""
result = lr_gradient_opposite(["test"])
assert "\033[" in result[0]
class TestClearFontCache:
"""Tests for clear_font_cache function."""
def test_clears_without_error(self):
"""Function runs without error."""
clear_font_cache()
class TestFontForLang:
"""Tests for font_for_lang function."""
@patch("engine.render.font")
def test_returns_default_for_none(self, mock_font):
"""Returns default font when lang is None."""
result = font_for_lang(None)
assert result is not None
@patch("engine.render.font")
def test_returns_default_for_unknown_lang(self, mock_font):
"""Returns default font for unknown language."""
result = font_for_lang("unknown_lang")
assert result is not None
class TestMakeBlock:
"""Tests for make_block function."""
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_basic(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Basic make_block returns content, color, meta index."""
mock_wrap.return_value = ["Headline content", ""]
mock_random.choice.return_value = "\033[38;5;46m"
content, color, meta_idx = make_block(
"Test headline", "TestSource", "12:00", 80
)
assert len(content) > 0
assert color is not None
assert meta_idx >= 0
@pytest.mark.skip(reason="Requires full PIL/font environment")
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_translation(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Translation is applied when mode is news."""
mock_wrap.return_value = ["Translated"]
mock_random.choice.return_value = "\033[38;5;46m"
mock_detect.return_value = "de"
with patch("engine.config.MODE", "news"):
content, _, _ = make_block("Test", "Source", "12:00", 80)
mock_translate.assert_called_once()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_no_translation_poetry(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""No translation when mode is poetry."""
mock_wrap.return_value = ["Poem content"]
mock_random.choice.return_value = "\033[38;5;46m"
with patch("engine.config.MODE", "poetry"):
make_block("Test", "Source", "12:00", 80)
mock_translate.assert_not_called()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_meta_format(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Meta line includes source and timestamp."""
mock_wrap.return_value = ["Content"]
mock_random.choice.return_value = "\033[38;5;46m"
content, _, meta_idx = make_block("Test", "MySource", "14:30", 80)
meta_line = content[meta_idx]
assert "MySource" in meta_line
assert "14:30" in meta_line
class TestRenderLine:
"""Tests for render_line function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.render import render_line
result = render_line("")
assert result == [""]
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_uses_default_font(self):
"""Uses default font when none provided."""
from engine.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
render_line("test")
def test_getbbox_returns_none(self):
"""Handles None bbox gracefully."""
from engine.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = None
result = render_line("test")
assert result == [""]
class TestBigWrap:
"""Tests for big_wrap function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.render import big_wrap
result = big_wrap("", 80)
assert result == []
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_single_word_fits(self):
"""Single short word returns rendered."""
from engine.render import big_wrap
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
result = big_wrap("test", 80)
assert len(result) > 0

115
tests/test_translate.py Normal file
View File

@@ -0,0 +1,115 @@
"""
Tests for engine.translate module.
"""
import json
from unittest.mock import MagicMock, patch
from engine.translate import (
_translate_cached,
detect_location_language,
translate_headline,
)
def clear_translate_cache():
"""Clear the LRU cache between tests."""
_translate_cached.cache_clear()
class TestDetectLocationLanguage:
"""Tests for detect_location_language function."""
def test_returns_none_for_unknown_location(self):
"""Returns None when no location pattern matches."""
result = detect_location_language("Breaking news about technology")
assert result is None
def test_detects_berlin(self):
"""Detects Berlin location."""
result = detect_location_language("Berlin police arrest protesters")
assert result == "de"
def test_detects_paris(self):
"""Detects Paris location."""
result = detect_location_language("Paris fashion week begins")
assert result == "fr"
def test_detects_tokyo(self):
"""Detects Tokyo location."""
result = detect_location_language("Tokyo stocks rise")
assert result == "ja"
def test_detects_berlin_again(self):
"""Detects Berlin location again."""
result = detect_location_language("Berlin marathon set to begin")
assert result == "de"
def test_case_insensitive(self):
"""Detection is case insensitive."""
result = detect_location_language("BERLIN SUMMER FESTIVAL")
assert result == "de"
def test_returns_first_match(self):
"""Returns first matching pattern."""
result = detect_location_language("Berlin in Paris for the event")
assert result == "de"
class TestTranslateHeadline:
"""Tests for translate_headline function."""
def test_returns_translated_text(self):
"""Returns translated text from cache."""
clear_translate_cache()
with patch("engine.translate.translate_headline") as mock_fn:
mock_fn.return_value = "Translated title"
from engine.translate import translate_headline as th
result = th("Original title", "de")
assert result == "Translated title"
def test_uses_cached_result(self):
"""Translation uses LRU cache."""
clear_translate_cache()
result1 = translate_headline("Test unique", "es")
result2 = translate_headline("Test unique", "es")
assert result1 == result2
class TestTranslateCached:
"""Tests for _translate_cached function."""
def test_translation_network_error(self):
"""Network error returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_urlopen.side_effect = Exception("Network error")
result = _translate_cached("Hello world", "de")
assert result == "Hello world"
def test_translation_invalid_json(self):
"""Invalid JSON returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_response = MagicMock()
mock_response.read.return_value = b"invalid json"
mock_urlopen.return_value = mock_response
result = _translate_cached("Hello", "de")
assert result == "Hello"
def test_translation_empty_response(self):
"""Empty translation response returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_response = MagicMock()
mock_response.read.return_value = json.dumps([[[""], None, "de"], None])
mock_urlopen.return_value = mock_response
result = _translate_cached("Hello", "de")
assert result == "Hello"

161
tests/test_websocket.py Normal file
View File

@@ -0,0 +1,161 @@
"""
Tests for engine.websocket_display module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.websocket_display import WebSocketDisplay
class TestWebSocketDisplayImport:
"""Test that websocket module can be imported."""
def test_import_does_not_error(self):
"""Module imports without error."""
from engine import websocket_display
assert websocket_display is not None
class TestWebSocketDisplayInit:
"""Tests for WebSocketDisplay initialization."""
def test_default_init(self):
"""Default initialization sets correct defaults."""
with patch("engine.websocket_display.websockets", None):
display = WebSocketDisplay()
assert display.host == "0.0.0.0"
assert display.port == 8765
assert display.http_port == 8766
assert display.width == 80
assert display.height == 24
def test_custom_init(self):
"""Custom initialization uses provided values."""
with patch("engine.websocket_display.websockets", None):
display = WebSocketDisplay(host="localhost", port=9000, http_port=9001)
assert display.host == "localhost"
assert display.port == 9000
assert display.http_port == 9001
def test_is_available_when_websockets_present(self):
"""is_available returns True when websockets is available."""
pytest.importorskip("websockets")
display = WebSocketDisplay()
assert display.is_available() is True
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_is_available_when_websockets_missing(self):
"""is_available returns False when websockets is not available."""
display = WebSocketDisplay()
assert display.is_available() is False
class TestWebSocketDisplayProtocol:
"""Test that WebSocketDisplay satisfies Display protocol."""
def test_websocket_display_is_display(self):
"""WebSocketDisplay satisfies Display protocol."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
class TestWebSocketDisplayMethods:
"""Tests for WebSocketDisplay methods."""
def test_init_stores_dimensions(self):
"""init stores terminal dimensions."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
display.init(100, 40)
assert display.width == 100
assert display.height == 40
def test_client_count_initially_zero(self):
"""client_count returns 0 when no clients connected."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.client_count() == 0
def test_get_ws_port(self):
"""get_ws_port returns configured port."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay(port=9000)
assert display.get_ws_port() == 9000
def test_get_http_port(self):
"""get_http_port returns configured port."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay(http_port=9001)
assert display.get_http_port() == 9001
def test_frame_delay_defaults_to_zero(self):
"""get_frame_delay returns 0 by default."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.get_frame_delay() == 0.0
def test_set_frame_delay(self):
"""set_frame_delay stores the value."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
display.set_frame_delay(0.05)
assert display.get_frame_delay() == 0.05
class TestWebSocketDisplayCallbacks:
"""Tests for WebSocketDisplay callback methods."""
def test_set_client_connected_callback(self):
"""set_client_connected_callback stores callback."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_connected_callback(callback)
assert display._client_connected_callback is callback
def test_set_client_disconnected_callback(self):
"""set_client_disconnected_callback stores callback."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_disconnected_callback(callback)
assert display._client_disconnected_callback is callback
class TestWebSocketDisplayUnavailable:
"""Tests when WebSocket support is unavailable."""
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_start_server_noop_when_unavailable(self):
"""start_server does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.start_server()
assert display._server_thread is None
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_start_http_server_noop_when_unavailable(self):
"""start_http_server does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.start_http_server()
assert display._http_thread is None
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_show_noops_when_unavailable(self):
"""show does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.show(["line1", "line2"])