Compare commits

..

8 Commits

22 changed files with 314 additions and 1686 deletions

1
.gitignore vendored
View File

@@ -9,4 +9,3 @@ htmlcov/
.coverage .coverage
.pytest_cache/ .pytest_cache/
*.egg-info/ *.egg-info/
coverage.xml

View File

@@ -22,23 +22,13 @@ uv sync
### Available Commands ### Available Commands
```bash ```bash
mise run test # Run tests mise run test # Run tests
mise run test-v # Run tests verbose mise run test-v # Run tests verbose
mise run test-cov # Run tests with coverage report mise run test-cov # Run tests with coverage report
mise run test-browser # Run e2e browser tests (requires playwright) mise run lint # Run ruff linter
mise run lint # Run ruff linter mise run lint-fix # Run ruff with auto-fix
mise run lint-fix # Run ruff with auto-fix mise run format # Run ruff formatter
mise run format # Run ruff formatter mise run ci # Full CI pipeline (sync + test + coverage)
mise run ci # Full CI pipeline (sync + test + coverage)
```
### Runtime Commands
```bash
mise run run # Run mainline (terminal)
mise run run-websocket # Run with WebSocket display
mise run run-both # Run with both terminal and WebSocket
mise run run-client # Run both + open browser
``` ```
## Git Hooks ## Git Hooks
@@ -118,11 +108,3 @@ The project uses pytest with strict marker enforcement. Test configuration is in
- **eventbus.py** provides thread-safe event publishing for decoupled communication - **eventbus.py** provides thread-safe event publishing for decoupled communication
- **controller.py** coordinates ntfy/mic monitoring - **controller.py** coordinates ntfy/mic monitoring
- The render pipeline: fetch → render → effects → scroll → terminal output - The render pipeline: fetch → render → effects → scroll → terminal output
- **Display abstraction** (`engine/display.py`): swap display backends via the Display protocol
- `TerminalDisplay` - ANSI terminal output
- `WebSocketDisplay` - broadcasts to web clients via WebSocket
- `MultiDisplay` - forwards to multiple displays simultaneously
- **WebSocket display** (`engine/websocket_display.py`): real-time frame broadcasting to web browsers
- WebSocket server on port 8765
- HTTP server on port 8766 (serves HTML client)
- Client at `client/index.html` with ANSI color parsing and fullscreen support

View File

@@ -1,366 +0,0 @@
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>Mainline Terminal</title>
<style>
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
background: #0a0a0a;
color: #ccc;
font-family: 'Fira Code', 'Cascadia Code', 'Consolas', monospace;
display: flex;
flex-direction: column;
align-items: center;
justify-content: center;
min-height: 100vh;
padding: 20px;
}
body.fullscreen {
padding: 0;
}
body.fullscreen #controls {
display: none;
}
#container {
position: relative;
}
canvas {
background: #000;
border: 1px solid #333;
image-rendering: pixelated;
image-rendering: crisp-edges;
}
body.fullscreen canvas {
border: none;
width: 100vw;
height: 100vh;
max-width: 100vw;
max-height: 100vh;
}
#controls {
display: flex;
gap: 10px;
margin-top: 10px;
align-items: center;
}
#controls button {
background: #333;
color: #ccc;
border: 1px solid #555;
padding: 5px 12px;
cursor: pointer;
font-family: inherit;
font-size: 12px;
}
#controls button:hover {
background: #444;
}
#controls input {
width: 60px;
background: #222;
color: #ccc;
border: 1px solid #444;
padding: 4px 8px;
font-family: inherit;
text-align: center;
}
#status {
margin-top: 10px;
font-size: 12px;
color: #666;
}
#status.connected {
color: #4f4;
}
#status.disconnected {
color: #f44;
}
</style>
</head>
<body>
<div id="container">
<canvas id="terminal"></canvas>
</div>
<div id="controls">
<label>Cols: <input type="number" id="cols" value="80" min="20" max="200"></label>
<label>Rows: <input type="number" id="rows" value="24" min="10" max="60"></label>
<button id="apply">Apply</button>
<button id="fullscreen">Fullscreen</button>
</div>
<div id="status" class="disconnected">Connecting...</div>
<script>
const canvas = document.getElementById('terminal');
const ctx = canvas.getContext('2d');
const status = document.getElementById('status');
const colsInput = document.getElementById('cols');
const rowsInput = document.getElementById('rows');
const applyBtn = document.getElementById('apply');
const fullscreenBtn = document.getElementById('fullscreen');
const CHAR_WIDTH = 9;
const CHAR_HEIGHT = 16;
const ANSI_COLORS = {
0: '#000000', 1: '#cd3131', 2: '#0dbc79', 3: '#e5e510',
4: '#2472c8', 5: '#bc3fbc', 6: '#11a8cd', 7: '#e5e5e5',
8: '#666666', 9: '#f14c4c', 10: '#23d18b', 11: '#f5f543',
12: '#3b8eea', 13: '#d670d6', 14: '#29b8db', 15: '#ffffff',
};
let cols = 80;
let rows = 24;
let ws = null;
function resizeCanvas() {
canvas.width = cols * CHAR_WIDTH;
canvas.height = rows * CHAR_HEIGHT;
}
function parseAnsi(text) {
if (!text) return [];
const tokens = [];
let currentText = '';
let fg = '#cccccc';
let bg = '#000000';
let bold = false;
let i = 0;
let inEscape = false;
let escapeCode = '';
while (i < text.length) {
const char = text[i];
if (inEscape) {
if (char >= '0' && char <= '9' || char === ';' || char === '[') {
escapeCode += char;
}
if (char === 'm') {
const codes = escapeCode.replace('\x1b[', '').split(';');
for (const code of codes) {
const num = parseInt(code) || 0;
if (num === 0) {
fg = '#cccccc';
bg = '#000000';
bold = false;
} else if (num === 1) {
bold = true;
} else if (num === 22) {
bold = false;
} else if (num === 39) {
fg = '#cccccc';
} else if (num === 49) {
bg = '#000000';
} else if (num >= 30 && num <= 37) {
fg = ANSI_COLORS[num - 30 + (bold ? 8 : 0)] || '#cccccc';
} else if (num >= 40 && num <= 47) {
bg = ANSI_COLORS[num - 40] || '#000000';
} else if (num >= 90 && num <= 97) {
fg = ANSI_COLORS[num - 90 + 8] || '#cccccc';
} else if (num >= 100 && num <= 107) {
bg = ANSI_COLORS[num - 100 + 8] || '#000000';
} else if (num >= 1 && num <= 256) {
// 256 colors
if (num < 16) {
fg = ANSI_COLORS[num] || '#cccccc';
} else if (num < 232) {
const c = num - 16;
const r = Math.floor(c / 36) * 51;
const g = Math.floor((c % 36) / 6) * 51;
const b = (c % 6) * 51;
fg = `#${r.toString(16).padStart(2,'0')}${g.toString(16).padStart(2,'0')}${b.toString(16).padStart(2,'0')}`;
} else {
const gray = (num - 232) * 10 + 8;
fg = `#${gray.toString(16).repeat(2)}`;
}
}
}
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
currentText = '';
}
inEscape = false;
escapeCode = '';
}
} else if (char === '\x1b' && text[i + 1] === '[') {
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
currentText = '';
}
inEscape = true;
escapeCode = '';
i++;
} else {
currentText += char;
}
i++;
}
if (currentText) {
tokens.push({ text: currentText, fg, bg, bold });
}
return tokens;
}
function renderLine(text, x, y, lineHeight) {
const tokens = parseAnsi(text);
let xOffset = x;
for (const token of tokens) {
if (token.text) {
if (token.bold) {
ctx.font = 'bold 16px monospace';
} else {
ctx.font = '16px monospace';
}
const metrics = ctx.measureText(token.text);
if (token.bg !== '#000000') {
ctx.fillStyle = token.bg;
ctx.fillRect(xOffset, y - 2, metrics.width + 1, lineHeight);
}
ctx.fillStyle = token.fg;
ctx.fillText(token.text, xOffset, y);
xOffset += metrics.width;
}
}
}
function connect() {
const protocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const wsUrl = `${protocol}//${window.location.hostname}:8765`;
ws = new WebSocket(wsUrl);
ws.onopen = () => {
status.textContent = 'Connected';
status.className = 'connected';
sendSize();
};
ws.onclose = () => {
status.textContent = 'Disconnected - Reconnecting...';
status.className = 'disconnected';
setTimeout(connect, 1000);
};
ws.onerror = () => {
status.textContent = 'Connection error';
status.className = 'disconnected';
};
ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
if (data.type === 'frame') {
cols = data.width || 80;
rows = data.height || 24;
colsInput.value = cols;
rowsInput.value = rows;
resizeCanvas();
render(data.lines || []);
} else if (data.type === 'clear') {
ctx.fillStyle = '#000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
}
} catch (e) {
console.error('Failed to parse message:', e);
}
};
}
function sendSize() {
if (ws && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({
type: 'resize',
width: parseInt(colsInput.value),
height: parseInt(rowsInput.value)
}));
}
}
function render(lines) {
ctx.fillStyle = '#000';
ctx.fillRect(0, 0, canvas.width, canvas.height);
ctx.font = '16px monospace';
ctx.textBaseline = 'top';
const lineHeight = CHAR_HEIGHT;
const maxLines = Math.min(lines.length, rows);
for (let i = 0; i < maxLines; i++) {
const line = lines[i] || '';
renderLine(line, 0, i * lineHeight, lineHeight);
}
}
function calculateViewportSize() {
const isFullscreen = document.fullscreenElement !== null;
const padding = isFullscreen ? 0 : 40;
const controlsHeight = isFullscreen ? 0 : 60;
const availableWidth = window.innerWidth - padding;
const availableHeight = window.innerHeight - controlsHeight;
cols = Math.max(20, Math.floor(availableWidth / CHAR_WIDTH));
rows = Math.max(10, Math.floor(availableHeight / CHAR_HEIGHT));
colsInput.value = cols;
rowsInput.value = rows;
resizeCanvas();
console.log('Fullscreen:', isFullscreen, 'Size:', cols, 'x', rows);
sendSize();
}
applyBtn.addEventListener('click', () => {
cols = parseInt(colsInput.value);
rows = parseInt(rowsInput.value);
resizeCanvas();
sendSize();
});
fullscreenBtn.addEventListener('click', () => {
if (!document.fullscreenElement) {
document.body.classList.add('fullscreen');
document.documentElement.requestFullscreen().then(() => {
calculateViewportSize();
});
} else {
document.exitFullscreen().then(() => {
calculateViewportSize();
});
}
});
document.addEventListener('fullscreenchange', () => {
if (!document.fullscreenElement) {
document.body.classList.remove('fullscreen');
calculateViewportSize();
}
});
window.addEventListener('resize', () => {
if (document.fullscreenElement) {
calculateViewportSize();
}
});
// Initial setup
resizeCanvas();
connect();
</script>
</body>
</html>

