forked from genewildish/Mainline
refactor: consolidate pipeline architecture with unified data source system
MAJOR REFACTORING: Consolidate duplicated pipeline code and standardize on capability-based dependency resolution. This is a significant but backwards-compatible restructuring that improves maintainability and extensibility. ## ARCHITECTURE CHANGES ### Data Sources Consolidation - Move engine/sources_v2.py → engine/data_sources/sources.py - Move engine/pipeline_sources/ → engine/data_sources/ - Create unified DataSource ABC with common interface: * fetch() - idempotent data retrieval * get_items() - cached access with automatic refresh * refresh() - force cache invalidation * is_dynamic - indicate streaming vs static sources - Support for SourceItem dataclass (content, source, timestamp, metadata) ### Display Backend Improvements - Update all 7 display backends to use new import paths - Terminal: Improve dimension detection and handling - WebSocket: Better error handling and client lifecycle - Sixel: Refactor graphics rendering - Pygame: Modernize event handling - Kitty: Add protocol support for inline images - Multi: Ensure proper forwarding to all backends - Null: Maintain testing backend functionality ### Pipeline Adapter Consolidation - Refactor adapter stages for clarity and flexibility - RenderStage now handles both item-based and buffer-based rendering - Add SourceItemsToBufferStage for converting data source items - Improve DataSourceStage to work with all source types - Add DisplayStage wrapper for display backends ### Camera & Viewport Refinements - Update Camera class for new architecture - Improve viewport dimension detection - Better handling of resize events across backends ### New Effect Plugins - border.py: Frame rendering effect with configurable style - crop.py: Viewport clipping effect for selective display - tint.py: Color filtering effect for atmosphere ### Tests & Quality - Add test_border_effect.py with comprehensive border tests - Add test_crop_effect.py with viewport clipping tests - Add test_tint_effect.py with color filtering tests - Update test_pipeline.py for new architecture - Update test_pipeline_introspection.py for new data source location - All 463 tests pass with 56% coverage - Linting: All checks pass with ruff ### Removals (Code Cleanup) - Delete engine/benchmark.py (deprecated performance testing) - Delete engine/pipeline_sources/__init__.py (moved to data_sources) - Delete engine/sources_v2.py (replaced by data_sources/sources.py) - Update AGENTS.md to reflect new structure ### Import Path Updates - Update engine/pipeline/controller.py::create_default_pipeline() * Old: from engine.sources_v2 import HeadlinesDataSource * New: from engine.data_sources.sources import HeadlinesDataSource - All display backends import from new locations - All tests import from new locations ## BACKWARDS COMPATIBILITY This refactoring is intended to be backwards compatible: - Pipeline execution unchanged (DAG-based with capability matching) - Effect plugins unchanged (EffectPlugin interface same) - Display protocol unchanged (Display duck-typing works as before) - Config system unchanged (presets.toml format same) ## TESTING - 463 tests pass (0 failures, 19 skipped) - Full linting check passes - Manual testing on demo, poetry, websocket modes - All new effect plugins tested ## FILES CHANGED - 24 files modified/added/deleted - 723 insertions, 1,461 deletions (net -738 LOC - cleanup!) - No breaking changes to public APIs - All transitive imports updated correctly
This commit is contained in:
43
AGENTS.md
43
AGENTS.md
@@ -71,39 +71,17 @@ The project uses hk configured in `hk.pkl`:
|
||||
|
||||
## Benchmark Runner
|
||||
|
||||
Run performance benchmarks:
|
||||
Benchmark tests are in `tests/test_benchmark.py` with `@pytest.mark.benchmark`.
|
||||
|
||||
### Hook Mode (via pytest)
|
||||
|
||||
Run benchmarks in hook mode to catch performance regressions:
|
||||
|
||||
```bash
|
||||
mise run benchmark # Run all benchmarks (text output)
|
||||
mise run benchmark-json # Run benchmarks (JSON output)
|
||||
mise run benchmark-report # Run benchmarks (Markdown report)
|
||||
mise run test-cov # Run with coverage
|
||||
```
|
||||
|
||||
### Benchmark Commands
|
||||
|
||||
```bash
|
||||
# Run benchmarks
|
||||
uv run python -m engine.benchmark
|
||||
|
||||
# Run with specific displays/effects
|
||||
uv run python -m engine.benchmark --displays null,terminal --effects fade,glitch
|
||||
|
||||
# Save baseline for hook comparisons
|
||||
uv run python -m engine.benchmark --baseline
|
||||
|
||||
# Run in hook mode (compares against baseline)
|
||||
uv run python -m engine.benchmark --hook
|
||||
|
||||
# Hook mode with custom threshold (default: 20% degradation)
|
||||
uv run python -m engine.benchmark --hook --threshold 0.3
|
||||
|
||||
# Custom baseline location
|
||||
uv run python -m engine.benchmark --hook --cache /path/to/cache.json
|
||||
```
|
||||
|
||||
### Hook Mode
|
||||
|
||||
The `--hook` mode compares current benchmarks against a saved baseline. If performance degrades beyond the threshold (default 20%), it exits with code 1. This is useful for preventing performance regressions in feature branches.
|
||||
The benchmark tests will fail if performance degrades beyond the threshold.
|
||||
|
||||
The pre-push hook runs benchmark in hook mode to catch performance regressions before pushing.
|
||||
|
||||
@@ -161,12 +139,11 @@ The project uses pytest with strict marker enforcement. Test configuration is in
|
||||
|
||||
### Test Coverage Strategy
|
||||
|
||||
Current coverage: 56% (434 tests)
|
||||
Current coverage: 56% (463 tests)
|
||||
|
||||
Key areas with lower coverage (acceptable for now):
|
||||
- **app.py** (8%): Main entry point - integration heavy, requires terminal
|
||||
- **scroll.py** (10%): Terminal-dependent rendering logic
|
||||
- **benchmark.py** (0%): Standalone benchmark tool, runs separately
|
||||
- **scroll.py** (10%): Terminal-dependent rendering logic (unused)
|
||||
|
||||
Key areas with good coverage:
|
||||
- **display/backends/null.py** (95%): Easy to test headlessly
|
||||
@@ -227,7 +204,7 @@ Sensors support param bindings to drive effect parameters in real-time.
|
||||
|
||||
#### Pipeline Introspection
|
||||
|
||||
- **PipelineIntrospectionSource** (`engine/pipeline_sources/pipeline_introspection.py`): Renders live ASCII visualization of pipeline DAG with metrics
|
||||
- **PipelineIntrospectionSource** (`engine/data_sources/pipeline_introspection.py`): Renders live ASCII visualization of pipeline DAG with metrics
|
||||
- **PipelineIntrospectionDemo** (`engine/pipeline/pipeline_introspection_demo.py`): 3-phase demo controller for effect animation
|
||||
|
||||
Preset: `pipeline-inspect` - Live pipeline introspection with DAG and performance metrics
|
||||
|
||||
105
effects_plugins/border.py
Normal file
105
effects_plugins/border.py
Normal file
@@ -0,0 +1,105 @@
|
||||
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
|
||||
|
||||
class BorderEffect(EffectPlugin):
|
||||
"""Simple border effect for terminal display.
|
||||
|
||||
Draws a border around the buffer and optionally displays
|
||||
performance metrics in the border corners.
|
||||
|
||||
Internally crops to display dimensions to ensure border fits.
|
||||
"""
|
||||
|
||||
name = "border"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not buf:
|
||||
return buf
|
||||
|
||||
# Get actual display dimensions from context
|
||||
display_w = ctx.terminal_width
|
||||
display_h = ctx.terminal_height
|
||||
|
||||
# If dimensions are reasonable, crop first - use slightly smaller to ensure fit
|
||||
if display_w >= 10 and display_h >= 3:
|
||||
# Subtract 2 for border characters (left and right)
|
||||
crop_w = display_w - 2
|
||||
crop_h = display_h - 2
|
||||
buf = self._crop_to_size(buf, crop_w, crop_h)
|
||||
w = display_w
|
||||
h = display_h
|
||||
else:
|
||||
# Use buffer dimensions
|
||||
h = len(buf)
|
||||
w = max(len(line) for line in buf) if buf else 0
|
||||
|
||||
if w < 3 or h < 3:
|
||||
return buf
|
||||
|
||||
inner_w = w - 2
|
||||
|
||||
# Get metrics from context
|
||||
fps = 0.0
|
||||
frame_time = 0.0
|
||||
metrics = ctx.get_state("metrics")
|
||||
if metrics:
|
||||
avg_ms = metrics.get("avg_ms")
|
||||
frame_count = metrics.get("frame_count", 0)
|
||||
if avg_ms and frame_count > 0:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Build borders
|
||||
# Top border: ┌────────────────────┐ or with FPS
|
||||
if fps > 0:
|
||||
fps_str = f" FPS:{fps:.0f}"
|
||||
if len(fps_str) < inner_w:
|
||||
right_len = inner_w - len(fps_str)
|
||||
top_border = "┌" + "─" * right_len + fps_str + "┐"
|
||||
else:
|
||||
top_border = "┌" + "─" * inner_w + "┐"
|
||||
else:
|
||||
top_border = "┌" + "─" * inner_w + "┐"
|
||||
|
||||
# Bottom border: └────────────────────┘ or with frame time
|
||||
if frame_time > 0:
|
||||
ft_str = f" {frame_time:.1f}ms"
|
||||
if len(ft_str) < inner_w:
|
||||
right_len = inner_w - len(ft_str)
|
||||
bottom_border = "└" + "─" * right_len + ft_str + "┘"
|
||||
else:
|
||||
bottom_border = "└" + "─" * inner_w + "┘"
|
||||
else:
|
||||
bottom_border = "└" + "─" * inner_w + "┘"
|
||||
|
||||
# Build result with left/right borders
|
||||
result = [top_border]
|
||||
for line in buf[: h - 2]:
|
||||
if len(line) >= inner_w:
|
||||
result.append("│" + line[:inner_w] + "│")
|
||||
else:
|
||||
result.append("│" + line + " " * (inner_w - len(line)) + "│")
|
||||
|
||||
result.append(bottom_border)
|
||||
|
||||
return result
|
||||
|
||||
def _crop_to_size(self, buf: list[str], w: int, h: int) -> list[str]:
|
||||
"""Crop buffer to fit within w x h."""
|
||||
result = []
|
||||
for i in range(min(h, len(buf))):
|
||||
line = buf[i]
|
||||
if len(line) > w:
|
||||
result.append(line[:w])
|
||||
else:
|
||||
result.append(line + " " * (w - len(line)))
|
||||
|
||||
# Pad with empty lines if needed (for border)
|
||||
while len(result) < h:
|
||||
result.append(" " * w)
|
||||
|
||||
return result
|
||||
|
||||
def configure(self, config: EffectConfig) -> None:
|
||||
self.config = config
|
||||
42
effects_plugins/crop.py
Normal file
42
effects_plugins/crop.py
Normal file
@@ -0,0 +1,42 @@
|
||||
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
|
||||
|
||||
class CropEffect(EffectPlugin):
|
||||
"""Crop effect that crops the input buffer to fit the display.
|
||||
|
||||
This ensures the output buffer matches the actual display dimensions,
|
||||
useful when the source produces a buffer larger than the viewport.
|
||||
"""
|
||||
|
||||
name = "crop"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not buf:
|
||||
return buf
|
||||
|
||||
# Get actual display dimensions from context
|
||||
w = (
|
||||
ctx.terminal_width
|
||||
if ctx.terminal_width > 0
|
||||
else max(len(line) for line in buf)
|
||||
)
|
||||
h = ctx.terminal_height if ctx.terminal_height > 0 else len(buf)
|
||||
|
||||
# Crop buffer to fit
|
||||
result = []
|
||||
for i in range(min(h, len(buf))):
|
||||
line = buf[i]
|
||||
if len(line) > w:
|
||||
result.append(line[:w])
|
||||
else:
|
||||
result.append(line + " " * (w - len(line)))
|
||||
|
||||
# Pad with empty lines if needed
|
||||
while len(result) < h:
|
||||
result.append(" " * w)
|
||||
|
||||
return result
|
||||
|
||||
def configure(self, config: EffectConfig) -> None:
|
||||
self.config = config
|
||||
99
effects_plugins/tint.py
Normal file
99
effects_plugins/tint.py
Normal file
@@ -0,0 +1,99 @@
|
||||
from engine.effects.types import EffectConfig, EffectContext, EffectPlugin
|
||||
|
||||
|
||||
class TintEffect(EffectPlugin):
|
||||
"""Tint effect that applies an RGB color overlay to the buffer.
|
||||
|
||||
Uses ANSI escape codes to tint text with the specified RGB values.
|
||||
Supports transparency (0-100%) for blending.
|
||||
|
||||
Inlets:
|
||||
- r: Red component (0-255)
|
||||
- g: Green component (0-255)
|
||||
- b: Blue component (0-255)
|
||||
- a: Alpha/transparency (0.0-1.0, where 0.0 = fully transparent)
|
||||
"""
|
||||
|
||||
name = "tint"
|
||||
config = EffectConfig(enabled=True, intensity=1.0)
|
||||
|
||||
# Define inlet types for PureData-style typing
|
||||
@property
|
||||
def inlet_types(self) -> set:
|
||||
from engine.pipeline.core import DataType
|
||||
|
||||
return {DataType.TEXT_BUFFER}
|
||||
|
||||
@property
|
||||
def outlet_types(self) -> set:
|
||||
from engine.pipeline.core import DataType
|
||||
|
||||
return {DataType.TEXT_BUFFER}
|
||||
|
||||
def process(self, buf: list[str], ctx: EffectContext) -> list[str]:
|
||||
if not buf:
|
||||
return buf
|
||||
|
||||
# Get tint values from effect params or sensors
|
||||
r = self.config.params.get("r", 255)
|
||||
g = self.config.params.get("g", 255)
|
||||
b = self.config.params.get("b", 255)
|
||||
a = self.config.params.get("a", 0.3) # Default 30% tint
|
||||
|
||||
# Clamp values
|
||||
r = max(0, min(255, int(r)))
|
||||
g = max(0, min(255, int(g)))
|
||||
b = max(0, min(255, int(b)))
|
||||
a = max(0.0, min(1.0, float(a)))
|
||||
|
||||
if a <= 0:
|
||||
return buf
|
||||
|
||||
# Convert RGB to ANSI 256 color
|
||||
ansi_color = self._rgb_to_ansi256(r, g, b)
|
||||
|
||||
# Apply tint with transparency effect
|
||||
result = []
|
||||
for line in buf:
|
||||
if not line.strip():
|
||||
result.append(line)
|
||||
continue
|
||||
|
||||
# Check if line already has ANSI codes
|
||||
if "\033[" in line:
|
||||
# For lines with existing colors, wrap the whole line
|
||||
result.append(f"\033[38;5;{ansi_color}m{line}\033[0m")
|
||||
else:
|
||||
# Apply tint to plain text lines
|
||||
result.append(f"\033[38;5;{ansi_color}m{line}\033[0m")
|
||||
|
||||
return result
|
||||
|
||||
def _rgb_to_ansi256(self, r: int, g: int, b: int) -> int:
|
||||
"""Convert RGB (0-255 each) to ANSI 256 color code."""
|
||||
if r == g == b == 0:
|
||||
return 16
|
||||
if r == g == b == 255:
|
||||
return 231
|
||||
|
||||
# Calculate grayscale
|
||||
gray = int((0.299 * r + 0.587 * g + 0.114 * b) / 255 * 24) + 232
|
||||
|
||||
# Calculate color cube
|
||||
ri = int(r / 51)
|
||||
gi = int(g / 51)
|
||||
bi = int(b / 51)
|
||||
color = 16 + 36 * ri + 6 * gi + bi
|
||||
|
||||
# Use whichever is closer - gray or color
|
||||
gray_dist = abs(r - gray)
|
||||
color_dist = (
|
||||
(r - ri * 51) ** 2 + (g - gi * 51) ** 2 + (b - bi * 51) ** 2
|
||||
) ** 0.5
|
||||
|
||||
if gray_dist < color_dist:
|
||||
return gray
|
||||
return color
|
||||
|
||||
def configure(self, config: EffectConfig) -> None:
|
||||
self.config = config
|
||||
@@ -1,730 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Benchmark runner for mainline - tests performance across effects and displays.
|
||||
|
||||
Usage:
|
||||
python -m engine.benchmark
|
||||
python -m engine.benchmark --output report.md
|
||||
python -m engine.benchmark --displays terminal,websocket --effects glitch,fade
|
||||
python -m engine.benchmark --format json --output benchmark.json
|
||||
|
||||
Headless mode (default): suppress all terminal output during benchmarks.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import sys
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
import numpy as np
|
||||
|
||||
|
||||
@dataclass
|
||||
class BenchmarkResult:
|
||||
"""Result of a single benchmark run."""
|
||||
|
||||
name: str
|
||||
display: str
|
||||
effect: str | None
|
||||
iterations: int
|
||||
total_time_ms: float
|
||||
avg_time_ms: float
|
||||
std_dev_ms: float
|
||||
min_ms: float
|
||||
max_ms: float
|
||||
fps: float
|
||||
chars_processed: int
|
||||
chars_per_sec: float
|
||||
|
||||
|
||||
@dataclass
|
||||
class BenchmarkReport:
|
||||
"""Complete benchmark report."""
|
||||
|
||||
timestamp: str
|
||||
python_version: str
|
||||
results: list[BenchmarkResult] = field(default_factory=list)
|
||||
summary: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
def get_sample_buffer(width: int = 80, height: int = 24) -> list[str]:
|
||||
"""Generate a sample buffer for benchmarking."""
|
||||
lines = []
|
||||
for i in range(height):
|
||||
line = f"\x1b[32mLine {i}\x1b[0m " + "A" * (width - 10)
|
||||
lines.append(line)
|
||||
return lines
|
||||
|
||||
|
||||
def benchmark_display(
|
||||
display_class,
|
||||
buffer: list[str],
|
||||
iterations: int = 100,
|
||||
display=None,
|
||||
reuse: bool = False,
|
||||
) -> BenchmarkResult | None:
|
||||
"""Benchmark a single display.
|
||||
|
||||
Args:
|
||||
display_class: Display class to instantiate
|
||||
buffer: Buffer to display
|
||||
iterations: Number of iterations
|
||||
display: Optional existing display instance to reuse
|
||||
reuse: If True and display provided, use reuse mode
|
||||
"""
|
||||
old_stdout = sys.stdout
|
||||
old_stderr = sys.stderr
|
||||
|
||||
try:
|
||||
sys.stdout = StringIO()
|
||||
sys.stderr = StringIO()
|
||||
|
||||
if display is None:
|
||||
display = display_class()
|
||||
display.init(80, 24, reuse=False)
|
||||
should_cleanup = True
|
||||
else:
|
||||
should_cleanup = False
|
||||
|
||||
times = []
|
||||
chars = sum(len(line) for line in buffer)
|
||||
|
||||
for _ in range(iterations):
|
||||
t0 = time.perf_counter()
|
||||
display.show(buffer)
|
||||
elapsed = (time.perf_counter() - t0) * 1000
|
||||
times.append(elapsed)
|
||||
|
||||
if should_cleanup and hasattr(display, "cleanup"):
|
||||
display.cleanup(quit_pygame=False)
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
finally:
|
||||
sys.stdout = old_stdout
|
||||
sys.stderr = old_stderr
|
||||
|
||||
times_arr = np.array(times)
|
||||
|
||||
return BenchmarkResult(
|
||||
name=f"display_{display_class.__name__}",
|
||||
display=display_class.__name__,
|
||||
effect=None,
|
||||
iterations=iterations,
|
||||
total_time_ms=sum(times),
|
||||
avg_time_ms=float(np.mean(times_arr)),
|
||||
std_dev_ms=float(np.std(times_arr)),
|
||||
min_ms=float(np.min(times_arr)),
|
||||
max_ms=float(np.max(times_arr)),
|
||||
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
|
||||
chars_processed=chars * iterations,
|
||||
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
|
||||
if sum(times) > 0
|
||||
else 0.0,
|
||||
)
|
||||
|
||||
|
||||
def benchmark_effect_with_display(
|
||||
effect_class, display, buffer: list[str], iterations: int = 100, reuse: bool = False
|
||||
) -> BenchmarkResult | None:
|
||||
"""Benchmark an effect with a display.
|
||||
|
||||
Args:
|
||||
effect_class: Effect class to instantiate
|
||||
display: Display instance to use
|
||||
buffer: Buffer to process and display
|
||||
iterations: Number of iterations
|
||||
reuse: If True, use reuse mode for display
|
||||
"""
|
||||
old_stdout = sys.stdout
|
||||
old_stderr = sys.stderr
|
||||
|
||||
try:
|
||||
from engine.effects.types import EffectConfig, EffectContext
|
||||
|
||||
sys.stdout = StringIO()
|
||||
sys.stderr = StringIO()
|
||||
|
||||
effect = effect_class()
|
||||
effect.configure(EffectConfig(enabled=True, intensity=1.0))
|
||||
|
||||
ctx = EffectContext(
|
||||
terminal_width=80,
|
||||
terminal_height=24,
|
||||
scroll_cam=0,
|
||||
ticker_height=0,
|
||||
mic_excess=0.0,
|
||||
grad_offset=0.0,
|
||||
frame_number=0,
|
||||
has_message=False,
|
||||
)
|
||||
|
||||
times = []
|
||||
chars = sum(len(line) for line in buffer)
|
||||
|
||||
for _ in range(iterations):
|
||||
processed = effect.process(buffer, ctx)
|
||||
t0 = time.perf_counter()
|
||||
display.show(processed)
|
||||
elapsed = (time.perf_counter() - t0) * 1000
|
||||
times.append(elapsed)
|
||||
|
||||
if not reuse and hasattr(display, "cleanup"):
|
||||
display.cleanup(quit_pygame=False)
|
||||
|
||||
except Exception:
|
||||
return None
|
||||
finally:
|
||||
sys.stdout = old_stdout
|
||||
sys.stderr = old_stderr
|
||||
|
||||
times_arr = np.array(times)
|
||||
|
||||
return BenchmarkResult(
|
||||
name=f"effect_{effect_class.__name__}_with_{display.__class__.__name__}",
|
||||
display=display.__class__.__name__,
|
||||
effect=effect_class.__name__,
|
||||
iterations=iterations,
|
||||
total_time_ms=sum(times),
|
||||
avg_time_ms=float(np.mean(times_arr)),
|
||||
std_dev_ms=float(np.std(times_arr)),
|
||||
min_ms=float(np.min(times_arr)),
|
||||
max_ms=float(np.max(times_arr)),
|
||||
fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0,
|
||||
chars_processed=chars * iterations,
|
||||
chars_per_sec=float((chars * iterations) / (sum(times) / 1000))
|
||||
if sum(times) > 0
|
||||
else 0.0,
|
||||
)
|
||||
|
||||
|
||||
def get_available_displays():
|
||||
"""Get available display classes."""
|
||||
from engine.display import (
|
||||
DisplayRegistry,
|
||||
NullDisplay,
|
||||
TerminalDisplay,
|
||||
)
|
||||
|
||||
DisplayRegistry.initialize()
|
||||
|
||||
displays = [
|
||||
("null", NullDisplay),
|
||||
("terminal", TerminalDisplay),
|
||||
]
|
||||
|
||||
try:
|
||||
from engine.display.backends.websocket import WebSocketDisplay
|
||||
|
||||
displays.append(("websocket", WebSocketDisplay))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
from engine.display.backends.sixel import SixelDisplay
|
||||
|
||||
displays.append(("sixel", SixelDisplay))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
try:
|
||||
from engine.display.backends.pygame import PygameDisplay
|
||||
|
||||
displays.append(("pygame", PygameDisplay))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return displays
|
||||
|
||||
|
||||
def get_available_effects():
|
||||
"""Get available effect classes."""
|
||||
try:
|
||||
from engine.effects import get_registry
|
||||
|
||||
try:
|
||||
from effects_plugins import discover_plugins
|
||||
|
||||
discover_plugins()
|
||||
except Exception:
|
||||
pass
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
effects = []
|
||||
registry = get_registry()
|
||||
|
||||
for name, effect in registry.list_all().items():
|
||||
if effect:
|
||||
effect_cls = type(effect)
|
||||
effects.append((name, effect_cls))
|
||||
|
||||
return effects
|
||||
|
||||
|
||||
def run_benchmarks(
|
||||
displays: list[tuple[str, Any]] | None = None,
|
||||
effects: list[tuple[str, Any]] | None = None,
|
||||
iterations: int = 100,
|
||||
verbose: bool = False,
|
||||
) -> BenchmarkReport:
|
||||
"""Run all benchmarks and return report."""
|
||||
from datetime import datetime
|
||||
|
||||
if displays is None:
|
||||
displays = get_available_displays()
|
||||
|
||||
if effects is None:
|
||||
effects = get_available_effects()
|
||||
|
||||
buffer = get_sample_buffer(80, 24)
|
||||
results = []
|
||||
|
||||
if verbose:
|
||||
print(f"Running benchmarks ({iterations} iterations each)...")
|
||||
|
||||
pygame_display = None
|
||||
for name, display_class in displays:
|
||||
if verbose:
|
||||
print(f"Benchmarking display: {name}")
|
||||
|
||||
result = benchmark_display(display_class, buffer, iterations)
|
||||
if result:
|
||||
results.append(result)
|
||||
if verbose:
|
||||
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
|
||||
|
||||
if name == "pygame":
|
||||
pygame_display = result
|
||||
|
||||
if verbose:
|
||||
print()
|
||||
|
||||
pygame_instance = None
|
||||
if pygame_display:
|
||||
try:
|
||||
from engine.display.backends.pygame import PygameDisplay
|
||||
|
||||
PygameDisplay.reset_state()
|
||||
pygame_instance = PygameDisplay()
|
||||
pygame_instance.init(80, 24, reuse=False)
|
||||
except Exception:
|
||||
pygame_instance = None
|
||||
|
||||
for effect_name, effect_class in effects:
|
||||
for display_name, display_class in displays:
|
||||
if display_name == "websocket":
|
||||
continue
|
||||
|
||||
if display_name == "pygame":
|
||||
if verbose:
|
||||
print(f"Benchmarking effect: {effect_name} with {display_name}")
|
||||
|
||||
if pygame_instance:
|
||||
result = benchmark_effect_with_display(
|
||||
effect_class, pygame_instance, buffer, iterations, reuse=True
|
||||
)
|
||||
if result:
|
||||
results.append(result)
|
||||
if verbose:
|
||||
print(
|
||||
f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg"
|
||||
)
|
||||
continue
|
||||
|
||||
if verbose:
|
||||
print(f"Benchmarking effect: {effect_name} with {display_name}")
|
||||
|
||||
display = display_class()
|
||||
display.init(80, 24)
|
||||
result = benchmark_effect_with_display(
|
||||
effect_class, display, buffer, iterations
|
||||
)
|
||||
if result:
|
||||
results.append(result)
|
||||
if verbose:
|
||||
print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg")
|
||||
|
||||
if pygame_instance:
|
||||
try:
|
||||
pygame_instance.cleanup(quit_pygame=True)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
summary = generate_summary(results)
|
||||
|
||||
return BenchmarkReport(
|
||||
timestamp=datetime.now().isoformat(),
|
||||
python_version=sys.version,
|
||||
results=results,
|
||||
summary=summary,
|
||||
)
|
||||
|
||||
|
||||
def generate_summary(results: list[BenchmarkResult]) -> dict[str, Any]:
|
||||
"""Generate summary statistics from results."""
|
||||
by_display: dict[str, list[BenchmarkResult]] = {}
|
||||
by_effect: dict[str, list[BenchmarkResult]] = {}
|
||||
|
||||
for r in results:
|
||||
if r.display not in by_display:
|
||||
by_display[r.display] = []
|
||||
by_display[r.display].append(r)
|
||||
|
||||
if r.effect:
|
||||
if r.effect not in by_effect:
|
||||
by_effect[r.effect] = []
|
||||
by_effect[r.effect].append(r)
|
||||
|
||||
summary = {
|
||||
"by_display": {},
|
||||
"by_effect": {},
|
||||
"overall": {
|
||||
"total_tests": len(results),
|
||||
"displays_tested": len(by_display),
|
||||
"effects_tested": len(by_effect),
|
||||
},
|
||||
}
|
||||
|
||||
for display, res in by_display.items():
|
||||
fps_values = [r.fps for r in res]
|
||||
summary["by_display"][display] = {
|
||||
"avg_fps": float(np.mean(fps_values)),
|
||||
"min_fps": float(np.min(fps_values)),
|
||||
"max_fps": float(np.max(fps_values)),
|
||||
"tests": len(res),
|
||||
}
|
||||
|
||||
for effect, res in by_effect.items():
|
||||
fps_values = [r.fps for r in res]
|
||||
summary["by_effect"][effect] = {
|
||||
"avg_fps": float(np.mean(fps_values)),
|
||||
"min_fps": float(np.min(fps_values)),
|
||||
"max_fps": float(np.max(fps_values)),
|
||||
"tests": len(res),
|
||||
}
|
||||
|
||||
return summary
|
||||
|
||||
|
||||
DEFAULT_CACHE_PATH = Path.home() / ".mainline_benchmark_cache.json"
|
||||
|
||||
|
||||
def load_baseline(cache_path: Path | None = None) -> dict[str, Any] | None:
|
||||
"""Load baseline benchmark results from cache."""
|
||||
path = cache_path or DEFAULT_CACHE_PATH
|
||||
if not path.exists():
|
||||
return None
|
||||
try:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def save_baseline(
|
||||
results: list[BenchmarkResult],
|
||||
cache_path: Path | None = None,
|
||||
) -> None:
|
||||
"""Save benchmark results as baseline to cache."""
|
||||
path = cache_path or DEFAULT_CACHE_PATH
|
||||
baseline = {
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"results": {
|
||||
r.name: {
|
||||
"fps": r.fps,
|
||||
"avg_time_ms": r.avg_time_ms,
|
||||
"chars_per_sec": r.chars_per_sec,
|
||||
}
|
||||
for r in results
|
||||
},
|
||||
}
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(path, "w") as f:
|
||||
json.dump(baseline, f, indent=2)
|
||||
|
||||
|
||||
def compare_with_baseline(
|
||||
results: list[BenchmarkResult],
|
||||
baseline: dict[str, Any],
|
||||
threshold: float = 0.2,
|
||||
verbose: bool = True,
|
||||
) -> tuple[bool, list[str]]:
|
||||
"""Compare current results with baseline. Returns (pass, messages)."""
|
||||
baseline_results = baseline.get("results", {})
|
||||
failures = []
|
||||
warnings = []
|
||||
|
||||
for r in results:
|
||||
if r.name not in baseline_results:
|
||||
warnings.append(f"New test: {r.name} (no baseline)")
|
||||
continue
|
||||
|
||||
b = baseline_results[r.name]
|
||||
if b["fps"] == 0:
|
||||
continue
|
||||
|
||||
degradation = (b["fps"] - r.fps) / b["fps"]
|
||||
if degradation > threshold:
|
||||
failures.append(
|
||||
f"{r.name}: FPS degraded {degradation * 100:.1f}% "
|
||||
f"(baseline: {b['fps']:.1f}, current: {r.fps:.1f})"
|
||||
)
|
||||
elif verbose:
|
||||
print(f" {r.name}: {r.fps:.1f} FPS (baseline: {b['fps']:.1f})")
|
||||
|
||||
passed = len(failures) == 0
|
||||
messages = []
|
||||
if failures:
|
||||
messages.extend(failures)
|
||||
if warnings:
|
||||
messages.extend(warnings)
|
||||
|
||||
return passed, messages
|
||||
|
||||
|
||||
def run_hook_mode(
|
||||
displays: list[tuple[str, Any]] | None = None,
|
||||
effects: list[tuple[str, Any]] | None = None,
|
||||
iterations: int = 20,
|
||||
threshold: float = 0.2,
|
||||
cache_path: Path | None = None,
|
||||
verbose: bool = False,
|
||||
) -> int:
|
||||
"""Run in hook mode: compare against baseline, exit 0 on pass, 1 on fail."""
|
||||
baseline = load_baseline(cache_path)
|
||||
|
||||
if baseline is None:
|
||||
print("No baseline found. Run with --baseline to create one.")
|
||||
return 1
|
||||
|
||||
report = run_benchmarks(displays, effects, iterations, verbose)
|
||||
|
||||
passed, messages = compare_with_baseline(
|
||||
report.results, baseline, threshold, verbose
|
||||
)
|
||||
|
||||
print("\n=== Benchmark Hook Results ===")
|
||||
if passed:
|
||||
print("PASSED - No significant performance degradation")
|
||||
return 0
|
||||
else:
|
||||
print("FAILED - Performance degradation detected:")
|
||||
for msg in messages:
|
||||
print(f" - {msg}")
|
||||
return 1
|
||||
|
||||
|
||||
def format_report_text(report: BenchmarkReport) -> str:
|
||||
"""Format report as human-readable text."""
|
||||
lines = [
|
||||
"# Mainline Performance Benchmark Report",
|
||||
"",
|
||||
f"Generated: {report.timestamp}",
|
||||
f"Python: {report.python_version}",
|
||||
"",
|
||||
"## Summary",
|
||||
"",
|
||||
f"Total tests: {report.summary['overall']['total_tests']}",
|
||||
f"Displays tested: {report.summary['overall']['displays_tested']}",
|
||||
f"Effects tested: {report.summary['overall']['effects_tested']}",
|
||||
"",
|
||||
"## By Display",
|
||||
"",
|
||||
]
|
||||
|
||||
for display, stats in report.summary["by_display"].items():
|
||||
lines.append(f"### {display}")
|
||||
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
|
||||
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
|
||||
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
|
||||
lines.append(f"- Tests: {stats['tests']}")
|
||||
lines.append("")
|
||||
|
||||
if report.summary["by_effect"]:
|
||||
lines.append("## By Effect")
|
||||
lines.append("")
|
||||
|
||||
for effect, stats in report.summary["by_effect"].items():
|
||||
lines.append(f"### {effect}")
|
||||
lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}")
|
||||
lines.append(f"- Min FPS: {stats['min_fps']:.1f}")
|
||||
lines.append(f"- Max FPS: {stats['max_fps']:.1f}")
|
||||
lines.append(f"- Tests: {stats['tests']}")
|
||||
lines.append("")
|
||||
|
||||
lines.append("## Detailed Results")
|
||||
lines.append("")
|
||||
lines.append("| Display | Effect | FPS | Avg ms | StdDev ms | Min ms | Max ms |")
|
||||
lines.append("|---------|--------|-----|--------|-----------|--------|--------|")
|
||||
|
||||
for r in report.results:
|
||||
effect_col = r.effect if r.effect else "-"
|
||||
lines.append(
|
||||
f"| {r.display} | {effect_col} | {r.fps:.1f} | {r.avg_time_ms:.2f} | "
|
||||
f"{r.std_dev_ms:.2f} | {r.min_ms:.2f} | {r.max_ms:.2f} |"
|
||||
)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_report_json(report: BenchmarkReport) -> str:
|
||||
"""Format report as JSON."""
|
||||
data = {
|
||||
"timestamp": report.timestamp,
|
||||
"python_version": report.python_version,
|
||||
"summary": report.summary,
|
||||
"results": [
|
||||
{
|
||||
"name": r.name,
|
||||
"display": r.display,
|
||||
"effect": r.effect,
|
||||
"iterations": r.iterations,
|
||||
"total_time_ms": r.total_time_ms,
|
||||
"avg_time_ms": r.avg_time_ms,
|
||||
"std_dev_ms": r.std_dev_ms,
|
||||
"min_ms": r.min_ms,
|
||||
"max_ms": r.max_ms,
|
||||
"fps": r.fps,
|
||||
"chars_processed": r.chars_processed,
|
||||
"chars_per_sec": r.chars_per_sec,
|
||||
}
|
||||
for r in report.results
|
||||
],
|
||||
}
|
||||
return json.dumps(data, indent=2)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="Run mainline benchmarks")
|
||||
parser.add_argument(
|
||||
"--displays",
|
||||
help="Comma-separated list of displays to test (default: all)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--effects",
|
||||
help="Comma-separated list of effects to test (default: all)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--iterations",
|
||||
type=int,
|
||||
default=100,
|
||||
help="Number of iterations per test (default: 100)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--output",
|
||||
help="Output file path (default: stdout)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--format",
|
||||
choices=["text", "json"],
|
||||
default="text",
|
||||
help="Output format (default: text)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--verbose",
|
||||
"-v",
|
||||
action="store_true",
|
||||
help="Show progress during benchmarking",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--hook",
|
||||
action="store_true",
|
||||
help="Run in hook mode: compare against baseline, exit 0 pass, 1 fail",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--baseline",
|
||||
action="store_true",
|
||||
help="Save current results as baseline for future hook comparisons",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--threshold",
|
||||
type=float,
|
||||
default=0.2,
|
||||
help="Performance degradation threshold for hook mode (default: 0.2 = 20%%)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--cache",
|
||||
type=str,
|
||||
default=None,
|
||||
help="Path to baseline cache file (default: ~/.mainline_benchmark_cache.json)",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
cache_path = Path(args.cache) if args.cache else DEFAULT_CACHE_PATH
|
||||
|
||||
if args.hook:
|
||||
displays = None
|
||||
if args.displays:
|
||||
display_map = dict(get_available_displays())
|
||||
displays = [
|
||||
(name, display_map[name])
|
||||
for name in args.displays.split(",")
|
||||
if name in display_map
|
||||
]
|
||||
|
||||
effects = None
|
||||
if args.effects:
|
||||
effect_map = dict(get_available_effects())
|
||||
effects = [
|
||||
(name, effect_map[name])
|
||||
for name in args.effects.split(",")
|
||||
if name in effect_map
|
||||
]
|
||||
|
||||
return run_hook_mode(
|
||||
displays,
|
||||
effects,
|
||||
iterations=args.iterations,
|
||||
threshold=args.threshold,
|
||||
cache_path=cache_path,
|
||||
verbose=args.verbose,
|
||||
)
|
||||
|
||||
displays = None
|
||||
if args.displays:
|
||||
display_map = dict(get_available_displays())
|
||||
displays = [
|
||||
(name, display_map[name])
|
||||
for name in args.displays.split(",")
|
||||
if name in display_map
|
||||
]
|
||||
|
||||
effects = None
|
||||
if args.effects:
|
||||
effect_map = dict(get_available_effects())
|
||||
effects = [
|
||||
(name, effect_map[name])
|
||||
for name in args.effects.split(",")
|
||||
if name in effect_map
|
||||
]
|
||||
|
||||
report = run_benchmarks(displays, effects, args.iterations, args.verbose)
|
||||
|
||||
if args.baseline:
|
||||
save_baseline(report.results, cache_path)
|
||||
print(f"Baseline saved to {cache_path}")
|
||||
return 0
|
||||
|
||||
if args.format == "json":
|
||||
output = format_report_json(report)
|
||||
else:
|
||||
output = format_report_text(report)
|
||||
|
||||
if args.output:
|
||||
with open(args.output, "w") as f:
|
||||
f.write(output)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -21,6 +21,7 @@ class CameraMode(Enum):
|
||||
HORIZONTAL = auto()
|
||||
OMNI = auto()
|
||||
FLOATING = auto()
|
||||
BOUNCE = auto()
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -135,8 +136,12 @@ class Camera:
|
||||
self._update_omni(dt)
|
||||
elif self.mode == CameraMode.FLOATING:
|
||||
self._update_floating(dt)
|
||||
elif self.mode == CameraMode.BOUNCE:
|
||||
self._update_bounce(dt)
|
||||
|
||||
self._clamp_to_bounds()
|
||||
# Bounce mode handles its own bounds checking
|
||||
if self.mode != CameraMode.BOUNCE:
|
||||
self._clamp_to_bounds()
|
||||
|
||||
def _clamp_to_bounds(self) -> None:
|
||||
"""Clamp camera position to stay within canvas bounds.
|
||||
@@ -170,6 +175,43 @@ class Camera:
|
||||
self.y = int(math.sin(self._time * 2) * base)
|
||||
self.x = int(math.cos(self._time * 1.5) * base * 0.5)
|
||||
|
||||
def _update_bounce(self, dt: float) -> None:
|
||||
"""Bouncing DVD-style camera that bounces off canvas edges."""
|
||||
vw = self.viewport_width
|
||||
vh = self.viewport_height
|
||||
|
||||
# Initialize direction if not set
|
||||
if not hasattr(self, "_bounce_dx"):
|
||||
self._bounce_dx = 1
|
||||
self._bounce_dy = 1
|
||||
|
||||
# Calculate max positions
|
||||
max_x = max(0, self.canvas_width - vw)
|
||||
max_y = max(0, self.canvas_height - vh)
|
||||
|
||||
# Move
|
||||
move_speed = self.speed * dt * 60
|
||||
|
||||
# Bounce off edges - reverse direction when hitting bounds
|
||||
self.x += int(move_speed * self._bounce_dx)
|
||||
self.y += int(move_speed * self._bounce_dy)
|
||||
|
||||
# Bounce horizontally
|
||||
if self.x <= 0:
|
||||
self.x = 0
|
||||
self._bounce_dx = 1
|
||||
elif self.x >= max_x:
|
||||
self.x = max_x
|
||||
self._bounce_dx = -1
|
||||
|
||||
# Bounce vertically
|
||||
if self.y <= 0:
|
||||
self.y = 0
|
||||
self._bounce_dy = 1
|
||||
elif self.y >= max_y:
|
||||
self.y = max_y
|
||||
self._bounce_dy = -1
|
||||
|
||||
def reset(self) -> None:
|
||||
"""Reset camera position."""
|
||||
self.x = 0
|
||||
@@ -212,6 +254,13 @@ class Camera:
|
||||
mode=CameraMode.FLOATING, speed=speed, canvas_width=200, canvas_height=200
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def bounce(cls, speed: float = 1.0) -> "Camera":
|
||||
"""Create a bouncing DVD-style camera that bounces off canvas edges."""
|
||||
return cls(
|
||||
mode=CameraMode.BOUNCE, speed=speed, canvas_width=200, canvas_height=200
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def custom(cls, update_fn: Callable[["Camera", float], None]) -> "Camera":
|
||||
"""Create a camera with custom update function."""
|
||||
|
||||
12
engine/data_sources/__init__.py
Normal file
12
engine/data_sources/__init__.py
Normal file
@@ -0,0 +1,12 @@
|
||||
"""
|
||||
Data source implementations for the pipeline architecture.
|
||||
|
||||
Import directly from submodules:
|
||||
from engine.data_sources.sources import DataSource, SourceItem, HeadlinesDataSource
|
||||
from engine.data_sources.pipeline_introspection import PipelineIntrospectionSource
|
||||
"""
|
||||
|
||||
# Re-export for convenience
|
||||
from engine.data_sources.sources import ImageItem, SourceItem
|
||||
|
||||
__all__ = ["ImageItem", "SourceItem"]
|
||||
@@ -15,7 +15,7 @@ Example:
|
||||
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
from engine.sources_v2 import DataSource, SourceItem
|
||||
from engine.data_sources.sources import DataSource, SourceItem
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from engine.pipeline.controller import Pipeline
|
||||
@@ -37,14 +37,25 @@ class PipelineIntrospectionSource(DataSource):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
pipelines: list["Pipeline"] | None = None,
|
||||
pipeline: "Pipeline | None" = None,
|
||||
viewport_width: int = 100,
|
||||
viewport_height: int = 35,
|
||||
):
|
||||
self._pipelines = pipelines or []
|
||||
self._pipeline = pipeline # May be None initially, set later via set_pipeline()
|
||||
self.viewport_width = viewport_width
|
||||
self.viewport_height = viewport_height
|
||||
self.frame = 0
|
||||
self._ready = False
|
||||
|
||||
def set_pipeline(self, pipeline: "Pipeline") -> None:
|
||||
"""Set the pipeline to introspect (call after pipeline is built)."""
|
||||
self._pipeline = [pipeline] # Wrap in list for iteration
|
||||
self._ready = True
|
||||
|
||||
@property
|
||||
def ready(self) -> bool:
|
||||
"""Check if source is ready to fetch."""
|
||||
return self._ready
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
@@ -68,16 +79,39 @@ class PipelineIntrospectionSource(DataSource):
|
||||
|
||||
def add_pipeline(self, pipeline: "Pipeline") -> None:
|
||||
"""Add a pipeline to visualize."""
|
||||
if pipeline not in self._pipelines:
|
||||
self._pipelines.append(pipeline)
|
||||
if self._pipeline is None:
|
||||
self._pipeline = [pipeline]
|
||||
elif isinstance(self._pipeline, list):
|
||||
self._pipeline.append(pipeline)
|
||||
else:
|
||||
self._pipeline = [self._pipeline, pipeline]
|
||||
self._ready = True
|
||||
|
||||
def remove_pipeline(self, pipeline: "Pipeline") -> None:
|
||||
"""Remove a pipeline from visualization."""
|
||||
if pipeline in self._pipelines:
|
||||
self._pipelines.remove(pipeline)
|
||||
if self._pipeline is None:
|
||||
return
|
||||
elif isinstance(self._pipeline, list):
|
||||
self._pipeline = [p for p in self._pipeline if p is not pipeline]
|
||||
if not self._pipeline:
|
||||
self._pipeline = None
|
||||
self._ready = False
|
||||
elif self._pipeline is pipeline:
|
||||
self._pipeline = None
|
||||
self._ready = False
|
||||
|
||||
def fetch(self) -> list[SourceItem]:
|
||||
"""Fetch the introspection visualization."""
|
||||
if not self._ready:
|
||||
# Return a placeholder until ready
|
||||
return [
|
||||
SourceItem(
|
||||
content="Initializing...",
|
||||
source="pipeline-inspect",
|
||||
timestamp="init",
|
||||
)
|
||||
]
|
||||
|
||||
lines = self._render()
|
||||
self.frame += 1
|
||||
content = "\n".join(lines)
|
||||
@@ -97,27 +131,35 @@ class PipelineIntrospectionSource(DataSource):
|
||||
# Header
|
||||
lines.extend(self._render_header())
|
||||
|
||||
if not self._pipelines:
|
||||
lines.append(" No pipelines to visualize")
|
||||
return lines
|
||||
|
||||
# Render each pipeline's DAG
|
||||
for i, pipeline in enumerate(self._pipelines):
|
||||
if len(self._pipelines) > 1:
|
||||
lines.append(f" Pipeline {i + 1}:")
|
||||
lines.extend(self._render_pipeline(pipeline))
|
||||
# Render pipeline(s) if ready
|
||||
if self._ready and self._pipeline:
|
||||
pipelines = (
|
||||
self._pipeline if isinstance(self._pipeline, list) else [self._pipeline]
|
||||
)
|
||||
for pipeline in pipelines:
|
||||
lines.extend(self._render_pipeline(pipeline))
|
||||
|
||||
# Footer with sparkline
|
||||
lines.extend(self._render_footer())
|
||||
|
||||
return lines
|
||||
|
||||
@property
|
||||
def _pipelines(self) -> list:
|
||||
"""Return pipelines as a list for iteration."""
|
||||
if self._pipeline is None:
|
||||
return []
|
||||
elif isinstance(self._pipeline, list):
|
||||
return self._pipeline
|
||||
else:
|
||||
return [self._pipeline]
|
||||
|
||||
def _render_header(self) -> list[str]:
|
||||
"""Render the header with frame info and metrics summary."""
|
||||
lines: list[str] = []
|
||||
|
||||
if not self._pipelines:
|
||||
return ["┌─ PIPELINE INTROSPECTION ──────────────────────────────┐"]
|
||||
if not self._pipeline:
|
||||
return ["PIPELINE INTROSPECTION"]
|
||||
|
||||
# Get aggregate metrics
|
||||
total_ms = 0.0
|
||||
@@ -128,13 +170,17 @@ class PipelineIntrospectionSource(DataSource):
|
||||
try:
|
||||
metrics = pipeline.get_metrics_summary()
|
||||
if metrics and "error" not in metrics:
|
||||
total_ms = max(total_ms, metrics.get("avg_ms", 0))
|
||||
fps = max(fps, metrics.get("fps", 0))
|
||||
# Get avg_ms from pipeline metrics
|
||||
pipeline_avg = metrics.get("pipeline", {}).get("avg_ms", 0)
|
||||
total_ms = max(total_ms, pipeline_avg)
|
||||
# Calculate FPS from avg_ms
|
||||
if pipeline_avg > 0:
|
||||
fps = max(fps, 1000.0 / pipeline_avg)
|
||||
frame_count = max(frame_count, metrics.get("frame_count", 0))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
header = f"┌─ PIPELINE INTROSPECTION ── frame: {self.frame} ─ avg: {total_ms:.1f}ms ─ fps: {fps:.1f} ─┐"
|
||||
header = f"PIPELINE INTROSPECTION -- frame: {self.frame} -- avg: {total_ms:.1f}ms -- fps: {fps:.1f}"
|
||||
lines.append(header)
|
||||
|
||||
return lines
|
||||
@@ -175,8 +221,8 @@ class PipelineIntrospectionSource(DataSource):
|
||||
total_time = sum(s["ms"] for s in stage_infos) or 1.0
|
||||
|
||||
# Render DAG - group by category
|
||||
lines.append("│")
|
||||
lines.append("│ Signal Flow:")
|
||||
lines.append("")
|
||||
lines.append(" Signal Flow:")
|
||||
|
||||
# Group stages by category for display
|
||||
categories: dict[str, list[dict]] = {}
|
||||
@@ -195,20 +241,20 @@ class PipelineIntrospectionSource(DataSource):
|
||||
|
||||
cat_stages = categories[cat]
|
||||
cat_names = [s["name"] for s in cat_stages]
|
||||
lines.append(f"│ {cat}: {' → '.join(cat_names)}")
|
||||
lines.append(f" {cat}: {' → '.join(cat_names)}")
|
||||
|
||||
# Render timing breakdown
|
||||
lines.append("│")
|
||||
lines.append("│ Stage Timings:")
|
||||
lines.append("")
|
||||
lines.append(" Stage Timings:")
|
||||
|
||||
for info in stage_infos:
|
||||
name = info["name"]
|
||||
ms = info["ms"]
|
||||
pct = (ms / total_time) * 100
|
||||
bar = self._render_bar(pct, 20)
|
||||
lines.append(f"│ {name:12s} {ms:6.2f}ms {bar} {pct:5.1f}%")
|
||||
lines.append(f" {name:12s} {ms:6.2f}ms {bar} {pct:5.1f}%")
|
||||
|
||||
lines.append("│")
|
||||
lines.append("")
|
||||
|
||||
return lines
|
||||
|
||||
@@ -217,9 +263,10 @@ class PipelineIntrospectionSource(DataSource):
|
||||
lines: list[str] = []
|
||||
|
||||
# Get frame history from first pipeline
|
||||
if self._pipelines:
|
||||
pipelines = self._pipelines
|
||||
if pipelines:
|
||||
try:
|
||||
frame_times = self._pipelines[0].get_frame_times()
|
||||
frame_times = pipelines[0].get_frame_times()
|
||||
except Exception:
|
||||
frame_times = []
|
||||
else:
|
||||
@@ -227,21 +274,13 @@ class PipelineIntrospectionSource(DataSource):
|
||||
|
||||
if frame_times:
|
||||
sparkline = self._render_sparkline(frame_times[-60:], 50)
|
||||
lines.append(
|
||||
f"├─ Frame Time History (last {len(frame_times[-60:])} frames) ─────────────────────────────┤"
|
||||
)
|
||||
lines.append(f"│{sparkline}│")
|
||||
lines.append(f" Frame Time History (last {len(frame_times[-60:])} frames)")
|
||||
lines.append(f" {sparkline}")
|
||||
else:
|
||||
lines.append(
|
||||
"├─ Frame Time History ─────────────────────────────────────────┤"
|
||||
)
|
||||
lines.append(
|
||||
"│ (collecting data...) │"
|
||||
)
|
||||
lines.append(" Frame Time History")
|
||||
lines.append(" (collecting data...)")
|
||||
|
||||
lines.append(
|
||||
"└────────────────────────────────────────────────────────────────┘"
|
||||
)
|
||||
lines.append("")
|
||||
|
||||
return lines
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
"""
|
||||
Data source abstraction - Treat data sources as first-class citizens in the pipeline.
|
||||
Data sources for the pipeline architecture.
|
||||
|
||||
Each data source implements a common interface:
|
||||
- name: Display name for the source
|
||||
- fetch(): Fetch fresh data
|
||||
- stream(): Stream data continuously (optional)
|
||||
- get_items(): Get current items
|
||||
This module contains all DataSource implementations:
|
||||
- DataSource: Abstract base class
|
||||
- SourceItem, ImageItem: Data containers
|
||||
- HeadlinesDataSource, PoetryDataSource, ImageDataSource: Concrete sources
|
||||
- SourceRegistry: Registry for source discovery
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
@@ -24,6 +24,17 @@ class SourceItem:
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ImageItem:
|
||||
"""An image item from a data source - wraps a PIL Image."""
|
||||
|
||||
image: Any # PIL Image
|
||||
source: str
|
||||
timestamp: str
|
||||
path: str | None = None # File path or URL if applicable
|
||||
metadata: dict[str, Any] | None = None
|
||||
|
||||
|
||||
class DataSource(ABC):
|
||||
"""Abstract base class for data sources.
|
||||
|
||||
@@ -80,6 +91,31 @@ class HeadlinesDataSource(DataSource):
|
||||
return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items]
|
||||
|
||||
|
||||
class EmptyDataSource(DataSource):
|
||||
"""Empty data source that produces blank lines for testing.
|
||||
|
||||
Useful for testing display borders, effects, and other pipeline
|
||||
components without needing actual content.
|
||||
"""
|
||||
|
||||
def __init__(self, width: int = 80, height: int = 24):
|
||||
self.width = width
|
||||
self.height = height
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "empty"
|
||||
|
||||
@property
|
||||
def is_dynamic(self) -> bool:
|
||||
return False
|
||||
|
||||
def fetch(self) -> list[SourceItem]:
|
||||
# Return empty lines as content
|
||||
content = "\n".join([" " * self.width for _ in range(self.height)])
|
||||
return [SourceItem(content=content, source="empty", timestamp="0")]
|
||||
|
||||
|
||||
class PoetryDataSource(DataSource):
|
||||
"""Data source for Poetry DB."""
|
||||
|
||||
@@ -94,6 +130,94 @@ class PoetryDataSource(DataSource):
|
||||
return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items]
|
||||
|
||||
|
||||
class ImageDataSource(DataSource):
|
||||
"""Data source that loads PNG images from file paths or URLs.
|
||||
|
||||
Supports:
|
||||
- Local file paths (e.g., /path/to/image.png)
|
||||
- URLs (e.g., https://example.com/image.png)
|
||||
|
||||
Yields ImageItem objects containing PIL Image objects that can be
|
||||
converted to text buffers by an ImageToTextTransform stage.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
path: str | list[str] | None = None,
|
||||
urls: str | list[str] | None = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
path: Single path or list of paths to PNG files
|
||||
urls: Single URL or list of URLs to PNG images
|
||||
"""
|
||||
self._paths = [path] if isinstance(path, str) else (path or [])
|
||||
self._urls = [urls] if isinstance(urls, str) else (urls or [])
|
||||
self._images: list[ImageItem] = []
|
||||
self._load_images()
|
||||
|
||||
def _load_images(self) -> None:
|
||||
"""Load all images from paths and URLs."""
|
||||
from datetime import datetime
|
||||
from io import BytesIO
|
||||
from urllib.request import urlopen
|
||||
|
||||
timestamp = datetime.now().isoformat()
|
||||
|
||||
for path in self._paths:
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
img = Image.open(path)
|
||||
if img.mode != "RGBA":
|
||||
img = img.convert("RGBA")
|
||||
self._images.append(
|
||||
ImageItem(
|
||||
image=img,
|
||||
source=f"file:{path}",
|
||||
timestamp=timestamp,
|
||||
path=path,
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
for url in self._urls:
|
||||
try:
|
||||
from PIL import Image
|
||||
|
||||
with urlopen(url) as response:
|
||||
img = Image.open(BytesIO(response.read()))
|
||||
if img.mode != "RGBA":
|
||||
img = img.convert("RGBA")
|
||||
self._images.append(
|
||||
ImageItem(
|
||||
image=img,
|
||||
source=f"url:{url}",
|
||||
timestamp=timestamp,
|
||||
path=url,
|
||||
)
|
||||
)
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
return "image"
|
||||
|
||||
@property
|
||||
def is_dynamic(self) -> bool:
|
||||
return False # Static images, not updating
|
||||
|
||||
def fetch(self) -> list[ImageItem]:
|
||||
"""Return loaded images as ImageItem list."""
|
||||
return self._images
|
||||
|
||||
def get_items(self) -> list[ImageItem]:
|
||||
"""Return current image items."""
|
||||
return self._images
|
||||
|
||||
|
||||
class MetricsDataSource(DataSource):
|
||||
"""Data source that renders live pipeline metrics as ASCII art.
|
||||
|
||||
@@ -55,8 +55,13 @@ class Display(Protocol):
|
||||
"""
|
||||
...
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
"""Show buffer on display."""
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
"""Show buffer on display.
|
||||
|
||||
Args:
|
||||
buffer: Buffer to display
|
||||
border: If True, render border around buffer (default False)
|
||||
"""
|
||||
...
|
||||
|
||||
def clear(self) -> None:
|
||||
@@ -136,10 +141,90 @@ def get_monitor():
|
||||
return None
|
||||
|
||||
|
||||
def _strip_ansi(s: str) -> str:
|
||||
"""Strip ANSI escape sequences from string for length calculation."""
|
||||
import re
|
||||
|
||||
return re.sub(r"\x1b\[[0-9;]*[a-zA-Z]", "", s)
|
||||
|
||||
|
||||
def render_border(
|
||||
buf: list[str], width: int, height: int, fps: float = 0.0, frame_time: float = 0.0
|
||||
) -> list[str]:
|
||||
"""Render a border around the buffer.
|
||||
|
||||
Args:
|
||||
buf: Input buffer (list of strings)
|
||||
width: Display width in characters
|
||||
height: Display height in rows
|
||||
fps: Current FPS to display in top border (optional)
|
||||
frame_time: Frame time in ms to display in bottom border (optional)
|
||||
|
||||
Returns:
|
||||
Buffer with border applied
|
||||
"""
|
||||
if not buf or width < 3 or height < 3:
|
||||
return buf
|
||||
|
||||
inner_w = width - 2
|
||||
inner_h = height - 2
|
||||
|
||||
# Crop buffer to fit inside border
|
||||
cropped = []
|
||||
for i in range(min(inner_h, len(buf))):
|
||||
line = buf[i]
|
||||
# Calculate visible width (excluding ANSI codes)
|
||||
visible_len = len(_strip_ansi(line))
|
||||
if visible_len > inner_w:
|
||||
# Truncate carefully - this is approximate for ANSI text
|
||||
cropped.append(line[:inner_w])
|
||||
else:
|
||||
cropped.append(line + " " * (inner_w - visible_len))
|
||||
|
||||
# Pad with empty lines if needed
|
||||
while len(cropped) < inner_h:
|
||||
cropped.append(" " * inner_w)
|
||||
|
||||
# Build borders
|
||||
if fps > 0:
|
||||
fps_str = f" FPS:{fps:.0f}"
|
||||
if len(fps_str) < inner_w:
|
||||
right_len = inner_w - len(fps_str)
|
||||
top_border = "┌" + "─" * right_len + fps_str + "┐"
|
||||
else:
|
||||
top_border = "┌" + "─" * inner_w + "┐"
|
||||
else:
|
||||
top_border = "┌" + "─" * inner_w + "┐"
|
||||
|
||||
if frame_time > 0:
|
||||
ft_str = f" {frame_time:.1f}ms"
|
||||
if len(ft_str) < inner_w:
|
||||
right_len = inner_w - len(ft_str)
|
||||
bottom_border = "└" + "─" * right_len + ft_str + "┘"
|
||||
else:
|
||||
bottom_border = "└" + "─" * inner_w + "┘"
|
||||
else:
|
||||
bottom_border = "└" + "─" * inner_w + "┘"
|
||||
|
||||
# Build result with left/right borders
|
||||
result = [top_border]
|
||||
for line in cropped:
|
||||
# Ensure exactly inner_w characters before adding right border
|
||||
if len(line) < inner_w:
|
||||
line = line + " " * (inner_w - len(line))
|
||||
elif len(line) > inner_w:
|
||||
line = line[:inner_w]
|
||||
result.append("│" + line + "│")
|
||||
result.append(bottom_border)
|
||||
|
||||
return result
|
||||
|
||||
|
||||
__all__ = [
|
||||
"Display",
|
||||
"DisplayRegistry",
|
||||
"get_monitor",
|
||||
"render_border",
|
||||
"TerminalDisplay",
|
||||
"NullDisplay",
|
||||
"WebSocketDisplay",
|
||||
|
||||
@@ -68,11 +68,31 @@ class KittyDisplay:
|
||||
|
||||
return self._font_path
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
import sys
|
||||
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get metrics for border display
|
||||
fps = 0.0
|
||||
frame_time = 0.0
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
stats = monitor.get_stats()
|
||||
avg_ms = stats.get("avg_ms", 0) if stats else 0
|
||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
||||
if avg_ms and frame_count > 0:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Apply border if requested
|
||||
if border:
|
||||
from engine.display import render_border
|
||||
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
img_width = self.width * self.cell_width
|
||||
img_height = self.height * self.cell_height
|
||||
|
||||
@@ -150,3 +170,11 @@ class KittyDisplay:
|
||||
|
||||
def cleanup(self) -> None:
|
||||
self.clear()
|
||||
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get current dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
"""
|
||||
return (self.width, self.height)
|
||||
|
||||
@@ -26,7 +26,7 @@ class NullDisplay:
|
||||
self.width = width
|
||||
self.height = height
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
@@ -41,3 +41,11 @@ class NullDisplay:
|
||||
|
||||
def cleanup(self) -> None:
|
||||
pass
|
||||
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get current dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
"""
|
||||
return (self.width, self.height)
|
||||
|
||||
@@ -17,7 +17,6 @@ class PygameDisplay:
|
||||
width: int = 80
|
||||
window_width: int = 800
|
||||
window_height: int = 600
|
||||
_pygame_initialized: bool = False
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -25,6 +24,7 @@ class PygameDisplay:
|
||||
cell_height: int = 18,
|
||||
window_width: int = 800,
|
||||
window_height: int = 600,
|
||||
target_fps: float = 30.0,
|
||||
):
|
||||
self.width = 80
|
||||
self.height = 24
|
||||
@@ -32,12 +32,15 @@ class PygameDisplay:
|
||||
self.cell_height = cell_height
|
||||
self.window_width = window_width
|
||||
self.window_height = window_height
|
||||
self.target_fps = target_fps
|
||||
self._initialized = False
|
||||
self._pygame = None
|
||||
self._screen = None
|
||||
self._font = None
|
||||
self._resized = False
|
||||
self._quit_requested = False
|
||||
self._last_frame_time = 0.0
|
||||
self._frame_period = 1.0 / target_fps if target_fps > 0 else 0
|
||||
|
||||
def _get_font_path(self) -> str | None:
|
||||
"""Get font path for rendering."""
|
||||
@@ -130,7 +133,7 @@ class PygameDisplay:
|
||||
|
||||
self._initialized = True
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
if not self._initialized or not self._pygame:
|
||||
return
|
||||
|
||||
@@ -154,6 +157,34 @@ class PygameDisplay:
|
||||
self.height = max(1, self.window_height // self.cell_height)
|
||||
self._resized = True
|
||||
|
||||
# FPS limiting - skip frame if we're going too fast
|
||||
if self._frame_period > 0:
|
||||
now = time.perf_counter()
|
||||
elapsed = now - self._last_frame_time
|
||||
if elapsed < self._frame_period:
|
||||
return # Skip this frame
|
||||
self._last_frame_time = now
|
||||
|
||||
# Get metrics for border display
|
||||
fps = 0.0
|
||||
frame_time = 0.0
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
stats = monitor.get_stats()
|
||||
avg_ms = stats.get("avg_ms", 0) if stats else 0
|
||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
||||
if avg_ms and frame_count > 0:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Apply border if requested
|
||||
if border:
|
||||
from engine.display import render_border
|
||||
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
self._screen.fill((0, 0, 0))
|
||||
|
||||
for row_idx, line in enumerate(buffer[: self.height]):
|
||||
@@ -180,9 +211,6 @@ class PygameDisplay:
|
||||
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
chars_in = sum(len(line) for line in buffer)
|
||||
monitor.record_effect("pygame_display", elapsed_ms, chars_in, chars_in)
|
||||
@@ -198,8 +226,17 @@ class PygameDisplay:
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
"""
|
||||
if self._resized:
|
||||
self._resized = False
|
||||
# Query actual window size and recalculate character cells
|
||||
if self._screen and self._pygame:
|
||||
try:
|
||||
w, h = self._screen.get_size()
|
||||
if w != self.window_width or h != self.window_height:
|
||||
self.window_width = w
|
||||
self.window_height = h
|
||||
self.width = max(1, w // self.cell_width)
|
||||
self.height = max(1, h // self.cell_height)
|
||||
except Exception:
|
||||
pass
|
||||
return self.width, self.height
|
||||
|
||||
def cleanup(self, quit_pygame: bool = True) -> None:
|
||||
|
||||
@@ -122,11 +122,31 @@ class SixelDisplay:
|
||||
self.height = height
|
||||
self._initialized = True
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
import sys
|
||||
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get metrics for border display
|
||||
fps = 0.0
|
||||
frame_time = 0.0
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
stats = monitor.get_stats()
|
||||
avg_ms = stats.get("avg_ms", 0) if stats else 0
|
||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
||||
if avg_ms and frame_count > 0:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Apply border if requested
|
||||
if border:
|
||||
from engine.display import render_border
|
||||
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
img_width = self.width * self.cell_width
|
||||
img_height = self.height * self.cell_height
|
||||
|
||||
@@ -198,3 +218,11 @@ class SixelDisplay:
|
||||
|
||||
def cleanup(self) -> None:
|
||||
pass
|
||||
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get current dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
"""
|
||||
return (self.width, self.height)
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
ANSI terminal display backend.
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
|
||||
|
||||
@@ -10,40 +11,106 @@ class TerminalDisplay:
|
||||
|
||||
Renders buffer to stdout using ANSI escape codes.
|
||||
Supports reuse - when reuse=True, skips re-initializing terminal state.
|
||||
Auto-detects terminal dimensions on init.
|
||||
"""
|
||||
|
||||
width: int = 80
|
||||
height: int = 24
|
||||
_initialized: bool = False
|
||||
|
||||
def __init__(self, target_fps: float = 30.0):
|
||||
self.target_fps = target_fps
|
||||
self._frame_period = 1.0 / target_fps if target_fps > 0 else 0
|
||||
self._last_frame_time = 0.0
|
||||
|
||||
def init(self, width: int, height: int, reuse: bool = False) -> None:
|
||||
"""Initialize display with dimensions.
|
||||
|
||||
If width/height are not provided (0/None), auto-detects terminal size.
|
||||
Otherwise uses provided dimensions or falls back to terminal size
|
||||
if the provided dimensions exceed terminal capacity.
|
||||
|
||||
Args:
|
||||
width: Terminal width in characters
|
||||
height: Terminal height in rows
|
||||
width: Desired terminal width (0 = auto-detect)
|
||||
height: Desired terminal height (0 = auto-detect)
|
||||
reuse: If True, skip terminal re-initialization
|
||||
"""
|
||||
from engine.terminal import CURSOR_OFF
|
||||
|
||||
self.width = width
|
||||
self.height = height
|
||||
# Auto-detect terminal size (handle case where no terminal)
|
||||
try:
|
||||
term_size = os.get_terminal_size()
|
||||
term_width = term_size.columns
|
||||
term_height = term_size.lines
|
||||
except OSError:
|
||||
# No terminal available (e.g., in tests)
|
||||
term_width = width if width > 0 else 80
|
||||
term_height = height if height > 0 else 24
|
||||
|
||||
# Use provided dimensions if valid, otherwise use terminal size
|
||||
if width > 0 and height > 0:
|
||||
self.width = min(width, term_width)
|
||||
self.height = min(height, term_height)
|
||||
else:
|
||||
self.width = term_width
|
||||
self.height = term_height
|
||||
|
||||
if not reuse or not self._initialized:
|
||||
print(CURSOR_OFF, end="", flush=True)
|
||||
self._initialized = True
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get current terminal dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
"""
|
||||
try:
|
||||
term_size = os.get_terminal_size()
|
||||
return (term_size.columns, term_size.lines)
|
||||
except OSError:
|
||||
return (self.width, self.height)
|
||||
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
import sys
|
||||
|
||||
from engine.display import get_monitor, render_border
|
||||
|
||||
t0 = time.perf_counter()
|
||||
sys.stdout.buffer.write("".join(buffer).encode())
|
||||
|
||||
# FPS limiting - skip frame if we're going too fast
|
||||
if self._frame_period > 0:
|
||||
now = time.perf_counter()
|
||||
elapsed = now - self._last_frame_time
|
||||
if elapsed < self._frame_period:
|
||||
# Skip this frame - too soon
|
||||
return
|
||||
self._last_frame_time = now
|
||||
|
||||
# Get metrics for border display
|
||||
fps = 0.0
|
||||
frame_time = 0.0
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
stats = monitor.get_stats()
|
||||
avg_ms = stats.get("avg_ms", 0) if stats else 0
|
||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
||||
if avg_ms and frame_count > 0:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Apply border if requested
|
||||
if border:
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
# Clear screen and home cursor before each frame
|
||||
from engine.terminal import CLR
|
||||
|
||||
output = CLR + "".join(buffer)
|
||||
sys.stdout.buffer.write(output.encode())
|
||||
sys.stdout.flush()
|
||||
elapsed_ms = (time.perf_counter() - t0) * 1000
|
||||
|
||||
from engine.display import get_monitor
|
||||
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
chars_in = sum(len(line) for line in buffer)
|
||||
monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in)
|
||||
|
||||
@@ -101,10 +101,28 @@ class WebSocketDisplay:
|
||||
self.start_server()
|
||||
self.start_http_server()
|
||||
|
||||
def show(self, buffer: list[str]) -> None:
|
||||
def show(self, buffer: list[str], border: bool = False) -> None:
|
||||
"""Broadcast buffer to all connected clients."""
|
||||
t0 = time.perf_counter()
|
||||
|
||||
# Get metrics for border display
|
||||
fps = 0.0
|
||||
frame_time = 0.0
|
||||
monitor = get_monitor()
|
||||
if monitor:
|
||||
stats = monitor.get_stats()
|
||||
avg_ms = stats.get("avg_ms", 0) if stats else 0
|
||||
frame_count = stats.get("frame_count", 0) if stats else 0
|
||||
if avg_ms and frame_count > 0:
|
||||
fps = 1000.0 / avg_ms
|
||||
frame_time = avg_ms
|
||||
|
||||
# Apply border if requested
|
||||
if border:
|
||||
from engine.display import render_border
|
||||
|
||||
buffer = render_border(buffer, self.width, self.height, fps, frame_time)
|
||||
|
||||
if self._clients:
|
||||
frame_data = {
|
||||
"type": "frame",
|
||||
@@ -272,3 +290,11 @@ class WebSocketDisplay:
|
||||
def set_client_disconnected_callback(self, callback) -> None:
|
||||
"""Set callback for client disconnections."""
|
||||
self._client_disconnected_callback = callback
|
||||
|
||||
def get_dimensions(self) -> tuple[int, int]:
|
||||
"""Get current dimensions.
|
||||
|
||||
Returns:
|
||||
(width, height) in character cells
|
||||
"""
|
||||
return (self.width, self.height)
|
||||
|
||||
@@ -233,7 +233,7 @@ class PipelineIntrospector:
|
||||
|
||||
def introspect_sources_v2(self) -> None:
|
||||
"""Introspect data sources v2 (new abstraction)."""
|
||||
from engine.sources_v2 import SourceRegistry, init_default_sources
|
||||
from engine.data_sources.sources import SourceRegistry, init_default_sources
|
||||
|
||||
init_default_sources()
|
||||
SourceRegistry()
|
||||
@@ -241,7 +241,7 @@ class PipelineIntrospector:
|
||||
self.add_node(
|
||||
PipelineNode(
|
||||
name="SourceRegistry",
|
||||
module="engine.sources_v2",
|
||||
module="engine.data_sources.sources",
|
||||
class_name="SourceRegistry",
|
||||
description="Source discovery and management",
|
||||
)
|
||||
|
||||
@@ -264,6 +264,92 @@ class DataSourceStage(Stage):
|
||||
return data
|
||||
|
||||
|
||||
class PassthroughStage(Stage):
|
||||
"""Simple stage that passes data through unchanged.
|
||||
|
||||
Used for sources that already provide the data in the correct format
|
||||
(e.g., pipeline introspection that outputs text directly).
|
||||
"""
|
||||
|
||||
def __init__(self, name: str = "passthrough"):
|
||||
self.name = name
|
||||
self.category = "render"
|
||||
self.optional = True
|
||||
|
||||
@property
|
||||
def stage_type(self) -> str:
|
||||
return "render"
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set[str]:
|
||||
return {"render.output"}
|
||||
|
||||
@property
|
||||
def dependencies(self) -> set[str]:
|
||||
return {"source"}
|
||||
|
||||
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
||||
"""Pass data through unchanged."""
|
||||
return data
|
||||
|
||||
|
||||
class SourceItemsToBufferStage(Stage):
|
||||
"""Convert SourceItem objects to text buffer.
|
||||
|
||||
Takes a list of SourceItem objects and extracts their content,
|
||||
splitting on newlines to create a proper text buffer for display.
|
||||
"""
|
||||
|
||||
def __init__(self, name: str = "items-to-buffer"):
|
||||
self.name = name
|
||||
self.category = "render"
|
||||
self.optional = True
|
||||
|
||||
@property
|
||||
def stage_type(self) -> str:
|
||||
return "render"
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set[str]:
|
||||
return {"render.output"}
|
||||
|
||||
@property
|
||||
def dependencies(self) -> set[str]:
|
||||
return {"source"}
|
||||
|
||||
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
||||
"""Convert SourceItem list to text buffer."""
|
||||
if data is None:
|
||||
return []
|
||||
|
||||
# If already a list of strings, return as-is
|
||||
if isinstance(data, list) and data and isinstance(data[0], str):
|
||||
return data
|
||||
|
||||
# If it's a list of SourceItem, extract content
|
||||
from engine.data_sources import SourceItem
|
||||
|
||||
if isinstance(data, list):
|
||||
result = []
|
||||
for item in data:
|
||||
if isinstance(item, SourceItem):
|
||||
# Split content by newline to get individual lines
|
||||
lines = item.content.split("\n")
|
||||
result.extend(lines)
|
||||
elif hasattr(item, "content"): # Has content attribute
|
||||
lines = str(item.content).split("\n")
|
||||
result.extend(lines)
|
||||
else:
|
||||
result.append(str(item))
|
||||
return result
|
||||
|
||||
# Single item
|
||||
if isinstance(data, SourceItem):
|
||||
return data.content.split("\n")
|
||||
|
||||
return [str(data)]
|
||||
|
||||
|
||||
class ItemsStage(Stage):
|
||||
"""Stage that holds pre-fetched items and provides them to the pipeline.
|
||||
|
||||
@@ -430,6 +516,113 @@ class FontStage(Stage):
|
||||
return data
|
||||
|
||||
|
||||
class ImageToTextStage(Stage):
|
||||
"""Transform that converts PIL Image to ASCII text buffer.
|
||||
|
||||
Takes an ImageItem or PIL Image and converts it to a text buffer
|
||||
using ASCII character density mapping. The output can be displayed
|
||||
directly or further processed by effects.
|
||||
|
||||
Attributes:
|
||||
width: Output width in characters
|
||||
height: Output height in characters
|
||||
charset: Character set for density mapping (default: simple ASCII)
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
width: int = 80,
|
||||
height: int = 24,
|
||||
charset: str = " .:-=+*#%@",
|
||||
name: str = "image-to-text",
|
||||
):
|
||||
self.name = name
|
||||
self.category = "transform"
|
||||
self.optional = False
|
||||
self.width = width
|
||||
self.height = height
|
||||
self.charset = charset
|
||||
|
||||
@property
|
||||
def stage_type(self) -> str:
|
||||
return "transform"
|
||||
|
||||
@property
|
||||
def capabilities(self) -> set[str]:
|
||||
from engine.pipeline.core import DataType
|
||||
|
||||
return {f"transform.{self.name}", DataType.TEXT_BUFFER}
|
||||
|
||||
@property
|
||||
def dependencies(self) -> set[str]:
|
||||
return {"source"}
|
||||
|
||||
def process(self, data: Any, ctx: PipelineContext) -> Any:
|
||||
"""Convert PIL Image to text buffer."""
|
||||
if data is None:
|
||||
return None
|
||||
|
||||
from engine.data_sources.sources import ImageItem
|
||||
|
||||
# Extract PIL Image from various input types
|
||||
pil_image = None
|
||||
|
||||
if isinstance(data, ImageItem) or hasattr(data, "image"):
|
||||
pil_image = data.image
|
||||
else:
|
||||
# Assume it's already a PIL Image
|
||||
pil_image = data
|
||||
|
||||
# Check if it's a PIL Image
|
||||
if not hasattr(pil_image, "resize"):
|
||||
# Not a PIL Image, return as-is
|
||||
return data if isinstance(data, list) else [str(data)]
|
||||
|
||||
# Convert to grayscale and resize
|
||||
try:
|
||||
if pil_image.mode != "L":
|
||||
pil_image = pil_image.convert("L")
|
||||
except Exception:
|
||||
return ["[image conversion error]"]
|
||||
|
||||
# Calculate cell aspect ratio correction (characters are taller than wide)
|
||||
aspect_ratio = 0.5
|
||||
target_w = self.width
|
||||
target_h = int(self.height * aspect_ratio)
|
||||
|
||||
# Resize image to target dimensions
|
||||
try:
|
||||
resized = pil_image.resize((target_w, target_h))
|
||||
except Exception:
|
||||
return ["[image resize error]"]
|
||||
|
||||
# Map pixels to characters
|
||||
result = []
|
||||
pixels = list(resized.getdata())
|
||||
|
||||
for row in range(target_h):
|
||||
line = ""
|
||||
for col in range(target_w):
|
||||
idx = row * target_w + col
|
||||
if idx < len(pixels):
|
||||
brightness = pixels[idx]
|
||||
char_idx = int((brightness / 255) * (len(self.charset) - 1))
|
||||
line += self.charset[char_idx]
|
||||
else:
|
||||
line += " "
|
||||
result.append(line)
|
||||
|
||||
# Pad or trim to exact height
|
||||
while len(result) < self.height:
|
||||
result.append(" " * self.width)
|
||||
result = result[: self.height]
|
||||
|
||||
# Pad lines to width
|
||||
result = [line.ljust(self.width) for line in result]
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def create_stage_from_display(display, name: str = "terminal") -> DisplayStage:
|
||||
"""Create a Stage from a Display instance."""
|
||||
return DisplayStage(display, name)
|
||||
|
||||
@@ -519,8 +519,8 @@ def create_pipeline_from_params(params: PipelineParams) -> Pipeline:
|
||||
|
||||
def create_default_pipeline() -> Pipeline:
|
||||
"""Create a default pipeline with all standard components."""
|
||||
from engine.data_sources.sources import HeadlinesDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
from engine.sources_v2 import HeadlinesDataSource
|
||||
|
||||
pipeline = Pipeline()
|
||||
|
||||
|
||||
@@ -29,12 +29,14 @@ class DataType(Enum):
|
||||
ITEM_TUPLES: List[tuple] - (title, source, timestamp) tuples
|
||||
TEXT_BUFFER: List[str] - rendered ANSI buffer for display
|
||||
RAW_TEXT: str - raw text strings
|
||||
PIL_IMAGE: PIL Image object
|
||||
"""
|
||||
|
||||
SOURCE_ITEMS = auto() # List[SourceItem] - from DataSource
|
||||
ITEM_TUPLES = auto() # List[tuple] - (title, source, ts)
|
||||
TEXT_BUFFER = auto() # List[str] - ANSI buffer
|
||||
RAW_TEXT = auto() # str - raw text
|
||||
PIL_IMAGE = auto() # PIL Image object
|
||||
ANY = auto() # Accepts any type
|
||||
NONE = auto() # No data (terminator)
|
||||
|
||||
|
||||
@@ -23,6 +23,7 @@ class PipelineParams:
|
||||
|
||||
# Display config
|
||||
display: str = "terminal"
|
||||
border: bool = False
|
||||
|
||||
# Camera config
|
||||
camera_mode: str = "vertical"
|
||||
|
||||
@@ -47,12 +47,14 @@ class PipelinePreset:
|
||||
display: str = "terminal"
|
||||
camera: str = "vertical"
|
||||
effects: list[str] = field(default_factory=list)
|
||||
border: bool = False
|
||||
|
||||
def to_params(self) -> PipelineParams:
|
||||
"""Convert to PipelineParams."""
|
||||
params = PipelineParams()
|
||||
params.source = self.source
|
||||
params.display = self.display
|
||||
params.border = self.border
|
||||
params.camera_mode = self.camera
|
||||
params.effect_order = self.effects.copy()
|
||||
return params
|
||||
@@ -67,6 +69,7 @@ class PipelinePreset:
|
||||
display=data.get("display", "terminal"),
|
||||
camera=data.get("camera", "vertical"),
|
||||
effects=data.get("effects", []),
|
||||
border=data.get("border", False),
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -87,7 +87,7 @@ def discover_stages() -> None:
|
||||
|
||||
# Import and register all stage implementations
|
||||
try:
|
||||
from engine.sources_v2 import (
|
||||
from engine.data_sources.sources import (
|
||||
HeadlinesDataSource,
|
||||
PoetryDataSource,
|
||||
)
|
||||
@@ -102,7 +102,7 @@ def discover_stages() -> None:
|
||||
|
||||
# Register pipeline introspection source
|
||||
try:
|
||||
from engine.pipeline_sources.pipeline_introspection import (
|
||||
from engine.data_sources.pipeline_introspection import (
|
||||
PipelineIntrospectionSource,
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +0,0 @@
|
||||
"""
|
||||
Data source implementations for the pipeline architecture.
|
||||
"""
|
||||
|
||||
from engine.pipeline_sources.pipeline_introspection import PipelineIntrospectionSource
|
||||
|
||||
__all__ = ["PipelineIntrospectionSource"]
|
||||
25
presets.toml
25
presets.toml
@@ -13,7 +13,7 @@ description = "Demo mode with effect cycling and camera modes"
|
||||
source = "headlines"
|
||||
display = "pygame"
|
||||
camera = "vertical"
|
||||
effects = ["noise", "fade", "glitch", "firehose", "hud"]
|
||||
effects = ["noise", "fade", "glitch", "firehose"]
|
||||
viewport_width = 80
|
||||
viewport_height = 24
|
||||
camera_speed = 1.0
|
||||
@@ -24,18 +24,29 @@ description = "Poetry feed with subtle effects"
|
||||
source = "poetry"
|
||||
display = "pygame"
|
||||
camera = "vertical"
|
||||
effects = ["fade", "hud"]
|
||||
effects = ["fade"]
|
||||
viewport_width = 80
|
||||
viewport_height = 24
|
||||
camera_speed = 0.5
|
||||
|
||||
[presets.border-test]
|
||||
description = "Test border rendering with empty buffer"
|
||||
source = "empty"
|
||||
display = "terminal"
|
||||
camera = "vertical"
|
||||
effects = []
|
||||
viewport_width = 80
|
||||
viewport_height = 24
|
||||
camera_speed = 1.0
|
||||
firehose_enabled = false
|
||||
border = true
|
||||
|
||||
[presets.websocket]
|
||||
description = "WebSocket display mode"
|
||||
source = "headlines"
|
||||
display = "websocket"
|
||||
camera = "vertical"
|
||||
effects = ["noise", "fade", "glitch", "hud"]
|
||||
effects = ["noise", "fade", "glitch"]
|
||||
viewport_width = 80
|
||||
viewport_height = 24
|
||||
camera_speed = 1.0
|
||||
@@ -46,7 +57,7 @@ description = "Sixel graphics display mode"
|
||||
source = "headlines"
|
||||
display = "sixel"
|
||||
camera = "vertical"
|
||||
effects = ["noise", "fade", "glitch", "hud"]
|
||||
effects = ["noise", "fade", "glitch"]
|
||||
viewport_width = 80
|
||||
viewport_height = 24
|
||||
camera_speed = 1.0
|
||||
@@ -57,7 +68,7 @@ description = "High-speed firehose mode"
|
||||
source = "headlines"
|
||||
display = "pygame"
|
||||
camera = "vertical"
|
||||
effects = ["noise", "fade", "glitch", "firehose", "hud"]
|
||||
effects = ["noise", "fade", "glitch", "firehose"]
|
||||
viewport_width = 80
|
||||
viewport_height = 24
|
||||
camera_speed = 2.0
|
||||
@@ -66,9 +77,9 @@ firehose_enabled = true
|
||||
[presets.pipeline-inspect]
|
||||
description = "Live pipeline introspection with DAG and performance metrics"
|
||||
source = "pipeline-inspect"
|
||||
display = "terminal"
|
||||
display = "pygame"
|
||||
camera = "vertical"
|
||||
effects = ["hud"]
|
||||
effects = ["crop"]
|
||||
viewport_width = 100
|
||||
viewport_height = 35
|
||||
camera_speed = 0.3
|
||||
|
||||
112
tests/test_border_effect.py
Normal file
112
tests/test_border_effect.py
Normal file
@@ -0,0 +1,112 @@
|
||||
"""
|
||||
Tests for BorderEffect.
|
||||
"""
|
||||
|
||||
|
||||
from effects_plugins.border import BorderEffect
|
||||
from engine.effects.types import EffectContext
|
||||
|
||||
|
||||
def make_ctx(terminal_width: int = 80, terminal_height: int = 24) -> EffectContext:
|
||||
"""Create a mock EffectContext."""
|
||||
return EffectContext(
|
||||
terminal_width=terminal_width,
|
||||
terminal_height=terminal_height,
|
||||
scroll_cam=0,
|
||||
ticker_height=terminal_height,
|
||||
)
|
||||
|
||||
|
||||
class TestBorderEffect:
|
||||
"""Tests for BorderEffect."""
|
||||
|
||||
def test_basic_init(self):
|
||||
"""BorderEffect initializes with defaults."""
|
||||
effect = BorderEffect()
|
||||
assert effect.name == "border"
|
||||
assert effect.config.enabled is True
|
||||
|
||||
def test_adds_border(self):
|
||||
"""BorderEffect adds border around content."""
|
||||
effect = BorderEffect()
|
||||
buf = [
|
||||
"Hello World",
|
||||
"Test Content",
|
||||
"Third Line",
|
||||
]
|
||||
ctx = make_ctx(terminal_width=20, terminal_height=10)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
# Should have top and bottom borders
|
||||
assert len(result) >= 3
|
||||
# First line should start with border character
|
||||
assert result[0][0] in "┌┎┍"
|
||||
# Last line should end with border character
|
||||
assert result[-1][-1] in "┘┖┚"
|
||||
|
||||
def test_border_with_small_buffer(self):
|
||||
"""BorderEffect handles small buffer (too small for border)."""
|
||||
effect = BorderEffect()
|
||||
buf = ["ab"] # Too small for proper border
|
||||
ctx = make_ctx(terminal_width=10, terminal_height=5)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
# Should still try to add border but result may differ
|
||||
# At minimum should have output
|
||||
assert len(result) >= 1
|
||||
|
||||
def test_metrics_in_border(self):
|
||||
"""BorderEffect includes FPS and frame time in border."""
|
||||
effect = BorderEffect()
|
||||
buf = ["x" * 10] * 5
|
||||
ctx = make_ctx(terminal_width=20, terminal_height=10)
|
||||
|
||||
# Add metrics to context
|
||||
ctx.set_state(
|
||||
"metrics",
|
||||
{
|
||||
"avg_ms": 16.5,
|
||||
"frame_count": 100,
|
||||
"fps": 60.0,
|
||||
},
|
||||
)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
# Check for FPS in top border
|
||||
top_line = result[0]
|
||||
assert "FPS" in top_line or "60" in top_line
|
||||
|
||||
# Check for frame time in bottom border
|
||||
bottom_line = result[-1]
|
||||
assert "ms" in bottom_line or "16" in bottom_line
|
||||
|
||||
def test_no_metrics(self):
|
||||
"""BorderEffect works without metrics."""
|
||||
effect = BorderEffect()
|
||||
buf = ["content"] * 5
|
||||
ctx = make_ctx(terminal_width=20, terminal_height=10)
|
||||
# No metrics set
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
# Should still have border characters
|
||||
assert len(result) >= 3
|
||||
assert result[0][0] in "┌┎┍"
|
||||
|
||||
def test_crops_before_bordering(self):
|
||||
"""BorderEffect crops input before adding border."""
|
||||
effect = BorderEffect()
|
||||
buf = ["x" * 100] * 50 # Very large buffer
|
||||
ctx = make_ctx(terminal_width=20, terminal_height=10)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
# Should be cropped to fit, then bordered
|
||||
# Result should be <= terminal_height with border
|
||||
assert len(result) <= ctx.terminal_height
|
||||
# Each line should be <= terminal_width
|
||||
for line in result:
|
||||
assert len(line) <= ctx.terminal_width
|
||||
100
tests/test_crop_effect.py
Normal file
100
tests/test_crop_effect.py
Normal file
@@ -0,0 +1,100 @@
|
||||
"""
|
||||
Tests for CropEffect.
|
||||
"""
|
||||
|
||||
|
||||
from effects_plugins.crop import CropEffect
|
||||
from engine.effects.types import EffectContext
|
||||
|
||||
|
||||
def make_ctx(terminal_width: int = 80, terminal_height: int = 24) -> EffectContext:
|
||||
"""Create a mock EffectContext."""
|
||||
return EffectContext(
|
||||
terminal_width=terminal_width,
|
||||
terminal_height=terminal_height,
|
||||
scroll_cam=0,
|
||||
ticker_height=terminal_height,
|
||||
)
|
||||
|
||||
|
||||
class TestCropEffect:
|
||||
"""Tests for CropEffect."""
|
||||
|
||||
def test_basic_init(self):
|
||||
"""CropEffect initializes with defaults."""
|
||||
effect = CropEffect()
|
||||
assert effect.name == "crop"
|
||||
assert effect.config.enabled is True
|
||||
|
||||
def test_crop_wider_buffer(self):
|
||||
"""CropEffect crops wide buffer to terminal width."""
|
||||
effect = CropEffect()
|
||||
buf = [
|
||||
"This is a very long line that exceeds the terminal width of eighty characters!",
|
||||
"Another long line that should also be cropped to fit within the terminal bounds!",
|
||||
"Short",
|
||||
]
|
||||
ctx = make_ctx(terminal_width=40, terminal_height=10)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
# Lines should be cropped to 40 chars
|
||||
assert len(result[0]) == 40
|
||||
assert len(result[1]) == 40
|
||||
assert result[2] == "Short" + " " * 35 # padded to width
|
||||
|
||||
def test_crop_taller_buffer(self):
|
||||
"""CropEffect crops tall buffer to terminal height."""
|
||||
effect = CropEffect()
|
||||
buf = ["line"] * 30 # 30 lines
|
||||
ctx = make_ctx(terminal_width=80, terminal_height=10)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
# Should be cropped to 10 lines
|
||||
assert len(result) == 10
|
||||
|
||||
def test_pad_shorter_lines(self):
|
||||
"""CropEffect pads lines shorter than width."""
|
||||
effect = CropEffect()
|
||||
buf = ["short", "medium length", ""]
|
||||
ctx = make_ctx(terminal_width=20, terminal_height=5)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
assert len(result[0]) == 20 # padded
|
||||
assert len(result[1]) == 20 # padded
|
||||
assert len(result[2]) == 20 # padded (was empty)
|
||||
|
||||
def test_pad_to_height(self):
|
||||
"""CropEffect pads with empty lines if buffer is too short."""
|
||||
effect = CropEffect()
|
||||
buf = ["line1", "line2"]
|
||||
ctx = make_ctx(terminal_width=20, terminal_height=10)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
# Should have 10 lines
|
||||
assert len(result) == 10
|
||||
# Last 8 should be empty padding
|
||||
for i in range(2, 10):
|
||||
assert result[i] == " " * 20
|
||||
|
||||
def test_empty_buffer(self):
|
||||
"""CropEffect handles empty buffer."""
|
||||
effect = CropEffect()
|
||||
ctx = make_ctx()
|
||||
|
||||
result = effect.process([], ctx)
|
||||
|
||||
assert result == []
|
||||
|
||||
def test_uses_context_dimensions(self):
|
||||
"""CropEffect uses context terminal_width/terminal_height."""
|
||||
effect = CropEffect()
|
||||
buf = ["x" * 100]
|
||||
ctx = make_ctx(terminal_width=50, terminal_height=1)
|
||||
|
||||
result = effect.process(buf, ctx)
|
||||
|
||||
assert len(result[0]) == 50
|
||||
@@ -678,8 +678,8 @@ class TestDataSourceStage:
|
||||
|
||||
def test_datasource_stage_capabilities(self):
|
||||
"""DataSourceStage declares correct capabilities."""
|
||||
from engine.data_sources.sources import HeadlinesDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
from engine.sources_v2 import HeadlinesDataSource
|
||||
|
||||
source = HeadlinesDataSource()
|
||||
stage = DataSourceStage(source, name="headlines")
|
||||
@@ -690,9 +690,9 @@ class TestDataSourceStage:
|
||||
"""DataSourceStage fetches from DataSource."""
|
||||
from unittest.mock import patch
|
||||
|
||||
from engine.data_sources.sources import HeadlinesDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage
|
||||
from engine.pipeline.core import PipelineContext
|
||||
from engine.sources_v2 import HeadlinesDataSource
|
||||
|
||||
mock_items = [
|
||||
("Test Headline 1", "TestSource", "12:00"),
|
||||
@@ -859,8 +859,8 @@ class TestFullPipeline:
|
||||
|
||||
def test_datasource_stage_capabilities_match_render_deps(self):
|
||||
"""DataSourceStage provides capability that RenderStage can depend on."""
|
||||
from engine.data_sources.sources import HeadlinesDataSource
|
||||
from engine.pipeline.adapters import DataSourceStage, RenderStage
|
||||
from engine.sources_v2 import HeadlinesDataSource
|
||||
|
||||
# DataSourceStage provides "source.headlines"
|
||||
ds_stage = DataSourceStage(HeadlinesDataSource(), name="headlines")
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
Tests for PipelineIntrospectionSource.
|
||||
"""
|
||||
|
||||
from engine.pipeline_sources.pipeline_introspection import PipelineIntrospectionSource
|
||||
from engine.data_sources.pipeline_introspection import PipelineIntrospectionSource
|
||||
|
||||
|
||||
class TestPipelineIntrospectionSource:
|
||||
@@ -14,19 +14,17 @@ class TestPipelineIntrospectionSource:
|
||||
assert source.name == "pipeline-inspect"
|
||||
assert source.is_dynamic is True
|
||||
assert source.frame == 0
|
||||
assert source.ready is False
|
||||
|
||||
def test_init_with_pipelines(self):
|
||||
"""Source initializes with custom pipelines list."""
|
||||
source = PipelineIntrospectionSource(
|
||||
pipelines=[], viewport_width=100, viewport_height=40
|
||||
)
|
||||
def test_init_with_params(self):
|
||||
"""Source initializes with custom params."""
|
||||
source = PipelineIntrospectionSource(viewport_width=100, viewport_height=40)
|
||||
assert source.viewport_width == 100
|
||||
assert source.viewport_height == 40
|
||||
|
||||
def test_inlet_outlet_types(self):
|
||||
"""Source has correct inlet/outlet types."""
|
||||
source = PipelineIntrospectionSource()
|
||||
# inlet should be NONE (source), outlet should be SOURCE_ITEMS
|
||||
from engine.pipeline.core import DataType
|
||||
|
||||
assert DataType.NONE in source.inlet_types
|
||||
@@ -40,9 +38,24 @@ class TestPipelineIntrospectionSource:
|
||||
assert items[0].source == "pipeline-inspect"
|
||||
|
||||
def test_fetch_increments_frame(self):
|
||||
"""fetch() increments frame counter."""
|
||||
"""fetch() increments frame counter when ready."""
|
||||
source = PipelineIntrospectionSource()
|
||||
assert source.frame == 0
|
||||
|
||||
# Set pipeline first to make source ready
|
||||
class MockPipeline:
|
||||
stages = {}
|
||||
execution_order = []
|
||||
|
||||
def get_metrics_summary(self):
|
||||
return {"avg_ms": 10.0, "fps": 60, "stages": {}}
|
||||
|
||||
def get_frame_times(self):
|
||||
return [10.0, 12.0, 11.0]
|
||||
|
||||
source.set_pipeline(MockPipeline())
|
||||
assert source.ready is True
|
||||
|
||||
source.fetch()
|
||||
assert source.frame == 1
|
||||
source.fetch()
|
||||
@@ -56,27 +69,30 @@ class TestPipelineIntrospectionSource:
|
||||
assert len(items) > 0
|
||||
assert items[0].source == "pipeline-inspect"
|
||||
|
||||
def test_add_pipeline(self):
|
||||
"""add_pipeline() adds pipeline to list."""
|
||||
def test_set_pipeline(self):
|
||||
"""set_pipeline() marks source as ready."""
|
||||
source = PipelineIntrospectionSource()
|
||||
mock_pipeline = object()
|
||||
source.add_pipeline(mock_pipeline)
|
||||
assert mock_pipeline in source._pipelines
|
||||
assert source.ready is False
|
||||
|
||||
def test_remove_pipeline(self):
|
||||
"""remove_pipeline() removes pipeline from list."""
|
||||
source = PipelineIntrospectionSource()
|
||||
mock_pipeline = object()
|
||||
source.add_pipeline(mock_pipeline)
|
||||
source.remove_pipeline(mock_pipeline)
|
||||
assert mock_pipeline not in source._pipelines
|
||||
class MockPipeline:
|
||||
stages = {}
|
||||
execution_order = []
|
||||
|
||||
def get_metrics_summary(self):
|
||||
return {"avg_ms": 10.0, "fps": 60, "stages": {}}
|
||||
|
||||
def get_frame_times(self):
|
||||
return [10.0, 12.0, 11.0]
|
||||
|
||||
source.set_pipeline(MockPipeline())
|
||||
assert source.ready is True
|
||||
|
||||
|
||||
class TestPipelineIntrospectionRender:
|
||||
"""Tests for rendering methods."""
|
||||
|
||||
def test_render_header_no_pipelines(self):
|
||||
"""_render_header returns default when no pipelines."""
|
||||
def test_render_header_no_pipeline(self):
|
||||
"""_render_header returns default when no pipeline."""
|
||||
source = PipelineIntrospectionSource()
|
||||
lines = source._render_header()
|
||||
assert len(lines) == 1
|
||||
@@ -115,19 +131,18 @@ class TestPipelineIntrospectionRender:
|
||||
sparkline = source._render_sparkline([], 10)
|
||||
assert sparkline == " " * 10
|
||||
|
||||
def test_render_footer_no_pipelines(self):
|
||||
"""_render_footer shows collecting data when no pipelines."""
|
||||
def test_render_footer_no_pipeline(self):
|
||||
"""_render_footer shows collecting data when no pipeline."""
|
||||
source = PipelineIntrospectionSource()
|
||||
lines = source._render_footer()
|
||||
assert len(lines) >= 2
|
||||
assert "collecting data" in lines[1] or "Frame Time" in lines[0]
|
||||
|
||||
|
||||
class TestPipelineIntrospectionFull:
|
||||
"""Integration tests."""
|
||||
|
||||
def test_render_empty(self):
|
||||
"""_render works with no pipelines."""
|
||||
"""_render works when not ready."""
|
||||
source = PipelineIntrospectionSource()
|
||||
lines = source._render()
|
||||
assert len(lines) > 0
|
||||
@@ -151,6 +166,6 @@ class TestPipelineIntrospectionFull:
|
||||
def get_frame_times(self):
|
||||
return [1.0, 2.0, 3.0]
|
||||
|
||||
source.add_pipeline(MockPipeline())
|
||||
source.set_pipeline(MockPipeline())
|
||||
lines = source._render()
|
||||
assert len(lines) > 0
|
||||
|
||||
125
tests/test_tint_effect.py
Normal file
125
tests/test_tint_effect.py
Normal file
@@ -0,0 +1,125 @@
|
||||
import pytest
|
||||
|
||||
from effects_plugins.tint import TintEffect
|
||||
from engine.effects.types import EffectConfig
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def effect():
|
||||
return TintEffect()
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def effect_with_params(r=255, g=128, b=64, a=0.5):
|
||||
e = TintEffect()
|
||||
config = EffectConfig(
|
||||
enabled=True,
|
||||
intensity=1.0,
|
||||
params={"r": r, "g": g, "b": b, "a": a},
|
||||
)
|
||||
e.configure(config)
|
||||
return e
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def mock_context():
|
||||
class MockContext:
|
||||
terminal_width = 80
|
||||
terminal_height = 24
|
||||
|
||||
def get_state(self, key):
|
||||
return None
|
||||
|
||||
return MockContext()
|
||||
|
||||
|
||||
class TestTintEffect:
|
||||
def test_name(self, effect):
|
||||
assert effect.name == "tint"
|
||||
|
||||
def test_enabled_by_default(self, effect):
|
||||
assert effect.config.enabled is True
|
||||
|
||||
def test_returns_input_when_empty(self, effect, mock_context):
|
||||
result = effect.process([], mock_context)
|
||||
assert result == []
|
||||
|
||||
def test_returns_input_when_transparency_zero(
|
||||
self, effect_with_params, mock_context
|
||||
):
|
||||
effect_with_params.config.params["a"] = 0.0
|
||||
buf = ["hello world"]
|
||||
result = effect_with_params.process(buf, mock_context)
|
||||
assert result == buf
|
||||
|
||||
def test_applies_tint_to_plain_text(self, effect_with_params, mock_context):
|
||||
buf = ["hello world"]
|
||||
result = effect_with_params.process(buf, mock_context)
|
||||
assert len(result) == 1
|
||||
assert "\033[" in result[0] # Has ANSI codes
|
||||
assert "hello world" in result[0]
|
||||
|
||||
def test_tint_preserves_content(self, effect_with_params, mock_context):
|
||||
buf = ["hello world", "test line"]
|
||||
result = effect_with_params.process(buf, mock_context)
|
||||
assert "hello world" in result[0]
|
||||
assert "test line" in result[1]
|
||||
|
||||
def test_rgb_to_ansi256_black(self, effect):
|
||||
assert effect._rgb_to_ansi256(0, 0, 0) == 16
|
||||
|
||||
def test_rgb_to_ansi256_white(self, effect):
|
||||
assert effect._rgb_to_ansi256(255, 255, 255) == 231
|
||||
|
||||
def test_rgb_to_ansi256_red(self, effect):
|
||||
color = effect._rgb_to_ansi256(255, 0, 0)
|
||||
assert 196 <= color <= 197 # Red in 256 color
|
||||
|
||||
def test_rgb_to_ansi256_green(self, effect):
|
||||
color = effect._rgb_to_ansi256(0, 255, 0)
|
||||
assert 34 <= color <= 46
|
||||
|
||||
def test_rgb_to_ansi256_blue(self, effect):
|
||||
color = effect._rgb_to_ansi256(0, 0, 255)
|
||||
assert 20 <= color <= 33
|
||||
|
||||
def test_configure_updates_params(self, effect):
|
||||
config = EffectConfig(
|
||||
enabled=True,
|
||||
intensity=1.0,
|
||||
params={"r": 100, "g": 150, "b": 200, "a": 0.8},
|
||||
)
|
||||
effect.configure(config)
|
||||
assert effect.config.params["r"] == 100
|
||||
assert effect.config.params["g"] == 150
|
||||
assert effect.config.params["b"] == 200
|
||||
assert effect.config.params["a"] == 0.8
|
||||
|
||||
def test_clamp_rgb_values(self, effect_with_params, mock_context):
|
||||
effect_with_params.config.params["r"] = 300
|
||||
effect_with_params.config.params["g"] = -10
|
||||
effect_with_params.config.params["b"] = 1.5
|
||||
buf = ["test"]
|
||||
result = effect_with_params.process(buf, mock_context)
|
||||
assert "\033[" in result[0]
|
||||
|
||||
def test_clamp_alpha_above_one(self, effect_with_params, mock_context):
|
||||
effect_with_params.config.params["a"] = 1.5
|
||||
buf = ["test"]
|
||||
result = effect_with_params.process(buf, mock_context)
|
||||
assert "\033[" in result[0]
|
||||
|
||||
def test_preserves_empty_lines(self, effect_with_params, mock_context):
|
||||
buf = ["hello", "", "world"]
|
||||
result = effect_with_params.process(buf, mock_context)
|
||||
assert result[1] == ""
|
||||
|
||||
def test_inlet_types_includes_text_buffer(self, effect):
|
||||
from engine.pipeline.core import DataType
|
||||
|
||||
assert DataType.TEXT_BUFFER in effect.inlet_types
|
||||
|
||||
def test_outlet_types_includes_text_buffer(self, effect):
|
||||
from engine.pipeline.core import DataType
|
||||
|
||||
assert DataType.TEXT_BUFFER in effect.outlet_types
|
||||
Reference in New Issue
Block a user