Compare commits

...

3 Commits

Author SHA1 Message Date
ab3e1766b1 feat(benchmark): add hook mode with baseline cache for pre-push checks
- Fix lint errors and LSP issues in benchmark.py
- Add --hook mode to compare against saved baseline
- Add --baseline flag to save results as baseline
- Add --threshold to configure degradation threshold (default 20%)
- Add benchmark step to pre-push hook in hk.pkl
- Update AGENTS.md with hk documentation links and benchmark runner docs
2026-03-15 22:41:13 -07:00
829c4ab63d refactor: modularize display backends and add benchmark runner
- Create engine/display/ package with registry pattern
- Move displays to engine/display/backends/ (terminal, null, websocket, sixel)
- Add DisplayRegistry with auto-discovery
- Add benchmark.py for performance testing effects × displays matrix
- Add mise tasks: benchmark, benchmark-json, benchmark-report
- Update controller to use new display module
2026-03-15 22:25:28 -07:00
22dd063baa feat: add SixelDisplay backend for terminal graphics
- Implement pure Python Sixel encoder (no C dependency)
- Add SixelDisplay class to display.py with ANSI parsing
- Update controller._get_display() to handle sixel mode
- Add --display sixel CLI flag
- Add mise run-sixel task
- Update docs with display modes
2026-03-15 22:13:44 -07:00
14 changed files with 1249 additions and 163 deletions

View File

