forked from genewildish/Mainline
- Replace estimate_block_height (PIL-based) with estimate_simple_height (word wrap) - Update viewport filter tests to match new height-based filtering (~4 items vs 24) - Fix CI task duplication in mise.toml (remove redundant depends) Closes #38 Closes #36
269 lines
6.5 KiB
Python
269 lines
6.5 KiB
Python
"""
|
|
Streaming protocol utilities for efficient frame transmission.
|
|
|
|
Provides:
|
|
- Frame differencing: Only send changed lines
|
|
- Run-length encoding: Compress repeated lines
|
|
- Binary encoding: Compact message format
|
|
"""
|
|
|
|
import json
|
|
import zlib
|
|
from dataclasses import dataclass
|
|
from enum import IntEnum
|
|
|
|
|
|
class MessageType(IntEnum):
|
|
"""Message types for streaming protocol."""
|
|
|
|
FULL_FRAME = 1
|
|
DIFF_FRAME = 2
|
|
STATE = 3
|
|
CLEAR = 4
|
|
PING = 5
|
|
PONG = 6
|
|
|
|
|
|
@dataclass
|
|
class FrameDiff:
|
|
"""Represents a diff between two frames."""
|
|
|
|
width: int
|
|
height: int
|
|
changed_lines: list[tuple[int, str]] # (line_index, content)
|
|
|
|
|
|
def compute_diff(old_buffer: list[str], new_buffer: list[str]) -> FrameDiff:
|
|
"""Compute differences between old and new buffer.
|
|
|
|
Args:
|
|
old_buffer: Previous frame buffer
|
|
new_buffer: Current frame buffer
|
|
|
|
Returns:
|
|
FrameDiff with only changed lines
|
|
"""
|
|
height = len(new_buffer)
|
|
changed_lines = []
|
|
|
|
for i, line in enumerate(new_buffer):
|
|
if i >= len(old_buffer) or line != old_buffer[i]:
|
|
changed_lines.append((i, line))
|
|
|
|
return FrameDiff(
|
|
width=len(new_buffer[0]) if new_buffer else 0,
|
|
height=height,
|
|
changed_lines=changed_lines,
|
|
)
|
|
|
|
|
|
def encode_rle(lines: list[tuple[int, str]]) -> list[tuple[int, str, int]]:
|
|
"""Run-length encode consecutive identical lines.
|
|
|
|
Args:
|
|
lines: List of (index, content) tuples (must be sorted by index)
|
|
|
|
Returns:
|
|
List of (start_index, content, run_length) tuples
|
|
"""
|
|
if not lines:
|
|
return []
|
|
|
|
encoded = []
|
|
start_idx = lines[0][0]
|
|
current_line = lines[0][1]
|
|
current_rle = 1
|
|
|
|
for idx, line in lines[1:]:
|
|
if line == current_line:
|
|
current_rle += 1
|
|
else:
|
|
encoded.append((start_idx, current_line, current_rle))
|
|
start_idx = idx
|
|
current_line = line
|
|
current_rle = 1
|
|
|
|
encoded.append((start_idx, current_line, current_rle))
|
|
return encoded
|
|
|
|
|
|
def decode_rle(encoded: list[tuple[int, str, int]]) -> list[tuple[int, str]]:
|
|
"""Decode run-length encoded lines.
|
|
|
|
Args:
|
|
encoded: List of (start_index, content, run_length) tuples
|
|
|
|
Returns:
|
|
List of (index, content) tuples
|
|
"""
|
|
result = []
|
|
for start_idx, line, rle in encoded:
|
|
for i in range(rle):
|
|
result.append((start_idx + i, line))
|
|
return result
|
|
|
|
|
|
def compress_frame(buffer: list[str], level: int = 6) -> bytes:
|
|
"""Compress a frame buffer using zlib.
|
|
|
|
Args:
|
|
buffer: Frame buffer (list of lines)
|
|
level: Compression level (0-9)
|
|
|
|
Returns:
|
|
Compressed bytes
|
|
"""
|
|
content = "\n".join(buffer)
|
|
return zlib.compress(content.encode("utf-8"), level)
|
|
|
|
|
|
def decompress_frame(data: bytes, height: int) -> list[str]:
|
|
"""Decompress a frame buffer.
|
|
|
|
Args:
|
|
data: Compressed bytes
|
|
height: Number of lines in original buffer
|
|
|
|
Returns:
|
|
Frame buffer (list of lines)
|
|
"""
|
|
content = zlib.decompress(data).decode("utf-8")
|
|
lines = content.split("\n")
|
|
if len(lines) > height:
|
|
lines = lines[:height]
|
|
while len(lines) < height:
|
|
lines.append("")
|
|
return lines
|
|
|
|
|
|
def encode_binary_message(
|
|
msg_type: MessageType, width: int, height: int, payload: bytes
|
|
) -> bytes:
|
|
"""Encode a binary message.
|
|
|
|
Message format:
|
|
- 1 byte: message type
|
|
- 2 bytes: width (uint16)
|
|
- 2 bytes: height (uint16)
|
|
- 4 bytes: payload length (uint32)
|
|
- N bytes: payload
|
|
|
|
Args:
|
|
msg_type: Message type
|
|
width: Frame width
|
|
height: Frame height
|
|
payload: Message payload
|
|
|
|
Returns:
|
|
Encoded binary message
|
|
"""
|
|
import struct
|
|
|
|
header = struct.pack("!BHHI", msg_type.value, width, height, len(payload))
|
|
return header + payload
|
|
|
|
|
|
def decode_binary_message(data: bytes) -> tuple[MessageType, int, int, bytes]:
|
|
"""Decode a binary message.
|
|
|
|
Args:
|
|
data: Binary message data
|
|
|
|
Returns:
|
|
Tuple of (msg_type, width, height, payload)
|
|
"""
|
|
import struct
|
|
|
|
msg_type_val, width, height, payload_len = struct.unpack("!BHHI", data[:9])
|
|
payload = data[9 : 9 + payload_len]
|
|
return MessageType(msg_type_val), width, height, payload
|
|
|
|
|
|
def encode_diff_message(diff: FrameDiff, use_rle: bool = True) -> bytes:
|
|
"""Encode a diff message for transmission.
|
|
|
|
Args:
|
|
diff: Frame diff
|
|
use_rle: Whether to use run-length encoding
|
|
|
|
Returns:
|
|
Encoded diff payload
|
|
"""
|
|
|
|
if use_rle:
|
|
encoded_lines = encode_rle(diff.changed_lines)
|
|
data = [[idx, line, rle] for idx, line, rle in encoded_lines]
|
|
else:
|
|
data = [[idx, line] for idx, line in diff.changed_lines]
|
|
|
|
payload = json.dumps(data).encode("utf-8")
|
|
return payload
|
|
|
|
|
|
def decode_diff_message(payload: bytes, use_rle: bool = True) -> list[tuple[int, str]]:
|
|
"""Decode a diff message.
|
|
|
|
Args:
|
|
payload: Encoded diff payload
|
|
use_rle: Whether run-length encoding was used
|
|
|
|
Returns:
|
|
List of (line_index, content) tuples
|
|
"""
|
|
|
|
data = json.loads(payload.decode("utf-8"))
|
|
|
|
if use_rle:
|
|
return decode_rle([(idx, line, rle) for idx, line, rle in data])
|
|
else:
|
|
return [(idx, line) for idx, line in data]
|
|
|
|
|
|
def should_use_diff(
|
|
old_buffer: list[str], new_buffer: list[str], threshold: float = 0.3
|
|
) -> bool:
|
|
"""Determine if diff or full frame is more efficient.
|
|
|
|
Args:
|
|
old_buffer: Previous frame
|
|
new_buffer: Current frame
|
|
threshold: Max changed ratio to use diff (0.0-1.0)
|
|
|
|
Returns:
|
|
True if diff is more efficient
|
|
"""
|
|
if not old_buffer or not new_buffer:
|
|
return False
|
|
|
|
diff = compute_diff(old_buffer, new_buffer)
|
|
total_lines = len(new_buffer)
|
|
changed_ratio = len(diff.changed_lines) / total_lines if total_lines > 0 else 1.0
|
|
|
|
return changed_ratio <= threshold
|
|
|
|
|
|
def apply_diff(old_buffer: list[str], diff: FrameDiff) -> list[str]:
|
|
"""Apply a diff to an old buffer to get the new buffer.
|
|
|
|
Args:
|
|
old_buffer: Previous frame buffer
|
|
diff: Frame diff to apply
|
|
|
|
Returns:
|
|
New frame buffer
|
|
"""
|
|
new_buffer = list(old_buffer)
|
|
|
|
for line_idx, content in diff.changed_lines:
|
|
if line_idx < len(new_buffer):
|
|
new_buffer[line_idx] = content
|
|
else:
|
|
while len(new_buffer) < line_idx:
|
|
new_buffer.append("")
|
|
new_buffer.append(content)
|
|
|
|
while len(new_buffer) < diff.height:
|
|
new_buffer.append("")
|
|
|
|
return new_buffer[: diff.height]
|