View File

@@ -0,0 +1,154 @@
# Code Scroll Mode — Design Spec
**Date:** 2026-03-16
**Branch:** feat/code-scroll
**Status:** Approved
---
## Overview
Add a `--code` CLI flag that puts MAINLINE into "source consciousness" mode. Instead of RSS headlines or poetry stanzas, the program's own source code scrolls upward as large OTF half-block characters with the standard white-hot → deep green gradient. Each scroll item is one non-blank, non-comment line from `engine/*.py`, attributed to its enclosing function/class scope and dotted module path.
---
## Goals
- Mirror the existing `--poetry` mode pattern as closely as possible
- Zero new runtime dependencies (stdlib `ast` and `pathlib` only)
- No changes to `scroll.py` or the render pipeline
- The item tuple shape `(text, src, ts)` is unchanged
---
## New Files
### `engine/fetch_code.py`
Single public function `fetch_code()` that returns `(items, line_count, 0)`.
**Algorithm:**
1. Glob `engine/*.py` in sorted order
2. For each file:
a. Read source text
b. `ast.parse(source)` → build a `{line_number: scope_label}` map by walking all `FunctionDef`, `AsyncFunctionDef`, and `ClassDef` nodes. Each node covers its full line range. Inner scopes override outer ones.
c. Iterate source lines (1-indexed). Skip if:
- The stripped line is empty
- The stripped line starts with `#`
d. For each kept line emit:
- `text` = `line.rstrip()` (preserve indentation for readability in the big render)
- `src` = scope label from the AST map, e.g. `stream()` for functions, `MicMonitor` for classes, `<module>` for top-level lines
- `ts` = dotted module path derived from filename, e.g. `engine/scroll.py``engine.scroll`
3. Return `(items, len(items), 0)`
**Scope label rules:**
- `FunctionDef` / `AsyncFunctionDef``name()`
- `ClassDef``name` (no parens)
- No enclosing node → `<module>`
**Dependencies:** `ast`, `pathlib` — stdlib only.
---
## Modified Files
### `engine/config.py`
Extend `MODE` detection to recognise `--code`:
```python
MODE = (
"poetry" if "--poetry" in sys.argv or "-p" in sys.argv
else "code" if "--code" in sys.argv
else "news"
)
```
### `engine/app.py`
**Subtitle line** — extend the subtitle dict:
```python
_subtitle = {
"poetry": "literary consciousness stream",
"code": "source consciousness stream",
}.get(config.MODE, "digital consciousness stream")
```
**Boot sequence** — add `elif config.MODE == "code":` branch after the poetry branch:
```python
elif config.MODE == "code":
from engine.fetch_code import fetch_code
slow_print(" > INITIALIZING SOURCE ARRAY...\n")
time.sleep(0.2)
print()
items, line_count, _ = fetch_code()
print()
print(f" {G_DIM}>{RST} {G_MID}{line_count} LINES ACQUIRED{RST}")
```
No cache save/load — local source files are read instantly and change only on disk writes.
---
## Data Flow
```
engine/*.py (sorted)
fetch_code()
│ ast.parse → scope map
│ filter blank + comment lines
│ emit (line, scope(), engine.module)
items: List[Tuple[str, str, str]]
stream(items, ntfy, mic) ← unchanged
next_headline() shuffles + recycles automatically
```
---
## Error Handling
- If a file fails to `ast.parse` (malformed source), fall back to `<module>` scope for all lines in that file — do not crash.
- If `engine/` contains no `.py` files (shouldn't happen in practice), `fetch_code()` returns an empty list; `app.py`'s existing `if not items:` guard handles this.
---
## Testing
New file: `tests/test_fetch_code.py`
| Test | Assertion |
|------|-----------|
| `test_items_are_tuples` | Every item from `fetch_code()` is a 3-tuple of strings |
| `test_blank_and_comment_lines_excluded` | No item text is empty; no item text (stripped) starts with `#` |
| `test_module_path_format` | Every `ts` field matches pattern `engine\.\w+` |
No mocking — tests read the real engine source files, keeping them honest against actual content.
---
## CLI
```bash
python3 mainline.py --code # source consciousness mode
uv run mainline.py --code
```
Compatible with all existing flags (`--no-font-picker`, `--font-file`, `--firehose`, etc.).
---
## Out of Scope
- Syntax highlighting / token-aware coloring (can be added later)
- `--code-dir` flag for pointing at arbitrary directories (YAGNI)
- Caching code items to disk

View File

@@ -29,30 +29,6 @@ from engine.terminal import (
slow_print, slow_print,
tw, tw,
) )
from engine.websocket_display import WebSocketDisplay
def _get_display():
"""Get the appropriate display(s) based on config."""
from engine.display import MultiDisplay, TerminalDisplay
displays = []
if config.DISPLAY in ("terminal", "both"):
displays.append(TerminalDisplay())
if config.DISPLAY in ("websocket", "both") or config.WEBSOCKET:
ws = WebSocketDisplay(host="0.0.0.0", port=config.WEBSOCKET_PORT)
ws.start_server()
ws.start_http_server()
displays.append(ws)
if not displays:
return None
if len(displays) == 1:
return displays[0]
return MultiDisplay(displays)
TITLE = [ TITLE = [
" ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗", " ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗",
@@ -296,11 +272,10 @@ def main():
time.sleep(0.07) time.sleep(0.07)
print() print()
_subtitle = ( _subtitle = {
"literary consciousness stream" "poetry": "literary consciousness stream",
if config.MODE == "poetry" "code": "source consciousness stream",
else "digital consciousness stream" }.get(config.MODE, "digital consciousness stream")
)
print(f" {W_DIM}v0.1 · {_subtitle}{RST}") print(f" {W_DIM}v0.1 · {_subtitle}{RST}")
print(f" {W_GHOST}{'' * (w - 4)}{RST}") print(f" {W_GHOST}{'' * (w - 4)}{RST}")
print() print()
@@ -321,6 +296,14 @@ def main():
) )
print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}") print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}")
save_cache(items) save_cache(items)
elif config.MODE == "code":
from engine.fetch_code import fetch_code
slow_print(" > INITIALIZING SOURCE ARRAY...\n")
time.sleep(0.2)
print()
items, line_count, _ = fetch_code()
print()
print(f" {G_DIM}>{RST} {G_MID}{line_count} LINES ACQUIRED{RST}")
else: else:
slow_print(" > INITIALIZING FEED ARRAY...\n") slow_print(" > INITIALIZING FEED ARRAY...\n")
time.sleep(0.2) time.sleep(0.2)
@@ -367,10 +350,7 @@ def main():
print() print()
time.sleep(0.4) time.sleep(0.4)
display = _get_display() stream(items, ntfy, mic)
stream(items, ntfy, mic, display)
if display:
display.cleanup()
print() print()
print(f" {W_GHOST}{'' * (tw() - 4)}{RST}") print(f" {W_GHOST}{'' * (tw() - 4)}{RST}")

