""" Display output abstraction - allows swapping output backends. Protocol: - init(width, height): Initialize display with terminal dimensions - show(buffer): Render buffer (list of strings) to display - clear(): Clear the display - cleanup(): Shutdown display """ import time from typing import Protocol class Display(Protocol): """Protocol for display backends.""" def init(self, width: int, height: int) -> None: """Initialize display with dimensions.""" ... def show(self, buffer: list[str]) -> None: """Show buffer on display.""" ... def clear(self) -> None: """Clear display.""" ... def cleanup(self) -> None: """Shutdown display.""" ... def get_monitor(): """Get the performance monitor.""" try: from engine.effects.performance import get_monitor as _get_monitor return _get_monitor() except Exception: return None class TerminalDisplay: """ANSI terminal display backend.""" def __init__(self): self.width = 80 self.height = 24 def init(self, width: int, height: int) -> None: from engine.terminal import CURSOR_OFF self.width = width self.height = height print(CURSOR_OFF, end="", flush=True) def show(self, buffer: list[str]) -> None: import sys t0 = time.perf_counter() sys.stdout.buffer.write("".join(buffer).encode()) sys.stdout.flush() elapsed_ms = (time.perf_counter() - t0) * 1000 monitor = get_monitor() if monitor: chars_in = sum(len(line) for line in buffer) monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in) def clear(self) -> None: from engine.terminal import CLR print(CLR, end="", flush=True) def cleanup(self) -> None: from engine.terminal import CURSOR_ON print(CURSOR_ON, end="", flush=True) class NullDisplay: """Headless/null display - discards all output.""" def init(self, width: int, height: int) -> None: self.width = width self.height = height def show(self, buffer: list[str]) -> None: monitor = get_monitor() if monitor: t0 = time.perf_counter() chars_in = sum(len(line) for line in buffer) elapsed_ms = (time.perf_counter() - t0) * 1000 monitor.record_effect("null_display", elapsed_ms, chars_in, chars_in) def clear(self) -> None: pass def cleanup(self) -> None: pass class MultiDisplay: """Display that forwards to multiple displays.""" def __init__(self, displays: list[Display]): self.displays = displays self.width = 80 self.height = 24 def init(self, width: int, height: int) -> None: self.width = width self.height = height for d in self.displays: d.init(width, height) def show(self, buffer: list[str]) -> None: for d in self.displays: d.show(buffer) def clear(self) -> None: for d in self.displays: d.clear() def cleanup(self) -> None: for d in self.displays: d.cleanup() def _parse_ansi( text: str, ) -> list[tuple[str, tuple[int, int, int], tuple[int, int, int], bool]]: """Parse ANSI text into tokens with fg/bg colors. Returns list of (text, fg_rgb, bg_rgb, bold). """ tokens = [] current_text = "" fg = (204, 204, 204) bg = (0, 0, 0) bold = False i = 0 ANSI_COLORS = { 0: (0, 0, 0), 1: (205, 49, 49), 2: (13, 188, 121), 3: (229, 229, 16), 4: (36, 114, 200), 5: (188, 63, 188), 6: (17, 168, 205), 7: (229, 229, 229), 8: (102, 102, 102), 9: (241, 76, 76), 10: (35, 209, 139), 11: (245, 245, 67), 12: (59, 142, 234), 13: (214, 112, 214), 14: (41, 184, 219), 15: (255, 255, 255), } while i < len(text): char = text[i] if char == "\x1b" and i + 1 < len(text) and text[i + 1] == "[": if current_text: tokens.append((current_text, fg, bg, bold)) current_text = "" i += 2 code = "" while i < len(text): c = text[i] if c.isalpha(): break code += c i += 1 if code: codes = code.split(";") for c in codes: try: n = int(c) if c else 0 except ValueError: continue if n == 0: fg = (204, 204, 204) bg = (0, 0, 0) bold = False elif n == 1: bold = True elif n == 22: bold = False elif n == 39: fg = (204, 204, 204) elif n == 49: bg = (0, 0, 0) elif 30 <= n <= 37: fg = ANSI_COLORS.get(n - 30 + (8 if bold else 0), fg) elif 40 <= n <= 47: bg = ANSI_COLORS.get(n - 40, bg) elif 90 <= n <= 97: fg = ANSI_COLORS.get(n - 90 + 8, fg) elif 100 <= n <= 107: bg = ANSI_COLORS.get(n - 100 + 8, bg) elif 1 <= n <= 256: if n < 16: fg = ANSI_COLORS.get(n, fg) elif n < 232: c = n - 16 r = (c // 36) * 51 g = ((c % 36) // 6) * 51 b = (c % 6) * 51 fg = (r, g, b) else: gray = (n - 232) * 10 + 8 fg = (gray, gray, gray) else: current_text += char i += 1 if current_text: tokens.append((current_text, fg, bg, bold)) return tokens if tokens else [("", fg, bg, bold)] def _encode_sixel(image) -> str: """Encode a PIL Image to sixel format (pure Python).""" img = image.convert("RGBA") width, height = img.size pixels = img.load() palette = [] pixel_palette_idx = {} def get_color_idx(r, g, b, a): if a < 128: return -1 key = (r // 32, g // 32, b // 32) if key not in pixel_palette_idx: idx = len(palette) if idx < 256: palette.append((r, g, b)) pixel_palette_idx[key] = idx return pixel_palette_idx.get(key, 0) for y in range(height): for x in range(width): r, g, b, a = pixels[x, y] get_color_idx(r, g, b, a) if not palette: return "" if len(palette) == 1: palette = [palette[0], (0, 0, 0)] sixel_data = [] sixel_data.append( f'"{"".join(f"#{i};2;{r};{g};{b}" for i, (r, g, b) in enumerate(palette))}' ) for x in range(width): col_data = [] for y in range(0, height, 6): bits = 0 color_idx = -1 for dy in range(6): if y + dy < height: r, g, b, a = pixels[x, y + dy] if a >= 128: bits |= 1 << dy idx = get_color_idx(r, g, b, a) if color_idx == -1: color_idx = idx elif color_idx != idx: color_idx = -2 if color_idx >= 0: col_data.append( chr(63 + color_idx) + chr(63 + bits) if bits else chr(63 + color_idx) + "?" ) elif color_idx == -2: pass if col_data: sixel_data.append("".join(col_data) + "$") else: sixel_data.append("-" if x < width - 1 else "$") sixel_data.append("\x1b\\") return "\x1bPq" + "".join(sixel_data) class SixelDisplay: """Sixel graphics display backend - renders to sixel graphics in terminal.""" def __init__(self, cell_width: int = 9, cell_height: int = 16): self.width = 80 self.height = 24 self.cell_width = cell_width self.cell_height = cell_height self._initialized = False def init(self, width: int, height: int) -> None: self.width = width self.height = height self._initialized = True def show(self, buffer: list[str]) -> None: import sys t0 = time.perf_counter() img_width = self.width * self.cell_width img_height = self.height * self.cell_height try: from PIL import Image, ImageDraw, ImageFont except ImportError: return img = Image.new("RGBA", (img_width, img_height), (0, 0, 0, 255)) draw = ImageDraw.Draw(img) try: font = ImageFont.truetype( "/usr/share/fonts/truetype/dejavu/DejaVuSansMono.ttf", self.cell_height - 2, ) except Exception: try: font = ImageFont.load_default() except Exception: font = None for row_idx, line in enumerate(buffer[: self.height]): if row_idx >= self.height: break tokens = _parse_ansi(line) x_pos = 0 y_pos = row_idx * self.cell_height for text, fg, bg, bold in tokens: if not text: continue if bg != (0, 0, 0): bbox = draw.textbbox((x_pos, y_pos), text, font=font) draw.rectangle(bbox, fill=(*bg, 255)) if bold and font: draw.text((x_pos - 1, y_pos - 1), text, fill=(*fg, 255), font=font) draw.text((x_pos, y_pos), text, fill=(*fg, 255), font=font) if font: x_pos += draw.textlength(text, font=font) sixel = _encode_sixel(img) sys.stdout.buffer.write(sixel.encode("utf-8")) sys.stdout.flush() elapsed_ms = (time.perf_counter() - t0) * 1000 monitor = get_monitor() if monitor: chars_in = sum(len(line) for line in buffer) monitor.record_effect("sixel_display", elapsed_ms, chars_in, chars_in) def clear(self) -> None: import sys sys.stdout.buffer.write(b"\x1b[2J\x1b[H") sys.stdout.flush() def cleanup(self) -> None: pass