@@ -36,12 +36,13 @@ mise run ci # Full CI pipeline (topics-init + lint + test-cov)
```bash
mise run run # Run mainline (terminal)
mise run run-poetry # Run with poetry feed
mise run run-firehose # Run in firehose mode
mise run run-websocket # Run with WebSocket display only
mise run run-both # Run with both terminal and WebSocket
mise run run-client # Run both + open browser
mise run cmd # Run C&C command interface
mise run run-poetry # Run with poetry feed
mise run run-firehose # Run in firehose mode
mise run run-websocket # Run with WebSocket display only
mise run run-sixel # Run with Sixel graphics display
mise run run-both # Run with both terminal and WebSocket
mise run run-client # Run both + open browser
mise run cmd # Run C&C command interface
```
## Git Hooks
@@ -59,9 +60,52 @@ hk init --mise
mise run pre-commit
```
**IMPORTANT**: Always review the hk documentation before modifying `hk.pkl`:
- [hk Configuration Guide](https://hk.jdx.dev/configuration.html)
- [hk Hooks Reference](https://hk.jdx.dev/hooks.html)
- [hk Builtins](https://hk.jdx.dev/builtins.html)
The project uses hk configured in `hk.pkl`:
- **pre-commit**: runs ruff-format and ruff (with auto-fix)
- **pre-push**: runs ruff check
- **pre-push**: runs ruff check + benchmark hook
## Benchmark Runner
Run performance benchmarks:
```bash
mise run benchmark # Run all benchmarks (text output)
mise run benchmark-json # Run benchmarks (JSON output)
mise run benchmark-report # Run benchmarks (Markdown report)
```
### Benchmark Commands
```bash
# Run benchmarks
uv run python -m engine.benchmark
# Run with specific displays/effects
uv run python -m engine.benchmark --displays null,terminal --effects fade,glitch
# Save baseline for hook comparisons
uv run python -m engine.benchmark --baseline
# Run in hook mode (compares against baseline)
uv run python -m engine.benchmark --hook
# Hook mode with custom threshold (default: 20% degradation)
uv run python -m engine.benchmark --hook --threshold 0.3
# Custom baseline location
uv run python -m engine.benchmark --hook --cache /path/to/cache.json
```
### Hook Mode
The `--hook` mode compares current benchmarks against a saved baseline. If performance degrades beyond the threshold (default 20%), it exits with code 1. This is useful for preventing performance regressions in feature branches.
The pre-push hook runs benchmark in hook mode to catch performance regressions before pushing.
## Workflow Rules
@@ -128,6 +172,7 @@ The project uses pytest with strict marker enforcement. Test configuration is in
- **Display abstraction** (`engine/display.py`): swap display backends via the Display protocol
- `TerminalDisplay` - ANSI terminal output
- `WebSocketDisplay` - broadcasts to web clients via WebSocket
- `SixelDisplay` - renders to Sixel graphics (pure Python, no C dependency)
- `MultiDisplay` - forwards to multiple displays simultaneously
- **WebSocket display** (`engine/websocket_display.py`): real-time frame broadcasting to web browsers
@@ -135,6 +180,12 @@ The project uses pytest with strict marker enforcement. Test configuration is in
- HTTP server on port 8766 (serves HTML client)
- Client at `client/index.html` with ANSI color parsing and fullscreen support
- **Display modes** (`--display` flag):
- `terminal` - Default ANSI terminal output
- `websocket` - Web browser display (requires websockets package)
- `sixel` - Sixel graphics in supported terminals (iTerm2, mintty, etc.)
- `both` - Terminal + WebSocket simultaneously
### Command & Control
- C&C uses separate ntfy topics for commands and responses

638
engine/benchmark.py Normal file
View File

@@ -0,0 +1,638 @@
#!/usr/bin/env python3
"""
Benchmark runner for mainline - tests performance across effects and displays.
Usage:
python -m engine.benchmark
python -m engine.benchmark --output report.md
python -m engine.benchmark --displays terminal,websocket --effects glitch,fade
python -m engine.benchmark --format json --output benchmark.json
Headless mode (default): suppress all terminal output during benchmarks.
"""
import argparse
import json
import sys
import time
from dataclasses import dataclass, field
from datetime import datetime
from io import StringIO
from pathlib import Path
from typing import Any
import numpy as np
@dataclass
class BenchmarkResult:
"""Result of a single benchmark run."""
name: str
display: str
effect: str | None
iterations: int
total_time_ms: float
avg_time_ms: float
std_dev_ms: float
min_ms: float
max_ms: float
fps: float
chars_processed: int
chars_per_sec: float
@dataclass
class BenchmarkReport:
"""Complete benchmark report."""
timestamp: str
python_version: str
results: list[BenchmarkResult] = field(default_factory=list)
summary: dict[str, Any] = field(default_factory=dict)
def get_sample_buffer(width: int = 80, height: int = 24) -> list[str]:
"""Generate a sample buffer for benchmarking."""
lines = []
for i in range(height):
line = f"\x1b[32mLine {i}\x1b[0m " + "A" * (width - 10)
lines.append(line)
return lines
def benchmark_display(
display_class, buffer: list[str], iterations: int = 100
) -> BenchmarkResult | None:
"""Benchmark a single display."""
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
sys.stdout = StringIO()
sys.stderr = StringIO()
display = display_class()
display.init(80, 24)
times = []
chars = sum(len(line) for line in buffer)
for _ in range(iterations):
t0 = time.perf_counter()
display.show(buffer)
elapsed = (time.perf_counter() - t0) * 1000
times.append(elapsed)
display.cleanup()
except Exception:
return None
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
times_arr = np.array(times)
return BenchmarkResult(
name=f"display_{display_class.__name__}",
display=display_class.__name__,
effect=None,
iterations=iterations,
total_time_ms=sum(times),
avg_time_ms=float(np.mean(times_arr)),
std_dev_ms=float(np.std(times_arr)),
min_ms=float(np.min(times_arr)),
max_ms=float(np.max(times_arr)),
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
chars_processed=chars * iterations,
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
if sum(times) > 0
else 0.0,
)
def benchmark_effect_with_display(
effect_class, display, buffer: list[str], iterations: int = 100
) -> BenchmarkResult | None:
"""Benchmark an effect with a display."""
old_stdout = sys.stdout
old_stderr = sys.stderr
try:
sys.stdout = StringIO()
sys.stderr = StringIO()
effect = effect_class()
effect.configure(enabled=True, intensity=1.0)
times = []
chars = sum(len(line) for line in buffer)
for _ in range(iterations):
processed = effect.process(buffer)
t0 = time.perf_counter()
display.show(processed)
elapsed = (time.perf_counter() - t0) * 1000
times.append(elapsed)
display.cleanup()
except Exception:
return None
finally:
sys.stdout = old_stdout
sys.stderr = old_stderr
times_arr = np.array(times)
return BenchmarkResult(
name=f"effect_{effect_class.__name__}_with_{display.__class__.__name__}",
display=display.__class__.__name__,
effect=effect_class.__name__,
iterations=iterations,
total_time_ms=sum(times),
avg_time_ms=float(np.mean(times_arr)),
std_dev_ms=float(np.std(times_arr)),
min_ms=float(np.min(times_arr)),
max_ms=float(np.max(times_arr)),
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
chars_processed=chars * iterations,
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
if sum(times) > 0
else 0.0,
)
def get_available_displays():
"""Get available display classes."""
from engine.display import (
DisplayRegistry,
NullDisplay,
TerminalDisplay,
)
DisplayRegistry.initialize()
displays = [
("null", NullDisplay),
("terminal", TerminalDisplay),
]
try:
from engine.display.backends.websocket import WebSocketDisplay
displays.append(("websocket", WebSocketDisplay))
except Exception:
pass
try:
from engine.display.backends.sixel import SixelDisplay
displays.append(("sixel", SixelDisplay))
except Exception:
pass
return displays
def get_available_effects():
"""Get available effect classes."""
try:
from engine.effects import get_registry
except Exception:
return []
effects = []
registry = get_registry()
for name, effect in registry.list_all().items():
if effect:
effects.append((name, effect))
return effects
def run_benchmarks(
displays: list[tuple[str, Any]] | None = None,
effects: list[tuple[str, Any]] | None = None,
iterations: int = 100,
verbose: bool = False,
) -> BenchmarkReport:
"""Run all benchmarks and return report."""
from datetime import datetime
if displays is None:
displays = get_available_displays()
if effects is None:
effects = get_available_effects()
buffer = get_sample_buffer(80, 24)
results = []
if verbose:
print(f"Running benchmarks ({iterations} iterations each)...")
for name, display_class in displays:
if verbose:
print(f"Benchmarking display: {name}")
result = benchmark_display(display_class, buffer, iterations)
if result:
results.append(result)
if verbose:
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
if verbose:
print()
for effect_name, effect_class in effects:
for display_name, display_class in displays:
if display_name == "websocket":
continue
if verbose:
print(f"Benchmarking effect: {effect_name} with {display_name}")
display = display_class()
display.init(80, 24)
result = benchmark_effect_with_display(
effect_class, display, buffer, iterations
)
if result:
results.append(result)
if verbose:
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
summary = generate_summary(results)
return BenchmarkReport(
timestamp=datetime.now().isoformat(),
python_version=sys.version,
results=results,
summary=summary,
)
def generate_summary(results: list[BenchmarkResult]) -> dict[str, Any]:
"""Generate summary statistics from results."""
by_display: dict[str, list[BenchmarkResult]] = {}
by_effect: dict[str, list[BenchmarkResult]] = {}
for r in results:
if r.display not in by_display:
by_display[r.display] = []
by_display[r.display].append(r)
if r.effect:
if r.effect not in by_effect:
by_effect[r.effect] = []
by_effect[r.effect].append(r)
summary = {
"by_display": {},
"by_effect": {},
"overall": {
"total_tests": len(results),
"displays_tested": len(by_display),
"effects_tested": len(by_effect),
},
}
for display, res in by_display.items():
fps_values = [r.fps for r in res]
summary["by_display"][display] = {
"avg_fps": float(np.mean(fps_values)),
"min_fps": float(np.min(fps_values)),
"max_fps": float(np.max(fps_values)),
"tests": len(res),
}
for effect, res in by_effect.items():
fps_values = [r.fps for r in res]
summary["by_effect"][effect] = {
"avg_fps": float(np.mean(fps_values)),
"min_fps": float(np.min(fps_values)),
"max_fps": float(np.max(fps_values)),
"tests": len(res),
}
return summary
DEFAULT_CACHE_PATH = Path.home() / ".mainline_benchmark_cache.json"
def load_baseline(cache_path: Path | None = None) -> dict[str, Any] | None:
"""Load baseline benchmark results from cache."""
path = cache_path or DEFAULT_CACHE_PATH
if not path.exists():
return None
try:
with open(path) as f:
return json.load(f)
except Exception:
return None
def save_baseline(
results: list[BenchmarkResult],
cache_path: Path | None = None,
) -> None:
"""Save benchmark results as baseline to cache."""
path = cache_path or DEFAULT_CACHE_PATH
baseline = {
"timestamp": datetime.now().isoformat(),
"results": {
r.name: {
"fps": r.fps,
"avg_time_ms": r.avg_time_ms,
"chars_per_sec": r.chars_per_sec,
}
for r in results
},
}
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
json.dump(baseline, f, indent=2)
def compare_with_baseline(
results: list[BenchmarkResult],
baseline: dict[str, Any],
threshold: float = 0.2,
verbose: bool = True,
) -> tuple[bool, list[str]]:
"""Compare current results with baseline. Returns (pass, messages)."""
baseline_results = baseline.get("results", {})
failures = []
warnings = []
for r in results:
if r.name not in baseline_results:
warnings.append(f"New test: {r.name} (no baseline)")
continue
b = baseline_results[r.name]
if b["fps"] == 0:
continue
degradation = (b["fps"] - r.fps) / b["fps"]
if degradation > threshold:
failures.append(
f"{r.name}: FPS degraded {degradation * 100:.1f}% "
f"(baseline: {b['fps']:.1f}, current: {r.fps:.1f})"
)
elif verbose:
print(f" {r.name}: {r.fps:.1f} FPS (baseline: {b['fps']:.1f})")
passed = len(failures) == 0
messages = []
if failures:
messages.extend(failures)
if warnings:
messages.extend(warnings)
return passed, messages
def run_hook_mode(
displays: list[tuple[str, Any]] | None = None,
effects: list[tuple[str, Any]] | None = None,
iterations: int = 20,
threshold: float = 0.2,
cache_path: Path | None = None,
verbose: bool = False,
) -> int:
"""Run in hook mode: compare against baseline, exit 0 on pass, 1 on fail."""
baseline = load_baseline(cache_path)
if baseline is None:
print("No baseline found. Run with --baseline to create one.")
return 1
report = run_benchmarks(displays, effects, iterations, verbose)
passed, messages = compare_with_baseline(
report.results, baseline, threshold, verbose
)
print("\n=== Benchmark Hook Results ===")
if passed:
print("PASSED - No significant performance degradation")
return 0
else:
print("FAILED - Performance degradation detected:")
for msg in messages:
print(f" - {msg}")
return 1
def format_report_text(report: BenchmarkReport) -> str:
"""Format report as human-readable text."""
lines = [
"# Mainline Performance Benchmark Report",
"",
f"Generated: {report.timestamp}",
f"Python: {report.python_version}",
"",
"## Summary",
"",
f"Total tests: {report.summary['overall']['total_tests']}",
f"Displays tested: {report.summary['overall']['displays_tested']}",
f"Effects tested: {report.summary['overall']['effects_tested']}",
"",
"## By Display",
"",
]
for display, stats in report.summary["by_display"].items():
lines.append(f"### {display}")
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
lines.append(f"- Tests: {stats['tests']}")
lines.append("")
if report.summary["by_effect"]:
lines.append("## By Effect")
lines.append("")
for effect, stats in report.summary["by_effect"].items():
lines.append(f"### {effect}")
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
lines.append(f"- Tests: {stats['tests']}")
lines.append("")
lines.append("## Detailed Results")
lines.append("")
lines.append("| Display | Effect | FPS | Avg ms | StdDev ms | Min ms | Max ms |")
lines.append("|---------|--------|-----|--------|-----------|--------|--------|")
for r in report.results:
effect_col = r.effect if r.effect else "-"
lines.append(
f"| {r.display} | {effect_col} | {r.fps:.1f} | {r.avg_time_ms:.2f} | "
f"{r.std_dev_ms:.2f} | {r.min_ms:.2f} | {r.max_ms:.2f} |"
)
return "\n".join(lines)
def format_report_json(report: BenchmarkReport) -> str:
"""Format report as JSON."""
data = {
"timestamp": report.timestamp,
"python_version": report.python_version,
"summary": report.summary,
"results": [
{
"name": r.name,
"display": r.display,
"effect": r.effect,
"iterations": r.iterations,
"total_time_ms": r.total_time_ms,
"avg_time_ms": r.avg_time_ms,
"std_dev_ms": r.std_dev_ms,
"min_ms": r.min_ms,
"max_ms": r.max_ms,
"fps": r.fps,
"chars_processed": r.chars_processed,
"chars_per_sec": r.chars_per_sec,
}
for r in report.results
],
}
return json.dumps(data, indent=2)
def main():
parser = argparse.ArgumentParser(description="Run mainline benchmarks")
parser.add_argument(
"--displays",
help="Comma-separated list of displays to test (default: all)",
)
parser.add_argument(
"--effects",
help="Comma-separated list of effects to test (default: all)",
)
parser.add_argument(
"--iterations",
type=int,
default=100,
help="Number of iterations per test (default: 100)",
)
parser.add_argument(
"--output",
help="Output file path (default: stdout)",
)
parser.add_argument(
"--format",
choices=["text", "json"],
default="text",
help="Output format (default: text)",
)
parser.add_argument(
"--verbose",
"-v",
action="store_true",
help="Show progress during benchmarking",
)
parser.add_argument(
"--hook",
action="store_true",
help="Run in hook mode: compare against baseline, exit 0 pass, 1 fail",
)
parser.add_argument(
"--baseline",
action="store_true",
help="Save current results as baseline for future hook comparisons",
)
parser.add_argument(
"--threshold",
type=float,
default=0.2,
help="Performance degradation threshold for hook mode (default: 0.2 = 20%%)",
)
parser.add_argument(
"--cache",
type=str,
default=None,
help="Path to baseline cache file (default: ~/.mainline_benchmark_cache.json)",
)
args = parser.parse_args()
cache_path = Path(args.cache) if args.cache else DEFAULT_CACHE_PATH
if args.hook:
displays = None
if args.displays:
display_map = dict(get_available_displays())
displays = [
(name, display_map[name])
for name in args.displays.split(",")
if name in display_map
]
effects = None
if args.effects:
effect_map = dict(get_available_effects())
effects = [
(name, effect_map[name])
for name in args.effects.split(",")
if name in effect_map
]
return run_hook_mode(
displays,
effects,
iterations=args.iterations,
threshold=args.threshold,
cache_path=cache_path,
verbose=args.verbose,
)
displays = None
if args.displays:
display_map = dict(get_available_displays())
displays = [
(name, display_map[name])
for name in args.displays.split(",")
if name in display_map
]
effects = None
if args.effects:
effect_map = dict(get_available_effects())
effects = [
(name, effect_map[name])
for name in args.effects.split(",")
if name in effect_map
]
report = run_benchmarks(displays, effects, args.iterations, args.verbose)
if args.baseline:
save_baseline(report.results, cache_path)
print(f"Baseline saved to {cache_path}")
return 0
if args.format == "json":
output = format_report_json(report)
else:
output = format_report_text(report)
if args.output:
with open(args.output, "w") as f:
f.write(output)
else:
print(output)
return 0
if __name__ == "__main__":
sys.exit(main())

View File

@@ -3,23 +3,48 @@ Stream controller - manages input sources and orchestrates the render stream.
"""
from engine.config import Config, get_config
from engine.display import (
DisplayRegistry,
MultiDisplay,
NullDisplay,
SixelDisplay,
TerminalDisplay,
WebSocketDisplay,
)
from engine.effects.controller import handle_effects_command
from engine.eventbus import EventBus
from engine.events import EventType, StreamEvent
from engine.mic import MicMonitor
from engine.ntfy import NtfyPoller
from engine.scroll import stream
from engine.websocket_display import WebSocketDisplay
def _get_display(config: Config):
"""Get the appropriate display based on config."""
if config.websocket:
DisplayRegistry.initialize()
display_mode = config.display.lower()
displays = []
if display_mode in ("terminal", "both"):
displays.append(TerminalDisplay())
if display_mode in ("websocket", "both"):
ws = WebSocketDisplay(host="0.0.0.0", port=config.websocket_port)
ws.start_server()
ws.start_http_server()
return ws
return None
displays.append(ws)
if display_mode == "sixel":
displays.append(SixelDisplay())
if not displays:
return NullDisplay()
if len(displays) == 1:
return displays[0]
return MultiDisplay(displays)
class StreamController:

View File

@@ -1,129 +0,0 @@
"""
Display output abstraction - allows swapping output backends.
Protocol:
- init(width, height): Initialize display with terminal dimensions
- show(buffer): Render buffer (list of strings) to display
- clear(): Clear the display
- cleanup(): Shutdown display
"""
import time
from typing import Protocol
class Display(Protocol):
"""Protocol for display backends."""
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
class TerminalDisplay:
"""ANSI terminal display backend."""
def __init__(self):
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
from engine.terminal import CURSOR_OFF
self.width = width
self.height = height
print(CURSOR_OFF, end="", flush=True)
def show(self, buffer: list[str]) -> None:
import sys
t0 = time.perf_counter()
sys.stdout.buffer.write("".join(buffer).encode())
sys.stdout.flush()
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
from engine.terminal import CLR
print(CLR, end="", flush=True)
def cleanup(self) -> None:
from engine.terminal import CURSOR_ON
print(CURSOR_ON, end="", flush=True)
class NullDisplay:
"""Headless/null display - discards all output."""
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
def show(self, buffer: list[str]) -> None:
monitor = get_monitor()
if monitor:
t0 = time.perf_counter()
chars_in = sum(len(line) for line in buffer)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass
class MultiDisplay:
"""Display that forwards to multiple displays."""
def __init__(self, displays: list[Display]):
self.displays = displays
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
for d in self.displays:
d.init(width, height)
def show(self, buffer: list[str]) -> None:
for d in self.displays:
d.show(buffer)
def clear(self) -> None:
for d in self.displays:
d.clear()
def cleanup(self) -> None:
for d in self.displays:
d.cleanup()

102
engine/display/__init__.py Normal file
View File

@@ -0,0 +1,102 @@
"""
Display backend system with registry pattern.
Allows swapping output backends via the Display protocol.
Supports auto-discovery of display backends.
"""
from typing import Protocol
from engine.display.backends.multi import MultiDisplay
from engine.display.backends.null import NullDisplay
from engine.display.backends.sixel import SixelDisplay
from engine.display.backends.terminal import TerminalDisplay
from engine.display.backends.websocket import WebSocketDisplay
class Display(Protocol):
"""Protocol for display backends."""
width: int
height: int
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
def show(self, buffer: list[str]) -> None:
"""Show buffer on display."""
...
def clear(self) -> None:
"""Clear display."""
...
def cleanup(self) -> None:
"""Shutdown display."""
...
class DisplayRegistry:
"""Registry for display backends with auto-discovery."""
_backends: dict[str, type[Display]] = {}
_initialized = False
@classmethod
def register(cls, name: str, backend_class: type[Display]) -> None:
"""Register a display backend."""
cls._backends[name.lower()] = backend_class
@classmethod
def get(cls, name: str) -> type[Display] | None:
"""Get a display backend class by name."""
return cls._backends.get(name.lower())
@classmethod
def list_backends(cls) -> list[str]:
"""List all available display backend names."""
return list(cls._backends.keys())
@classmethod
def create(cls, name: str, **kwargs) -> Display | None:
"""Create a display instance by name."""
backend_class = cls.get(name)
if backend_class:
return backend_class(**kwargs)
return None
@classmethod
def initialize(cls) -> None:
"""Initialize and register all built-in backends."""
if cls._initialized:
return
cls.register("terminal", TerminalDisplay)
cls.register("null", NullDisplay)
cls.register("websocket", WebSocketDisplay)
cls.register("sixel", SixelDisplay)
cls._initialized = True
def get_monitor():
"""Get the performance monitor."""
try:
from engine.effects.performance import get_monitor as _get_monitor
return _get_monitor()
except Exception:
return None
__all__ = [
"Display",
"DisplayRegistry",
"get_monitor",
"TerminalDisplay",
"NullDisplay",
"WebSocketDisplay",
"SixelDisplay",
"MultiDisplay",
]

View File

@@ -0,0 +1,33 @@
"""
Multi display backend - forwards to multiple displays.
"""
class MultiDisplay:
"""Display that forwards to multiple displays."""
width: int = 80
height: int = 24
def __init__(self, displays: list):
self.displays = displays
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
for d in self.displays:
d.init(width, height)
def show(self, buffer: list[str]) -> None:
for d in self.displays:
d.show(buffer)
def clear(self) -> None:
for d in self.displays:
d.clear()
def cleanup(self) -> None:
for d in self.displays:
d.cleanup()

View File

@@ -0,0 +1,32 @@
"""
Null/headless display backend.
"""
import time
class NullDisplay:
"""Headless/null display - discards all output."""
width: int = 80
height: int = 24
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
def show(self, buffer: list[str]) -> None:
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
t0 = time.perf_counter()
chars_in = sum(len(line) for line in buffer)
elapsed_ms = (time.perf_counter() - t0) * 1000
monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass

View File

@@ -0,0 +1,269 @@
"""
Sixel graphics display backend - renders to sixel graphics in terminal.
"""
import time
def _parse_ansi(
text: str,
) -> list[tuple[str, tuple[int, int, int], tuple[int, int, int], bool]]:
"""Parse ANSI text into tokens with fg/bg colors.
Returns list of (text, fg_rgb, bg_rgb, bold).
"""
tokens = []
current_text = ""
fg = (204, 204, 204)
bg = (0, 0, 0)
bold = False
i = 0
ANSI_COLORS = {
0: (0, 0, 0),
1: (205, 49, 49),
2: (13, 188, 121),
3: (229, 229, 16),
4: (36, 114, 200),
5: (188, 63, 188),
6: (17, 168, 205),
7: (229, 229, 229),
8: (102, 102, 102),
9: (241, 76, 76),
10: (35, 209, 139),
11: (245, 245, 67),
12: (59, 142, 234),
13: (214, 112, 214),
14: (41, 184, 219),
15: (255, 255, 255),
}
while i < len(text):
char = text[i]
if char == "\x1b" and i + 1 < len(text) and text[i + 1] == "[":
if current_text:
tokens.append((current_text, fg, bg, bold))
current_text = ""
i += 2
code = ""
while i < len(text):
c = text[i]
if c.isalpha():
break
code += c
i += 1
if code:
codes = code.split(";")
for c in codes:
try:
n = int(c) if c else 0
except ValueError:
continue
if n == 0:
fg = (204, 204, 204)
bg = (0, 0, 0)
bold = False
elif n == 1:
bold = True
elif n == 22:
bold = False
elif n == 39:
fg = (204, 204, 204)
elif n == 49:
bg = (0, 0, 0)
elif 30 <= n <= 37:
fg = ANSI_COLORS.get(n - 30 + (8 if bold else 0), fg)
elif 40 <= n <= 47:
bg = ANSI_COLORS.get(n - 40, bg)
elif 90 <= n <= 97:
fg = ANSI_COLORS.get(n - 90 + 8, fg)
elif 100 <= n <= 107:
bg = ANSI_COLORS.get(n - 100 + 8, bg)
elif 1 <= n <= 256:
if n < 16:
fg = ANSI_COLORS.get(n, fg)
elif n < 232:
c = n - 16
r = (c // 36) * 51
g = ((c % 36) // 6) * 51
b = (c % 6) * 51
fg = (r, g, b)
else:
gray = (n - 232) * 10 + 8
fg = (gray, gray, gray)
else:
current_text += char
i += 1
if current_text:
tokens.append((current_text, fg, bg, bold))
return tokens if tokens else [("", fg, bg, bold)]
def _encode_sixel(image) -> str:
"""Encode a PIL Image to sixel format (pure Python)."""
img = image.convert("RGBA")
width, height = img.size
pixels = img.load()
palette = []
pixel_palette_idx = {}
def get_color_idx(r, g, b, a):
if a < 128:
return -1
key = (r // 32, g // 32, b // 32)
if key not in pixel_palette_idx:
idx = len(palette)
if idx < 256:
palette.append((r, g, b))
pixel_palette_idx[key] = idx
return pixel_palette_idx.get(key, 0)
for y in range(height):
for x in range(width):
r, g, b, a = pixels[x, y]
get_color_idx(r, g, b, a)
if not palette:
return ""
if len(palette) == 1:
palette = [palette[0], (0, 0, 0)]
sixel_data = []
sixel_data.append(
f'"{"".join(f"#{i};2;{r};{g};{b}" for i, (r, g, b) in enumerate(palette))}'
)
for x in range(width):
col_data = []
for y in range(0, height, 6):
bits = 0
color_idx = -1
for dy in range(6):
if y + dy < height:
r, g, b, a = pixels[x, y + dy]
if a >= 128:
bits |= 1 << dy
idx = get_color_idx(r, g, b, a)
if color_idx == -1:
color_idx = idx
elif color_idx != idx:
color_idx = -2
if color_idx >= 0:
col_data.append(
chr(63 + color_idx) + chr(63 + bits)
if bits
else chr(63 + color_idx) + "?"
)
elif color_idx == -2:
pass
if col_data:
sixel_data.append("".join(col_data) + "$")
else:
sixel_data.append("-" if x < width - 1 else "$")
sixel_data.append("\x1b\\")
return "\x1bPq" + "".join(sixel_data)
class SixelDisplay:
"""Sixel graphics display backend - renders to sixel graphics in terminal."""
width: int = 80
height: int = 24
def __init__(self, cell_width: int = 9, cell_height: int = 16):
self.width = 80
self.height = 24
self.cell_width = cell_width
self.cell_height = cell_height
self._initialized = False
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
self._initialized = True
def show(self, buffer: list[str]) -> None:
import sys
t0 = time.perf_counter()
img_width = self.width * self.cell_width
img_height = self.height * self.cell_height
try:
from PIL import Image, ImageDraw, ImageFont
except ImportError:
return
img = Image.new("RGBA", (img_width, img_height), (0, 0, 0, 255))
draw = ImageDraw.Draw(img)
try:
font = ImageFont.truetype(
"/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf",
self.cell_height - 2,
)
except Exception:
try:
font = ImageFont.load_default()
except Exception:
font = None
for row_idx, line in enumerate(buffer[: self.height]):
if row_idx >= self.height:
break
tokens = _parse_ansi(line)
x_pos = 0
y_pos = row_idx * self.cell_height
for text, fg, bg, bold in tokens:
if not text:
continue
if bg != (0, 0, 0):
bbox = draw.textbbox((x_pos, y_pos), text, font=font)
draw.rectangle(bbox, fill=(*bg, 255))
if bold and font:
draw.text((x_pos - 1, y_pos - 1), text, fill=(*fg, 255), font=font)
draw.text((x_pos, y_pos), text, fill=(*fg, 255), font=font)
if font:
x_pos += draw.textlength(text, font=font)
sixel = _encode_sixel(img)
sys.stdout.buffer.write(sixel.encode("utf-8"))
sys.stdout.flush()
elapsed_ms = (time.perf_counter() - t0) * 1000
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("sixel_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
import sys
sys.stdout.buffer.write(b"\x1b[2J\x1b[H")
sys.stdout.flush()
def cleanup(self) -> None:
pass

View File

@@ -0,0 +1,48 @@
"""
ANSI terminal display backend.
"""
import time
class TerminalDisplay:
"""ANSI terminal display backend."""
width: int = 80
height: int = 24
def __init__(self):
self.width = 80
self.height = 24
def init(self, width: int, height: int) -> None:
from engine.terminal import CURSOR_OFF
self.width = width
self.height = height
print(CURSOR_OFF, end="", flush=True)
def show(self, buffer: list[str]) -> None:
import sys
t0 = time.perf_counter()
sys.stdout.buffer.write("".join(buffer).encode())
sys.stdout.flush()
elapsed_ms = (time.perf_counter() - t0) * 1000
from engine.display import get_monitor
monitor = get_monitor()
if monitor:
chars_in = sum(len(line) for line in buffer)
monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in)
def clear(self) -> None:
from engine.terminal import CLR
print(CLR, end="", flush=True)
def cleanup(self) -> None:
from engine.terminal import CURSOR_ON
print(CURSOR_ON, end="", flush=True)

View File

@@ -1,11 +1,5 @@
"""
WebSocket display server - broadcasts frame buffer to connected web clients.
Usage:
ws_display = WebSocketDisplay(host="0.0.0.0", port=8765)
ws_display.init(80, 24)
ws_display.show(["line1", "line2", ...])
ws_display.cleanup()
WebSocket display backend - broadcasts frame buffer to connected web clients.
"""
import asyncio
@@ -23,6 +17,9 @@ except ImportError:
class Display(Protocol):
"""Protocol for display backends."""
width: int
height: int
def init(self, width: int, height: int) -> None:
"""Initialize display with dimensions."""
...
@@ -53,6 +50,9 @@ def get_monitor():
class WebSocketDisplay:
"""WebSocket display backend - broadcasts to HTML Canvas clients."""
width: int = 80
height: int = 24
def __init__(
self,
host: str = "0.0.0.0",
@@ -177,7 +177,9 @@ class WebSocketDisplay:
import os
from http.server import HTTPServer, SimpleHTTPRequestHandler
client_dir = os.path.join(os.path.dirname(os.path.dirname(__file__)), "client")
client_dir = os.path.join(
os.path.dirname(os.path.dirname(os.path.dirname(__file__))), "client"
)
class Handler(SimpleHTTPRequestHandler):
def __init__(self, *args, **kwargs):

3
hk.pkl
View File

@@ -22,6 +22,9 @@ hooks {
prefix = "uv run"
check = "ruff check engine/ tests/"
}
["benchmark"] {
check = "uv run python -m engine.benchmark --hook --displays null --iterations 20"
}
}
}
}

View File

@@ -33,6 +33,7 @@ run-poetry = "uv run mainline.py --poetry"
run-firehose = "uv run mainline.py --firehose"
run-websocket = { run = "uv run mainline.py --display websocket", depends = ["sync-all"] }
run-sixel = { run = "uv run mainline.py --display sixel", depends = ["sync-all"] }
run-both = { run = "uv run mainline.py --display both", depends = ["sync-all"] }
run-client = { run = "mise run run-both & sleep 2 && $(open http://localhost:8766 2>/dev/null || xdg-open http://localhost:8766 2>/dev/null || echo 'Open http://localhost:8766 manually'); wait", depends = ["sync-all"] }
@@ -43,6 +44,14 @@ run-client = { run = "mise run run-both & sleep 2 && $(open http://localhost:876
cmd = "uv run cmdline.py"
cmd-stats = { run = "uv run cmdline.py -w \"/effects stats\"", depends = ["sync-all"] }
# =====================
# Benchmark
# =====================
benchmark = { run = "uv run python -m engine.benchmark", depends = ["sync-all"] }
benchmark-json = { run = "uv run python -m engine.benchmark --format json --output benchmark.json", depends = ["sync-all"] }
benchmark-report = { run = "uv run python -m engine.benchmark --output BENCHMARK.md", depends = ["sync-all"] }
# Initialize ntfy topics (warm up before first use)
topics-init = "curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_cmd > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline_cc_resp > /dev/null && curl -s -d 'init' https://ntfy.sh/klubhaus_terminal_mainline > /dev/null"

View File

@@ -33,6 +33,9 @@ mic = [
websocket = [
"websockets>=12.0",
]
sixel = [
"pysixel>=0.1.0",
]
browser = [
"playwright>=1.40.0",
]

View File

@@ -1,12 +1,12 @@
"""
Tests for engine.websocket_display module.
Tests for engine.display.backends.websocket module.
"""
from unittest.mock import MagicMock, patch
import pytest
from engine.websocket_display import WebSocketDisplay
from engine.display.backends.websocket import WebSocketDisplay
class TestWebSocketDisplayImport:
@@ -14,9 +14,9 @@ class TestWebSocketDisplayImport:
def test_import_does_not_error(self):
"""Module imports without error."""
from engine import websocket_display
from engine.display import backends
assert websocket_display is not None
assert backends is not None
class TestWebSocketDisplayInit:
@@ -24,7 +24,7 @@ class TestWebSocketDisplayInit:
def test_default_init(self):
"""Default initialization sets correct defaults."""
with patch("engine.websocket_display.websockets", None):
with patch("engine.display.backends.websocket.websockets", None):
display = WebSocketDisplay()
assert display.host == "0.0.0.0"
assert display.port == 8765
@@ -34,7 +34,7 @@ class TestWebSocketDisplayInit:
def test_custom_init(self):
"""Custom initialization uses provided values."""
with patch("engine.websocket_display.websockets", None):
with patch("engine.display.backends.websocket.websockets", None):
display = WebSocketDisplay(host="localhost", port=9000, http_port=9001)
assert display.host == "localhost"
assert display.port == 9000
@@ -60,7 +60,7 @@ class TestWebSocketDisplayProtocol:
def test_websocket_display_is_display(self):
"""WebSocketDisplay satisfies Display protocol."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert hasattr(display, "init")
assert hasattr(display, "show")
@@ -73,7 +73,7 @@ class TestWebSocketDisplayMethods:
def test_init_stores_dimensions(self):
"""init stores terminal dimensions."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
display.init(100, 40)
assert display.width == 100
@@ -81,31 +81,31 @@ class TestWebSocketDisplayMethods:
def test_client_count_initially_zero(self):
"""client_count returns 0 when no clients connected."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.client_count() == 0
def test_get_ws_port(self):
"""get_ws_port returns configured port."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay(port=9000)
assert display.get_ws_port() == 9000
def test_get_http_port(self):
"""get_http_port returns configured port."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay(http_port=9001)
assert display.get_http_port() == 9001
def test_frame_delay_defaults_to_zero(self):
"""get_frame_delay returns 0 by default."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
assert display.get_frame_delay() == 0.0
def test_set_frame_delay(self):
"""set_frame_delay stores the value."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
display.set_frame_delay(0.05)
assert display.get_frame_delay() == 0.05
@@ -116,7 +116,7 @@ class TestWebSocketDisplayCallbacks:
def test_set_client_connected_callback(self):
"""set_client_connected_callback stores callback."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_connected_callback(callback)
@@ -124,7 +124,7 @@ class TestWebSocketDisplayCallbacks:
def test_set_client_disconnected_callback(self):
"""set_client_disconnected_callback stores callback."""
with patch("engine.websocket_display.websockets", MagicMock()):
with patch("engine.display.backends.websocket.websockets", MagicMock()):
display = WebSocketDisplay()
callback = MagicMock()
display.set_client_disconnected_callback(callback)