forked from genewildish/Mainline
- Add _quit_requested flag to TerminalDisplay - Add request_quit() method to TerminalDisplay - Handle 'ctrl_c' key in REPL input loops in both pipeline_runner.py and main.py - When Ctrl+C is pressed, request_quit() is called which sets the flag - The main loop checks is_quit_requested() and raises KeyboardInterrupt
263 lines
9.2 KiB
Python
263 lines
9.2 KiB
Python
"""
|
|
ANSI terminal display backend.
|
|
"""
|
|
|
|
import os
|
|
import select
|
|
import sys
|
|
import termios
|
|
import tty
|
|
|
|
|
|
class TerminalDisplay:
|
|
"""ANSI terminal display backend.
|
|
|
|
Renders buffer to stdout using ANSI escape codes.
|
|
Supports reuse - when reuse=True, skips re-initializing terminal state.
|
|
Auto-detects terminal dimensions on init.
|
|
"""
|
|
|
|
width: int = 80
|
|
height: int = 24
|
|
_initialized: bool = False
|
|
|
|
def __init__(self, target_fps: float = 30.0):
|
|
self.target_fps = target_fps
|
|
self._frame_period = 1.0 / target_fps if target_fps > 0 else 0
|
|
self._last_frame_time = 0.0
|
|
self._cached_dimensions: tuple[int, int] | None = None
|
|
self._raw_mode_enabled: bool = False
|
|
self._original_termios: list = []
|
|
self._quit_requested: bool = False
|
|
|
|
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
|
"""Initialize display with dimensions.
|
|
|
|
If width/height are not provided (0/None), auto-detects terminal size.
|
|
Otherwise uses provided dimensions or falls back to terminal size
|
|
if the provided dimensions exceed terminal capacity.
|
|
|
|
Args:
|
|
width: Desired terminal width (0 = auto-detect)
|
|
height: Desired terminal height (0 = auto-detect)
|
|
reuse: If True, skip terminal re-initialization
|
|
"""
|
|
from engine.terminal import CURSOR_OFF
|
|
|
|
# Auto-detect terminal size (handle case where no terminal)
|
|
try:
|
|
term_size = os.get_terminal_size()
|
|
term_width = term_size.columns
|
|
term_height = term_size.lines
|
|
except OSError:
|
|
# No terminal available (e.g., in tests)
|
|
term_width = width if width > 0 else 80
|
|
term_height = height if height > 0 else 24
|
|
|
|
# Use provided dimensions if valid, otherwise use terminal size
|
|
if width > 0 and height > 0:
|
|
self.width = min(width, term_width)
|
|
self.height = min(height, term_height)
|
|
else:
|
|
self.width = term_width
|
|
self.height = term_height
|
|
|
|
if not reuse or not self._initialized:
|
|
print(CURSOR_OFF, end="", flush=True)
|
|
self._initialized = True
|
|
|
|
def get_dimensions(self) -> tuple[int, int]:
|
|
"""Get current terminal dimensions.
|
|
|
|
Returns cached dimensions to avoid querying terminal every frame,
|
|
which can cause inconsistent results. Dimensions are only refreshed
|
|
when they actually change.
|
|
|
|
Returns:
|
|
(width, height) in character cells
|
|
"""
|
|
try:
|
|
term_size = os.get_terminal_size()
|
|
new_dims = (term_size.columns, term_size.lines)
|
|
except OSError:
|
|
new_dims = (self.width, self.height)
|
|
|
|
# Only update cached dimensions if they actually changed
|
|
if self._cached_dimensions is None or self._cached_dimensions != new_dims:
|
|
self._cached_dimensions = new_dims
|
|
self.width = new_dims[0]
|
|
self.height = new_dims[1]
|
|
|
|
return self._cached_dimensions
|
|
|
|
def show(
|
|
self, buffer: list[str], border: bool = False, positioning: str = "mixed"
|
|
) -> None:
|
|
"""Display buffer with optional border and positioning mode.
|
|
|
|
Args:
|
|
buffer: List of lines to display
|
|
border: Whether to apply border
|
|
positioning: Positioning mode - "mixed" (default), "absolute", or "relative"
|
|
"""
|
|
import sys
|
|
|
|
from engine.display import get_monitor, render_border
|
|
|
|
# Note: Frame rate limiting is handled by the caller (e.g., FrameTimer).
|
|
# This display renders every frame it receives.
|
|
|
|
# Get metrics for border display
|
|
fps = 0.0
|
|
frame_time = 0.0
|
|
monitor = get_monitor()
|
|
if monitor:
|
|
stats = monitor.get_stats()
|
|
avg_ms = stats.get("pipeline", {}).get("avg_ms", 0) if stats else 0
|
|
frame_count = stats.get("frame_count", 0) if stats else 0
|
|
if avg_ms and frame_count > 0:
|
|
fps = 1000.0 / avg_ms
|
|
frame_time = avg_ms
|
|
|
|
# Apply border if requested
|
|
from engine.display import BorderMode
|
|
|
|
if border and border != BorderMode.OFF:
|
|
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
|
|
|
# Apply positioning based on mode
|
|
if positioning == "absolute":
|
|
# All lines should have cursor positioning codes
|
|
# Join with newlines (cursor codes already in buffer)
|
|
output = "\033[H\033[J" + "\n".join(buffer)
|
|
elif positioning == "relative":
|
|
# Remove cursor positioning codes (except colors) and join with newlines
|
|
import re
|
|
|
|
cleaned_buffer = []
|
|
for line in buffer:
|
|
# Remove cursor positioning codes but keep color codes
|
|
# Pattern: \033[row;colH or \033[row;col;...H
|
|
cleaned = re.sub(r"\033\[[0-9;]*H", "", line)
|
|
cleaned_buffer.append(cleaned)
|
|
output = "\033[H\033[J" + "\n".join(cleaned_buffer)
|
|
else: # mixed (default)
|
|
# Current behavior: join with newlines
|
|
# Effects that need absolute positioning have their own cursor codes
|
|
output = "\033[H\033[J" + "\n".join(buffer)
|
|
|
|
sys.stdout.buffer.write(output.encode())
|
|
sys.stdout.flush()
|
|
|
|
def clear(self) -> None:
|
|
from engine.terminal import CLR
|
|
|
|
print(CLR, end="", flush=True)
|
|
|
|
def cleanup(self) -> None:
|
|
from engine.terminal import CURSOR_ON
|
|
|
|
# Restore normal terminal mode if raw mode was enabled
|
|
self.set_raw_mode(False)
|
|
|
|
print(CURSOR_ON, end="", flush=True)
|
|
|
|
def is_quit_requested(self) -> bool:
|
|
"""Check if quit was requested (optional protocol method)."""
|
|
return self._quit_requested
|
|
|
|
def clear_quit_request(self) -> None:
|
|
"""Clear quit request (optional protocol method)."""
|
|
self._quit_requested = False
|
|
|
|
def request_quit(self) -> None:
|
|
"""Request quit (e.g., when Ctrl+C is pressed)."""
|
|
self._quit_requested = True
|
|
|
|
def set_raw_mode(self, enable: bool = True) -> None:
|
|
"""Enable/disable raw terminal mode for input capture.
|
|
|
|
When raw mode is enabled:
|
|
- Keystrokes are read immediately without echo
|
|
- Special keys (arrows, Ctrl+C, etc.) are captured
|
|
- Terminal is not in cooked/canonical mode
|
|
|
|
Args:
|
|
enable: True to enable raw mode, False to restore normal mode
|
|
"""
|
|
try:
|
|
if enable and not self._raw_mode_enabled:
|
|
# Save original terminal settings
|
|
self._original_termios = termios.tcgetattr(sys.stdin)
|
|
# Set raw mode
|
|
tty.setraw(sys.stdin.fileno())
|
|
self._raw_mode_enabled = True
|
|
elif not enable and self._raw_mode_enabled:
|
|
# Restore original terminal settings
|
|
if self._original_termios:
|
|
termios.tcsetattr(
|
|
sys.stdin, termios.TCSADRAIN, self._original_termios
|
|
)
|
|
self._raw_mode_enabled = False
|
|
except (termios.error, OSError):
|
|
# Terminal might not support raw mode (e.g., in tests)
|
|
pass
|
|
|
|
def get_input_keys(self, timeout: float = 0.0) -> list[str]:
|
|
"""Get available keyboard input.
|
|
|
|
Reads available keystrokes from stdin. Should be called
|
|
with raw mode enabled for best results.
|
|
|
|
Args:
|
|
timeout: Maximum time to wait for input (seconds)
|
|
|
|
Returns:
|
|
List of key symbols as strings
|
|
"""
|
|
keys = []
|
|
|
|
try:
|
|
# Check if input is available
|
|
if select.select([sys.stdin], [], [], timeout)[0]:
|
|
char = sys.stdin.read(1)
|
|
|
|
if char == "\x1b": # Escape sequence
|
|
# Read next character to determine key
|
|
seq = sys.stdin.read(2)
|
|
if seq == "[A":
|
|
keys.append("up")
|
|
elif seq == "[B":
|
|
keys.append("down")
|
|
elif seq == "[C":
|
|
keys.append("right")
|
|
elif seq == "[D":
|
|
keys.append("left")
|
|
else:
|
|
# Unknown escape sequence
|
|
keys.append("escape")
|
|
elif char == "\n" or char == "\r":
|
|
keys.append("return")
|
|
elif char == "\t":
|
|
keys.append("tab")
|
|
elif char == " ":
|
|
keys.append(" ")
|
|
elif char == "\x7f" or char == "\x08": # Backspace or Ctrl+H
|
|
keys.append("backspace")
|
|
elif char == "\x03": # Ctrl+C
|
|
keys.append("ctrl_c")
|
|
elif char == "\x04": # Ctrl+D
|
|
keys.append("ctrl_d")
|
|
elif char == "\x1b": # Escape
|
|
keys.append("escape")
|
|
elif char.isprintable():
|
|
keys.append(char)
|
|
except OSError:
|
|
pass
|
|
|
|
return keys
|
|
|
|
def is_raw_mode_enabled(self) -> bool:
|
|
"""Check if raw mode is currently enabled."""
|
|
return self._raw_mode_enabled
|