View File

@@ -127,10 +127,6 @@ class Config:
script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths) script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths)
display: str = "terminal"
websocket: bool = False
websocket_port: int = 8765
@classmethod @classmethod
def from_args(cls, argv: list[str] | None = None) -> "Config": def from_args(cls, argv: list[str] | None = None) -> "Config":
"""Create Config from CLI arguments (or custom argv for testing).""" """Create Config from CLI arguments (or custom argv for testing)."""
@@ -168,9 +164,6 @@ class Config:
glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋", glitch_glyphs="░▒▓█▌▐╌╍╎╏┃┆┇┊┋",
kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ", kata_glyphs="ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ",
script_fonts=_get_platform_font_paths(), script_fonts=_get_platform_font_paths(),
display=_arg_value("--display", argv) or "terminal",
websocket="--websocket" in argv,
websocket_port=_arg_int("--websocket-port", 8765, argv),
) )
@@ -195,7 +188,11 @@ def set_config(config: Config) -> None:
HEADLINE_LIMIT = 1000 HEADLINE_LIMIT = 1000
FEED_TIMEOUT = 10 FEED_TIMEOUT = 10
MIC_THRESHOLD_DB = 50 # dB above which glitches intensify MIC_THRESHOLD_DB = 50 # dB above which glitches intensify
MODE = "poetry" if "--poetry" in sys.argv or "-p" in sys.argv else "news" MODE = (
"poetry" if "--poetry" in sys.argv or "-p" in sys.argv
else "code" if "--code" in sys.argv
else "news"
)
FIREHOSE = "--firehose" in sys.argv FIREHOSE = "--firehose" in sys.argv
# ─── NTFY MESSAGE QUEUE ────────────────────────────────── # ─── NTFY MESSAGE QUEUE ──────────────────────────────────
@@ -230,11 +227,6 @@ GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep)
GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ" KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
# ─── WEBSOCKET ─────────────────────────────────────────────
DISPLAY = _arg_value("--display", sys.argv) or "terminal"
WEBSOCKET = "--websocket" in sys.argv
WEBSOCKET_PORT = _arg_int("--websocket-port", 8765)
def set_font_selection(font_path=None, font_index=None): def set_font_selection(font_path=None, font_index=None):
"""Set runtime primary font selection.""" """Set runtime primary font selection."""

View File

