refactor(display)!: remove deprecated backends, simplify protocol, and add BorderMode/UI rendering
- Remove SixelDisplay and KittyDisplay backends (unmaintained) - Simplify Display protocol: reduce docstring noise, emphasize duck typing - Add BorderMode enum (OFF, SIMPLE, UI) for flexible border rendering - Rename render_border to _render_simple_border - Add render_ui_panel() to compose main viewport with right-side UI panel - Add new render_border() dispatcher supporting BorderMode - Update __all__ to expose BorderMode, render_ui_panel, PygameDisplay - Clean up DisplayRegistry: remove deprecated method docstrings - Update tests: remove SixelDisplay import, assert sixel not in registry - Add TODO comment to WebSocket backend about streaming improvements This is a breaking change (removal of backends) but enables cleaner architecture and interactive UI panel. Closes #13, #21
This commit is contained in:
@@ -5,102 +5,58 @@ Allows swapping output backends via the Display protocol.
|
|||||||
Supports auto-discovery of display backends.
|
Supports auto-discovery of display backends.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
from enum import Enum, auto
|
||||||
from typing import Protocol
|
from typing import Protocol
|
||||||
|
|
||||||
from engine.display.backends.kitty import KittyDisplay
|
# Optional backend - requires moderngl package
|
||||||
|
try:
|
||||||
|
from engine.display.backends.moderngl import ModernGLDisplay
|
||||||
|
|
||||||
|
_MODERNGL_AVAILABLE = True
|
||||||
|
except ImportError:
|
||||||
|
ModernGLDisplay = None
|
||||||
|
_MODERNGL_AVAILABLE = False
|
||||||
|
|
||||||
from engine.display.backends.multi import MultiDisplay
|
from engine.display.backends.multi import MultiDisplay
|
||||||
from engine.display.backends.null import NullDisplay
|
from engine.display.backends.null import NullDisplay
|
||||||
from engine.display.backends.pygame import PygameDisplay
|
from engine.display.backends.pygame import PygameDisplay
|
||||||
from engine.display.backends.sixel import SixelDisplay
|
|
||||||
from engine.display.backends.terminal import TerminalDisplay
|
from engine.display.backends.terminal import TerminalDisplay
|
||||||
from engine.display.backends.websocket import WebSocketDisplay
|
from engine.display.backends.websocket import WebSocketDisplay
|
||||||
|
|
||||||
|
|
||||||
|
class BorderMode(Enum):
|
||||||
|
"""Border rendering modes for displays."""
|
||||||
|
|
||||||
|
OFF = auto() # No border
|
||||||
|
SIMPLE = auto() # Traditional border with FPS/frame time
|
||||||
|
UI = auto() # Right-side UI panel with interactive controls
|
||||||
|
|
||||||
|
|
||||||
class Display(Protocol):
|
class Display(Protocol):
|
||||||
"""Protocol for display backends.
|
"""Protocol for display backends.
|
||||||
|
|
||||||
All display backends must implement:
|
Required attributes:
|
||||||
- width, height: Terminal dimensions
|
- width: int
|
||||||
- init(width, height, reuse=False): Initialize the display
|
- height: int
|
||||||
- show(buffer): Render buffer to display
|
|
||||||
- clear(): Clear the display
|
|
||||||
- cleanup(): Shutdown the display
|
|
||||||
|
|
||||||
Optional methods for keyboard input:
|
Required methods (duck typing - actual signatures may vary):
|
||||||
- is_quit_requested(): Returns True if user pressed Ctrl+C/Q or Escape
|
- init(width, height, reuse=False)
|
||||||
- clear_quit_request(): Clears the quit request flag
|
- show(buffer, border=False)
|
||||||
|
- clear()
|
||||||
|
- cleanup()
|
||||||
|
- get_dimensions() -> (width, height)
|
||||||
|
|
||||||
The reuse flag allows attaching to an existing display instance
|
Optional attributes (for UI mode):
|
||||||
rather than creating a new window/connection.
|
- ui_panel: UIPanel instance (set by app when border=UI)
|
||||||
|
|
||||||
Keyboard input support by backend:
|
Optional methods:
|
||||||
- terminal: No native input (relies on signal handler for Ctrl+C)
|
- is_quit_requested() -> bool
|
||||||
- pygame: Supports Ctrl+C, Ctrl+Q, Escape for graceful shutdown
|
- clear_quit_request() -> None
|
||||||
- websocket: No native input (relies on signal handler for Ctrl+C)
|
|
||||||
- sixel: No native input (relies on signal handler for Ctrl+C)
|
|
||||||
- null: No native input
|
|
||||||
- kitty: Supports Ctrl+C, Ctrl+Q, Escape (via pygame-like handling)
|
|
||||||
"""
|
"""
|
||||||
|
|
||||||
width: int
|
width: int
|
||||||
height: int
|
height: int
|
||||||
|
|
||||||
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
|
||||||
"""Initialize display with dimensions.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
width: Terminal width in characters
|
|
||||||
height: Terminal height in rows
|
|
||||||
reuse: If True, attach to existing display instead of creating new
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
|
||||||
"""Show buffer on display.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
buffer: Buffer to display
|
|
||||||
border: If True, render border around buffer (default False)
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
def clear(self) -> None:
|
|
||||||
"""Clear display."""
|
|
||||||
...
|
|
||||||
|
|
||||||
def cleanup(self) -> None:
|
|
||||||
"""Shutdown display."""
|
|
||||||
...
|
|
||||||
|
|
||||||
def get_dimensions(self) -> tuple[int, int]:
|
|
||||||
"""Get current terminal dimensions.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
(width, height) in character cells
|
|
||||||
|
|
||||||
This method is called after show() to check if the display
|
|
||||||
was resized. The main loop should compare this to the current
|
|
||||||
viewport dimensions and update accordingly.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
def is_quit_requested(self) -> bool:
|
|
||||||
"""Check if user requested quit (Ctrl+C, Ctrl+Q, or Escape).
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
True if quit was requested, False otherwise
|
|
||||||
|
|
||||||
Optional method - only implemented by backends that support keyboard input.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
def clear_quit_request(self) -> None:
|
|
||||||
"""Clear the quit request flag.
|
|
||||||
|
|
||||||
Optional method - only implemented by backends that support keyboard input.
|
|
||||||
"""
|
|
||||||
...
|
|
||||||
|
|
||||||
|
|
||||||
class DisplayRegistry:
|
class DisplayRegistry:
|
||||||
"""Registry for display backends with auto-discovery."""
|
"""Registry for display backends with auto-discovery."""
|
||||||
@@ -110,22 +66,18 @@ class DisplayRegistry:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def register(cls, name: str, backend_class: type[Display]) -> None:
|
def register(cls, name: str, backend_class: type[Display]) -> None:
|
||||||
"""Register a display backend."""
|
|
||||||
cls._backends[name.lower()] = backend_class
|
cls._backends[name.lower()] = backend_class
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def get(cls, name: str) -> type[Display] | None:
|
def get(cls, name: str) -> type[Display] | None:
|
||||||
"""Get a display backend class by name."""
|
|
||||||
return cls._backends.get(name.lower())
|
return cls._backends.get(name.lower())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def list_backends(cls) -> list[str]:
|
def list_backends(cls) -> list[str]:
|
||||||
"""List all available display backend names."""
|
|
||||||
return list(cls._backends.keys())
|
return list(cls._backends.keys())
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create(cls, name: str, **kwargs) -> Display | None:
|
def create(cls, name: str, **kwargs) -> Display | None:
|
||||||
"""Create a display instance by name."""
|
|
||||||
cls.initialize()
|
cls.initialize()
|
||||||
backend_class = cls.get(name)
|
backend_class = cls.get(name)
|
||||||
if backend_class:
|
if backend_class:
|
||||||
@@ -134,31 +86,18 @@ class DisplayRegistry:
|
|||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def initialize(cls) -> None:
|
def initialize(cls) -> None:
|
||||||
"""Initialize and register all built-in backends."""
|
|
||||||
if cls._initialized:
|
if cls._initialized:
|
||||||
return
|
return
|
||||||
|
|
||||||
cls.register("terminal", TerminalDisplay)
|
cls.register("terminal", TerminalDisplay)
|
||||||
cls.register("null", NullDisplay)
|
cls.register("null", NullDisplay)
|
||||||
cls.register("websocket", WebSocketDisplay)
|
cls.register("websocket", WebSocketDisplay)
|
||||||
cls.register("sixel", SixelDisplay)
|
|
||||||
cls.register("kitty", KittyDisplay)
|
|
||||||
cls.register("pygame", PygameDisplay)
|
cls.register("pygame", PygameDisplay)
|
||||||
|
if _MODERNGL_AVAILABLE:
|
||||||
|
cls.register("moderngl", ModernGLDisplay) # type: ignore[arg-type]
|
||||||
cls._initialized = True
|
cls._initialized = True
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def create_multi(cls, names: list[str]) -> "Display | None":
|
def create_multi(cls, names: list[str]) -> MultiDisplay | None:
|
||||||
"""Create a MultiDisplay from a list of backend names.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
names: List of display backend names (e.g., ["terminal", "pygame"])
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
MultiDisplay instance or None if any backend fails
|
|
||||||
"""
|
|
||||||
from engine.display.backends.multi import MultiDisplay
|
|
||||||
|
|
||||||
displays = []
|
displays = []
|
||||||
for name in names:
|
for name in names:
|
||||||
backend = cls.create(name)
|
backend = cls.create(name)
|
||||||
@@ -166,10 +105,8 @@ class DisplayRegistry:
|
|||||||
displays.append(backend)
|
displays.append(backend)
|
||||||
else:
|
else:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if not displays:
|
if not displays:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
return MultiDisplay(displays)
|
return MultiDisplay(displays)
|
||||||
|
|
||||||
|
|
||||||
@@ -190,44 +127,28 @@ def _strip_ansi(s: str) -> str:
|
|||||||
return re.sub(r"\x1b\[[0-9;]*[a-zA-Z]", "", s)
|
return re.sub(r"\x1b\[[0-9;]*[a-zA-Z]", "", s)
|
||||||
|
|
||||||
|
|
||||||
def render_border(
|
def _render_simple_border(
|
||||||
buf: list[str], width: int, height: int, fps: float = 0.0, frame_time: float = 0.0
|
buf: list[str], width: int, height: int, fps: float = 0.0, frame_time: float = 0.0
|
||||||
) -> list[str]:
|
) -> list[str]:
|
||||||
"""Render a border around the buffer.
|
"""Render a traditional border around the buffer."""
|
||||||
|
|
||||||
Args:
|
|
||||||
buf: Input buffer (list of strings)
|
|
||||||
width: Display width in characters
|
|
||||||
height: Display height in rows
|
|
||||||
fps: Current FPS to display in top border (optional)
|
|
||||||
frame_time: Frame time in ms to display in bottom border (optional)
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
Buffer with border applied
|
|
||||||
"""
|
|
||||||
if not buf or width < 3 or height < 3:
|
if not buf or width < 3 or height < 3:
|
||||||
return buf
|
return buf
|
||||||
|
|
||||||
inner_w = width - 2
|
inner_w = width - 2
|
||||||
inner_h = height - 2
|
inner_h = height - 2
|
||||||
|
|
||||||
# Crop buffer to fit inside border
|
|
||||||
cropped = []
|
cropped = []
|
||||||
for i in range(min(inner_h, len(buf))):
|
for i in range(min(inner_h, len(buf))):
|
||||||
line = buf[i]
|
line = buf[i]
|
||||||
# Calculate visible width (excluding ANSI codes)
|
|
||||||
visible_len = len(_strip_ansi(line))
|
visible_len = len(_strip_ansi(line))
|
||||||
if visible_len > inner_w:
|
if visible_len > inner_w:
|
||||||
# Truncate carefully - this is approximate for ANSI text
|
|
||||||
cropped.append(line[:inner_w])
|
cropped.append(line[:inner_w])
|
||||||
else:
|
else:
|
||||||
cropped.append(line + " " * (inner_w - visible_len))
|
cropped.append(line + " " * (inner_w - visible_len))
|
||||||
|
|
||||||
# Pad with empty lines if needed
|
|
||||||
while len(cropped) < inner_h:
|
while len(cropped) < inner_h:
|
||||||
cropped.append(" " * inner_w)
|
cropped.append(" " * inner_w)
|
||||||
|
|
||||||
# Build borders
|
|
||||||
if fps > 0:
|
if fps > 0:
|
||||||
fps_str = f" FPS:{fps:.0f}"
|
fps_str = f" FPS:{fps:.0f}"
|
||||||
if len(fps_str) < inner_w:
|
if len(fps_str) < inner_w:
|
||||||
@@ -248,10 +169,8 @@ def render_border(
|
|||||||
else:
|
else:
|
||||||
bottom_border = "└" + "─" * inner_w + "┘"
|
bottom_border = "└" + "─" * inner_w + "┘"
|
||||||
|
|
||||||
# Build result with left/right borders
|
|
||||||
result = [top_border]
|
result = [top_border]
|
||||||
for line in cropped:
|
for line in cropped:
|
||||||
# Ensure exactly inner_w characters before adding right border
|
|
||||||
if len(line) < inner_w:
|
if len(line) < inner_w:
|
||||||
line = line + " " * (inner_w - len(line))
|
line = line + " " * (inner_w - len(line))
|
||||||
elif len(line) > inner_w:
|
elif len(line) > inner_w:
|
||||||
@@ -262,14 +181,107 @@ def render_border(
|
|||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
def render_ui_panel(
|
||||||
|
buf: list[str],
|
||||||
|
width: int,
|
||||||
|
height: int,
|
||||||
|
ui_panel,
|
||||||
|
fps: float = 0.0,
|
||||||
|
frame_time: float = 0.0,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Render buffer with a right-side UI panel."""
|
||||||
|
from engine.pipeline.ui import UIPanel
|
||||||
|
|
||||||
|
if not isinstance(ui_panel, UIPanel):
|
||||||
|
return _render_simple_border(buf, width, height, fps, frame_time)
|
||||||
|
|
||||||
|
panel_width = min(ui_panel.config.panel_width, width - 4)
|
||||||
|
main_width = width - panel_width - 1
|
||||||
|
|
||||||
|
panel_lines = ui_panel.render(panel_width, height)
|
||||||
|
|
||||||
|
main_buf = buf[: height - 2]
|
||||||
|
main_result = _render_simple_border(
|
||||||
|
main_buf, main_width + 2, height, fps, frame_time
|
||||||
|
)
|
||||||
|
|
||||||
|
combined = []
|
||||||
|
for i in range(height):
|
||||||
|
if i < len(main_result):
|
||||||
|
main_line = main_result[i]
|
||||||
|
if len(main_line) >= 2:
|
||||||
|
main_content = (
|
||||||
|
main_line[1:-1] if main_line[-1] in "│┌┐└┘" else main_line[1:]
|
||||||
|
)
|
||||||
|
main_content = main_content.ljust(main_width)[:main_width]
|
||||||
|
else:
|
||||||
|
main_content = " " * main_width
|
||||||
|
else:
|
||||||
|
main_content = " " * main_width
|
||||||
|
|
||||||
|
panel_idx = i
|
||||||
|
panel_line = (
|
||||||
|
panel_lines[panel_idx][:panel_width].ljust(panel_width)
|
||||||
|
if panel_idx < len(panel_lines)
|
||||||
|
else " " * panel_width
|
||||||
|
)
|
||||||
|
|
||||||
|
separator = "│" if 0 < i < height - 1 else "┼" if i == 0 else "┴"
|
||||||
|
combined.append(main_content + separator + panel_line)
|
||||||
|
|
||||||
|
return combined
|
||||||
|
|
||||||
|
|
||||||
|
def render_border(
|
||||||
|
buf: list[str],
|
||||||
|
width: int,
|
||||||
|
height: int,
|
||||||
|
fps: float = 0.0,
|
||||||
|
frame_time: float = 0.0,
|
||||||
|
border_mode: BorderMode | bool = BorderMode.SIMPLE,
|
||||||
|
) -> list[str]:
|
||||||
|
"""Render a border or UI panel around the buffer.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
buf: Input buffer
|
||||||
|
width: Display width
|
||||||
|
height: Display height
|
||||||
|
fps: FPS for top border
|
||||||
|
frame_time: Frame time for bottom border
|
||||||
|
border_mode: Border rendering mode
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Buffer with border/panel applied
|
||||||
|
"""
|
||||||
|
# Normalize border_mode to BorderMode enum
|
||||||
|
if isinstance(border_mode, bool):
|
||||||
|
border_mode = BorderMode.SIMPLE if border_mode else BorderMode.OFF
|
||||||
|
|
||||||
|
if border_mode == BorderMode.UI:
|
||||||
|
# UI panel requires a UIPanel instance (injected separately)
|
||||||
|
# For now, this will be called by displays that have a ui_panel attribute
|
||||||
|
# This function signature doesn't include ui_panel, so we'll handle it in render_ui_panel
|
||||||
|
# Fall back to simple border if no panel available
|
||||||
|
return _render_simple_border(buf, width, height, fps, frame_time)
|
||||||
|
elif border_mode == BorderMode.SIMPLE:
|
||||||
|
return _render_simple_border(buf, width, height, fps, frame_time)
|
||||||
|
else:
|
||||||
|
return buf
|
||||||
|
|
||||||
|
|
||||||
__all__ = [
|
__all__ = [
|
||||||
"Display",
|
"Display",
|
||||||
"DisplayRegistry",
|
"DisplayRegistry",
|
||||||
"get_monitor",
|
"get_monitor",
|
||||||
"render_border",
|
"render_border",
|
||||||
|
"render_ui_panel",
|
||||||
|
"BorderMode",
|
||||||
"TerminalDisplay",
|
"TerminalDisplay",
|
||||||
"NullDisplay",
|
"NullDisplay",
|
||||||
"WebSocketDisplay",
|
"WebSocketDisplay",
|
||||||
"SixelDisplay",
|
|
||||||
"MultiDisplay",
|
"MultiDisplay",
|
||||||
|
"PygameDisplay",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
if _MODERNGL_AVAILABLE:
|
||||||
|
__all__.append("ModernGLDisplay")
|
||||||
|
|||||||
@@ -1,180 +0,0 @@
|
|||||||
"""
|
|
||||||
Kitty graphics display backend - renders using kitty's native graphics protocol.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import time
|
|
||||||
|
|
||||||
from engine.display.renderer import get_default_font_path, parse_ansi
|
|
||||||
|
|
||||||
|
|
||||||
def _encode_kitty_graphic(image_data: bytes, width: int, height: int) -> bytes:
|
|
||||||
"""Encode image data using kitty's graphics protocol."""
|
|
||||||
import base64
|
|
||||||
|
|
||||||
encoded = base64.b64encode(image_data).decode("ascii")
|
|
||||||
|
|
||||||
chunks = []
|
|
||||||
for i in range(0, len(encoded), 4096):
|
|
||||||
chunk = encoded[i : i + 4096]
|
|
||||||
if i == 0:
|
|
||||||
chunks.append(f"\x1b_Gf=100,t=d,s={width},v={height},c=1,r=1;{chunk}\x1b\\")
|
|
||||||
else:
|
|
||||||
chunks.append(f"\x1b_Gm={height};{chunk}\x1b\\")
|
|
||||||
|
|
||||||
return "".join(chunks).encode("utf-8")
|
|
||||||
|
|
||||||
|
|
||||||
class KittyDisplay:
|
|
||||||
"""Kitty graphics display backend using kitty's native protocol."""
|
|
||||||
|
|
||||||
width: int = 80
|
|
||||||
height: int = 24
|
|
||||||
|
|
||||||
def __init__(self, cell_width: int = 9, cell_height: int = 16):
|
|
||||||
self.width = 80
|
|
||||||
self.height = 24
|
|
||||||
self.cell_width = cell_width
|
|
||||||
self.cell_height = cell_height
|
|
||||||
self._initialized = False
|
|
||||||
self._font_path = None
|
|
||||||
|
|
||||||
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
|
||||||
"""Initialize display with dimensions.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
width: Terminal width in characters
|
|
||||||
height: Terminal height in rows
|
|
||||||
reuse: Ignored for KittyDisplay (protocol doesn't support reuse)
|
|
||||||
"""
|
|
||||||
self.width = width
|
|
||||||
self.height = height
|
|
||||||
self._initialized = True
|
|
||||||
|
|
||||||
def _get_font_path(self) -> str | None:
|
|
||||||
"""Get font path from env or detect common locations."""
|
|
||||||
import os
|
|
||||||
|
|
||||||
if self._font_path:
|
|
||||||
return self._font_path
|
|
||||||
|
|
||||||
env_font = os.environ.get("MAINLINE_KITTY_FONT")
|
|
||||||
if env_font and os.path.exists(env_font):
|
|
||||||
self._font_path = env_font
|
|
||||||
return env_font
|
|
||||||
|
|
||||||
font_path = get_default_font_path()
|
|
||||||
if font_path:
|
|
||||||
self._font_path = font_path
|
|
||||||
|
|
||||||
return self._font_path
|
|
||||||
|
|
||||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
|
||||||
import sys
|
|
||||||
|
|
||||||
t0 = time.perf_counter()
|
|
||||||
|
|
||||||
# Get metrics for border display
|
|
||||||
fps = 0.0
|
|
||||||
frame_time = 0.0
|
|
||||||
from engine.display import get_monitor
|
|
||||||
|
|
||||||
monitor = get_monitor()
|
|
||||||
if monitor:
|
|
||||||
stats = monitor.get_stats()
|
|
||||||
avg_ms = stats.get("pipeline", {}).get("avg_ms", 0) if stats else 0
|
|
||||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
|
||||||
if avg_ms and frame_count > 0:
|
|
||||||
fps = 1000.0 / avg_ms
|
|
||||||
frame_time = avg_ms
|
|
||||||
|
|
||||||
# Apply border if requested
|
|
||||||
if border:
|
|
||||||
from engine.display import render_border
|
|
||||||
|
|
||||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
|
||||||
|
|
||||||
img_width = self.width * self.cell_width
|
|
||||||
img_height = self.height * self.cell_height
|
|
||||||
|
|
||||||
try:
|
|
||||||
from PIL import Image, ImageDraw, ImageFont
|
|
||||||
except ImportError:
|
|
||||||
return
|
|
||||||
|
|
||||||
img = Image.new("RGBA", (img_width, img_height), (0, 0, 0, 255))
|
|
||||||
draw = ImageDraw.Draw(img)
|
|
||||||
|
|
||||||
font_path = self._get_font_path()
|
|
||||||
font = None
|
|
||||||
if font_path:
|
|
||||||
try:
|
|
||||||
font = ImageFont.truetype(font_path, self.cell_height - 2)
|
|
||||||
except Exception:
|
|
||||||
font = None
|
|
||||||
|
|
||||||
if font is None:
|
|
||||||
try:
|
|
||||||
font = ImageFont.load_default()
|
|
||||||
except Exception:
|
|
||||||
font = None
|
|
||||||
|
|
||||||
for row_idx, line in enumerate(buffer[: self.height]):
|
|
||||||
if row_idx >= self.height:
|
|
||||||
break
|
|
||||||
|
|
||||||
tokens = parse_ansi(line)
|
|
||||||
x_pos = 0
|
|
||||||
y_pos = row_idx * self.cell_height
|
|
||||||
|
|
||||||
for text, fg, bg, bold in tokens:
|
|
||||||
if not text:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if bg != (0, 0, 0):
|
|
||||||
bbox = draw.textbbox((x_pos, y_pos), text, font=font)
|
|
||||||
draw.rectangle(bbox, fill=(*bg, 255))
|
|
||||||
|
|
||||||
if bold and font:
|
|
||||||
draw.text((x_pos - 1, y_pos - 1), text, fill=(*fg, 255), font=font)
|
|
||||||
|
|
||||||
draw.text((x_pos, y_pos), text, fill=(*fg, 255), font=font)
|
|
||||||
|
|
||||||
if font:
|
|
||||||
x_pos += draw.textlength(text, font=font)
|
|
||||||
|
|
||||||
from io import BytesIO
|
|
||||||
|
|
||||||
output = BytesIO()
|
|
||||||
img.save(output, format="PNG")
|
|
||||||
png_data = output.getvalue()
|
|
||||||
|
|
||||||
graphic = _encode_kitty_graphic(png_data, img_width, img_height)
|
|
||||||
|
|
||||||
sys.stdout.buffer.write(graphic)
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
|
||||||
|
|
||||||
from engine.display import get_monitor
|
|
||||||
|
|
||||||
monitor = get_monitor()
|
|
||||||
if monitor:
|
|
||||||
chars_in = sum(len(line) for line in buffer)
|
|
||||||
monitor.record_effect("kitty_display", elapsed_ms, chars_in, chars_in)
|
|
||||||
|
|
||||||
def clear(self) -> None:
|
|
||||||
import sys
|
|
||||||
|
|
||||||
sys.stdout.buffer.write(b"\x1b_Ga=d\x1b\\")
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
def cleanup(self) -> None:
|
|
||||||
self.clear()
|
|
||||||
|
|
||||||
def get_dimensions(self) -> tuple[int, int]:
|
|
||||||
"""Get current dimensions.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
(width, height) in character cells
|
|
||||||
"""
|
|
||||||
return (self.width, self.height)
|
|
||||||
@@ -1,228 +0,0 @@
|
|||||||
"""
|
|
||||||
Sixel graphics display backend - renders to sixel graphics in terminal.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import time
|
|
||||||
|
|
||||||
from engine.display.renderer import get_default_font_path, parse_ansi
|
|
||||||
|
|
||||||
|
|
||||||
def _encode_sixel(image) -> str:
|
|
||||||
"""Encode a PIL Image to sixel format (pure Python)."""
|
|
||||||
img = image.convert("RGBA")
|
|
||||||
width, height = img.size
|
|
||||||
pixels = img.load()
|
|
||||||
|
|
||||||
palette = []
|
|
||||||
pixel_palette_idx = {}
|
|
||||||
|
|
||||||
def get_color_idx(r, g, b, a):
|
|
||||||
if a < 128:
|
|
||||||
return -1
|
|
||||||
key = (r // 32, g // 32, b // 32)
|
|
||||||
if key not in pixel_palette_idx:
|
|
||||||
idx = len(palette)
|
|
||||||
if idx < 256:
|
|
||||||
palette.append((r, g, b))
|
|
||||||
pixel_palette_idx[key] = idx
|
|
||||||
return pixel_palette_idx.get(key, 0)
|
|
||||||
|
|
||||||
for y in range(height):
|
|
||||||
for x in range(width):
|
|
||||||
r, g, b, a = pixels[x, y]
|
|
||||||
get_color_idx(r, g, b, a)
|
|
||||||
|
|
||||||
if not palette:
|
|
||||||
return ""
|
|
||||||
|
|
||||||
if len(palette) == 1:
|
|
||||||
palette = [palette[0], (0, 0, 0)]
|
|
||||||
|
|
||||||
sixel_data = []
|
|
||||||
sixel_data.append(
|
|
||||||
f'"{"".join(f"#{i};2;{r};{g};{b}" for i, (r, g, b) in enumerate(palette))}'
|
|
||||||
)
|
|
||||||
|
|
||||||
for x in range(width):
|
|
||||||
col_data = []
|
|
||||||
for y in range(0, height, 6):
|
|
||||||
bits = 0
|
|
||||||
color_idx = -1
|
|
||||||
for dy in range(6):
|
|
||||||
if y + dy < height:
|
|
||||||
r, g, b, a = pixels[x, y + dy]
|
|
||||||
if a >= 128:
|
|
||||||
bits |= 1 << dy
|
|
||||||
idx = get_color_idx(r, g, b, a)
|
|
||||||
if color_idx == -1:
|
|
||||||
color_idx = idx
|
|
||||||
elif color_idx != idx:
|
|
||||||
color_idx = -2
|
|
||||||
|
|
||||||
if color_idx >= 0:
|
|
||||||
col_data.append(
|
|
||||||
chr(63 + color_idx) + chr(63 + bits)
|
|
||||||
if bits
|
|
||||||
else chr(63 + color_idx) + "?"
|
|
||||||
)
|
|
||||||
elif color_idx == -2:
|
|
||||||
pass
|
|
||||||
|
|
||||||
if col_data:
|
|
||||||
sixel_data.append("".join(col_data) + "$")
|
|
||||||
else:
|
|
||||||
sixel_data.append("-" if x < width - 1 else "$")
|
|
||||||
|
|
||||||
sixel_data.append("\x1b\\")
|
|
||||||
|
|
||||||
return "\x1bPq" + "".join(sixel_data)
|
|
||||||
|
|
||||||
|
|
||||||
class SixelDisplay:
|
|
||||||
"""Sixel graphics display backend - renders to sixel graphics in terminal."""
|
|
||||||
|
|
||||||
width: int = 80
|
|
||||||
height: int = 24
|
|
||||||
|
|
||||||
def __init__(self, cell_width: int = 9, cell_height: int = 16):
|
|
||||||
self.width = 80
|
|
||||||
self.height = 24
|
|
||||||
self.cell_width = cell_width
|
|
||||||
self.cell_height = cell_height
|
|
||||||
self._initialized = False
|
|
||||||
self._font_path = None
|
|
||||||
|
|
||||||
def _get_font_path(self) -> str | None:
|
|
||||||
"""Get font path from env or detect common locations."""
|
|
||||||
import os
|
|
||||||
|
|
||||||
if self._font_path:
|
|
||||||
return self._font_path
|
|
||||||
|
|
||||||
env_font = os.environ.get("MAINLINE_SIXEL_FONT")
|
|
||||||
if env_font and os.path.exists(env_font):
|
|
||||||
self._font_path = env_font
|
|
||||||
return env_font
|
|
||||||
|
|
||||||
font_path = get_default_font_path()
|
|
||||||
if font_path:
|
|
||||||
self._font_path = font_path
|
|
||||||
|
|
||||||
return self._font_path
|
|
||||||
|
|
||||||
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
|
||||||
"""Initialize display with dimensions.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
width: Terminal width in characters
|
|
||||||
height: Terminal height in rows
|
|
||||||
reuse: Ignored for SixelDisplay
|
|
||||||
"""
|
|
||||||
self.width = width
|
|
||||||
self.height = height
|
|
||||||
self._initialized = True
|
|
||||||
|
|
||||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
|
||||||
import sys
|
|
||||||
|
|
||||||
t0 = time.perf_counter()
|
|
||||||
|
|
||||||
# Get metrics for border display
|
|
||||||
fps = 0.0
|
|
||||||
frame_time = 0.0
|
|
||||||
from engine.display import get_monitor
|
|
||||||
|
|
||||||
monitor = get_monitor()
|
|
||||||
if monitor:
|
|
||||||
stats = monitor.get_stats()
|
|
||||||
avg_ms = stats.get("pipeline", {}).get("avg_ms", 0) if stats else 0
|
|
||||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
|
||||||
if avg_ms and frame_count > 0:
|
|
||||||
fps = 1000.0 / avg_ms
|
|
||||||
frame_time = avg_ms
|
|
||||||
|
|
||||||
# Apply border if requested
|
|
||||||
if border:
|
|
||||||
from engine.display import render_border
|
|
||||||
|
|
||||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
|
||||||
|
|
||||||
img_width = self.width * self.cell_width
|
|
||||||
img_height = self.height * self.cell_height
|
|
||||||
|
|
||||||
try:
|
|
||||||
from PIL import Image, ImageDraw, ImageFont
|
|
||||||
except ImportError:
|
|
||||||
return
|
|
||||||
|
|
||||||
img = Image.new("RGBA", (img_width, img_height), (0, 0, 0, 255))
|
|
||||||
draw = ImageDraw.Draw(img)
|
|
||||||
|
|
||||||
font_path = self._get_font_path()
|
|
||||||
font = None
|
|
||||||
if font_path:
|
|
||||||
try:
|
|
||||||
font = ImageFont.truetype(font_path, self.cell_height - 2)
|
|
||||||
except Exception:
|
|
||||||
font = None
|
|
||||||
|
|
||||||
if font is None:
|
|
||||||
try:
|
|
||||||
font = ImageFont.load_default()
|
|
||||||
except Exception:
|
|
||||||
font = None
|
|
||||||
|
|
||||||
for row_idx, line in enumerate(buffer[: self.height]):
|
|
||||||
if row_idx >= self.height:
|
|
||||||
break
|
|
||||||
|
|
||||||
tokens = parse_ansi(line)
|
|
||||||
x_pos = 0
|
|
||||||
y_pos = row_idx * self.cell_height
|
|
||||||
|
|
||||||
for text, fg, bg, bold in tokens:
|
|
||||||
if not text:
|
|
||||||
continue
|
|
||||||
|
|
||||||
if bg != (0, 0, 0):
|
|
||||||
bbox = draw.textbbox((x_pos, y_pos), text, font=font)
|
|
||||||
draw.rectangle(bbox, fill=(*bg, 255))
|
|
||||||
|
|
||||||
if bold and font:
|
|
||||||
draw.text((x_pos - 1, y_pos - 1), text, fill=(*fg, 255), font=font)
|
|
||||||
|
|
||||||
draw.text((x_pos, y_pos), text, fill=(*fg, 255), font=font)
|
|
||||||
|
|
||||||
if font:
|
|
||||||
x_pos += draw.textlength(text, font=font)
|
|
||||||
|
|
||||||
sixel = _encode_sixel(img)
|
|
||||||
|
|
||||||
sys.stdout.buffer.write(sixel.encode("utf-8"))
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
|
||||||
|
|
||||||
from engine.display import get_monitor
|
|
||||||
|
|
||||||
monitor = get_monitor()
|
|
||||||
if monitor:
|
|
||||||
chars_in = sum(len(line) for line in buffer)
|
|
||||||
monitor.record_effect("sixel_display", elapsed_ms, chars_in, chars_in)
|
|
||||||
|
|
||||||
def clear(self) -> None:
|
|
||||||
import sys
|
|
||||||
|
|
||||||
sys.stdout.buffer.write(b"\x1b[2J\x1b[H")
|
|
||||||
sys.stdout.flush()
|
|
||||||
|
|
||||||
def cleanup(self) -> None:
|
|
||||||
pass
|
|
||||||
|
|
||||||
def get_dimensions(self) -> tuple[int, int]:
|
|
||||||
"""Get current dimensions.
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
(width, height) in character cells
|
|
||||||
"""
|
|
||||||
return (self.width, self.height)
|
|
||||||
@@ -1,5 +1,14 @@
|
|||||||
"""
|
"""
|
||||||
WebSocket display backend - broadcasts frame buffer to connected web clients.
|
WebSocket display backend - broadcasts frame buffer to connected web clients.
|
||||||
|
|
||||||
|
TODO: Transform to a true streaming backend with:
|
||||||
|
- Proper WebSocket message streaming (currently sends full buffer each frame)
|
||||||
|
- Connection pooling and backpressure handling
|
||||||
|
- Binary protocol for efficiency (instead of JSON)
|
||||||
|
- Client management with proper async handling
|
||||||
|
- Mark for deprecation if replaced by a new streaming implementation
|
||||||
|
|
||||||
|
Current implementation: Simple broadcast of text frames to all connected clients.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
|||||||
@@ -77,11 +77,13 @@ class TestDisplayRegistry:
|
|||||||
DisplayRegistry.initialize()
|
DisplayRegistry.initialize()
|
||||||
assert DisplayRegistry.get("terminal") == TerminalDisplay
|
assert DisplayRegistry.get("terminal") == TerminalDisplay
|
||||||
assert DisplayRegistry.get("null") == NullDisplay
|
assert DisplayRegistry.get("null") == NullDisplay
|
||||||
from engine.display.backends.sixel import SixelDisplay
|
from engine.display.backends.pygame import PygameDisplay
|
||||||
from engine.display.backends.websocket import WebSocketDisplay
|
from engine.display.backends.websocket import WebSocketDisplay
|
||||||
|
|
||||||
assert DisplayRegistry.get("websocket") == WebSocketDisplay
|
assert DisplayRegistry.get("websocket") == WebSocketDisplay
|
||||||
assert DisplayRegistry.get("sixel") == SixelDisplay
|
assert DisplayRegistry.get("pygame") == PygameDisplay
|
||||||
|
# Removed backends (sixel, kitty) should not be present
|
||||||
|
assert DisplayRegistry.get("sixel") is None
|
||||||
|
|
||||||
def test_initialize_idempotent(self):
|
def test_initialize_idempotent(self):
|
||||||
"""initialize can be called multiple times safely."""
|
"""initialize can be called multiple times safely."""
|
||||||
|
|||||||
@@ -1,128 +0,0 @@
|
|||||||
"""
|
|
||||||
Tests for engine.display.backends.sixel module.
|
|
||||||
"""
|
|
||||||
|
|
||||||
from unittest.mock import MagicMock, patch
|
|
||||||
|
|
||||||
|
|
||||||
class TestSixelDisplay:
|
|
||||||
"""Tests for SixelDisplay class."""
|
|
||||||
|
|
||||||
def test_init_stores_dimensions(self):
|
|
||||||
"""init stores dimensions."""
|
|
||||||
from engine.display.backends.sixel import SixelDisplay
|
|
||||||
|
|
||||||
display = SixelDisplay()
|
|
||||||
display.init(80, 24)
|
|
||||||
assert display.width == 80
|
|
||||||
assert display.height == 24
|
|
||||||
|
|
||||||
def test_init_custom_cell_size(self):
|
|
||||||
"""init accepts custom cell size."""
|
|
||||||
from engine.display.backends.sixel import SixelDisplay
|
|
||||||
|
|
||||||
display = SixelDisplay(cell_width=12, cell_height=18)
|
|
||||||
assert display.cell_width == 12
|
|
||||||
assert display.cell_height == 18
|
|
||||||
|
|
||||||
def test_show_handles_empty_buffer(self):
|
|
||||||
"""show handles empty buffer gracefully."""
|
|
||||||
from engine.display.backends.sixel import SixelDisplay
|
|
||||||
|
|
||||||
display = SixelDisplay()
|
|
||||||
display.init(80, 24)
|
|
||||||
|
|
||||||
with patch("engine.display.backends.sixel._encode_sixel") as mock_encode:
|
|
||||||
mock_encode.return_value = ""
|
|
||||||
display.show([])
|
|
||||||
|
|
||||||
def test_show_handles_pil_import_error(self):
|
|
||||||
"""show gracefully handles missing PIL."""
|
|
||||||
from engine.display.backends.sixel import SixelDisplay
|
|
||||||
|
|
||||||
display = SixelDisplay()
|
|
||||||
display.init(80, 24)
|
|
||||||
|
|
||||||
with patch.dict("sys.modules", {"PIL": None}):
|
|
||||||
display.show(["test line"])
|
|
||||||
|
|
||||||
def test_clear_sends_escape_sequence(self):
|
|
||||||
"""clear sends clear screen escape sequence."""
|
|
||||||
from engine.display.backends.sixel import SixelDisplay
|
|
||||||
|
|
||||||
display = SixelDisplay()
|
|
||||||
|
|
||||||
with patch("sys.stdout") as mock_stdout:
|
|
||||||
display.clear()
|
|
||||||
mock_stdout.buffer.write.assert_called()
|
|
||||||
|
|
||||||
def test_cleanup_does_nothing(self):
|
|
||||||
"""cleanup does nothing."""
|
|
||||||
from engine.display.backends.sixel import SixelDisplay
|
|
||||||
|
|
||||||
display = SixelDisplay()
|
|
||||||
display.cleanup()
|
|
||||||
|
|
||||||
|
|
||||||
class TestSixelAnsiParsing:
|
|
||||||
"""Tests for ANSI parsing in SixelDisplay."""
|
|
||||||
|
|
||||||
def test_parse_empty_string(self):
|
|
||||||
"""handles empty string."""
|
|
||||||
from engine.display.renderer import parse_ansi
|
|
||||||
|
|
||||||
result = parse_ansi("")
|
|
||||||
assert len(result) > 0
|
|
||||||
|
|
||||||
def test_parse_plain_text(self):
|
|
||||||
"""parses plain text without ANSI codes."""
|
|
||||||
from engine.display.renderer import parse_ansi
|
|
||||||
|
|
||||||
result = parse_ansi("hello world")
|
|
||||||
assert len(result) == 1
|
|
||||||
text, fg, bg, bold = result[0]
|
|
||||||
assert text == "hello world"
|
|
||||||
|
|
||||||
def test_parse_with_color_codes(self):
|
|
||||||
"""parses ANSI color codes."""
|
|
||||||
from engine.display.renderer import parse_ansi
|
|
||||||
|
|
||||||
result = parse_ansi("\033[31mred\033[0m")
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0][0] == "red"
|
|
||||||
assert result[0][1] == (205, 49, 49)
|
|
||||||
|
|
||||||
def test_parse_with_bold(self):
|
|
||||||
"""parses bold codes."""
|
|
||||||
from engine.display.renderer import parse_ansi
|
|
||||||
|
|
||||||
result = parse_ansi("\033[1mbold\033[0m")
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0][0] == "bold"
|
|
||||||
assert result[0][3] is True
|
|
||||||
|
|
||||||
def test_parse_256_color(self):
|
|
||||||
"""parses 256 color codes."""
|
|
||||||
from engine.display.renderer import parse_ansi
|
|
||||||
|
|
||||||
result = parse_ansi("\033[38;5;196mred\033[0m")
|
|
||||||
assert len(result) == 1
|
|
||||||
assert result[0][0] == "red"
|
|
||||||
|
|
||||||
|
|
||||||
class TestSixelEncoding:
|
|
||||||
"""Tests for Sixel encoding."""
|
|
||||||
|
|
||||||
def test_encode_empty_image(self):
|
|
||||||
"""handles empty image."""
|
|
||||||
from engine.display.backends.sixel import _encode_sixel
|
|
||||||
|
|
||||||
with patch("PIL.Image.Image") as mock_image:
|
|
||||||
mock_img_instance = MagicMock()
|
|
||||||
mock_img_instance.convert.return_value = mock_img_instance
|
|
||||||
mock_img_instance.size = (0, 0)
|
|
||||||
mock_img_instance.load.return_value = {}
|
|
||||||
mock_image.return_value = mock_img_instance
|
|
||||||
|
|
||||||
result = _encode_sixel(mock_img_instance)
|
|
||||||
assert result == ""
|
|
||||||
Reference in New Issue
Block a user