docs(analysis): Add visual output comparison analysis

- Created analysis/visual_output_comparison.md with detailed architectural comparison
- Added capture utilities for output comparison (capture_output.py, capture_upstream.py, compare_outputs.py)
- Captured and compared output from upstream/main vs sideline branch
- Documented fundamental architectural differences in rendering approaches
- Updated Gitea issue #50 with findings
This commit is contained in:
2026-03-21 15:47:20 -07:00
parent a747f67f63
commit afee03f693
6 changed files with 4505 additions and 0 deletions

201
scripts/capture_output.py Normal file
View File

@@ -0,0 +1,201 @@
#!/usr/bin/env python3
"""
Capture output utility for Mainline.
This script captures the output of a Mainline pipeline using NullDisplay
and saves it to a JSON file for comparison with other branches.
"""
import argparse
import json
import time
from pathlib import Path
from engine.display import DisplayRegistry
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext
from engine.pipeline.adapters import create_stage_from_display
from engine.pipeline.presets import get_preset
def capture_pipeline_output(
preset_name: str,
output_file: str,
frames: int = 60,
width: int = 80,
height: int = 24,
):
"""Capture pipeline output for a given preset.
Args:
preset_name: Name of preset to use
output_file: Path to save captured output
frames: Number of frames to capture
width: Terminal width
height: Terminal height
"""
print(f"Capturing output for preset '{preset_name}'...")
# Get preset
preset = get_preset(preset_name)
if not preset:
print(f"Error: Preset '{preset_name}' not found")
return False
# Create NullDisplay with recording
display = DisplayRegistry.create("null")
display.init(width, height)
display.start_recording()
# Build pipeline
config = PipelineConfig(
source=preset.source,
display="null", # Use null display
camera=preset.camera,
effects=preset.effects,
enable_metrics=False,
)
# Create pipeline context with params
from engine.pipeline.params import PipelineParams
params = PipelineParams(
source=preset.source,
display="null",
camera_mode=preset.camera,
effect_order=preset.effects,
viewport_width=preset.viewport_width,
viewport_height=preset.viewport_height,
camera_speed=preset.camera_speed,
)
ctx = PipelineContext()
ctx.params = params
pipeline = Pipeline(config=config, context=ctx)
# Add stages based on preset
from engine.data_sources.sources import HeadlinesDataSource
from engine.pipeline.adapters import DataSourceStage
# Add source stage
source = HeadlinesDataSource()
pipeline.add_stage("source", DataSourceStage(source, name="headlines"))
# Add message overlay if enabled
if getattr(preset, "enable_message_overlay", False):
from engine import config as engine_config
from engine.pipeline.adapters import MessageOverlayConfig, MessageOverlayStage
overlay_config = MessageOverlayConfig(
enabled=True,
display_secs=getattr(engine_config, "MESSAGE_DISPLAY_SECS", 30),
topic_url=getattr(engine_config, "NTFY_TOPIC", None),
)
pipeline.add_stage(
"message_overlay", MessageOverlayStage(config=overlay_config)
)
# Add display stage
pipeline.add_stage("display", create_stage_from_display(display, "null"))
# Build and initialize
pipeline.build()
if not pipeline.initialize():
print("Error: Failed to initialize pipeline")
return False
# Capture frames
print(f"Capturing {frames} frames...")
start_time = time.time()
for frame in range(frames):
try:
pipeline.execute([])
if frame % 10 == 0:
print(f" Frame {frame}/{frames}")
except Exception as e:
print(f"Error on frame {frame}: {e}")
break
elapsed = time.time() - start_time
print(f"Captured {frame + 1} frames in {elapsed:.2f}s")
# Get captured frames
captured_frames = display.get_frames()
print(f"Retrieved {len(captured_frames)} frames from display")
# Save to JSON
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
recording_data = {
"version": 1,
"preset": preset_name,
"display": "null",
"width": width,
"height": height,
"frame_count": len(captured_frames),
"frames": [
{
"frame_number": i,
"buffer": frame,
"width": width,
"height": height,
}
for i, frame in enumerate(captured_frames)
],
}
with open(output_path, "w") as f:
json.dump(recording_data, f, indent=2)
print(f"Saved recording to {output_path}")
return True
def main():
parser = argparse.ArgumentParser(description="Capture Mainline pipeline output")
parser.add_argument(
"--preset",
default="demo",
help="Preset name to use (default: demo)",
)
parser.add_argument(
"--output",
default="output/capture.json",
help="Output file path (default: output/capture.json)",
)
parser.add_argument(
"--frames",
type=int,
default=60,
help="Number of frames to capture (default: 60)",
)
parser.add_argument(
"--width",
type=int,
default=80,
help="Terminal width (default: 80)",
)
parser.add_argument(
"--height",
type=int,
default=24,
help="Terminal height (default: 24)",
)
args = parser.parse_args()
success = capture_pipeline_output(
preset_name=args.preset,
output_file=args.output,
frames=args.frames,
width=args.width,
height=args.height,
)
return 0 if success else 1
if __name__ == "__main__":
exit(main())

