diff --git a/AGENTS.md b/AGENTS.md index 0351b32..87ee358 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -71,39 +71,17 @@ The project uses hk configured in `hk.pkl`: ## Benchmark Runner -Run performance benchmarks: +Benchmark tests are in `tests/test_benchmark.py` with `@pytest.mark.benchmark`. + +### Hook Mode (via pytest) + +Run benchmarks in hook mode to catch performance regressions: ```bash -mise run benchmark # Run all benchmarks (text output) -mise run benchmark-json # Run benchmarks (JSON output) -mise run benchmark-report # Run benchmarks (Markdown report) +mise run test-cov # Run with coverage ``` -### Benchmark Commands - -```bash -# Run benchmarks -uv run python -m engine.benchmark - -# Run with specific displays/effects -uv run python -m engine.benchmark --displays null,terminal --effects fade,glitch - -# Save baseline for hook comparisons -uv run python -m engine.benchmark --baseline - -# Run in hook mode (compares against baseline) -uv run python -m engine.benchmark --hook - -# Hook mode with custom threshold (default: 20% degradation) -uv run python -m engine.benchmark --hook --threshold 0.3 - -# Custom baseline location -uv run python -m engine.benchmark --hook --cache /path/to/cache.json -``` - -### Hook Mode - -The `--hook` mode compares current benchmarks against a saved baseline. If performance degrades beyond the threshold (default 20%), it exits with code 1. This is useful for preventing performance regressions in feature branches. +The benchmark tests will fail if performance degrades beyond the threshold. The pre-push hook runs benchmark in hook mode to catch performance regressions before pushing. @@ -161,12 +139,11 @@ The project uses pytest with strict marker enforcement. Test configuration is in ### Test Coverage Strategy -Current coverage: 56% (434 tests) +Current coverage: 56% (463 tests) Key areas with lower coverage (acceptable for now): - **app.py** (8%): Main entry point - integration heavy, requires terminal -- **scroll.py** (10%): Terminal-dependent rendering logic -- **benchmark.py** (0%): Standalone benchmark tool, runs separately +- **scroll.py** (10%): Terminal-dependent rendering logic (unused) Key areas with good coverage: - **display/backends/null.py** (95%): Easy to test headlessly @@ -227,7 +204,7 @@ Sensors support param bindings to drive effect parameters in real-time. #### Pipeline Introspection -- **PipelineIntrospectionSource** (`engine/pipeline_sources/pipeline_introspection.py`): Renders live ASCII visualization of pipeline DAG with metrics +- **PipelineIntrospectionSource** (`engine/data_sources/pipeline_introspection.py`): Renders live ASCII visualization of pipeline DAG with metrics - **PipelineIntrospectionDemo** (`engine/pipeline/pipeline_introspection_demo.py`): 3-phase demo controller for effect animation Preset: `pipeline-inspect` - Live pipeline introspection with DAG and performance metrics diff --git a/effects_plugins/border.py b/effects_plugins/border.py new file mode 100644 index 0000000..7b158c4 --- /dev/null +++ b/effects_plugins/border.py @@ -0,0 +1,105 @@ +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin + + +class BorderEffect(EffectPlugin): + """Simple border effect for terminal display. + + Draws a border around the buffer and optionally displays + performance metrics in the border corners. + + Internally crops to display dimensions to ensure border fits. + """ + + name = "border" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not buf: + return buf + + # Get actual display dimensions from context + display_w = ctx.terminal_width + display_h = ctx.terminal_height + + # If dimensions are reasonable, crop first - use slightly smaller to ensure fit + if display_w >= 10 and display_h >= 3: + # Subtract 2 for border characters (left and right) + crop_w = display_w - 2 + crop_h = display_h - 2 + buf = self._crop_to_size(buf, crop_w, crop_h) + w = display_w + h = display_h + else: + # Use buffer dimensions + h = len(buf) + w = max(len(line) for line in buf) if buf else 0 + + if w < 3 or h < 3: + return buf + + inner_w = w - 2 + + # Get metrics from context + fps = 0.0 + frame_time = 0.0 + metrics = ctx.get_state("metrics") + if metrics: + avg_ms = metrics.get("avg_ms") + frame_count = metrics.get("frame_count", 0) + if avg_ms and frame_count > 0: + fps = 1000.0 / avg_ms + frame_time = avg_ms + + # Build borders + # Top border: ┌────────────────────┐ or with FPS + if fps > 0: + fps_str = f" FPS:{fps:.0f}" + if len(fps_str) < inner_w: + right_len = inner_w - len(fps_str) + top_border = "┌" + "─" * right_len + fps_str + "┐" + else: + top_border = "┌" + "─" * inner_w + "┐" + else: + top_border = "┌" + "─" * inner_w + "┐" + + # Bottom border: └────────────────────┘ or with frame time + if frame_time > 0: + ft_str = f" {frame_time:.1f}ms" + if len(ft_str) < inner_w: + right_len = inner_w - len(ft_str) + bottom_border = "└" + "─" * right_len + ft_str + "┘" + else: + bottom_border = "└" + "─" * inner_w + "┘" + else: + bottom_border = "└" + "─" * inner_w + "┘" + + # Build result with left/right borders + result = [top_border] + for line in buf[: h - 2]: + if len(line) >= inner_w: + result.append("│" + line[:inner_w] + "│") + else: + result.append("│" + line + " " * (inner_w - len(line)) + "│") + + result.append(bottom_border) + + return result + + def _crop_to_size(self, buf: list[str], w: int, h: int) -> list[str]: + """Crop buffer to fit within w x h.""" + result = [] + for i in range(min(h, len(buf))): + line = buf[i] + if len(line) > w: + result.append(line[:w]) + else: + result.append(line + " " * (w - len(line))) + + # Pad with empty lines if needed (for border) + while len(result) < h: + result.append(" " * w) + + return result + + def configure(self, config: EffectConfig) -> None: + self.config = config diff --git a/effects_plugins/crop.py b/effects_plugins/crop.py new file mode 100644 index 0000000..6e0431a --- /dev/null +++ b/effects_plugins/crop.py @@ -0,0 +1,42 @@ +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin + + +class CropEffect(EffectPlugin): + """Crop effect that crops the input buffer to fit the display. + + This ensures the output buffer matches the actual display dimensions, + useful when the source produces a buffer larger than the viewport. + """ + + name = "crop" + config = EffectConfig(enabled=True, intensity=1.0) + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not buf: + return buf + + # Get actual display dimensions from context + w = ( + ctx.terminal_width + if ctx.terminal_width > 0 + else max(len(line) for line in buf) + ) + h = ctx.terminal_height if ctx.terminal_height > 0 else len(buf) + + # Crop buffer to fit + result = [] + for i in range(min(h, len(buf))): + line = buf[i] + if len(line) > w: + result.append(line[:w]) + else: + result.append(line + " " * (w - len(line))) + + # Pad with empty lines if needed + while len(result) < h: + result.append(" " * w) + + return result + + def configure(self, config: EffectConfig) -> None: + self.config = config diff --git a/effects_plugins/tint.py b/effects_plugins/tint.py new file mode 100644 index 0000000..ce8c941 --- /dev/null +++ b/effects_plugins/tint.py @@ -0,0 +1,99 @@ +from engine.effects.types import EffectConfig, EffectContext, EffectPlugin + + +class TintEffect(EffectPlugin): + """Tint effect that applies an RGB color overlay to the buffer. + + Uses ANSI escape codes to tint text with the specified RGB values. + Supports transparency (0-100%) for blending. + + Inlets: + - r: Red component (0-255) + - g: Green component (0-255) + - b: Blue component (0-255) + - a: Alpha/transparency (0.0-1.0, where 0.0 = fully transparent) + """ + + name = "tint" + config = EffectConfig(enabled=True, intensity=1.0) + + # Define inlet types for PureData-style typing + @property + def inlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} + + @property + def outlet_types(self) -> set: + from engine.pipeline.core import DataType + + return {DataType.TEXT_BUFFER} + + def process(self, buf: list[str], ctx: EffectContext) -> list[str]: + if not buf: + return buf + + # Get tint values from effect params or sensors + r = self.config.params.get("r", 255) + g = self.config.params.get("g", 255) + b = self.config.params.get("b", 255) + a = self.config.params.get("a", 0.3) # Default 30% tint + + # Clamp values + r = max(0, min(255, int(r))) + g = max(0, min(255, int(g))) + b = max(0, min(255, int(b))) + a = max(0.0, min(1.0, float(a))) + + if a <= 0: + return buf + + # Convert RGB to ANSI 256 color + ansi_color = self._rgb_to_ansi256(r, g, b) + + # Apply tint with transparency effect + result = [] + for line in buf: + if not line.strip(): + result.append(line) + continue + + # Check if line already has ANSI codes + if "\033[" in line: + # For lines with existing colors, wrap the whole line + result.append(f"\033[38;5;{ansi_color}m{line}\033[0m") + else: + # Apply tint to plain text lines + result.append(f"\033[38;5;{ansi_color}m{line}\033[0m") + + return result + + def _rgb_to_ansi256(self, r: int, g: int, b: int) -> int: + """Convert RGB (0-255 each) to ANSI 256 color code.""" + if r == g == b == 0: + return 16 + if r == g == b == 255: + return 231 + + # Calculate grayscale + gray = int((0.299 * r + 0.587 * g + 0.114 * b) / 255 * 24) + 232 + + # Calculate color cube + ri = int(r / 51) + gi = int(g / 51) + bi = int(b / 51) + color = 16 + 36 * ri + 6 * gi + bi + + # Use whichever is closer - gray or color + gray_dist = abs(r - gray) + color_dist = ( + (r - ri * 51) ** 2 + (g - gi * 51) ** 2 + (b - bi * 51) ** 2 + ) ** 0.5 + + if gray_dist < color_dist: + return gray + return color + + def configure(self, config: EffectConfig) -> None: + self.config = config diff --git a/engine/benchmark.py b/engine/benchmark.py deleted file mode 100644 index 0aef02e..0000000 --- a/engine/benchmark.py +++ /dev/null @@ -1,730 +0,0 @@ -#!/usr/bin/env python3 -""" -Benchmark runner for mainline - tests performance across effects and displays. - -Usage: - python -m engine.benchmark - python -m engine.benchmark --output report.md - python -m engine.benchmark --displays terminal,websocket --effects glitch,fade - python -m engine.benchmark --format json --output benchmark.json - -Headless mode (default): suppress all terminal output during benchmarks. -""" - -import argparse -import json -import sys -import time -from dataclasses import dataclass, field -from datetime import datetime -from io import StringIO -from pathlib import Path -from typing import Any - -import numpy as np - - -@dataclass -class BenchmarkResult: - """Result of a single benchmark run.""" - - name: str - display: str - effect: str | None - iterations: int - total_time_ms: float - avg_time_ms: float - std_dev_ms: float - min_ms: float - max_ms: float - fps: float - chars_processed: int - chars_per_sec: float - - -@dataclass -class BenchmarkReport: - """Complete benchmark report.""" - - timestamp: str - python_version: str - results: list[BenchmarkResult] = field(default_factory=list) - summary: dict[str, Any] = field(default_factory=dict) - - -def get_sample_buffer(width: int = 80, height: int = 24) -> list[str]: - """Generate a sample buffer for benchmarking.""" - lines = [] - for i in range(height): - line = f"\x1b[32mLine {i}\x1b[0m " + "A" * (width - 10) - lines.append(line) - return lines - - -def benchmark_display( - display_class, - buffer: list[str], - iterations: int = 100, - display=None, - reuse: bool = False, -) -> BenchmarkResult | None: - """Benchmark a single display. - - Args: - display_class: Display class to instantiate - buffer: Buffer to display - iterations: Number of iterations - display: Optional existing display instance to reuse - reuse: If True and display provided, use reuse mode - """ - old_stdout = sys.stdout - old_stderr = sys.stderr - - try: - sys.stdout = StringIO() - sys.stderr = StringIO() - - if display is None: - display = display_class() - display.init(80, 24, reuse=False) - should_cleanup = True - else: - should_cleanup = False - - times = [] - chars = sum(len(line) for line in buffer) - - for _ in range(iterations): - t0 = time.perf_counter() - display.show(buffer) - elapsed = (time.perf_counter() - t0) * 1000 - times.append(elapsed) - - if should_cleanup and hasattr(display, "cleanup"): - display.cleanup(quit_pygame=False) - - except Exception: - return None - finally: - sys.stdout = old_stdout - sys.stderr = old_stderr - - times_arr = np.array(times) - - return BenchmarkResult( - name=f"display_{display_class.__name__}", - display=display_class.__name__, - effect=None, - iterations=iterations, - total_time_ms=sum(times), - avg_time_ms=float(np.mean(times_arr)), - std_dev_ms=float(np.std(times_arr)), - min_ms=float(np.min(times_arr)), - max_ms=float(np.max(times_arr)), - fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0, - chars_processed=chars * iterations, - chars_per_sec=float((chars * iterations) / (sum(times) / 1000)) - if sum(times) > 0 - else 0.0, - ) - - -def benchmark_effect_with_display( - effect_class, display, buffer: list[str], iterations: int = 100, reuse: bool = False -) -> BenchmarkResult | None: - """Benchmark an effect with a display. - - Args: - effect_class: Effect class to instantiate - display: Display instance to use - buffer: Buffer to process and display - iterations: Number of iterations - reuse: If True, use reuse mode for display - """ - old_stdout = sys.stdout - old_stderr = sys.stderr - - try: - from engine.effects.types import EffectConfig, EffectContext - - sys.stdout = StringIO() - sys.stderr = StringIO() - - effect = effect_class() - effect.configure(EffectConfig(enabled=True, intensity=1.0)) - - ctx = EffectContext( - terminal_width=80, - terminal_height=24, - scroll_cam=0, - ticker_height=0, - mic_excess=0.0, - grad_offset=0.0, - frame_number=0, - has_message=False, - ) - - times = [] - chars = sum(len(line) for line in buffer) - - for _ in range(iterations): - processed = effect.process(buffer, ctx) - t0 = time.perf_counter() - display.show(processed) - elapsed = (time.perf_counter() - t0) * 1000 - times.append(elapsed) - - if not reuse and hasattr(display, "cleanup"): - display.cleanup(quit_pygame=False) - - except Exception: - return None - finally: - sys.stdout = old_stdout - sys.stderr = old_stderr - - times_arr = np.array(times) - - return BenchmarkResult( - name=f"effect_{effect_class.__name__}_with_{display.__class__.__name__}", - display=display.__class__.__name__, - effect=effect_class.__name__, - iterations=iterations, - total_time_ms=sum(times), - avg_time_ms=float(np.mean(times_arr)), - std_dev_ms=float(np.std(times_arr)), - min_ms=float(np.min(times_arr)), - max_ms=float(np.max(times_arr)), - fps=float(1000.0 / np.mean(times_arr)) if np.mean(times_arr) > 0 else 0.0, - chars_processed=chars * iterations, - chars_per_sec=float((chars * iterations) / (sum(times) / 1000)) - if sum(times) > 0 - else 0.0, - ) - - -def get_available_displays(): - """Get available display classes.""" - from engine.display import ( - DisplayRegistry, - NullDisplay, - TerminalDisplay, - ) - - DisplayRegistry.initialize() - - displays = [ - ("null", NullDisplay), - ("terminal", TerminalDisplay), - ] - - try: - from engine.display.backends.websocket import WebSocketDisplay - - displays.append(("websocket", WebSocketDisplay)) - except Exception: - pass - - try: - from engine.display.backends.sixel import SixelDisplay - - displays.append(("sixel", SixelDisplay)) - except Exception: - pass - - try: - from engine.display.backends.pygame import PygameDisplay - - displays.append(("pygame", PygameDisplay)) - except Exception: - pass - - return displays - - -def get_available_effects(): - """Get available effect classes.""" - try: - from engine.effects import get_registry - - try: - from effects_plugins import discover_plugins - - discover_plugins() - except Exception: - pass - except Exception: - return [] - - effects = [] - registry = get_registry() - - for name, effect in registry.list_all().items(): - if effect: - effect_cls = type(effect) - effects.append((name, effect_cls)) - - return effects - - -def run_benchmarks( - displays: list[tuple[str, Any]] | None = None, - effects: list[tuple[str, Any]] | None = None, - iterations: int = 100, - verbose: bool = False, -) -> BenchmarkReport: - """Run all benchmarks and return report.""" - from datetime import datetime - - if displays is None: - displays = get_available_displays() - - if effects is None: - effects = get_available_effects() - - buffer = get_sample_buffer(80, 24) - results = [] - - if verbose: - print(f"Running benchmarks ({iterations} iterations each)...") - - pygame_display = None - for name, display_class in displays: - if verbose: - print(f"Benchmarking display: {name}") - - result = benchmark_display(display_class, buffer, iterations) - if result: - results.append(result) - if verbose: - print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg") - - if name == "pygame": - pygame_display = result - - if verbose: - print() - - pygame_instance = None - if pygame_display: - try: - from engine.display.backends.pygame import PygameDisplay - - PygameDisplay.reset_state() - pygame_instance = PygameDisplay() - pygame_instance.init(80, 24, reuse=False) - except Exception: - pygame_instance = None - - for effect_name, effect_class in effects: - for display_name, display_class in displays: - if display_name == "websocket": - continue - - if display_name == "pygame": - if verbose: - print(f"Benchmarking effect: {effect_name} with {display_name}") - - if pygame_instance: - result = benchmark_effect_with_display( - effect_class, pygame_instance, buffer, iterations, reuse=True - ) - if result: - results.append(result) - if verbose: - print( - f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg" - ) - continue - - if verbose: - print(f"Benchmarking effect: {effect_name} with {display_name}") - - display = display_class() - display.init(80, 24) - result = benchmark_effect_with_display( - effect_class, display, buffer, iterations - ) - if result: - results.append(result) - if verbose: - print(f" {result.fps:.1f} FPS, {result.avg_time_ms:.2f}ms avg") - - if pygame_instance: - try: - pygame_instance.cleanup(quit_pygame=True) - except Exception: - pass - - summary = generate_summary(results) - - return BenchmarkReport( - timestamp=datetime.now().isoformat(), - python_version=sys.version, - results=results, - summary=summary, - ) - - -def generate_summary(results: list[BenchmarkResult]) -> dict[str, Any]: - """Generate summary statistics from results.""" - by_display: dict[str, list[BenchmarkResult]] = {} - by_effect: dict[str, list[BenchmarkResult]] = {} - - for r in results: - if r.display not in by_display: - by_display[r.display] = [] - by_display[r.display].append(r) - - if r.effect: - if r.effect not in by_effect: - by_effect[r.effect] = [] - by_effect[r.effect].append(r) - - summary = { - "by_display": {}, - "by_effect": {}, - "overall": { - "total_tests": len(results), - "displays_tested": len(by_display), - "effects_tested": len(by_effect), - }, - } - - for display, res in by_display.items(): - fps_values = [r.fps for r in res] - summary["by_display"][display] = { - "avg_fps": float(np.mean(fps_values)), - "min_fps": float(np.min(fps_values)), - "max_fps": float(np.max(fps_values)), - "tests": len(res), - } - - for effect, res in by_effect.items(): - fps_values = [r.fps for r in res] - summary["by_effect"][effect] = { - "avg_fps": float(np.mean(fps_values)), - "min_fps": float(np.min(fps_values)), - "max_fps": float(np.max(fps_values)), - "tests": len(res), - } - - return summary - - -DEFAULT_CACHE_PATH = Path.home() / ".mainline_benchmark_cache.json" - - -def load_baseline(cache_path: Path | None = None) -> dict[str, Any] | None: - """Load baseline benchmark results from cache.""" - path = cache_path or DEFAULT_CACHE_PATH - if not path.exists(): - return None - try: - with open(path) as f: - return json.load(f) - except Exception: - return None - - -def save_baseline( - results: list[BenchmarkResult], - cache_path: Path | None = None, -) -> None: - """Save benchmark results as baseline to cache.""" - path = cache_path or DEFAULT_CACHE_PATH - baseline = { - "timestamp": datetime.now().isoformat(), - "results": { - r.name: { - "fps": r.fps, - "avg_time_ms": r.avg_time_ms, - "chars_per_sec": r.chars_per_sec, - } - for r in results - }, - } - path.parent.mkdir(parents=True, exist_ok=True) - with open(path, "w") as f: - json.dump(baseline, f, indent=2) - - -def compare_with_baseline( - results: list[BenchmarkResult], - baseline: dict[str, Any], - threshold: float = 0.2, - verbose: bool = True, -) -> tuple[bool, list[str]]: - """Compare current results with baseline. Returns (pass, messages).""" - baseline_results = baseline.get("results", {}) - failures = [] - warnings = [] - - for r in results: - if r.name not in baseline_results: - warnings.append(f"New test: {r.name} (no baseline)") - continue - - b = baseline_results[r.name] - if b["fps"] == 0: - continue - - degradation = (b["fps"] - r.fps) / b["fps"] - if degradation > threshold: - failures.append( - f"{r.name}: FPS degraded {degradation * 100:.1f}% " - f"(baseline: {b['fps']:.1f}, current: {r.fps:.1f})" - ) - elif verbose: - print(f" {r.name}: {r.fps:.1f} FPS (baseline: {b['fps']:.1f})") - - passed = len(failures) == 0 - messages = [] - if failures: - messages.extend(failures) - if warnings: - messages.extend(warnings) - - return passed, messages - - -def run_hook_mode( - displays: list[tuple[str, Any]] | None = None, - effects: list[tuple[str, Any]] | None = None, - iterations: int = 20, - threshold: float = 0.2, - cache_path: Path | None = None, - verbose: bool = False, -) -> int: - """Run in hook mode: compare against baseline, exit 0 on pass, 1 on fail.""" - baseline = load_baseline(cache_path) - - if baseline is None: - print("No baseline found. Run with --baseline to create one.") - return 1 - - report = run_benchmarks(displays, effects, iterations, verbose) - - passed, messages = compare_with_baseline( - report.results, baseline, threshold, verbose - ) - - print("\n=== Benchmark Hook Results ===") - if passed: - print("PASSED - No significant performance degradation") - return 0 - else: - print("FAILED - Performance degradation detected:") - for msg in messages: - print(f" - {msg}") - return 1 - - -def format_report_text(report: BenchmarkReport) -> str: - """Format report as human-readable text.""" - lines = [ - "# Mainline Performance Benchmark Report", - "", - f"Generated: {report.timestamp}", - f"Python: {report.python_version}", - "", - "## Summary", - "", - f"Total tests: {report.summary['overall']['total_tests']}", - f"Displays tested: {report.summary['overall']['displays_tested']}", - f"Effects tested: {report.summary['overall']['effects_tested']}", - "", - "## By Display", - "", - ] - - for display, stats in report.summary["by_display"].items(): - lines.append(f"### {display}") - lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}") - lines.append(f"- Min FPS: {stats['min_fps']:.1f}") - lines.append(f"- Max FPS: {stats['max_fps']:.1f}") - lines.append(f"- Tests: {stats['tests']}") - lines.append("") - - if report.summary["by_effect"]: - lines.append("## By Effect") - lines.append("") - - for effect, stats in report.summary["by_effect"].items(): - lines.append(f"### {effect}") - lines.append(f"- Avg FPS: {stats['avg_fps']:.1f}") - lines.append(f"- Min FPS: {stats['min_fps']:.1f}") - lines.append(f"- Max FPS: {stats['max_fps']:.1f}") - lines.append(f"- Tests: {stats['tests']}") - lines.append("") - - lines.append("## Detailed Results") - lines.append("") - lines.append("| Display | Effect | FPS | Avg ms | StdDev ms | Min ms | Max ms |") - lines.append("|---------|--------|-----|--------|-----------|--------|--------|") - - for r in report.results: - effect_col = r.effect if r.effect else "-" - lines.append( - f"| {r.display} | {effect_col} | {r.fps:.1f} | {r.avg_time_ms:.2f} | " - f"{r.std_dev_ms:.2f} | {r.min_ms:.2f} | {r.max_ms:.2f} |" - ) - - return "\n".join(lines) - - -def format_report_json(report: BenchmarkReport) -> str: - """Format report as JSON.""" - data = { - "timestamp": report.timestamp, - "python_version": report.python_version, - "summary": report.summary, - "results": [ - { - "name": r.name, - "display": r.display, - "effect": r.effect, - "iterations": r.iterations, - "total_time_ms": r.total_time_ms, - "avg_time_ms": r.avg_time_ms, - "std_dev_ms": r.std_dev_ms, - "min_ms": r.min_ms, - "max_ms": r.max_ms, - "fps": r.fps, - "chars_processed": r.chars_processed, - "chars_per_sec": r.chars_per_sec, - } - for r in report.results - ], - } - return json.dumps(data, indent=2) - - -def main(): - parser = argparse.ArgumentParser(description="Run mainline benchmarks") - parser.add_argument( - "--displays", - help="Comma-separated list of displays to test (default: all)", - ) - parser.add_argument( - "--effects", - help="Comma-separated list of effects to test (default: all)", - ) - parser.add_argument( - "--iterations", - type=int, - default=100, - help="Number of iterations per test (default: 100)", - ) - parser.add_argument( - "--output", - help="Output file path (default: stdout)", - ) - parser.add_argument( - "--format", - choices=["text", "json"], - default="text", - help="Output format (default: text)", - ) - parser.add_argument( - "--verbose", - "-v", - action="store_true", - help="Show progress during benchmarking", - ) - parser.add_argument( - "--hook", - action="store_true", - help="Run in hook mode: compare against baseline, exit 0 pass, 1 fail", - ) - parser.add_argument( - "--baseline", - action="store_true", - help="Save current results as baseline for future hook comparisons", - ) - parser.add_argument( - "--threshold", - type=float, - default=0.2, - help="Performance degradation threshold for hook mode (default: 0.2 = 20%%)", - ) - parser.add_argument( - "--cache", - type=str, - default=None, - help="Path to baseline cache file (default: ~/.mainline_benchmark_cache.json)", - ) - - args = parser.parse_args() - - cache_path = Path(args.cache) if args.cache else DEFAULT_CACHE_PATH - - if args.hook: - displays = None - if args.displays: - display_map = dict(get_available_displays()) - displays = [ - (name, display_map[name]) - for name in args.displays.split(",") - if name in display_map - ] - - effects = None - if args.effects: - effect_map = dict(get_available_effects()) - effects = [ - (name, effect_map[name]) - for name in args.effects.split(",") - if name in effect_map - ] - - return run_hook_mode( - displays, - effects, - iterations=args.iterations, - threshold=args.threshold, - cache_path=cache_path, - verbose=args.verbose, - ) - - displays = None - if args.displays: - display_map = dict(get_available_displays()) - displays = [ - (name, display_map[name]) - for name in args.displays.split(",") - if name in display_map - ] - - effects = None - if args.effects: - effect_map = dict(get_available_effects()) - effects = [ - (name, effect_map[name]) - for name in args.effects.split(",") - if name in effect_map - ] - - report = run_benchmarks(displays, effects, args.iterations, args.verbose) - - if args.baseline: - save_baseline(report.results, cache_path) - print(f"Baseline saved to {cache_path}") - return 0 - - if args.format == "json": - output = format_report_json(report) - else: - output = format_report_text(report) - - if args.output: - with open(args.output, "w") as f: - f.write(output) - else: - print(output) - - return 0 - - -if __name__ == "__main__": - sys.exit(main()) diff --git a/engine/camera.py b/engine/camera.py index 7d55800..a038d4b 100644 --- a/engine/camera.py +++ b/engine/camera.py @@ -21,6 +21,7 @@ class CameraMode(Enum): HORIZONTAL = auto() OMNI = auto() FLOATING = auto() + BOUNCE = auto() @dataclass @@ -135,8 +136,12 @@ class Camera: self._update_omni(dt) elif self.mode == CameraMode.FLOATING: self._update_floating(dt) + elif self.mode == CameraMode.BOUNCE: + self._update_bounce(dt) - self._clamp_to_bounds() + # Bounce mode handles its own bounds checking + if self.mode != CameraMode.BOUNCE: + self._clamp_to_bounds() def _clamp_to_bounds(self) -> None: """Clamp camera position to stay within canvas bounds. @@ -170,6 +175,43 @@ class Camera: self.y = int(math.sin(self._time * 2) * base) self.x = int(math.cos(self._time * 1.5) * base * 0.5) + def _update_bounce(self, dt: float) -> None: + """Bouncing DVD-style camera that bounces off canvas edges.""" + vw = self.viewport_width + vh = self.viewport_height + + # Initialize direction if not set + if not hasattr(self, "_bounce_dx"): + self._bounce_dx = 1 + self._bounce_dy = 1 + + # Calculate max positions + max_x = max(0, self.canvas_width - vw) + max_y = max(0, self.canvas_height - vh) + + # Move + move_speed = self.speed * dt * 60 + + # Bounce off edges - reverse direction when hitting bounds + self.x += int(move_speed * self._bounce_dx) + self.y += int(move_speed * self._bounce_dy) + + # Bounce horizontally + if self.x <= 0: + self.x = 0 + self._bounce_dx = 1 + elif self.x >= max_x: + self.x = max_x + self._bounce_dx = -1 + + # Bounce vertically + if self.y <= 0: + self.y = 0 + self._bounce_dy = 1 + elif self.y >= max_y: + self.y = max_y + self._bounce_dy = -1 + def reset(self) -> None: """Reset camera position.""" self.x = 0 @@ -212,6 +254,13 @@ class Camera: mode=CameraMode.FLOATING, speed=speed, canvas_width=200, canvas_height=200 ) + @classmethod + def bounce(cls, speed: float = 1.0) -> "Camera": + """Create a bouncing DVD-style camera that bounces off canvas edges.""" + return cls( + mode=CameraMode.BOUNCE, speed=speed, canvas_width=200, canvas_height=200 + ) + @classmethod def custom(cls, update_fn: Callable[["Camera", float], None]) -> "Camera": """Create a camera with custom update function.""" diff --git a/engine/data_sources/__init__.py b/engine/data_sources/__init__.py new file mode 100644 index 0000000..2f5493c --- /dev/null +++ b/engine/data_sources/__init__.py @@ -0,0 +1,12 @@ +""" +Data source implementations for the pipeline architecture. + +Import directly from submodules: + from engine.data_sources.sources import DataSource, SourceItem, HeadlinesDataSource + from engine.data_sources.pipeline_introspection import PipelineIntrospectionSource +""" + +# Re-export for convenience +from engine.data_sources.sources import ImageItem, SourceItem + +__all__ = ["ImageItem", "SourceItem"] diff --git a/engine/pipeline_sources/pipeline_introspection.py b/engine/data_sources/pipeline_introspection.py similarity index 65% rename from engine/pipeline_sources/pipeline_introspection.py rename to engine/data_sources/pipeline_introspection.py index 6761f9b..b7c372d 100644 --- a/engine/pipeline_sources/pipeline_introspection.py +++ b/engine/data_sources/pipeline_introspection.py @@ -15,7 +15,7 @@ Example: from typing import TYPE_CHECKING -from engine.sources_v2 import DataSource, SourceItem +from engine.data_sources.sources import DataSource, SourceItem if TYPE_CHECKING: from engine.pipeline.controller import Pipeline @@ -37,14 +37,25 @@ class PipelineIntrospectionSource(DataSource): def __init__( self, - pipelines: list["Pipeline"] | None = None, + pipeline: "Pipeline | None" = None, viewport_width: int = 100, viewport_height: int = 35, ): - self._pipelines = pipelines or [] + self._pipeline = pipeline # May be None initially, set later via set_pipeline() self.viewport_width = viewport_width self.viewport_height = viewport_height self.frame = 0 + self._ready = False + + def set_pipeline(self, pipeline: "Pipeline") -> None: + """Set the pipeline to introspect (call after pipeline is built).""" + self._pipeline = [pipeline] # Wrap in list for iteration + self._ready = True + + @property + def ready(self) -> bool: + """Check if source is ready to fetch.""" + return self._ready @property def name(self) -> str: @@ -68,16 +79,39 @@ class PipelineIntrospectionSource(DataSource): def add_pipeline(self, pipeline: "Pipeline") -> None: """Add a pipeline to visualize.""" - if pipeline not in self._pipelines: - self._pipelines.append(pipeline) + if self._pipeline is None: + self._pipeline = [pipeline] + elif isinstance(self._pipeline, list): + self._pipeline.append(pipeline) + else: + self._pipeline = [self._pipeline, pipeline] + self._ready = True def remove_pipeline(self, pipeline: "Pipeline") -> None: """Remove a pipeline from visualization.""" - if pipeline in self._pipelines: - self._pipelines.remove(pipeline) + if self._pipeline is None: + return + elif isinstance(self._pipeline, list): + self._pipeline = [p for p in self._pipeline if p is not pipeline] + if not self._pipeline: + self._pipeline = None + self._ready = False + elif self._pipeline is pipeline: + self._pipeline = None + self._ready = False def fetch(self) -> list[SourceItem]: """Fetch the introspection visualization.""" + if not self._ready: + # Return a placeholder until ready + return [ + SourceItem( + content="Initializing...", + source="pipeline-inspect", + timestamp="init", + ) + ] + lines = self._render() self.frame += 1 content = "\n".join(lines) @@ -97,27 +131,35 @@ class PipelineIntrospectionSource(DataSource): # Header lines.extend(self._render_header()) - if not self._pipelines: - lines.append(" No pipelines to visualize") - return lines - - # Render each pipeline's DAG - for i, pipeline in enumerate(self._pipelines): - if len(self._pipelines) > 1: - lines.append(f" Pipeline {i + 1}:") - lines.extend(self._render_pipeline(pipeline)) + # Render pipeline(s) if ready + if self._ready and self._pipeline: + pipelines = ( + self._pipeline if isinstance(self._pipeline, list) else [self._pipeline] + ) + for pipeline in pipelines: + lines.extend(self._render_pipeline(pipeline)) # Footer with sparkline lines.extend(self._render_footer()) return lines + @property + def _pipelines(self) -> list: + """Return pipelines as a list for iteration.""" + if self._pipeline is None: + return [] + elif isinstance(self._pipeline, list): + return self._pipeline + else: + return [self._pipeline] + def _render_header(self) -> list[str]: """Render the header with frame info and metrics summary.""" lines: list[str] = [] - if not self._pipelines: - return ["┌─ PIPELINE INTROSPECTION ──────────────────────────────┐"] + if not self._pipeline: + return ["PIPELINE INTROSPECTION"] # Get aggregate metrics total_ms = 0.0 @@ -128,13 +170,17 @@ class PipelineIntrospectionSource(DataSource): try: metrics = pipeline.get_metrics_summary() if metrics and "error" not in metrics: - total_ms = max(total_ms, metrics.get("avg_ms", 0)) - fps = max(fps, metrics.get("fps", 0)) + # Get avg_ms from pipeline metrics + pipeline_avg = metrics.get("pipeline", {}).get("avg_ms", 0) + total_ms = max(total_ms, pipeline_avg) + # Calculate FPS from avg_ms + if pipeline_avg > 0: + fps = max(fps, 1000.0 / pipeline_avg) frame_count = max(frame_count, metrics.get("frame_count", 0)) except Exception: pass - header = f"┌─ PIPELINE INTROSPECTION ── frame: {self.frame} ─ avg: {total_ms:.1f}ms ─ fps: {fps:.1f} ─┐" + header = f"PIPELINE INTROSPECTION -- frame: {self.frame} -- avg: {total_ms:.1f}ms -- fps: {fps:.1f}" lines.append(header) return lines @@ -175,8 +221,8 @@ class PipelineIntrospectionSource(DataSource): total_time = sum(s["ms"] for s in stage_infos) or 1.0 # Render DAG - group by category - lines.append("│") - lines.append("│ Signal Flow:") + lines.append("") + lines.append(" Signal Flow:") # Group stages by category for display categories: dict[str, list[dict]] = {} @@ -195,20 +241,20 @@ class PipelineIntrospectionSource(DataSource): cat_stages = categories[cat] cat_names = [s["name"] for s in cat_stages] - lines.append(f"│ {cat}: {' → '.join(cat_names)}") + lines.append(f" {cat}: {' → '.join(cat_names)}") # Render timing breakdown - lines.append("│") - lines.append("│ Stage Timings:") + lines.append("") + lines.append(" Stage Timings:") for info in stage_infos: name = info["name"] ms = info["ms"] pct = (ms / total_time) * 100 bar = self._render_bar(pct, 20) - lines.append(f"│ {name:12s} {ms:6.2f}ms {bar} {pct:5.1f}%") + lines.append(f" {name:12s} {ms:6.2f}ms {bar} {pct:5.1f}%") - lines.append("│") + lines.append("") return lines @@ -217,9 +263,10 @@ class PipelineIntrospectionSource(DataSource): lines: list[str] = [] # Get frame history from first pipeline - if self._pipelines: + pipelines = self._pipelines + if pipelines: try: - frame_times = self._pipelines[0].get_frame_times() + frame_times = pipelines[0].get_frame_times() except Exception: frame_times = [] else: @@ -227,21 +274,13 @@ class PipelineIntrospectionSource(DataSource): if frame_times: sparkline = self._render_sparkline(frame_times[-60:], 50) - lines.append( - f"├─ Frame Time History (last {len(frame_times[-60:])} frames) ─────────────────────────────┤" - ) - lines.append(f"│{sparkline}│") + lines.append(f" Frame Time History (last {len(frame_times[-60:])} frames)") + lines.append(f" {sparkline}") else: - lines.append( - "├─ Frame Time History ─────────────────────────────────────────┤" - ) - lines.append( - "│ (collecting data...) │" - ) + lines.append(" Frame Time History") + lines.append(" (collecting data...)") - lines.append( - "└────────────────────────────────────────────────────────────────┘" - ) + lines.append("") return lines diff --git a/engine/sources_v2.py b/engine/data_sources/sources.py similarity index 70% rename from engine/sources_v2.py rename to engine/data_sources/sources.py index dcd0afa..c7c1289 100644 --- a/engine/sources_v2.py +++ b/engine/data_sources/sources.py @@ -1,11 +1,11 @@ """ -Data source abstraction - Treat data sources as first-class citizens in the pipeline. +Data sources for the pipeline architecture. -Each data source implements a common interface: -- name: Display name for the source -- fetch(): Fetch fresh data -- stream(): Stream data continuously (optional) -- get_items(): Get current items +This module contains all DataSource implementations: +- DataSource: Abstract base class +- SourceItem, ImageItem: Data containers +- HeadlinesDataSource, PoetryDataSource, ImageDataSource: Concrete sources +- SourceRegistry: Registry for source discovery """ from abc import ABC, abstractmethod @@ -24,6 +24,17 @@ class SourceItem: metadata: dict[str, Any] | None = None +@dataclass +class ImageItem: + """An image item from a data source - wraps a PIL Image.""" + + image: Any # PIL Image + source: str + timestamp: str + path: str | None = None # File path or URL if applicable + metadata: dict[str, Any] | None = None + + class DataSource(ABC): """Abstract base class for data sources. @@ -80,6 +91,31 @@ class HeadlinesDataSource(DataSource): return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items] +class EmptyDataSource(DataSource): + """Empty data source that produces blank lines for testing. + + Useful for testing display borders, effects, and other pipeline + components without needing actual content. + """ + + def __init__(self, width: int = 80, height: int = 24): + self.width = width + self.height = height + + @property + def name(self) -> str: + return "empty" + + @property + def is_dynamic(self) -> bool: + return False + + def fetch(self) -> list[SourceItem]: + # Return empty lines as content + content = "\n".join([" " * self.width for _ in range(self.height)]) + return [SourceItem(content=content, source="empty", timestamp="0")] + + class PoetryDataSource(DataSource): """Data source for Poetry DB.""" @@ -94,6 +130,94 @@ class PoetryDataSource(DataSource): return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items] +class ImageDataSource(DataSource): + """Data source that loads PNG images from file paths or URLs. + + Supports: + - Local file paths (e.g., /path/to/image.png) + - URLs (e.g., https://example.com/image.png) + + Yields ImageItem objects containing PIL Image objects that can be + converted to text buffers by an ImageToTextTransform stage. + """ + + def __init__( + self, + path: str | list[str] | None = None, + urls: str | list[str] | None = None, + ): + """ + Args: + path: Single path or list of paths to PNG files + urls: Single URL or list of URLs to PNG images + """ + self._paths = [path] if isinstance(path, str) else (path or []) + self._urls = [urls] if isinstance(urls, str) else (urls or []) + self._images: list[ImageItem] = [] + self._load_images() + + def _load_images(self) -> None: + """Load all images from paths and URLs.""" + from datetime import datetime + from io import BytesIO + from urllib.request import urlopen + + timestamp = datetime.now().isoformat() + + for path in self._paths: + try: + from PIL import Image + + img = Image.open(path) + if img.mode != "RGBA": + img = img.convert("RGBA") + self._images.append( + ImageItem( + image=img, + source=f"file:{path}", + timestamp=timestamp, + path=path, + ) + ) + except Exception: + pass + + for url in self._urls: + try: + from PIL import Image + + with urlopen(url) as response: + img = Image.open(BytesIO(response.read())) + if img.mode != "RGBA": + img = img.convert("RGBA") + self._images.append( + ImageItem( + image=img, + source=f"url:{url}", + timestamp=timestamp, + path=url, + ) + ) + except Exception: + pass + + @property + def name(self) -> str: + return "image" + + @property + def is_dynamic(self) -> bool: + return False # Static images, not updating + + def fetch(self) -> list[ImageItem]: + """Return loaded images as ImageItem list.""" + return self._images + + def get_items(self) -> list[ImageItem]: + """Return current image items.""" + return self._images + + class MetricsDataSource(DataSource): """Data source that renders live pipeline metrics as ASCII art. diff --git a/engine/display/__init__.py b/engine/display/__init__.py index ed72a15..33d6394 100644 --- a/engine/display/__init__.py +++ b/engine/display/__init__.py @@ -55,8 +55,13 @@ class Display(Protocol): """ ... - def show(self, buffer: list[str]) -> None: - """Show buffer on display.""" + def show(self, buffer: list[str], border: bool = False) -> None: + """Show buffer on display. + + Args: + buffer: Buffer to display + border: If True, render border around buffer (default False) + """ ... def clear(self) -> None: @@ -136,10 +141,90 @@ def get_monitor(): return None +def _strip_ansi(s: str) -> str: + """Strip ANSI escape sequences from string for length calculation.""" + import re + + return re.sub(r"\x1b\[[0-9;]*[a-zA-Z]", "", s) + + +def render_border( + buf: list[str], width: int, height: int, fps: float = 0.0, frame_time: float = 0.0 +) -> list[str]: + """Render a border around the buffer. + + Args: + buf: Input buffer (list of strings) + width: Display width in characters + height: Display height in rows + fps: Current FPS to display in top border (optional) + frame_time: Frame time in ms to display in bottom border (optional) + + Returns: + Buffer with border applied + """ + if not buf or width < 3 or height < 3: + return buf + + inner_w = width - 2 + inner_h = height - 2 + + # Crop buffer to fit inside border + cropped = [] + for i in range(min(inner_h, len(buf))): + line = buf[i] + # Calculate visible width (excluding ANSI codes) + visible_len = len(_strip_ansi(line)) + if visible_len > inner_w: + # Truncate carefully - this is approximate for ANSI text + cropped.append(line[:inner_w]) + else: + cropped.append(line + " " * (inner_w - visible_len)) + + # Pad with empty lines if needed + while len(cropped) < inner_h: + cropped.append(" " * inner_w) + + # Build borders + if fps > 0: + fps_str = f" FPS:{fps:.0f}" + if len(fps_str) < inner_w: + right_len = inner_w - len(fps_str) + top_border = "┌" + "─" * right_len + fps_str + "┐" + else: + top_border = "┌" + "─" * inner_w + "┐" + else: + top_border = "┌" + "─" * inner_w + "┐" + + if frame_time > 0: + ft_str = f" {frame_time:.1f}ms" + if len(ft_str) < inner_w: + right_len = inner_w - len(ft_str) + bottom_border = "└" + "─" * right_len + ft_str + "┘" + else: + bottom_border = "└" + "─" * inner_w + "┘" + else: + bottom_border = "└" + "─" * inner_w + "┘" + + # Build result with left/right borders + result = [top_border] + for line in cropped: + # Ensure exactly inner_w characters before adding right border + if len(line) < inner_w: + line = line + " " * (inner_w - len(line)) + elif len(line) > inner_w: + line = line[:inner_w] + result.append("│" + line + "│") + result.append(bottom_border) + + return result + + __all__ = [ "Display", "DisplayRegistry", "get_monitor", + "render_border", "TerminalDisplay", "NullDisplay", "WebSocketDisplay", diff --git a/engine/display/backends/kitty.py b/engine/display/backends/kitty.py index e6a5d89..61f1ac7 100644 --- a/engine/display/backends/kitty.py +++ b/engine/display/backends/kitty.py @@ -68,11 +68,31 @@ class KittyDisplay: return self._font_path - def show(self, buffer: list[str]) -> None: + def show(self, buffer: list[str], border: bool = False) -> None: import sys t0 = time.perf_counter() + # Get metrics for border display + fps = 0.0 + frame_time = 0.0 + from engine.display import get_monitor + + monitor = get_monitor() + if monitor: + stats = monitor.get_stats() + avg_ms = stats.get("avg_ms", 0) if stats else 0 + frame_count = stats.get("frame_count", 0) if stats else 0 + if avg_ms and frame_count > 0: + fps = 1000.0 / avg_ms + frame_time = avg_ms + + # Apply border if requested + if border: + from engine.display import render_border + + buffer = render_border(buffer, self.width, self.height, fps, frame_time) + img_width = self.width * self.cell_width img_height = self.height * self.cell_height @@ -150,3 +170,11 @@ class KittyDisplay: def cleanup(self) -> None: self.clear() + + def get_dimensions(self) -> tuple[int, int]: + """Get current dimensions. + + Returns: + (width, height) in character cells + """ + return (self.width, self.height) diff --git a/engine/display/backends/null.py b/engine/display/backends/null.py index 5c89086..399a8b5 100644 --- a/engine/display/backends/null.py +++ b/engine/display/backends/null.py @@ -26,7 +26,7 @@ class NullDisplay: self.width = width self.height = height - def show(self, buffer: list[str]) -> None: + def show(self, buffer: list[str], border: bool = False) -> None: from engine.display import get_monitor monitor = get_monitor() @@ -41,3 +41,11 @@ class NullDisplay: def cleanup(self) -> None: pass + + def get_dimensions(self) -> tuple[int, int]: + """Get current dimensions. + + Returns: + (width, height) in character cells + """ + return (self.width, self.height) diff --git a/engine/display/backends/pygame.py b/engine/display/backends/pygame.py index 0988c36..0ae9811 100644 --- a/engine/display/backends/pygame.py +++ b/engine/display/backends/pygame.py @@ -17,7 +17,6 @@ class PygameDisplay: width: int = 80 window_width: int = 800 window_height: int = 600 - _pygame_initialized: bool = False def __init__( self, @@ -25,6 +24,7 @@ class PygameDisplay: cell_height: int = 18, window_width: int = 800, window_height: int = 600, + target_fps: float = 30.0, ): self.width = 80 self.height = 24 @@ -32,12 +32,15 @@ class PygameDisplay: self.cell_height = cell_height self.window_width = window_width self.window_height = window_height + self.target_fps = target_fps self._initialized = False self._pygame = None self._screen = None self._font = None self._resized = False self._quit_requested = False + self._last_frame_time = 0.0 + self._frame_period = 1.0 / target_fps if target_fps > 0 else 0 def _get_font_path(self) -> str | None: """Get font path for rendering.""" @@ -130,7 +133,7 @@ class PygameDisplay: self._initialized = True - def show(self, buffer: list[str]) -> None: + def show(self, buffer: list[str], border: bool = False) -> None: if not self._initialized or not self._pygame: return @@ -154,6 +157,34 @@ class PygameDisplay: self.height = max(1, self.window_height // self.cell_height) self._resized = True + # FPS limiting - skip frame if we're going too fast + if self._frame_period > 0: + now = time.perf_counter() + elapsed = now - self._last_frame_time + if elapsed < self._frame_period: + return # Skip this frame + self._last_frame_time = now + + # Get metrics for border display + fps = 0.0 + frame_time = 0.0 + from engine.display import get_monitor + + monitor = get_monitor() + if monitor: + stats = monitor.get_stats() + avg_ms = stats.get("avg_ms", 0) if stats else 0 + frame_count = stats.get("frame_count", 0) if stats else 0 + if avg_ms and frame_count > 0: + fps = 1000.0 / avg_ms + frame_time = avg_ms + + # Apply border if requested + if border: + from engine.display import render_border + + buffer = render_border(buffer, self.width, self.height, fps, frame_time) + self._screen.fill((0, 0, 0)) for row_idx, line in enumerate(buffer[: self.height]): @@ -180,9 +211,6 @@ class PygameDisplay: elapsed_ms = (time.perf_counter() - t0) * 1000 - from engine.display import get_monitor - - monitor = get_monitor() if monitor: chars_in = sum(len(line) for line in buffer) monitor.record_effect("pygame_display", elapsed_ms, chars_in, chars_in) @@ -198,8 +226,17 @@ class PygameDisplay: Returns: (width, height) in character cells """ - if self._resized: - self._resized = False + # Query actual window size and recalculate character cells + if self._screen and self._pygame: + try: + w, h = self._screen.get_size() + if w != self.window_width or h != self.window_height: + self.window_width = w + self.window_height = h + self.width = max(1, w // self.cell_width) + self.height = max(1, h // self.cell_height) + except Exception: + pass return self.width, self.height def cleanup(self, quit_pygame: bool = True) -> None: diff --git a/engine/display/backends/sixel.py b/engine/display/backends/sixel.py index adc8c7b..d692895 100644 --- a/engine/display/backends/sixel.py +++ b/engine/display/backends/sixel.py @@ -122,11 +122,31 @@ class SixelDisplay: self.height = height self._initialized = True - def show(self, buffer: list[str]) -> None: + def show(self, buffer: list[str], border: bool = False) -> None: import sys t0 = time.perf_counter() + # Get metrics for border display + fps = 0.0 + frame_time = 0.0 + from engine.display import get_monitor + + monitor = get_monitor() + if monitor: + stats = monitor.get_stats() + avg_ms = stats.get("avg_ms", 0) if stats else 0 + frame_count = stats.get("frame_count", 0) if stats else 0 + if avg_ms and frame_count > 0: + fps = 1000.0 / avg_ms + frame_time = avg_ms + + # Apply border if requested + if border: + from engine.display import render_border + + buffer = render_border(buffer, self.width, self.height, fps, frame_time) + img_width = self.width * self.cell_width img_height = self.height * self.cell_height @@ -198,3 +218,11 @@ class SixelDisplay: def cleanup(self) -> None: pass + + def get_dimensions(self) -> tuple[int, int]: + """Get current dimensions. + + Returns: + (width, height) in character cells + """ + return (self.width, self.height) diff --git a/engine/display/backends/terminal.py b/engine/display/backends/terminal.py index d3d490d..61106c9 100644 --- a/engine/display/backends/terminal.py +++ b/engine/display/backends/terminal.py @@ -2,6 +2,7 @@ ANSI terminal display backend. """ +import os import time @@ -10,40 +11,106 @@ class TerminalDisplay: Renders buffer to stdout using ANSI escape codes. Supports reuse - when reuse=True, skips re-initializing terminal state. + Auto-detects terminal dimensions on init. """ width: int = 80 height: int = 24 _initialized: bool = False + def __init__(self, target_fps: float = 30.0): + self.target_fps = target_fps + self._frame_period = 1.0 / target_fps if target_fps > 0 else 0 + self._last_frame_time = 0.0 + def init(self, width: int, height: int, reuse: bool = False) -> None: """Initialize display with dimensions. + If width/height are not provided (0/None), auto-detects terminal size. + Otherwise uses provided dimensions or falls back to terminal size + if the provided dimensions exceed terminal capacity. + Args: - width: Terminal width in characters - height: Terminal height in rows + width: Desired terminal width (0 = auto-detect) + height: Desired terminal height (0 = auto-detect) reuse: If True, skip terminal re-initialization """ from engine.terminal import CURSOR_OFF - self.width = width - self.height = height + # Auto-detect terminal size (handle case where no terminal) + try: + term_size = os.get_terminal_size() + term_width = term_size.columns + term_height = term_size.lines + except OSError: + # No terminal available (e.g., in tests) + term_width = width if width > 0 else 80 + term_height = height if height > 0 else 24 + + # Use provided dimensions if valid, otherwise use terminal size + if width > 0 and height > 0: + self.width = min(width, term_width) + self.height = min(height, term_height) + else: + self.width = term_width + self.height = term_height if not reuse or not self._initialized: print(CURSOR_OFF, end="", flush=True) self._initialized = True - def show(self, buffer: list[str]) -> None: + def get_dimensions(self) -> tuple[int, int]: + """Get current terminal dimensions. + + Returns: + (width, height) in character cells + """ + try: + term_size = os.get_terminal_size() + return (term_size.columns, term_size.lines) + except OSError: + return (self.width, self.height) + + def show(self, buffer: list[str], border: bool = False) -> None: import sys + from engine.display import get_monitor, render_border + t0 = time.perf_counter() - sys.stdout.buffer.write("".join(buffer).encode()) + + # FPS limiting - skip frame if we're going too fast + if self._frame_period > 0: + now = time.perf_counter() + elapsed = now - self._last_frame_time + if elapsed < self._frame_period: + # Skip this frame - too soon + return + self._last_frame_time = now + + # Get metrics for border display + fps = 0.0 + frame_time = 0.0 + monitor = get_monitor() + if monitor: + stats = monitor.get_stats() + avg_ms = stats.get("avg_ms", 0) if stats else 0 + frame_count = stats.get("frame_count", 0) if stats else 0 + if avg_ms and frame_count > 0: + fps = 1000.0 / avg_ms + frame_time = avg_ms + + # Apply border if requested + if border: + buffer = render_border(buffer, self.width, self.height, fps, frame_time) + + # Clear screen and home cursor before each frame + from engine.terminal import CLR + + output = CLR + "".join(buffer) + sys.stdout.buffer.write(output.encode()) sys.stdout.flush() elapsed_ms = (time.perf_counter() - t0) * 1000 - from engine.display import get_monitor - - monitor = get_monitor() if monitor: chars_in = sum(len(line) for line in buffer) monitor.record_effect("terminal_display", elapsed_ms, chars_in, chars_in) diff --git a/engine/display/backends/websocket.py b/engine/display/backends/websocket.py index f7d6c38..7ac31ce 100644 --- a/engine/display/backends/websocket.py +++ b/engine/display/backends/websocket.py @@ -101,10 +101,28 @@ class WebSocketDisplay: self.start_server() self.start_http_server() - def show(self, buffer: list[str]) -> None: + def show(self, buffer: list[str], border: bool = False) -> None: """Broadcast buffer to all connected clients.""" t0 = time.perf_counter() + # Get metrics for border display + fps = 0.0 + frame_time = 0.0 + monitor = get_monitor() + if monitor: + stats = monitor.get_stats() + avg_ms = stats.get("avg_ms", 0) if stats else 0 + frame_count = stats.get("frame_count", 0) if stats else 0 + if avg_ms and frame_count > 0: + fps = 1000.0 / avg_ms + frame_time = avg_ms + + # Apply border if requested + if border: + from engine.display import render_border + + buffer = render_border(buffer, self.width, self.height, fps, frame_time) + if self._clients: frame_data = { "type": "frame", @@ -272,3 +290,11 @@ class WebSocketDisplay: def set_client_disconnected_callback(self, callback) -> None: """Set callback for client disconnections.""" self._client_disconnected_callback = callback + + def get_dimensions(self) -> tuple[int, int]: + """Get current dimensions. + + Returns: + (width, height) in character cells + """ + return (self.width, self.height) diff --git a/engine/pipeline.py b/engine/pipeline.py index 9d89677..45b414b 100644 --- a/engine/pipeline.py +++ b/engine/pipeline.py @@ -233,7 +233,7 @@ class PipelineIntrospector: def introspect_sources_v2(self) -> None: """Introspect data sources v2 (new abstraction).""" - from engine.sources_v2 import SourceRegistry, init_default_sources + from engine.data_sources.sources import SourceRegistry, init_default_sources init_default_sources() SourceRegistry() @@ -241,7 +241,7 @@ class PipelineIntrospector: self.add_node( PipelineNode( name="SourceRegistry", - module="engine.sources_v2", + module="engine.data_sources.sources", class_name="SourceRegistry", description="Source discovery and management", ) diff --git a/engine/pipeline/adapters.py b/engine/pipeline/adapters.py index b69d74a..47cc86b 100644 --- a/engine/pipeline/adapters.py +++ b/engine/pipeline/adapters.py @@ -264,6 +264,92 @@ class DataSourceStage(Stage): return data +class PassthroughStage(Stage): + """Simple stage that passes data through unchanged. + + Used for sources that already provide the data in the correct format + (e.g., pipeline introspection that outputs text directly). + """ + + def __init__(self, name: str = "passthrough"): + self.name = name + self.category = "render" + self.optional = True + + @property + def stage_type(self) -> str: + return "render" + + @property + def capabilities(self) -> set[str]: + return {"render.output"} + + @property + def dependencies(self) -> set[str]: + return {"source"} + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Pass data through unchanged.""" + return data + + +class SourceItemsToBufferStage(Stage): + """Convert SourceItem objects to text buffer. + + Takes a list of SourceItem objects and extracts their content, + splitting on newlines to create a proper text buffer for display. + """ + + def __init__(self, name: str = "items-to-buffer"): + self.name = name + self.category = "render" + self.optional = True + + @property + def stage_type(self) -> str: + return "render" + + @property + def capabilities(self) -> set[str]: + return {"render.output"} + + @property + def dependencies(self) -> set[str]: + return {"source"} + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Convert SourceItem list to text buffer.""" + if data is None: + return [] + + # If already a list of strings, return as-is + if isinstance(data, list) and data and isinstance(data[0], str): + return data + + # If it's a list of SourceItem, extract content + from engine.data_sources import SourceItem + + if isinstance(data, list): + result = [] + for item in data: + if isinstance(item, SourceItem): + # Split content by newline to get individual lines + lines = item.content.split("\n") + result.extend(lines) + elif hasattr(item, "content"): # Has content attribute + lines = str(item.content).split("\n") + result.extend(lines) + else: + result.append(str(item)) + return result + + # Single item + if isinstance(data, SourceItem): + return data.content.split("\n") + + return [str(data)] + + class ItemsStage(Stage): """Stage that holds pre-fetched items and provides them to the pipeline. @@ -430,6 +516,113 @@ class FontStage(Stage): return data +class ImageToTextStage(Stage): + """Transform that converts PIL Image to ASCII text buffer. + + Takes an ImageItem or PIL Image and converts it to a text buffer + using ASCII character density mapping. The output can be displayed + directly or further processed by effects. + + Attributes: + width: Output width in characters + height: Output height in characters + charset: Character set for density mapping (default: simple ASCII) + """ + + def __init__( + self, + width: int = 80, + height: int = 24, + charset: str = " .:-=+*#%@", + name: str = "image-to-text", + ): + self.name = name + self.category = "transform" + self.optional = False + self.width = width + self.height = height + self.charset = charset + + @property + def stage_type(self) -> str: + return "transform" + + @property + def capabilities(self) -> set[str]: + from engine.pipeline.core import DataType + + return {f"transform.{self.name}", DataType.TEXT_BUFFER} + + @property + def dependencies(self) -> set[str]: + return {"source"} + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Convert PIL Image to text buffer.""" + if data is None: + return None + + from engine.data_sources.sources import ImageItem + + # Extract PIL Image from various input types + pil_image = None + + if isinstance(data, ImageItem) or hasattr(data, "image"): + pil_image = data.image + else: + # Assume it's already a PIL Image + pil_image = data + + # Check if it's a PIL Image + if not hasattr(pil_image, "resize"): + # Not a PIL Image, return as-is + return data if isinstance(data, list) else [str(data)] + + # Convert to grayscale and resize + try: + if pil_image.mode != "L": + pil_image = pil_image.convert("L") + except Exception: + return ["[image conversion error]"] + + # Calculate cell aspect ratio correction (characters are taller than wide) + aspect_ratio = 0.5 + target_w = self.width + target_h = int(self.height * aspect_ratio) + + # Resize image to target dimensions + try: + resized = pil_image.resize((target_w, target_h)) + except Exception: + return ["[image resize error]"] + + # Map pixels to characters + result = [] + pixels = list(resized.getdata()) + + for row in range(target_h): + line = "" + for col in range(target_w): + idx = row * target_w + col + if idx < len(pixels): + brightness = pixels[idx] + char_idx = int((brightness / 255) * (len(self.charset) - 1)) + line += self.charset[char_idx] + else: + line += " " + result.append(line) + + # Pad or trim to exact height + while len(result) < self.height: + result.append(" " * self.width) + result = result[: self.height] + + # Pad lines to width + result = [line.ljust(self.width) for line in result] + + return result + + def create_stage_from_display(display, name: str = "terminal") -> DisplayStage: """Create a Stage from a Display instance.""" return DisplayStage(display, name) diff --git a/engine/pipeline/controller.py b/engine/pipeline/controller.py index ff6dbd7..6810a66 100644 --- a/engine/pipeline/controller.py +++ b/engine/pipeline/controller.py @@ -519,8 +519,8 @@ def create_pipeline_from_params(params: PipelineParams) -> Pipeline: def create_default_pipeline() -> Pipeline: """Create a default pipeline with all standard components.""" + from engine.data_sources.sources import HeadlinesDataSource from engine.pipeline.adapters import DataSourceStage - from engine.sources_v2 import HeadlinesDataSource pipeline = Pipeline() diff --git a/engine/pipeline/core.py b/engine/pipeline/core.py index e6ea66d..e3d566c 100644 --- a/engine/pipeline/core.py +++ b/engine/pipeline/core.py @@ -29,12 +29,14 @@ class DataType(Enum): ITEM_TUPLES: List[tuple] - (title, source, timestamp) tuples TEXT_BUFFER: List[str] - rendered ANSI buffer for display RAW_TEXT: str - raw text strings + PIL_IMAGE: PIL Image object """ SOURCE_ITEMS = auto() # List[SourceItem] - from DataSource ITEM_TUPLES = auto() # List[tuple] - (title, source, ts) TEXT_BUFFER = auto() # List[str] - ANSI buffer RAW_TEXT = auto() # str - raw text + PIL_IMAGE = auto() # PIL Image object ANY = auto() # Accepts any type NONE = auto() # No data (terminator) diff --git a/engine/pipeline/params.py b/engine/pipeline/params.py index 2c7468c..4f29c3a 100644 --- a/engine/pipeline/params.py +++ b/engine/pipeline/params.py @@ -23,6 +23,7 @@ class PipelineParams: # Display config display: str = "terminal" + border: bool = False # Camera config camera_mode: str = "vertical" diff --git a/engine/pipeline/presets.py b/engine/pipeline/presets.py index ceda711..970146f 100644 --- a/engine/pipeline/presets.py +++ b/engine/pipeline/presets.py @@ -47,12 +47,14 @@ class PipelinePreset: display: str = "terminal" camera: str = "vertical" effects: list[str] = field(default_factory=list) + border: bool = False def to_params(self) -> PipelineParams: """Convert to PipelineParams.""" params = PipelineParams() params.source = self.source params.display = self.display + params.border = self.border params.camera_mode = self.camera params.effect_order = self.effects.copy() return params @@ -67,6 +69,7 @@ class PipelinePreset: display=data.get("display", "terminal"), camera=data.get("camera", "vertical"), effects=data.get("effects", []), + border=data.get("border", False), ) diff --git a/engine/pipeline/registry.py b/engine/pipeline/registry.py index e06b90a..59dc3f9 100644 --- a/engine/pipeline/registry.py +++ b/engine/pipeline/registry.py @@ -87,7 +87,7 @@ def discover_stages() -> None: # Import and register all stage implementations try: - from engine.sources_v2 import ( + from engine.data_sources.sources import ( HeadlinesDataSource, PoetryDataSource, ) @@ -102,7 +102,7 @@ def discover_stages() -> None: # Register pipeline introspection source try: - from engine.pipeline_sources.pipeline_introspection import ( + from engine.data_sources.pipeline_introspection import ( PipelineIntrospectionSource, ) diff --git a/engine/pipeline_sources/__init__.py b/engine/pipeline_sources/__init__.py deleted file mode 100644 index 47a1ce4..0000000 --- a/engine/pipeline_sources/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -""" -Data source implementations for the pipeline architecture. -""" - -from engine.pipeline_sources.pipeline_introspection import PipelineIntrospectionSource - -__all__ = ["PipelineIntrospectionSource"] diff --git a/presets.toml b/presets.toml index ea97fa3..f43f2c6 100644 --- a/presets.toml +++ b/presets.toml @@ -13,7 +13,7 @@ description = "Demo mode with effect cycling and camera modes" source = "headlines" display = "pygame" camera = "vertical" -effects = ["noise", "fade", "glitch", "firehose", "hud"] +effects = ["noise", "fade", "glitch", "firehose"] viewport_width = 80 viewport_height = 24 camera_speed = 1.0 @@ -24,18 +24,29 @@ description = "Poetry feed with subtle effects" source = "poetry" display = "pygame" camera = "vertical" -effects = ["fade", "hud"] +effects = ["fade"] viewport_width = 80 viewport_height = 24 camera_speed = 0.5 + +[presets.border-test] +description = "Test border rendering with empty buffer" +source = "empty" +display = "terminal" +camera = "vertical" +effects = [] +viewport_width = 80 +viewport_height = 24 +camera_speed = 1.0 firehose_enabled = false +border = true [presets.websocket] description = "WebSocket display mode" source = "headlines" display = "websocket" camera = "vertical" -effects = ["noise", "fade", "glitch", "hud"] +effects = ["noise", "fade", "glitch"] viewport_width = 80 viewport_height = 24 camera_speed = 1.0 @@ -46,7 +57,7 @@ description = "Sixel graphics display mode" source = "headlines" display = "sixel" camera = "vertical" -effects = ["noise", "fade", "glitch", "hud"] +effects = ["noise", "fade", "glitch"] viewport_width = 80 viewport_height = 24 camera_speed = 1.0 @@ -57,7 +68,7 @@ description = "High-speed firehose mode" source = "headlines" display = "pygame" camera = "vertical" -effects = ["noise", "fade", "glitch", "firehose", "hud"] +effects = ["noise", "fade", "glitch", "firehose"] viewport_width = 80 viewport_height = 24 camera_speed = 2.0 @@ -66,9 +77,9 @@ firehose_enabled = true [presets.pipeline-inspect] description = "Live pipeline introspection with DAG and performance metrics" source = "pipeline-inspect" -display = "terminal" +display = "pygame" camera = "vertical" -effects = ["hud"] +effects = ["crop"] viewport_width = 100 viewport_height = 35 camera_speed = 0.3 diff --git a/tests/test_border_effect.py b/tests/test_border_effect.py new file mode 100644 index 0000000..a7fac37 --- /dev/null +++ b/tests/test_border_effect.py @@ -0,0 +1,112 @@ +""" +Tests for BorderEffect. +""" + + +from effects_plugins.border import BorderEffect +from engine.effects.types import EffectContext + + +def make_ctx(terminal_width: int = 80, terminal_height: int = 24) -> EffectContext: + """Create a mock EffectContext.""" + return EffectContext( + terminal_width=terminal_width, + terminal_height=terminal_height, + scroll_cam=0, + ticker_height=terminal_height, + ) + + +class TestBorderEffect: + """Tests for BorderEffect.""" + + def test_basic_init(self): + """BorderEffect initializes with defaults.""" + effect = BorderEffect() + assert effect.name == "border" + assert effect.config.enabled is True + + def test_adds_border(self): + """BorderEffect adds border around content.""" + effect = BorderEffect() + buf = [ + "Hello World", + "Test Content", + "Third Line", + ] + ctx = make_ctx(terminal_width=20, terminal_height=10) + + result = effect.process(buf, ctx) + + # Should have top and bottom borders + assert len(result) >= 3 + # First line should start with border character + assert result[0][0] in "┌┎┍" + # Last line should end with border character + assert result[-1][-1] in "┘┖┚" + + def test_border_with_small_buffer(self): + """BorderEffect handles small buffer (too small for border).""" + effect = BorderEffect() + buf = ["ab"] # Too small for proper border + ctx = make_ctx(terminal_width=10, terminal_height=5) + + result = effect.process(buf, ctx) + + # Should still try to add border but result may differ + # At minimum should have output + assert len(result) >= 1 + + def test_metrics_in_border(self): + """BorderEffect includes FPS and frame time in border.""" + effect = BorderEffect() + buf = ["x" * 10] * 5 + ctx = make_ctx(terminal_width=20, terminal_height=10) + + # Add metrics to context + ctx.set_state( + "metrics", + { + "avg_ms": 16.5, + "frame_count": 100, + "fps": 60.0, + }, + ) + + result = effect.process(buf, ctx) + + # Check for FPS in top border + top_line = result[0] + assert "FPS" in top_line or "60" in top_line + + # Check for frame time in bottom border + bottom_line = result[-1] + assert "ms" in bottom_line or "16" in bottom_line + + def test_no_metrics(self): + """BorderEffect works without metrics.""" + effect = BorderEffect() + buf = ["content"] * 5 + ctx = make_ctx(terminal_width=20, terminal_height=10) + # No metrics set + + result = effect.process(buf, ctx) + + # Should still have border characters + assert len(result) >= 3 + assert result[0][0] in "┌┎┍" + + def test_crops_before_bordering(self): + """BorderEffect crops input before adding border.""" + effect = BorderEffect() + buf = ["x" * 100] * 50 # Very large buffer + ctx = make_ctx(terminal_width=20, terminal_height=10) + + result = effect.process(buf, ctx) + + # Should be cropped to fit, then bordered + # Result should be <= terminal_height with border + assert len(result) <= ctx.terminal_height + # Each line should be <= terminal_width + for line in result: + assert len(line) <= ctx.terminal_width diff --git a/tests/test_crop_effect.py b/tests/test_crop_effect.py new file mode 100644 index 0000000..aa99baf --- /dev/null +++ b/tests/test_crop_effect.py @@ -0,0 +1,100 @@ +""" +Tests for CropEffect. +""" + + +from effects_plugins.crop import CropEffect +from engine.effects.types import EffectContext + + +def make_ctx(terminal_width: int = 80, terminal_height: int = 24) -> EffectContext: + """Create a mock EffectContext.""" + return EffectContext( + terminal_width=terminal_width, + terminal_height=terminal_height, + scroll_cam=0, + ticker_height=terminal_height, + ) + + +class TestCropEffect: + """Tests for CropEffect.""" + + def test_basic_init(self): + """CropEffect initializes with defaults.""" + effect = CropEffect() + assert effect.name == "crop" + assert effect.config.enabled is True + + def test_crop_wider_buffer(self): + """CropEffect crops wide buffer to terminal width.""" + effect = CropEffect() + buf = [ + "This is a very long line that exceeds the terminal width of eighty characters!", + "Another long line that should also be cropped to fit within the terminal bounds!", + "Short", + ] + ctx = make_ctx(terminal_width=40, terminal_height=10) + + result = effect.process(buf, ctx) + + # Lines should be cropped to 40 chars + assert len(result[0]) == 40 + assert len(result[1]) == 40 + assert result[2] == "Short" + " " * 35 # padded to width + + def test_crop_taller_buffer(self): + """CropEffect crops tall buffer to terminal height.""" + effect = CropEffect() + buf = ["line"] * 30 # 30 lines + ctx = make_ctx(terminal_width=80, terminal_height=10) + + result = effect.process(buf, ctx) + + # Should be cropped to 10 lines + assert len(result) == 10 + + def test_pad_shorter_lines(self): + """CropEffect pads lines shorter than width.""" + effect = CropEffect() + buf = ["short", "medium length", ""] + ctx = make_ctx(terminal_width=20, terminal_height=5) + + result = effect.process(buf, ctx) + + assert len(result[0]) == 20 # padded + assert len(result[1]) == 20 # padded + assert len(result[2]) == 20 # padded (was empty) + + def test_pad_to_height(self): + """CropEffect pads with empty lines if buffer is too short.""" + effect = CropEffect() + buf = ["line1", "line2"] + ctx = make_ctx(terminal_width=20, terminal_height=10) + + result = effect.process(buf, ctx) + + # Should have 10 lines + assert len(result) == 10 + # Last 8 should be empty padding + for i in range(2, 10): + assert result[i] == " " * 20 + + def test_empty_buffer(self): + """CropEffect handles empty buffer.""" + effect = CropEffect() + ctx = make_ctx() + + result = effect.process([], ctx) + + assert result == [] + + def test_uses_context_dimensions(self): + """CropEffect uses context terminal_width/terminal_height.""" + effect = CropEffect() + buf = ["x" * 100] + ctx = make_ctx(terminal_width=50, terminal_height=1) + + result = effect.process(buf, ctx) + + assert len(result[0]) == 50 diff --git a/tests/test_pipeline.py b/tests/test_pipeline.py index 9495655..5aaf5ba 100644 --- a/tests/test_pipeline.py +++ b/tests/test_pipeline.py @@ -678,8 +678,8 @@ class TestDataSourceStage: def test_datasource_stage_capabilities(self): """DataSourceStage declares correct capabilities.""" + from engine.data_sources.sources import HeadlinesDataSource from engine.pipeline.adapters import DataSourceStage - from engine.sources_v2 import HeadlinesDataSource source = HeadlinesDataSource() stage = DataSourceStage(source, name="headlines") @@ -690,9 +690,9 @@ class TestDataSourceStage: """DataSourceStage fetches from DataSource.""" from unittest.mock import patch + from engine.data_sources.sources import HeadlinesDataSource from engine.pipeline.adapters import DataSourceStage from engine.pipeline.core import PipelineContext - from engine.sources_v2 import HeadlinesDataSource mock_items = [ ("Test Headline 1", "TestSource", "12:00"), @@ -859,8 +859,8 @@ class TestFullPipeline: def test_datasource_stage_capabilities_match_render_deps(self): """DataSourceStage provides capability that RenderStage can depend on.""" + from engine.data_sources.sources import HeadlinesDataSource from engine.pipeline.adapters import DataSourceStage, RenderStage - from engine.sources_v2 import HeadlinesDataSource # DataSourceStage provides "source.headlines" ds_stage = DataSourceStage(HeadlinesDataSource(), name="headlines") diff --git a/tests/test_pipeline_introspection.py b/tests/test_pipeline_introspection.py index aa09c75..23c6888 100644 --- a/tests/test_pipeline_introspection.py +++ b/tests/test_pipeline_introspection.py @@ -2,7 +2,7 @@ Tests for PipelineIntrospectionSource. """ -from engine.pipeline_sources.pipeline_introspection import PipelineIntrospectionSource +from engine.data_sources.pipeline_introspection import PipelineIntrospectionSource class TestPipelineIntrospectionSource: @@ -14,19 +14,17 @@ class TestPipelineIntrospectionSource: assert source.name == "pipeline-inspect" assert source.is_dynamic is True assert source.frame == 0 + assert source.ready is False - def test_init_with_pipelines(self): - """Source initializes with custom pipelines list.""" - source = PipelineIntrospectionSource( - pipelines=[], viewport_width=100, viewport_height=40 - ) + def test_init_with_params(self): + """Source initializes with custom params.""" + source = PipelineIntrospectionSource(viewport_width=100, viewport_height=40) assert source.viewport_width == 100 assert source.viewport_height == 40 def test_inlet_outlet_types(self): """Source has correct inlet/outlet types.""" source = PipelineIntrospectionSource() - # inlet should be NONE (source), outlet should be SOURCE_ITEMS from engine.pipeline.core import DataType assert DataType.NONE in source.inlet_types @@ -40,9 +38,24 @@ class TestPipelineIntrospectionSource: assert items[0].source == "pipeline-inspect" def test_fetch_increments_frame(self): - """fetch() increments frame counter.""" + """fetch() increments frame counter when ready.""" source = PipelineIntrospectionSource() assert source.frame == 0 + + # Set pipeline first to make source ready + class MockPipeline: + stages = {} + execution_order = [] + + def get_metrics_summary(self): + return {"avg_ms": 10.0, "fps": 60, "stages": {}} + + def get_frame_times(self): + return [10.0, 12.0, 11.0] + + source.set_pipeline(MockPipeline()) + assert source.ready is True + source.fetch() assert source.frame == 1 source.fetch() @@ -56,27 +69,30 @@ class TestPipelineIntrospectionSource: assert len(items) > 0 assert items[0].source == "pipeline-inspect" - def test_add_pipeline(self): - """add_pipeline() adds pipeline to list.""" + def test_set_pipeline(self): + """set_pipeline() marks source as ready.""" source = PipelineIntrospectionSource() - mock_pipeline = object() - source.add_pipeline(mock_pipeline) - assert mock_pipeline in source._pipelines + assert source.ready is False - def test_remove_pipeline(self): - """remove_pipeline() removes pipeline from list.""" - source = PipelineIntrospectionSource() - mock_pipeline = object() - source.add_pipeline(mock_pipeline) - source.remove_pipeline(mock_pipeline) - assert mock_pipeline not in source._pipelines + class MockPipeline: + stages = {} + execution_order = [] + + def get_metrics_summary(self): + return {"avg_ms": 10.0, "fps": 60, "stages": {}} + + def get_frame_times(self): + return [10.0, 12.0, 11.0] + + source.set_pipeline(MockPipeline()) + assert source.ready is True class TestPipelineIntrospectionRender: """Tests for rendering methods.""" - def test_render_header_no_pipelines(self): - """_render_header returns default when no pipelines.""" + def test_render_header_no_pipeline(self): + """_render_header returns default when no pipeline.""" source = PipelineIntrospectionSource() lines = source._render_header() assert len(lines) == 1 @@ -115,19 +131,18 @@ class TestPipelineIntrospectionRender: sparkline = source._render_sparkline([], 10) assert sparkline == " " * 10 - def test_render_footer_no_pipelines(self): - """_render_footer shows collecting data when no pipelines.""" + def test_render_footer_no_pipeline(self): + """_render_footer shows collecting data when no pipeline.""" source = PipelineIntrospectionSource() lines = source._render_footer() assert len(lines) >= 2 - assert "collecting data" in lines[1] or "Frame Time" in lines[0] class TestPipelineIntrospectionFull: """Integration tests.""" def test_render_empty(self): - """_render works with no pipelines.""" + """_render works when not ready.""" source = PipelineIntrospectionSource() lines = source._render() assert len(lines) > 0 @@ -151,6 +166,6 @@ class TestPipelineIntrospectionFull: def get_frame_times(self): return [1.0, 2.0, 3.0] - source.add_pipeline(MockPipeline()) + source.set_pipeline(MockPipeline()) lines = source._render() assert len(lines) > 0 diff --git a/tests/test_tint_effect.py b/tests/test_tint_effect.py new file mode 100644 index 0000000..c015167 --- /dev/null +++ b/tests/test_tint_effect.py @@ -0,0 +1,125 @@ +import pytest + +from effects_plugins.tint import TintEffect +from engine.effects.types import EffectConfig + + +@pytest.fixture +def effect(): + return TintEffect() + + +@pytest.fixture +def effect_with_params(r=255, g=128, b=64, a=0.5): + e = TintEffect() + config = EffectConfig( + enabled=True, + intensity=1.0, + params={"r": r, "g": g, "b": b, "a": a}, + ) + e.configure(config) + return e + + +@pytest.fixture +def mock_context(): + class MockContext: + terminal_width = 80 + terminal_height = 24 + + def get_state(self, key): + return None + + return MockContext() + + +class TestTintEffect: + def test_name(self, effect): + assert effect.name == "tint" + + def test_enabled_by_default(self, effect): + assert effect.config.enabled is True + + def test_returns_input_when_empty(self, effect, mock_context): + result = effect.process([], mock_context) + assert result == [] + + def test_returns_input_when_transparency_zero( + self, effect_with_params, mock_context + ): + effect_with_params.config.params["a"] = 0.0 + buf = ["hello world"] + result = effect_with_params.process(buf, mock_context) + assert result == buf + + def test_applies_tint_to_plain_text(self, effect_with_params, mock_context): + buf = ["hello world"] + result = effect_with_params.process(buf, mock_context) + assert len(result) == 1 + assert "\033[" in result[0] # Has ANSI codes + assert "hello world" in result[0] + + def test_tint_preserves_content(self, effect_with_params, mock_context): + buf = ["hello world", "test line"] + result = effect_with_params.process(buf, mock_context) + assert "hello world" in result[0] + assert "test line" in result[1] + + def test_rgb_to_ansi256_black(self, effect): + assert effect._rgb_to_ansi256(0, 0, 0) == 16 + + def test_rgb_to_ansi256_white(self, effect): + assert effect._rgb_to_ansi256(255, 255, 255) == 231 + + def test_rgb_to_ansi256_red(self, effect): + color = effect._rgb_to_ansi256(255, 0, 0) + assert 196 <= color <= 197 # Red in 256 color + + def test_rgb_to_ansi256_green(self, effect): + color = effect._rgb_to_ansi256(0, 255, 0) + assert 34 <= color <= 46 + + def test_rgb_to_ansi256_blue(self, effect): + color = effect._rgb_to_ansi256(0, 0, 255) + assert 20 <= color <= 33 + + def test_configure_updates_params(self, effect): + config = EffectConfig( + enabled=True, + intensity=1.0, + params={"r": 100, "g": 150, "b": 200, "a": 0.8}, + ) + effect.configure(config) + assert effect.config.params["r"] == 100 + assert effect.config.params["g"] == 150 + assert effect.config.params["b"] == 200 + assert effect.config.params["a"] == 0.8 + + def test_clamp_rgb_values(self, effect_with_params, mock_context): + effect_with_params.config.params["r"] = 300 + effect_with_params.config.params["g"] = -10 + effect_with_params.config.params["b"] = 1.5 + buf = ["test"] + result = effect_with_params.process(buf, mock_context) + assert "\033[" in result[0] + + def test_clamp_alpha_above_one(self, effect_with_params, mock_context): + effect_with_params.config.params["a"] = 1.5 + buf = ["test"] + result = effect_with_params.process(buf, mock_context) + assert "\033[" in result[0] + + def test_preserves_empty_lines(self, effect_with_params, mock_context): + buf = ["hello", "", "world"] + result = effect_with_params.process(buf, mock_context) + assert result[1] == "" + + def test_inlet_types_includes_text_buffer(self, effect): + from engine.pipeline.core import DataType + + assert DataType.TEXT_BUFFER in effect.inlet_types + + def test_outlet_types_includes_text_buffer(self, effect): + from engine.pipeline.core import DataType + + assert DataType.TEXT_BUFFER in effect.outlet_types