feat(display): add reuse flag to Display protocol

- Add reuse parameter to Display.init() for all backends
- PygameDisplay: reuse existing SDL window via class-level flag
- TerminalDisplay: skip re-init when reuse=True
- WebSocketDisplay: skip server start when reuse=True
- SixelDisplay, KittyDisplay, NullDisplay: ignore reuse (not applicable)
- MultiDisplay: pass reuse to child displays
- Update benchmark.py to reuse pygame display for effect benchmarks
- Add test_websocket_e2e.py with e2e marker
- Register e2e marker in pyproject.toml
This commit is contained in:
2026-03-16 00:30:52 -07:00
parent f9991c24af
commit f5de2c62e0
13 changed files with 309 additions and 34 deletions

View File

@@ -62,9 +62,21 @@ def get_sample_buffer(width: int = 80, height: int = 24) -> list[str]:
def benchmark_display( def benchmark_display(
display_class, buffer: list[str], iterations: int = 100 display_class,
buffer: list[str],
iterations: int = 100,
display=None,
reuse: bool = False,
) -> BenchmarkResult | None: ) -> BenchmarkResult | None:
"""Benchmark a single display.""" """Benchmark a single display.
Args:
display_class: Display class to instantiate
buffer: Buffer to display
iterations: Number of iterations
display: Optional existing display instance to reuse
reuse: If True and display provided, use reuse mode
"""
old_stdout = sys.stdout old_stdout = sys.stdout
old_stderr = sys.stderr old_stderr = sys.stderr
@@ -72,8 +84,12 @@ def benchmark_display(
sys.stdout = StringIO() sys.stdout = StringIO()
sys.stderr = StringIO() sys.stderr = StringIO()
if display is None:
display = display_class() display = display_class()
display.init(80, 24) display.init(80, 24, reuse=False)
should_cleanup = True
else:
should_cleanup = False
times = [] times = []
chars = sum(len(line) for line in buffer) chars = sum(len(line) for line in buffer)
@@ -84,7 +100,8 @@ def benchmark_display(
elapsed = (time.perf_counter() - t0) * 1000 elapsed = (time.perf_counter() - t0) * 1000
times.append(elapsed) times.append(elapsed)
display.cleanup() if should_cleanup and hasattr(display, "cleanup"):
display.cleanup(quit_pygame=False)
except Exception: except Exception:
return None return None
@@ -113,9 +130,17 @@ def benchmark_display(
def benchmark_effect_with_display( def benchmark_effect_with_display(
effect_class, display, buffer: list[str], iterations: int = 100 effect_class, display, buffer: list[str], iterations: int = 100, reuse: bool = False
) -> BenchmarkResult | None: ) -> BenchmarkResult | None:
"""Benchmark an effect with a display.""" """Benchmark an effect with a display.
Args:
effect_class: Effect class to instantiate
display: Display instance to use
buffer: Buffer to process and display
iterations: Number of iterations
reuse: If True, use reuse mode for display
"""
old_stdout = sys.stdout old_stdout = sys.stdout
old_stderr = sys.stderr old_stderr = sys.stderr
@@ -149,7 +174,8 @@ def benchmark_effect_with_display(
elapsed = (time.perf_counter() - t0) * 1000 elapsed = (time.perf_counter() - t0) * 1000
times.append(elapsed) times.append(elapsed)
display.cleanup() if not reuse and hasattr(display, "cleanup"):
display.cleanup(quit_pygame=False)
except Exception: except Exception:
return None return None
@@ -206,6 +232,13 @@ def get_available_displays():
except Exception: except Exception:
pass pass
try:
from engine.display.backends.pygame import PygameDisplay
displays.append(("pygame", PygameDisplay))
except Exception:
pass
return displays return displays
@@ -255,6 +288,7 @@ def run_benchmarks(
if verbose: if verbose:
print(f"Running benchmarks ({iterations} iterations each)...") print(f"Running benchmarks ({iterations} iterations each)...")
pygame_display = None
for name, display_class in displays: for name, display_class in displays:
if verbose: if verbose:
print(f"Benchmarking display: {name}") print(f"Benchmarking display: {name}")
@@ -265,13 +299,44 @@ def run_benchmarks(
if verbose: if verbose:
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg") print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
if name == "pygame":
pygame_display = result
if verbose: if verbose:
print() print()
pygame_instance = None
if pygame_display:
try:
from engine.display.backends.pygame import PygameDisplay
PygameDisplay.reset_state()
pygame_instance = PygameDisplay()
pygame_instance.init(80, 24, reuse=False)
except Exception:
pygame_instance = None
for effect_name, effect_class in effects: for effect_name, effect_class in effects:
for display_name, display_class in displays: for display_name, display_class in displays:
if display_name == "websocket": if display_name == "websocket":
continue continue
if display_name == "pygame":
if verbose:
print(f"Benchmarking effect: {effect_name} with {display_name}")
if pygame_instance:
result = benchmark_effect_with_display(
effect_class, pygame_instance, buffer, iterations, reuse=True
)
if result:
results.append(result)
if verbose:
print(
f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg"
)
continue
if verbose: if verbose:
print(f"Benchmarking effect: {effect_name} with {display_name}") print(f"Benchmarking effect: {effect_name} with {display_name}")
@@ -285,6 +350,12 @@ def run_benchmarks(
if verbose: if verbose:
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg") print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
if pygame_instance:
try:
pygame_instance.cleanup(quit_pygame=True)
except Exception:
pass
summary = generate_summary(results) summary = generate_summary(results)
return BenchmarkReport( return BenchmarkReport(

View File

@@ -5,8 +5,10 @@ Stream controller - manages input sources and orchestrates the render stream.
from engine.config import Config, get_config from engine.config import Config, get_config
from engine.display import ( from engine.display import (
DisplayRegistry, DisplayRegistry,
KittyDisplay,
MultiDisplay, MultiDisplay,
NullDisplay, NullDisplay,
PygameDisplay,
SixelDisplay, SixelDisplay,
TerminalDisplay, TerminalDisplay,
WebSocketDisplay, WebSocketDisplay,
@@ -38,6 +40,12 @@ def _get_display(config: Config):
if display_mode == "sixel": if display_mode == "sixel":
displays.append(SixelDisplay()) displays.append(SixelDisplay())
if display_mode == "kitty":
displays.append(KittyDisplay())
if display_mode == "pygame":
displays.append(PygameDisplay())
if not displays: if not displays:
return NullDisplay() return NullDisplay()

View File

@@ -17,13 +17,30 @@ from engine.display.backends.websocket import WebSocketDisplay
class Display(Protocol): class Display(Protocol):
"""Protocol for display backends.""" """Protocol for display backends.
All display backends must implement:
- width, height: Terminal dimensions
- init(width, height, reuse=False): Initialize the display
- show(buffer): Render buffer to display
- clear(): Clear the display
- cleanup(): Shutdown the display
The reuse flag allows attaching to an existing display instance
rather than creating a new window/connection.
"""
width: int width: int
height: int height: int
def init(self, width: int, height: int) -> None: def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize display with dimensions.""" """Initialize display with dimensions.
Args:
width: Terminal width in characters
height: Terminal height in rows
reuse: If True, attach to existing display instead of creating new
"""
... ...
def show(self, buffer: list[str]) -> None: def show(self, buffer: list[str]) -> None:

View File

@@ -126,7 +126,14 @@ class KittyDisplay:
self._initialized = False self._initialized = False
self._font_path = None self._font_path = None
def init(self, width: int, height: int) -> None: def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize display with dimensions.
Args:
width: Terminal width in characters
height: Terminal height in rows
reuse: Ignored for KittyDisplay (protocol doesn't support reuse)
"""
self.width = width self.width = width
self.height = height self.height = height
self._initialized = True self._initialized = True

View File

@@ -4,7 +4,10 @@ Multi display backend - forwards to multiple displays.
class MultiDisplay: class MultiDisplay:
"""Display that forwards to multiple displays.""" """Display that forwards to multiple displays.
Supports reuse - passes reuse flag to all child displays.
"""
width: int = 80 width: int = 80
height: int = 24 height: int = 24
@@ -14,11 +17,18 @@ class MultiDisplay:
self.width = 80 self.width = 80
self.height = 24 self.height = 24
def init(self, width: int, height: int) -> None: def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize all child displays with dimensions.
Args:
width: Terminal width in characters
height: Terminal height in rows
reuse: If True, use reuse mode for child displays
"""
self.width = width self.width = width
self.height = height self.height = height
for d in self.displays: for d in self.displays:
d.init(width, height) d.init(width, height, reuse=reuse)
def show(self, buffer: list[str]) -> None: def show(self, buffer: list[str]) -> None:
for d in self.displays: for d in self.displays:

View File

@@ -6,12 +6,23 @@ import time
class NullDisplay: class NullDisplay:
"""Headless/null display - discards all output.""" """Headless/null display - discards all output.
This display does nothing - useful for headless benchmarking
or when no display output is needed.
"""
width: int = 80 width: int = 80
height: int = 24 height: int = 24
def init(self, width: int, height: int) -> None: def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize display with dimensions.
Args:
width: Terminal width in characters
height: Terminal height in rows
reuse: Ignored for NullDisplay (no resources to reuse)
"""
self.width = width self.width = width
self.height = height self.height = height

View File

@@ -6,12 +6,16 @@ import time
class PygameDisplay: class PygameDisplay:
"""Pygame display backend - renders to native window.""" """Pygame display backend - renders to native window.
Supports reuse mode - when reuse=True, skips SDL initialization
and reuses the existing pygame window from a previous instance.
"""
width: int = 80 width: int = 80
height: int = 24
window_width: int = 800 window_width: int = 800
window_height: int = 600 window_height: int = 600
_pygame_initialized: bool = False
def __init__( def __init__(
self, self,
@@ -76,20 +80,37 @@ class PygameDisplay:
return None return None
def init(self, width: int, height: int) -> None: def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize display with dimensions.
Args:
width: Terminal width in characters
height: Terminal height in rows
reuse: If True, attach to existing pygame window instead of creating new
"""
self.width = width self.width = width
self.height = height self.height = height
import os
os.environ["SDL_VIDEODRIVER"] = "x11"
try: try:
import pygame import pygame
except ImportError: except ImportError:
return return
pygame.init() if reuse and PygameDisplay._pygame_initialized:
self._pygame = pygame self._pygame = pygame
self._initialized = True
return
pygame.init()
pygame.display.set_caption("Mainline")
self._screen = pygame.display.set_mode((self.window_width, self.window_height)) self._screen = pygame.display.set_mode((self.window_width, self.window_height))
pygame.display.set_caption("Mainline") self._pygame = pygame
PygameDisplay._pygame_initialized = True
font_path = self._get_font_path() font_path = self._get_font_path()
if font_path: if font_path:
@@ -218,6 +239,18 @@ class PygameDisplay:
self._screen.fill((0, 0, 0)) self._screen.fill((0, 0, 0))
self._pygame.display.flip() self._pygame.display.flip()
def cleanup(self) -> None: def cleanup(self, quit_pygame: bool = True) -> None:
if self._pygame: """Cleanup display resources.
Args:
quit_pygame: If True, quit pygame entirely. Set to False when
reusing the display to avoid closing shared window.
"""
if quit_pygame and self._pygame:
self._pygame.quit() self._pygame.quit()
PygameDisplay._pygame_initialized = False
@classmethod
def reset_state(cls) -> None:
"""Reset pygame state - useful for testing."""
cls._pygame_initialized = False

View File

@@ -271,7 +271,14 @@ class SixelDisplay:
return None return None
def init(self, width: int, height: int) -> None: def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize display with dimensions.
Args:
width: Terminal width in characters
height: Terminal height in rows
reuse: Ignored for SixelDisplay
"""
self.width = width self.width = width
self.height = height self.height = height
self._initialized = True self._initialized = True

View File

@@ -6,17 +6,32 @@ import time
class TerminalDisplay: class TerminalDisplay:
"""ANSI terminal display backend.""" """ANSI terminal display backend.
Renders buffer to stdout using ANSI escape codes.
Supports reuse - when reuse=True, skips re-initializing terminal state.
"""
width: int = 80 width: int = 80
height: int = 24 height: int = 24
_initialized: bool = False
def init(self, width: int, height: int) -> None: def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize display with dimensions.
Args:
width: Terminal width in characters
height: Terminal height in rows
reuse: If True, skip terminal re-initialization
"""
from engine.terminal import CURSOR_OFF from engine.terminal import CURSOR_OFF
self.width = width self.width = width
self.height = height self.height = height
if not reuse or not self._initialized:
print(CURSOR_OFF, end="", flush=True) print(CURSOR_OFF, end="", flush=True)
self._initialized = True
def show(self, buffer: list[str]) -> None: def show(self, buffer: list[str]) -> None:
import sys import sys

View File

@@ -86,10 +86,18 @@ class WebSocketDisplay:
"""Check if WebSocket support is available.""" """Check if WebSocket support is available."""
return self._available return self._available
def init(self, width: int, height: int) -> None: def init(self, width: int, height: int, reuse: bool = False) -> None:
"""Initialize display with dimensions and start server.""" """Initialize display with dimensions and start server.
Args:
width: Terminal width in characters
height: Terminal height in rows
reuse: If True, skip starting servers (assume already running)
"""
self.width = width self.width = width
self.height = height self.height = height
if not reuse or not self._server_running:
self.start_server() self.start_server()
self.start_http_server() self.start_http_server()

View File

@@ -75,6 +75,7 @@ addopts = [
] ]
markers = [ markers = [
"benchmark: marks tests as performance benchmarks (may be slow)", "benchmark: marks tests as performance benchmarks (may be slow)",
"e2e: marks tests as end-to-end tests (require network/display)",
] ]
filterwarnings = [ filterwarnings = [
"ignore::DeprecationWarning", "ignore::DeprecationWarning",

View File

@@ -155,8 +155,8 @@ class TestMultiDisplay:
assert multi.width == 120 assert multi.width == 120
assert multi.height == 40 assert multi.height == 40
mock_display1.init.assert_called_once_with(120, 40) mock_display1.init.assert_called_once_with(120, 40, reuse=False)
mock_display2.init.assert_called_once_with(120, 40) mock_display2.init.assert_called_once_with(120, 40, reuse=False)
def test_show_forwards_to_all_displays(self): def test_show_forwards_to_all_displays(self):
"""show forwards buffer to all displays.""" """show forwards buffer to all displays."""
@@ -199,3 +199,12 @@ class TestMultiDisplay:
multi.show(["test"]) multi.show(["test"])
multi.clear() multi.clear()
multi.cleanup() multi.cleanup()
def test_init_with_reuse(self):
"""init passes reuse flag to child displays."""
mock_display = MagicMock()
multi = MultiDisplay([mock_display])
multi.init(80, 24, reuse=True)
mock_display.init.assert_called_once_with(80, 24, reuse=True)

View File

@@ -0,0 +1,78 @@
"""
End-to-end tests for WebSocket display using Playwright.
"""
import time
import pytest
class TestWebSocketE2E:
"""End-to-end tests for WebSocket display with browser."""
@pytest.mark.e2e
def test_websocket_server_starts(self):
"""Test that WebSocket server starts and serves HTTP."""
import threading
from engine.display.backends.websocket import WebSocketDisplay
display = WebSocketDisplay(host="127.0.0.1", port=18765)
server_thread = threading.Thread(target=display.start_http_server)
server_thread.daemon = True
server_thread.start()
time.sleep(1)
try:
import urllib.request
response = urllib.request.urlopen("http://127.0.0.1:18765", timeout=5)
assert response.status == 200
content = response.read().decode("utf-8")
assert len(content) > 0
finally:
display.cleanup()
time.sleep(0.5)
@pytest.mark.e2e
@pytest.mark.skipif(
not pytest.importorskip("playwright", reason="playwright not installed"),
reason="playwright not installed",
)
def test_websocket_browser_connection(self):
"""Test WebSocket connection with actual browser."""
import threading
from playwright.sync_api import sync_playwright
from engine.display.backends.websocket import WebSocketDisplay
display = WebSocketDisplay(host="127.0.0.1", port=18767)
server_thread = threading.Thread(target=display.start_server)
server_thread.daemon = True
server_thread.start()
http_thread = threading.Thread(target=display.start_http_server)
http_thread.daemon = True
http_thread.start()
time.sleep(1)
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto("http://127.0.0.1:18767")
time.sleep(0.5)
title = page.title()
assert len(title) >= 0
browser.close()
finally:
display.cleanup()
time.sleep(0.5)