186
scripts/capture_upstream.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
"""
Capture output from upstream/main branch.
This script captures the output of upstream/main Mainline using NullDisplay
and saves it to a JSON file for comparison with sideline branch.
"""
import argparse
import json
import sys
from pathlib import Path
# Add upstream/main to path
sys.path.insert(0, "/tmp/upstream_mainline")
def capture_upstream_output(
output_file: str,
frames: int = 60,
width: int = 80,
height: int = 24,
):
"""Capture upstream/main output.
Args:
output_file: Path to save captured output
frames: Number of frames to capture
width: Terminal width
height: Terminal height
"""
print(f"Capturing upstream/main output...")
try:
# Import upstream modules
from engine import config, themes
from engine.display import NullDisplay
from engine.fetch import fetch_all, load_cache
from engine.scroll import stream
from engine.ntfy import NtfyPoller
from engine.mic import MicMonitor
except ImportError as e:
print(f"Error importing upstream modules: {e}")
print("Make sure upstream/main is in the Python path")
return False
# Create a custom NullDisplay that captures frames
class CapturingNullDisplay:
def __init__(self, width, height, max_frames):
self.width = width
self.height = height
self.max_frames = max_frames
self.frame_count = 0
self.frames = []
def init(self, width: int, height: int) -> None:
self.width = width
self.height = height
def show(self, buffer: list[str], border: bool = False) -> None:
if self.frame_count < self.max_frames:
self.frames.append(list(buffer))
self.frame_count += 1
if self.frame_count >= self.max_frames:
raise StopIteration("Frame limit reached")
def clear(self) -> None:
pass
def cleanup(self) -> None:
pass
def get_frames(self):
return self.frames
display = CapturingNullDisplay(width, height, frames)
# Load items (use cached headlines)
items = load_cache()
if not items:
print("No cached items found, fetching...")
result = fetch_all()
if isinstance(result, tuple):
items, linked, failed = result
else:
items = result
if not items:
print("Error: No items available")
return False
print(f"Loaded {len(items)} items")
# Create ntfy poller and mic monitor (upstream uses these)
ntfy_poller = NtfyPoller(config.NTFY_TOPIC, reconnect_delay=5, display_secs=30)
mic_monitor = MicMonitor()
# Run stream for specified number of frames
print(f"Capturing {frames} frames...")
try:
# Run the stream
stream(
items=items,
ntfy_poller=ntfy_poller,
mic_monitor=mic_monitor,
display=display,
)
except StopIteration:
print("Frame limit reached")
except Exception as e:
print(f"Error during capture: {e}")
# Continue to save what we have
# Get captured frames
captured_frames = display.get_frames()
print(f"Retrieved {len(captured_frames)} frames from display")
# Save to JSON
output_path = Path(output_file)
output_path.parent.mkdir(parents=True, exist_ok=True)
recording_data = {
"version": 1,
"preset": "upstream_demo",
"display": "null",
"width": width,
"height": height,
"frame_count": len(captured_frames),
"frames": [
{
"frame_number": i,
"buffer": frame,
"width": width,
"height": height,
}
for i, frame in enumerate(captured_frames)
],
}
with open(output_path, "w") as f:
json.dump(recording_data, f, indent=2)
print(f"Saved recording to {output_path}")
return True
def main():
parser = argparse.ArgumentParser(description="Capture upstream/main output")
parser.add_argument(
"--output",
default="output/upstream_demo.json",
help="Output file path (default: output/upstream_demo.json)",
)
parser.add_argument(
"--frames",
type=int,
default=60,
help="Number of frames to capture (default: 60)",
)
parser.add_argument(
"--width",
type=int,
default=80,
help="Terminal width (default: 80)",
)
parser.add_argument(
"--height",
type=int,
default=24,
help="Terminal height (default: 24)",
)
args = parser.parse_args()
success = capture_upstream_output(
output_file=args.output,
frames=args.frames,
width=args.width,
height=args.height,
)
return 0 if success else 1
if __name__ == "__main__":
exit(main())

220
scripts/compare_outputs.py Normal file
View File