@@ -8,17 +8,6 @@ from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller from engine.ntfy import NtfyPoller
from engine.scroll import stream from engine.scroll import stream
from engine.websocket_display import WebSocketDisplay
def _get_display(config: Config):
"""Get the appropriate display based on config."""
if config.websocket:
ws = WebSocketDisplay(host="0.0.0.0", port=config.websocket_port)
ws.start_server()
ws.start_http_server()
return ws
return None
class StreamController: class StreamController:
@@ -62,10 +51,7 @@ class StreamController:
), ),
) )
display = _get_display(self.config) stream(items, self.ntfy, self.mic)
stream(items, self.ntfy, self.mic, display)
if display:
display.cleanup()
if self.event_bus: if self.event_bus:
self.event_bus.publish( self.event_bus.publish(

View File

@@ -100,30 +100,3 @@ class NullDisplay:
def cleanup(self) -> None: def cleanup(self) -> None:
pass pass
class MultiDisplay:
"""Display that forwards to multiple displays."""
def __init__(self, displays: list[Display]):
self.displays = displays
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
for d in self.displays:
d.init(width, height)
def show(self, buffer: list[str]) -> None:
for d in self.displays:
d.show(buffer)
def clear(self) -> None:
for d in self.displays:
d.clear()
def cleanup(self) -> None:
for d in self.displays:
d.cleanup()

67
engine/fetch_code.py Normal file
View File

@@ -0,0 +1,67 @@
"""
Source code feed — reads engine/*.py and emits non-blank, non-comment lines
as scroll items. Used by --code mode.
Depends on: nothing (stdlib only).
"""
import ast
from pathlib import Path
_ENGINE_DIR = Path(__file__).resolve().parent
def _scope_map(source: str) -> dict[int, str]:
"""Return {line_number: scope_label} for every line in source.
Nodes are sorted by range size descending so inner scopes overwrite
outer ones, guaranteeing the narrowest enclosing scope wins.
"""
try:
tree = ast.parse(source)
except SyntaxError:
return {}
nodes = []
for node in ast.walk(tree):
if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef, ast.ClassDef)):
end = getattr(node, "end_lineno", node.lineno)
span = end - node.lineno
nodes.append((span, node))
# Largest range first → inner scopes overwrite on second pass
nodes.sort(key=lambda x: x[0], reverse=True)
scope = {}
for _, node in nodes:
end = getattr(node, "end_lineno", node.lineno)
if isinstance(node, ast.ClassDef):
label = node.name
else:
label = f"{node.name}()"
for ln in range(node.lineno, end + 1):
scope[ln] = label
return scope
def fetch_code():
"""Read engine/*.py and return (items, line_count, 0).
Each item is (text, src, ts) where:
text = the code line (rstripped, indentation preserved)
src = enclosing function/class name, e.g. 'stream()' or '<module>'
ts = dotted module path, e.g. 'engine.scroll'
"""
items = []
for path in sorted(_ENGINE_DIR.glob("*.py")):
module = f"engine.{path.stem}"
source = path.read_text(encoding="utf-8")
scope = _scope_map(source)
for lineno, raw in enumerate(source.splitlines(), start=1):
stripped = raw.strip()
if not stripped or stripped.startswith("#"):
continue
label = scope.get(lineno, "<module>")
items.append((raw.rstrip(), label, module))
return items, len(items), 0

View File