@@ -0,0 +1,220 @@
#!/usr/bin/env python3
"""
Compare captured outputs from different branches or configurations.
This script loads two captured recordings and compares them frame-by-frame,
reporting any differences found.
"""
import argparse
import difflib
import json
from pathlib import Path
def load_recording(file_path: str) -> dict:
"""Load a recording from a JSON file."""
with open(file_path, "r") as f:
return json.load(f)
def compare_frame_buffers(buf1: list[str], buf2: list[str]) -> tuple[int, list[str]]:
"""Compare two frame buffers and return differences.
Returns:
tuple: (difference_count, list of difference descriptions)
"""
differences = []
# Check dimensions
if len(buf1) != len(buf2):
differences.append(f"Height mismatch: {len(buf1)} vs {len(buf2)}")
# Check each line
max_lines = max(len(buf1), len(buf2))
for i in range(max_lines):
if i >= len(buf1):
differences.append(f"Line {i}: Missing in first buffer")
continue
if i >= len(buf2):
differences.append(f"Line {i}: Missing in second buffer")
continue
line1 = buf1[i]
line2 = buf2[i]
if line1 != line2:
# Find the specific differences in the line
if len(line1) != len(line2):
differences.append(
f"Line {i}: Length mismatch ({len(line1)} vs {len(line2)})"
)
# Show a snippet of the difference
max_len = max(len(line1), len(line2))
snippet1 = line1[:50] + "..." if len(line1) > 50 else line1
snippet2 = line2[:50] + "..." if len(line2) > 50 else line2
differences.append(f"Line {i}: '{snippet1}' != '{snippet2}'")
return len(differences), differences
def compare_recordings(
recording1: dict, recording2: dict, max_frames: int = None
) -> dict:
"""Compare two recordings frame-by-frame.
Returns:
dict: Comparison results with summary and detailed differences
"""
results = {
"summary": {},
"frames": [],
"total_differences": 0,
"frames_with_differences": 0,
}
# Compare metadata
results["summary"]["recording1"] = {
"preset": recording1.get("preset", "unknown"),
"frame_count": recording1.get("frame_count", 0),
"width": recording1.get("width", 0),
"height": recording1.get("height", 0),
}
results["summary"]["recording2"] = {
"preset": recording2.get("preset", "unknown"),
"frame_count": recording2.get("frame_count", 0),
"width": recording2.get("width", 0),
"height": recording2.get("height", 0),
}
# Compare frames
frames1 = recording1.get("frames", [])
frames2 = recording2.get("frames", [])
num_frames = min(len(frames1), len(frames2))
if max_frames:
num_frames = min(num_frames, max_frames)
print(f"Comparing {num_frames} frames...")
for frame_idx in range(num_frames):
frame1 = frames1[frame_idx]
frame2 = frames2[frame_idx]
buf1 = frame1.get("buffer", [])
buf2 = frame2.get("buffer", [])
diff_count, differences = compare_frame_buffers(buf1, buf2)
if diff_count > 0:
results["total_differences"] += diff_count
results["frames_with_differences"] += 1
results["frames"].append(
{
"frame_number": frame_idx,
"differences": differences,
"diff_count": diff_count,
}
)
if frame_idx < 5: # Only print first 5 frames with differences
print(f"\nFrame {frame_idx} ({diff_count} differences):")
for diff in differences[:5]: # Limit to 5 differences per frame
print(f" - {diff}")
# Summary
results["summary"]["total_frames_compared"] = num_frames
results["summary"]["frames_with_differences"] = results["frames_with_differences"]
results["summary"]["total_differences"] = results["total_differences"]
results["summary"]["match_percentage"] = (
(1 - results["frames_with_differences"] / num_frames) * 100
if num_frames > 0
else 0
)
return results
def print_comparison_summary(results: dict):
"""Print a summary of the comparison results."""
print("\n" + "=" * 80)
print("COMPARISON SUMMARY")
print("=" * 80)
r1 = results["summary"]["recording1"]
r2 = results["summary"]["recording2"]
print(f"\nRecording 1: {r1['preset']}")
print(
f" Frames: {r1['frame_count']}, Width: {r1['width']}, Height: {r1['height']}"
)
print(f"\nRecording 2: {r2['preset']}")
print(
f" Frames: {r2['frame_count']}, Width: {r2['width']}, Height: {r2['height']}"
)
print(f"\nComparison:")
print(f" Frames compared: {results['summary']['total_frames_compared']}")
print(f" Frames with differences: {results['summary']['frames_with_differences']}")
print(f" Total differences: {results['summary']['total_differences']}")
print(f" Match percentage: {results['summary']['match_percentage']:.2f}%")
if results["summary"]["match_percentage"] == 100:
print("\n✓ Recordings match perfectly!")
else:
print("\n⚠ Recordings have differences.")
def main():
parser = argparse.ArgumentParser(
description="Compare captured outputs from different branches"
)
parser.add_argument(
"recording1",
help="First recording file (JSON)",
)
parser.add_argument(
"recording2",
help="Second recording file (JSON)",
)
parser.add_argument(
"--max-frames",
type=int,
help="Maximum number of frames to compare",
)
parser.add_argument(
"--output",
"-o",
help="Output file for detailed comparison results (JSON)",
)
args = parser.parse_args()
# Load recordings
print(f"Loading {args.recording1}...")
recording1 = load_recording(args.recording1)
print(f"Loading {args.recording2}...")
recording2 = load_recording(args.recording2)
# Compare
results = compare_recordings(recording1, recording2, args.max_frames)
# Print summary
print_comparison_summary(results)
# Save detailed results if requested
if args.output:
output_path = Path(args.output)
output_path.parent.mkdir(parents=True, exist_ok=True)
with open(output_path, "w") as f:
json.dump(results, f, indent=2)
print(f"\nDetailed results saved to {args.output}")
return 0
if __name__ == "__main__":
exit(main())