@@ -1,264 +0,0 @@
"""
WebSocket display server - broadcasts frame buffer to connected web clients.
Usage:
ws_display = WebSocketDisplay(host="0.0.0.0", port=8765)
ws_display.init(80, 24)
ws_display.show(["line1", "line2", ...])
ws_display.cleanup()
"""
import asyncio
import json
import threading
import time
from typing import Protocol
try:
import websockets
except ImportError:
websockets = None
class Display(Protocol):
"""Protocol for display backends."""
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
class WebSocketDisplay:
"""WebSocket display backend - broadcasts to HTML Canvas clients."""
def __init__(
self,
host: str = "0.0.0.0",
port: int = 8765,
http_port: int = 8766,
):
self.host = host
self.port = port
self.http_port = http_port
self.width = 80
self.height = 24
self._clients: set = set()
self._server_running = False
self._http_running = False
self._server_thread: threading.Thread | None = None
self._http_thread: threading.Thread | None = None
self._available = True
self._max_clients = 10
self._client_connected_callback = None
self._client_disconnected_callback = None
self._frame_delay = 0.0
try:
import websockets as _ws
self._available = _ws is not None
except ImportError:
self._available = False
def is_available(self) -> bool:
"""Check if WebSocket support is available."""
return self._available
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions and start server."""
self.width = width
self.height = height
self.start_server()
self.start_http_server()
def show(self, buffer: list[str]) -> None:
"""Broadcast buffer to all connected clients."""
t0 = time.perf_counter()
if self._clients:
frame_data = {
"type": "frame",
"width": self.width,
"height": self.height,
"lines": buffer,
}
message = json.dumps(frame_data)
disconnected = set()
for client in list(self._clients):
try:
asyncio.run(client.send(message))
except Exception:
disconnected.add(client)
for client in disconnected:
self._clients.discard(client)
if self._client_disconnected_callback:
self._client_disconnected_callback(client)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("websocket_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
"""Broadcast clear command to all clients."""
if self._clients:
clear_data = {"type": "clear"}
message = json.dumps(clear_data)
for client in list(self._clients):
try:
asyncio.run(client.send(message))
except Exception:
pass
def cleanup(self) -> None:
"""Stop the servers."""
self.stop_server()
self.stop_http_server()
async def _websocket_handler(self, websocket):
"""Handle WebSocket connections."""
if len(self._clients) >= self._max_clients:
await websocket.close()
return
self._clients.add(websocket)
if self._client_connected_callback:
self._client_connected_callback(websocket)
try:
async for message in websocket:
try:
data = json.loads(message)
if data.get("type") == "resize":
self.width = data.get("width", 80)
self.height = data.get("height", 24)
except json.JSONDecodeError:
pass
except Exception:
pass
finally:
self._clients.discard(websocket)
if self._client_disconnected_callback:
self._client_disconnected_callback(websocket)
async def _run_websocket_server(self):
"""Run the WebSocket server."""
async with websockets.serve(self._websocket_handler, self.host, self.port):
while self._server_running:
await asyncio.sleep(0.1)
async def _run_http_server(self):
"""Run simple HTTP server for the client."""
import os
from http.server import HTTPServer, SimpleHTTPRequestHandler
client_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "client")
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):
super().__init__(*args, directory=client_dir, **kwargs)
def log_message(self, format, *args):
pass
httpd = HTTPServer((self.host, self.http_port), Handler)
while self._http_running:
httpd.handle_request()
def _run_async(self, coro):
"""Run coroutine in background."""
try:
asyncio.run(coro)
except Exception as e:
print(f"WebSocket async error: {e}")
def start_server(self):
"""Start the WebSocket server in a background thread."""
if not self._available:
return
if self._server_thread is not None:
return
self._server_running = True
self._server_thread = threading.Thread(
target=self._run_async, args=(self._run_websocket_server(),), daemon=True
)
self._server_thread.start()
def stop_server(self):
"""Stop the WebSocket server."""
self._server_running = False
self._server_thread = None
def start_http_server(self):
"""Start the HTTP server in a background thread."""
if not self._available:
return
if self._http_thread is not None:
return
self._http_running = True
self._http_running = True
self._http_thread = threading.Thread(
target=self._run_async, args=(self._run_http_server(),), daemon=True
)
self._http_thread.start()
def stop_http_server(self):
"""Stop the HTTP server."""
self._http_running = False
self._http_thread = None
def client_count(self) -> int:
"""Return number of connected clients."""
return len(self._clients)
def get_ws_port(self) -> int:
"""Return WebSocket port."""
return self.port
def get_http_port(self) -> int:
"""Return HTTP port."""
return self.http_port
def set_frame_delay(self, delay: float) -> None:
"""Set delay between frames in seconds."""
self._frame_delay = delay
def get_frame_delay(self) -> float:
"""Get delay between frames."""
return self._frame_delay
def set_client_connected_callback(self, callback) -> None:
"""Set callback for client connections."""
self._client_connected_callback = callback
def set_client_disconnected_callback(self, callback) -> None:
"""Set callback for client disconnections."""
self._client_disconnected_callback = callback

BIN
fonts/Kapiler.otf Normal file

Binary file not shown.

BIN
fonts/Kapiler.ttf Normal file

Binary file not shown.

View File

@@ -13,9 +13,6 @@ test-v = "uv run pytest -v"
test-cov = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html" test-cov = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html"
test-cov-open = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html && open htmlcov/index.html" test-cov-open = "uv run pytest --cov=engine --cov-report=term-missing --cov-report=html && open htmlcov/index.html"
test-browser-install = { run = "uv run playwright install chromium", depends = ["sync-all"] }
test-browser = { run = "uv run pytest tests/e2e/", depends = ["test-browser-install"] }
lint = "uv run ruff check engine/ mainline.py" lint = "uv run ruff check engine/ mainline.py"
lint-fix = "uv run ruff check --fix engine/ mainline.py" lint-fix = "uv run ruff check --fix engine/ mainline.py"
format = "uv run ruff format engine/ mainline.py" format = "uv run ruff format engine/ mainline.py"
@@ -27,9 +24,6 @@ format = "uv run ruff format engine/ mainline.py"
run = "uv run mainline.py" run = "uv run mainline.py"
run-poetry = "uv run mainline.py --poetry" run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose" run-firehose = "uv run mainline.py --firehose"
run-websocket = { run = "uv run mainline.py --websocket", depends = ["sync-all"] }
run-both = { run = "uv run mainline.py --display both", depends = ["sync-all"] }
run-client = { run = "uv run mainline.py --display both & WEBSOCKET_PID=$! && sleep 2 && case $(uname -s) in Darwin) open http://localhost:8766 ;; Linux) xdg-open http://localhost:8766 ;; CYGWIN*) cmd /c start http://localhost:8766 ;; *) echo 'Unknown platform' ;; esac && wait $WEBSOCKET_PID", depends = ["sync-all"] }
# ===================== # =====================
# Environment # Environment

View File

@@ -30,12 +30,6 @@ mic = [
"sounddevice>=0.4.0", "sounddevice>=0.4.0",
"numpy>=1.24.0", "numpy>=1.24.0",
] ]
websocket = [
"websockets>=12.0",
]
browser = [
"playwright>=1.40.0",
]
dev = [ dev = [
"pytest>=8.0.0", "pytest>=8.0.0",
"pytest-cov>=4.1.0", "pytest-cov>=4.1.0",

View File

@@ -1,133 +0,0 @@
"""
End-to-end tests for web client with headless browser.
"""
import os
import socketserver
import threading
from http.server import HTTPServer, SimpleHTTPRequestHandler
from pathlib import Path
import pytest
CLIENT_DIR = Path(__file__).parent.parent.parent / "client"
class ThreadedHTTPServer(socketserver.ThreadingMixIn, HTTPServer):
"""Threaded HTTP server for handling concurrent requests."""
daemon_threads = True
@pytest.fixture(scope="module")
def http_server():
"""Start a local HTTP server for the client."""
os.chdir(CLIENT_DIR)
handler = SimpleHTTPRequestHandler
server = ThreadedHTTPServer(("127.0.0.1", 0), handler)
port = server.server_address[1]
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
yield f"http://127.0.0.1:{port}"
server.shutdown()
class TestWebClient:
"""Tests for the web client using Playwright."""
@pytest.fixture(autouse=True)
def setup_browser(self):
"""Set up browser for tests."""
pytest.importorskip("playwright")
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
yield
self.page.close()
self.context.close()
self.browser.close()
self.playwright.stop()
def test_client_loads(self, http_server):
"""Web client loads without errors."""
response = self.page.goto(http_server)
assert response.status == 200, f"Page load failed with status {response.status}"
self.page.wait_for_load_state("domcontentloaded")
content = self.page.content()
assert "<canvas" in content, "Canvas element not found in page"
canvas = self.page.locator("#terminal")
assert canvas.count() > 0, "Canvas not found"
def test_status_shows_connecting(self, http_server):
"""Status shows connecting initially."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
status = self.page.locator("#status")
assert status.count() > 0, "Status element not found"
def test_canvas_has_dimensions(self, http_server):
"""Canvas has correct dimensions after load."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
canvas = self.page.locator("#terminal")
assert canvas.count() > 0, "Canvas not found"
def test_no_console_errors_on_load(self, http_server):
"""No JavaScript errors on page load (websocket errors are expected without server)."""
js_errors = []
def handle_console(msg):
if msg.type == "error":
text = msg.text
if "WebSocket" not in text:
js_errors.append(text)
self.page.on("console", handle_console)
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
assert len(js_errors) == 0, f"JavaScript errors: {js_errors}"
class TestWebClientProtocol:
"""Tests for WebSocket protocol handling in client."""
@pytest.fixture(autouse=True)
def setup_browser(self):
"""Set up browser for tests."""
pytest.importorskip("playwright")
from playwright.sync_api import sync_playwright
self.playwright = sync_playwright().start()
self.browser = self.playwright.chromium.launch(headless=True)
self.context = self.browser.new_context()
self.page = self.context.new_page()
yield
self.page.close()
self.context.close()
self.browser.close()
self.playwright.stop()
def test_websocket_reconnection(self, http_server):
"""Client attempts reconnection on disconnect."""
self.page.goto(http_server)
self.page.wait_for_load_state("domcontentloaded")
status = self.page.locator("#status")
assert status.count() > 0, "Status element not found"

View File

@@ -1,55 +0,0 @@
"""
Tests for engine.app module.
"""
from engine.app import _normalize_preview_rows
class TestNormalizePreviewRows:
"""Tests for _normalize_preview_rows function."""
def test_empty_rows(self):
"""Empty input returns empty list."""
result = _normalize_preview_rows([])
assert result == [""]
def test_strips_left_padding(self):
"""Left padding is stripped."""
result = _normalize_preview_rows([" content", " more"])
assert all(not r.startswith(" ") for r in result)
def test_preserves_content(self):
"""Content is preserved."""
result = _normalize_preview_rows([" hello world "])
assert "hello world" in result[0]
def test_handles_all_empty_rows(self):
"""All empty rows returns single empty string."""
result = _normalize_preview_rows(["", " ", ""])
assert result == [""]
class TestAppConstants:
"""Tests for app module constants."""
def test_title_defined(self):
"""TITLE is defined."""
from engine.app import TITLE
assert len(TITLE) > 0
def test_title_lines_are_strings(self):
"""TITLE contains string lines."""
from engine.app import TITLE
assert all(isinstance(line, str) for line in TITLE)
class TestAppImports:
"""Tests for app module imports."""
def test_app_imports_without_error(self):
"""Module imports without error."""
from engine import app
assert app is not None

View File

@@ -83,3 +83,35 @@ class TestStreamControllerCleanup:
controller.cleanup() controller.cleanup()
mock_mic_instance.stop.assert_called_once() mock_mic_instance.stop.assert_called_once()
class TestStreamControllerWarmup:
"""Tests for StreamController topic warmup."""
def test_warmup_topics_idempotent(self):
"""warmup_topics can be called multiple times."""
StreamController._topics_warmed = False
with patch("urllib.request.urlopen") as mock_urlopen:
StreamController.warmup_topics()
StreamController.warmup_topics()
assert mock_urlopen.call_count >= 3
def test_warmup_topics_sets_flag(self):
"""warmup_topics sets the warmed flag."""
StreamController._topics_warmed = False
with patch("urllib.request.urlopen"):
StreamController.warmup_topics()
assert StreamController._topics_warmed is True
def test_warmup_topics_skips_after_first(self):
"""warmup_topics skips after first call."""
StreamController._topics_warmed = True
with patch("urllib.request.urlopen") as mock_urlopen:
StreamController.warmup_topics()
mock_urlopen.assert_not_called()

View File

@@ -1,234 +0,0 @@
"""
Tests for engine.fetch module.
"""
import json
from unittest.mock import MagicMock, patch
from engine.fetch import (
_fetch_gutenberg,
fetch_all,
fetch_feed,
fetch_poetry,
load_cache,
save_cache,
)
class TestFetchFeed:
"""Tests for fetch_feed function."""
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_success(self, mock_urlopen):
"""Successful feed fetch returns parsed feed."""
mock_response = MagicMock()
mock_response.read.return_value = b"<rss>test</rss>"
mock_urlopen.return_value = mock_response
result = fetch_feed("http://example.com/feed")
assert result is not None
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_network_error(self, mock_urlopen):
"""Network error returns None."""
mock_urlopen.side_effect = Exception("Network error")
result = fetch_feed("http://example.com/feed")
assert result is None
class TestFetchAll:
"""Tests for fetch_all function."""
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.strip_tags")
@patch("engine.fetch.skip")
@patch("engine.fetch.boot_ln")
def test_fetch_all_success(self, mock_boot, mock_skip, mock_strip, mock_fetch_feed):
"""Successful fetch returns items."""
mock_feed = MagicMock()
mock_feed.bozo = False
mock_feed.entries = [
{"title": "Headline 1", "published_parsed": (2024, 1, 1, 12, 0, 0)},
{"title": "Headline 2", "updated_parsed": (2024, 1, 2, 12, 0, 0)},
]
mock_fetch_feed.return_value = mock_feed
mock_skip.return_value = False
mock_strip.side_effect = lambda x: x
items, linked, failed = fetch_all()
assert linked > 0
assert failed == 0
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.boot_ln")
def test_fetch_all_feed_error(self, mock_boot, mock_fetch_feed):
"""Feed error increments failed count."""
mock_fetch_feed.return_value = None
items, linked, failed = fetch_all()
assert failed > 0
@patch("engine.fetch.fetch_feed")
@patch("engine.fetch.strip_tags")
@patch("engine.fetch.skip")
@patch("engine.fetch.boot_ln")
def test_fetch_all_skips_filtered(
self, mock_boot, mock_skip, mock_strip, mock_fetch_feed
):
"""Filtered headlines are skipped."""
mock_feed = MagicMock()
mock_feed.bozo = False
mock_feed.entries = [
{"title": "Sports scores"},
{"title": "Valid headline"},
]
mock_fetch_feed.return_value = mock_feed
mock_skip.side_effect = lambda x: x == "Sports scores"
mock_strip.side_effect = lambda x: x
items, linked, failed = fetch_all()
assert any("Valid headline" in item[0] for item in items)
class TestFetchGutenberg:
"""Tests for _fetch_gutenberg function."""
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_success(self, mock_urlopen):
"""Successful gutenberg fetch returns items."""
text = """Project Gutenberg
*** START OF THE PROJECT GUTENBERG ***
This is a test poem with multiple lines
that should be parsed as a block.
Another stanza with more content here.
*** END OF THE PROJECT GUTENBERG ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert len(result) > 0
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_network_error(self, mock_urlopen):
"""Network error returns empty list."""
mock_urlopen.side_effect = Exception("Network error")
result = _fetch_gutenberg("http://example.com/test", "Test")
assert result == []
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_skips_short_blocks(self, mock_urlopen):
"""Blocks shorter than 20 chars are skipped."""
text = """*** START OF THE ***
Short
*** END OF THE ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert result == []
@patch("engine.fetch.urllib.request.urlopen")
def test_gutenberg_skips_all_caps_headers(self, mock_urlopen):
"""All-caps lines are skipped as headers."""
text = """*** START OF THE ***
THIS IS ALL CAPS HEADER
more content here
*** END OF THE ***
"""
mock_response = MagicMock()
mock_response.read.return_value = text.encode("utf-8")
mock_urlopen.return_value = mock_response
result = _fetch_gutenberg("http://example.com/test", "Test")
assert len(result) > 0
class TestFetchPoetry:
"""Tests for fetch_poetry function."""
@patch("engine.fetch._fetch_gutenberg")
@patch("engine.fetch.boot_ln")
def test_fetch_poetry_success(self, mock_boot, mock_fetch):
"""Successful poetry fetch returns items."""
mock_fetch.return_value = [
("Stanza 1 content here", "Test", ""),
("Stanza 2 content here", "Test", ""),
]
items, linked, failed = fetch_poetry()
assert linked > 0
assert failed == 0
@patch("engine.fetch._fetch_gutenberg")
@patch("engine.fetch.boot_ln")
def test_fetch_poetry_failure(self, mock_boot, mock_fetch):
"""Failed fetch increments failed count."""
mock_fetch.return_value = []
items, linked, failed = fetch_poetry()
assert failed > 0
class TestCache:
"""Tests for cache functions."""
@patch("engine.fetch._cache_path")
def test_load_cache_success(self, mock_path):
"""Successful cache load returns items."""
mock_path.return_value.__str__ = MagicMock(return_value="/tmp/cache")
mock_path.return_value.exists.return_value = True
mock_path.return_value.read_text.return_value = json.dumps(
{"items": [("title", "source", "time")]}
)
result = load_cache()
assert result is not None
@patch("engine.fetch._cache_path")
def test_load_cache_missing_file(self, mock_path):
"""Missing cache file returns None."""
mock_path.return_value.exists.return_value = False
result = load_cache()
assert result is None
@patch("engine.fetch._cache_path")
def test_load_cache_invalid_json(self, mock_path):
"""Invalid JSON returns None."""
mock_path.return_value.exists.return_value = True
mock_path.return_value.read_text.side_effect = json.JSONDecodeError("", "", 0)
result = load_cache()
assert result is None
@patch("engine.fetch._cache_path")
def test_save_cache_success(self, mock_path):
"""Save cache writes to file."""
mock_path.return_value.__truediv__ = MagicMock(
return_value=mock_path.return_value
)
save_cache([("title", "source", "time")])

35
tests/test_fetch_code.py Normal file
View File

@@ -0,0 +1,35 @@
import re
from engine.fetch_code import fetch_code
def test_return_shape():
items, line_count, ignored = fetch_code()
assert isinstance(items, list)
assert line_count == len(items)
assert ignored == 0
def test_items_are_tuples():
items, _, _ = fetch_code()
assert items, "expected at least one code line"
for item in items:
assert isinstance(item, tuple) and len(item) == 3
text, src, ts = item
assert isinstance(text, str)
assert isinstance(src, str)
assert isinstance(ts, str)
def test_blank_and_comment_lines_excluded():
items, _, _ = fetch_code()
for text, _, _ in items:
assert text.strip(), "blank line should have been filtered"
assert not text.strip().startswith("#"), "comment line should have been filtered"
def test_module_path_format():
items, _, _ = fetch_code()
pattern = re.compile(r"^engine\.\w+$")
for _, _, ts in items:
assert pattern.match(ts), f"unexpected module path: {ts!r}"

View File

@@ -1,232 +0,0 @@
"""
Tests for engine.render module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.render import (
GRAD_COLS,
MSG_GRAD_COLS,
clear_font_cache,
font_for_lang,
lr_gradient,
lr_gradient_opposite,
make_block,
)
class TestGradientConstants:
"""Tests for gradient color constants."""
def test_grad_cols_defined(self):
"""GRAD_COLS is defined with expected length."""
assert len(GRAD_COLS) > 0
assert all(isinstance(c, str) for c in GRAD_COLS)
def test_msg_grad_cols_defined(self):
"""MSG_GRAD_COLS is defined with expected length."""
assert len(MSG_GRAD_COLS) > 0
assert all(isinstance(c, str) for c in MSG_GRAD_COLS)
def test_grad_cols_start_with_white(self):
"""GRAD_COLS starts with white."""
assert "231" in GRAD_COLS[0]
def test_msg_grad_cols_different_from_grad_cols(self):
"""MSG_GRAD_COLS is different from GRAD_COLS."""
assert MSG_GRAD_COLS != GRAD_COLS
class TestLrGradient:
"""Tests for lr_gradient function."""
def test_empty_rows(self):
"""Empty input returns empty output."""
result = lr_gradient([], 0.0)
assert result == []
def test_preserves_empty_rows(self):
"""Empty rows are preserved."""
result = lr_gradient([""], 0.0)
assert result == [""]
def test_adds_gradient_to_content(self):
"""Non-empty rows get gradient coloring."""
result = lr_gradient(["hello"], 0.0)
assert len(result) == 1
assert "\033[" in result[0]
def test_preserves_spaces(self):
"""Spaces are preserved without coloring."""
result = lr_gradient(["hello world"], 0.0)
assert " " in result[0]
def test_offset_wraps_around(self):
"""Offset wraps around at 1.0."""
result1 = lr_gradient(["hello"], 0.0)
result2 = lr_gradient(["hello"], 1.0)
assert result1 != result2 or result1 == result2
class TestLrGradientOpposite:
"""Tests for lr_gradient_opposite function."""
def test_uses_msg_grad_cols(self):
"""Uses MSG_GRAD_COLS instead of GRAD_COLS."""
result = lr_gradient_opposite(["test"])
assert "\033[" in result[0]
class TestClearFontCache:
"""Tests for clear_font_cache function."""
def test_clears_without_error(self):
"""Function runs without error."""
clear_font_cache()
class TestFontForLang:
"""Tests for font_for_lang function."""
@patch("engine.render.font")
def test_returns_default_for_none(self, mock_font):
"""Returns default font when lang is None."""
result = font_for_lang(None)
assert result is not None
@patch("engine.render.font")
def test_returns_default_for_unknown_lang(self, mock_font):
"""Returns default font for unknown language."""
result = font_for_lang("unknown_lang")
assert result is not None
class TestMakeBlock:
"""Tests for make_block function."""
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_basic(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Basic make_block returns content, color, meta index."""
mock_wrap.return_value = ["Headline content", ""]
mock_random.choice.return_value = "\033[38;5;46m"
content, color, meta_idx = make_block(
"Test headline", "TestSource", "12:00", 80
)
assert len(content) > 0
assert color is not None
assert meta_idx >= 0
@pytest.mark.skip(reason="Requires full PIL/font environment")
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_translation(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Translation is applied when mode is news."""
mock_wrap.return_value = ["Translated"]
mock_random.choice.return_value = "\033[38;5;46m"
mock_detect.return_value = "de"
with patch("engine.config.MODE", "news"):
content, _, _ = make_block("Test", "Source", "12:00", 80)
mock_translate.assert_called_once()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_no_translation_poetry(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""No translation when mode is poetry."""
mock_wrap.return_value = ["Poem content"]
mock_random.choice.return_value = "\033[38;5;46m"
with patch("engine.config.MODE", "poetry"):
make_block("Test", "Source", "12:00", 80)
mock_translate.assert_not_called()
@patch("engine.translate.translate_headline")
@patch("engine.translate.detect_location_language")
@patch("engine.render.font_for_lang")
@patch("engine.render.big_wrap")
@patch("engine.render.random")
def test_make_block_meta_format(
self, mock_random, mock_wrap, mock_font, mock_detect, mock_translate
):
"""Meta line includes source and timestamp."""
mock_wrap.return_value = ["Content"]
mock_random.choice.return_value = "\033[38;5;46m"
content, _, meta_idx = make_block("Test", "MySource", "14:30", 80)
meta_line = content[meta_idx]
assert "MySource" in meta_line
assert "14:30" in meta_line
class TestRenderLine:
"""Tests for render_line function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.render import render_line
result = render_line("")
assert result == [""]
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_uses_default_font(self):
"""Uses default font when none provided."""
from engine.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
render_line("test")
def test_getbbox_returns_none(self):
"""Handles None bbox gracefully."""
from engine.render import render_line
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = None
result = render_line("test")
assert result == [""]
class TestBigWrap:
"""Tests for big_wrap function."""
def test_empty_string(self):
"""Empty string returns empty list."""
from engine.render import big_wrap
result = big_wrap("", 80)
assert result == []
@pytest.mark.skip(reason="Requires real font/PIL setup")
def test_single_word_fits(self):
"""Single short word returns rendered."""
from engine.render import big_wrap
with patch("engine.render.font") as mock_font:
mock_font.return_value = MagicMock()
mock_font.return_value.getbbox.return_value = (0, 0, 10, 10)
result = big_wrap("test", 80)
assert len(result) > 0

View File

@@ -1,115 +0,0 @@
"""
Tests for engine.translate module.
"""
import json
from unittest.mock import MagicMock, patch
from engine.translate import (
_translate_cached,
detect_location_language,
translate_headline,
)
def clear_translate_cache():
"""Clear the LRU cache between tests."""
_translate_cached.cache_clear()
class TestDetectLocationLanguage:
"""Tests for detect_location_language function."""
def test_returns_none_for_unknown_location(self):
"""Returns None when no location pattern matches."""
result = detect_location_language("Breaking news about technology")
assert result is None
def test_detects_berlin(self):
"""Detects Berlin location."""
result = detect_location_language("Berlin police arrest protesters")
assert result == "de"
def test_detects_paris(self):
"""Detects Paris location."""
result = detect_location_language("Paris fashion week begins")
assert result == "fr"
def test_detects_tokyo(self):
"""Detects Tokyo location."""
result = detect_location_language("Tokyo stocks rise")
assert result == "ja"
def test_detects_berlin_again(self):
"""Detects Berlin location again."""
result = detect_location_language("Berlin marathon set to begin")
assert result == "de"
def test_case_insensitive(self):
"""Detection is case insensitive."""
result = detect_location_language("BERLIN SUMMER FESTIVAL")
assert result == "de"
def test_returns_first_match(self):
"""Returns first matching pattern."""
result = detect_location_language("Berlin in Paris for the event")
assert result == "de"
class TestTranslateHeadline:
"""Tests for translate_headline function."""
def test_returns_translated_text(self):
"""Returns translated text from cache."""
clear_translate_cache()
with patch("engine.translate.translate_headline") as mock_fn:
mock_fn.return_value = "Translated title"
from engine.translate import translate_headline as th
result = th("Original title", "de")
assert result == "Translated title"
def test_uses_cached_result(self):
"""Translation uses LRU cache."""
clear_translate_cache()
result1 = translate_headline("Test unique", "es")
result2 = translate_headline("Test unique", "es")
assert result1 == result2
class TestTranslateCached:
"""Tests for _translate_cached function."""
def test_translation_network_error(self):
"""Network error returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_urlopen.side_effect = Exception("Network error")
result = _translate_cached("Hello world", "de")
assert result == "Hello world"
def test_translation_invalid_json(self):
"""Invalid JSON returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_response = MagicMock()
mock_response.read.return_value = b"invalid json"
mock_urlopen.return_value = mock_response
result = _translate_cached("Hello", "de")
assert result == "Hello"
def test_translation_empty_response(self):
"""Empty translation response returns original text."""
clear_translate_cache()
with patch("engine.translate.urllib.request.urlopen") as mock_urlopen:
mock_response = MagicMock()
mock_response.read.return_value = json.dumps([[[""], None, "de"], None])
mock_urlopen.return_value = mock_response
result = _translate_cached("Hello", "de")
assert result == "Hello"

View File

@@ -1,161 +0,0 @@
"""
Tests for engine.websocket_display module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.websocket_display import WebSocketDisplay
class TestWebSocketDisplayImport:
"""Test that websocket module can be imported."""
def test_import_does_not_error(self):
"""Module imports without error."""
from engine import websocket_display
assert websocket_display is not None
class TestWebSocketDisplayInit:
"""Tests for WebSocketDisplay initialization."""
def test_default_init(self):
"""Default initialization sets correct defaults."""
with patch("engine.websocket_display.websockets", None):
display = WebSocketDisplay()
assert display.host == "0.0.0.0"
assert display.port == 8765
assert display.http_port == 8766
assert display.width == 80
assert display.height == 24
def test_custom_init(self):
"""Custom initialization uses provided values."""
with patch("engine.websocket_display.websockets", None):
display = WebSocketDisplay(host="localhost", port=9000, http_port=9001)
assert display.host == "localhost"
assert display.port == 9000
assert display.http_port == 9001
def test_is_available_when_websockets_present(self):
"""is_available returns True when websockets is available."""
pytest.importorskip("websockets")
display = WebSocketDisplay()
assert display.is_available() is True
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_is_available_when_websockets_missing(self):
"""is_available returns False when websockets is not available."""
display = WebSocketDisplay()
assert display.is_available() is False
class TestWebSocketDisplayProtocol:
"""Test that WebSocketDisplay satisfies Display protocol."""
def test_websocket_display_is_display(self):
"""WebSocketDisplay satisfies Display protocol."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
assert hasattr(display, "clear")
assert hasattr(display, "cleanup")
class TestWebSocketDisplayMethods:
"""Tests for WebSocketDisplay methods."""
def test_init_stores_dimensions(self):
"""init stores terminal dimensions."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
display.init(100, 40)
assert display.width == 100
assert display.height == 40
def test_client_count_initially_zero(self):
"""client_count returns 0 when no clients connected."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.client_count() == 0
def test_get_ws_port(self):
"""get_ws_port returns configured port."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay(port=9000)
assert display.get_ws_port() == 9000
def test_get_http_port(self):
"""get_http_port returns configured port."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay(http_port=9001)
assert display.get_http_port() == 9001
def test_frame_delay_defaults_to_zero(self):
"""get_frame_delay returns 0 by default."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.get_frame_delay() == 0.0
def test_set_frame_delay(self):
"""set_frame_delay stores the value."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
display.set_frame_delay(0.05)
assert display.get_frame_delay() == 0.05
class TestWebSocketDisplayCallbacks:
"""Tests for WebSocketDisplay callback methods."""
def test_set_client_connected_callback(self):
"""set_client_connected_callback stores callback."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_connected_callback(callback)
assert display._client_connected_callback is callback
def test_set_client_disconnected_callback(self):
"""set_client_disconnected_callback stores callback."""
with patch("engine.websocket_display.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_disconnected_callback(callback)
assert display._client_disconnected_callback is callback
class TestWebSocketDisplayUnavailable:
"""Tests when WebSocket support is unavailable."""
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_start_server_noop_when_unavailable(self):
"""start_server does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.start_server()
assert display._server_thread is None
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_start_http_server_noop_when_unavailable(self):
"""start_http_server does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.start_http_server()
assert display._http_thread is None
@pytest.mark.skipif(
pytest.importorskip("websockets") is not None, reason="websockets is available"
)
def test_show_noops_when_unavailable(self):
"""show does nothing when websockets unavailable."""
display = WebSocketDisplay()
display.show(["line1", "line2"])