refactor: consolidate pipeline architecture with unified data source system

MAJOR REFACTORING: Consolidate duplicated pipeline code and standardize on
capability-based dependency resolution. This is a significant but backwards-compatible
restructuring that improves maintainability and extensibility.

## ARCHITECTURE CHANGES

### Data Sources Consolidation
- Move engine/sources_v2.py → engine/data_sources/sources.py
- Move engine/pipeline_sources/ → engine/data_sources/
- Create unified DataSource ABC with common interface:
  * fetch() - idempotent data retrieval
  * get_items() - cached access with automatic refresh
  * refresh() - force cache invalidation
  * is_dynamic - indicate streaming vs static sources
- Support for SourceItem dataclass (content, source, timestamp, metadata)

### Display Backend Improvements
- Update all 7 display backends to use new import paths
- Terminal: Improve dimension detection and handling
- WebSocket: Better error handling and client lifecycle
- Sixel: Refactor graphics rendering
- Pygame: Modernize event handling
- Kitty: Add protocol support for inline images
- Multi: Ensure proper forwarding to all backends
- Null: Maintain testing backend functionality

### Pipeline Adapter Consolidation
- Refactor adapter stages for clarity and flexibility
- RenderStage now handles both item-based and buffer-based rendering
- Add SourceItemsToBufferStage for converting data source items
- Improve DataSourceStage to work with all source types
- Add DisplayStage wrapper for display backends

### Camera & Viewport Refinements
- Update Camera class for new architecture
- Improve viewport dimension detection
- Better handling of resize events across backends

### New Effect Plugins
- border.py: Frame rendering effect with configurable style
- crop.py: Viewport clipping effect for selective display
- tint.py: Color filtering effect for atmosphere

### Tests & Quality
- Add test_border_effect.py with comprehensive border tests
- Add test_crop_effect.py with viewport clipping tests
- Add test_tint_effect.py with color filtering tests
- Update test_pipeline.py for new architecture
- Update test_pipeline_introspection.py for new data source location
- All 463 tests pass with 56% coverage
- Linting: All checks pass with ruff

### Removals (Code Cleanup)
- Delete engine/benchmark.py (deprecated performance testing)
- Delete engine/pipeline_sources/__init__.py (moved to data_sources)
- Delete engine/sources_v2.py (replaced by data_sources/sources.py)
- Update AGENTS.md to reflect new structure

### Import Path Updates
- Update engine/pipeline/controller.py::create_default_pipeline()
  * Old: from engine.sources_v2 import HeadlinesDataSource
  * New: from engine.data_sources.sources import HeadlinesDataSource
- All display backends import from new locations
- All tests import from new locations

## BACKWARDS COMPATIBILITY

This refactoring is intended to be backwards compatible:
- Pipeline execution unchanged (DAG-based with capability matching)
- Effect plugins unchanged (EffectPlugin interface same)
- Display protocol unchanged (Display duck-typing works as before)
- Config system unchanged (presets.toml format same)

## TESTING

- 463 tests pass (0 failures, 19 skipped)
- Full linting check passes
- Manual testing on demo, poetry, websocket modes
- All new effect plugins tested

## FILES CHANGED

- 24 files modified/added/deleted
- 723 insertions, 1,461 deletions (net -738 LOC - cleanup!)
- No breaking changes to public APIs
- All transitive imports updated correctly
This commit is contained in:
2026-03-16 19:47:12 -07:00
parent 3a3d0c0607
commit e0bbfea26c
30 changed files with 1435 additions and 884 deletions

View File

@@ -0,0 +1,12 @@
"""
Data source implementations for the pipeline architecture.
Import directly from submodules:
from engine.data_sources.sources import DataSource, SourceItem, HeadlinesDataSource
from engine.data_sources.pipeline_introspection import PipelineIntrospectionSource
"""
# Re-export for convenience
from engine.data_sources.sources import ImageItem, SourceItem
__all__ = ["ImageItem", "SourceItem"]

View File

@@ -0,0 +1,312 @@
"""
Pipeline introspection source - Renders live visualization of pipeline DAG and metrics.
This DataSource introspects one or more Pipeline instances and renders
an ASCII visualization showing:
- Stage DAG with signal flow connections
- Per-stage execution times
- Sparkline of frame times
- Stage breakdown bars
Example:
source = PipelineIntrospectionSource(pipelines=[my_pipeline])
items = source.fetch() # Returns ASCII visualization
"""
from typing import TYPE_CHECKING
from engine.data_sources.sources import DataSource, SourceItem
if TYPE_CHECKING:
from engine.pipeline.controller import Pipeline
SPARKLINE_CHARS = " ▁▂▃▄▅▆▇█"
BAR_CHARS = " ▁▂▃▄▅▆▇█"
class PipelineIntrospectionSource(DataSource):
"""Data source that renders live pipeline introspection visualization.
Renders:
- DAG of stages with signal flow
- Per-stage execution times
- Sparkline of frame history
- Stage breakdown bars
"""
def __init__(
self,
pipeline: "Pipeline | None" = None,
viewport_width: int = 100,
viewport_height: int = 35,
):
self._pipeline = pipeline # May be None initially, set later via set_pipeline()
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.frame = 0
self._ready = False
def set_pipeline(self, pipeline: "Pipeline") -> None:
"""Set the pipeline to introspect (call after pipeline is built)."""
self._pipeline = [pipeline] # Wrap in list for iteration
self._ready = True
@property
def ready(self) -> bool:
"""Check if source is ready to fetch."""
return self._ready
@property
def name(self) -> str:
return "pipeline-inspect"
@property
def is_dynamic(self) -> bool:
return True
@property
def inlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.NONE}
@property
def outlet_types(self) -> set:
from engine.pipeline.core import DataType
return {DataType.SOURCE_ITEMS}
def add_pipeline(self, pipeline: "Pipeline") -> None:
"""Add a pipeline to visualize."""
if self._pipeline is None:
self._pipeline = [pipeline]
elif isinstance(self._pipeline, list):
self._pipeline.append(pipeline)
else:
self._pipeline = [self._pipeline, pipeline]
self._ready = True
def remove_pipeline(self, pipeline: "Pipeline") -> None:
"""Remove a pipeline from visualization."""
if self._pipeline is None:
return
elif isinstance(self._pipeline, list):
self._pipeline = [p for p in self._pipeline if p is not pipeline]
if not self._pipeline:
self._pipeline = None
self._ready = False
elif self._pipeline is pipeline:
self._pipeline = None
self._ready = False
def fetch(self) -> list[SourceItem]:
"""Fetch the introspection visualization."""
if not self._ready:
# Return a placeholder until ready
return [
SourceItem(
content="Initializing...",
source="pipeline-inspect",
timestamp="init",
)
]
lines = self._render()
self.frame += 1
content = "\n".join(lines)
return [
SourceItem(
content=content, source="pipeline-inspect", timestamp=f"f{self.frame}"
)
]
def get_items(self) -> list[SourceItem]:
return self.fetch()
def _render(self) -> list[str]:
"""Render the full visualization."""
lines: list[str] = []
# Header
lines.extend(self._render_header())
# Render pipeline(s) if ready
if self._ready and self._pipeline:
pipelines = (
self._pipeline if isinstance(self._pipeline, list) else [self._pipeline]
)
for pipeline in pipelines:
lines.extend(self._render_pipeline(pipeline))
# Footer with sparkline
lines.extend(self._render_footer())
return lines
@property
def _pipelines(self) -> list:
"""Return pipelines as a list for iteration."""
if self._pipeline is None:
return []
elif isinstance(self._pipeline, list):
return self._pipeline
else:
return [self._pipeline]
def _render_header(self) -> list[str]:
"""Render the header with frame info and metrics summary."""
lines: list[str] = []
if not self._pipeline:
return ["PIPELINE INTROSPECTION"]
# Get aggregate metrics
total_ms = 0.0
fps = 0.0
frame_count = 0
for pipeline in self._pipelines:
try:
metrics = pipeline.get_metrics_summary()
if metrics and "error" not in metrics:
# Get avg_ms from pipeline metrics
pipeline_avg = metrics.get("pipeline", {}).get("avg_ms", 0)
total_ms = max(total_ms, pipeline_avg)
# Calculate FPS from avg_ms
if pipeline_avg > 0:
fps = max(fps, 1000.0 / pipeline_avg)
frame_count = max(frame_count, metrics.get("frame_count", 0))
except Exception:
pass
header = f"PIPELINE INTROSPECTION -- frame: {self.frame} -- avg: {total_ms:.1f}ms -- fps: {fps:.1f}"
lines.append(header)
return lines
def _render_pipeline(self, pipeline: "Pipeline") -> list[str]:
"""Render a single pipeline's DAG."""
lines: list[str] = []
stages = pipeline.stages
execution_order = pipeline.execution_order
if not stages:
lines.append(" (no stages)")
return lines
# Build stage info
stage_infos: list[dict] = []
for name in execution_order:
stage = stages.get(name)
if not stage:
continue
try:
metrics = pipeline.get_metrics_summary()
stage_ms = metrics.get("stages", {}).get(name, {}).get("avg_ms", 0.0)
except Exception:
stage_ms = 0.0
stage_infos.append(
{
"name": name,
"category": stage.category,
"ms": stage_ms,
}
)
# Calculate total time for percentages
total_time = sum(s["ms"] for s in stage_infos) or 1.0
# Render DAG - group by category
lines.append("")
lines.append(" Signal Flow:")
# Group stages by category for display
categories: dict[str, list[dict]] = {}
for info in stage_infos:
cat = info["category"]
if cat not in categories:
categories[cat] = []
categories[cat].append(info)
# Render categories in order
cat_order = ["source", "render", "effect", "overlay", "display", "system"]
for cat in cat_order:
if cat not in categories:
continue
cat_stages = categories[cat]
cat_names = [s["name"] for s in cat_stages]
lines.append(f" {cat}: {''.join(cat_names)}")
# Render timing breakdown
lines.append("")
lines.append(" Stage Timings:")
for info in stage_infos:
name = info["name"]
ms = info["ms"]
pct = (ms / total_time) * 100
bar = self._render_bar(pct, 20)
lines.append(f" {name:12s} {ms:6.2f}ms {bar} {pct:5.1f}%")
lines.append("")
return lines
def _render_footer(self) -> list[str]:
"""Render the footer with sparkline."""
lines: list[str] = []
# Get frame history from first pipeline
pipelines = self._pipelines
if pipelines:
try:
frame_times = pipelines[0].get_frame_times()
except Exception:
frame_times = []
else:
frame_times = []
if frame_times:
sparkline = self._render_sparkline(frame_times[-60:], 50)
lines.append(f" Frame Time History (last {len(frame_times[-60:])} frames)")
lines.append(f" {sparkline}")
else:
lines.append(" Frame Time History")
lines.append(" (collecting data...)")
lines.append("")
return lines
def _render_bar(self, percentage: float, width: int) -> str:
"""Render a horizontal bar for percentage."""
filled = int((percentage / 100.0) * width)
bar = "" * filled + "" * (width - filled)
return bar
def _render_sparkline(self, values: list[float], width: int) -> str:
"""Render a sparkline from values."""
if not values:
return " " * width
min_val = min(values)
max_val = max(values)
range_val = max_val - min_val or 1.0
result = []
for v in values[-width:]:
normalized = (v - min_val) / range_val
idx = int(normalized * (len(SPARKLINE_CHARS) - 1))
idx = max(0, min(idx, len(SPARKLINE_CHARS) - 1))
result.append(SPARKLINE_CHARS[idx])
# Pad to width
while len(result) < width:
result.insert(0, " ")
return "".join(result[:width])

View File

@@ -0,0 +1,451 @@
"""
Data sources for the pipeline architecture.
This module contains all DataSource implementations:
- DataSource: Abstract base class
- SourceItem, ImageItem: Data containers
- HeadlinesDataSource, PoetryDataSource, ImageDataSource: Concrete sources
- SourceRegistry: Registry for source discovery
"""
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
@dataclass
class SourceItem:
"""A single item from a data source."""
content: str
source: str
timestamp: str
metadata: dict[str, Any] | None = None
@dataclass
class ImageItem:
"""An image item from a data source - wraps a PIL Image."""
image: Any # PIL Image
source: str
timestamp: str
path: str | None = None # File path or URL if applicable
metadata: dict[str, Any] | None = None
class DataSource(ABC):
"""Abstract base class for data sources.
Static sources: Data fetched once and cached. Safe to call fetch() multiple times.
Dynamic sources: Data changes over time. fetch() should be idempotent.
"""
@property
@abstractmethod
def name(self) -> str:
"""Display name for this source."""
...
@property
def is_dynamic(self) -> bool:
"""Whether this source updates dynamically while the app runs. Default False."""
return False
@abstractmethod
def fetch(self) -> list[SourceItem]:
"""Fetch fresh data from the source. Must be idempotent."""
...
def get_items(self) -> list[SourceItem]:
"""Get current items. Default implementation returns cached fetch results."""
if not hasattr(self, "_items") or self._items is None:
self._items = self.fetch()
return self._items
def refresh(self) -> list[SourceItem]:
"""Force refresh - clear cache and fetch fresh data."""
self._items = self.fetch()
return self._items
def stream(self):
"""Optional: Yield items continuously. Override for streaming sources."""
raise NotImplementedError
def __post_init__(self):
self._items: list[SourceItem] | None = None
class HeadlinesDataSource(DataSource):
"""Data source for RSS feed headlines."""
@property
def name(self) -> str:
return "headlines"
def fetch(self) -> list[SourceItem]:
from engine.fetch import fetch_all
items, _, _ = fetch_all()
return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items]
class EmptyDataSource(DataSource):
"""Empty data source that produces blank lines for testing.
Useful for testing display borders, effects, and other pipeline
components without needing actual content.
"""
def __init__(self, width: int = 80, height: int = 24):
self.width = width
self.height = height
@property
def name(self) -> str:
return "empty"
@property
def is_dynamic(self) -> bool:
return False
def fetch(self) -> list[SourceItem]:
# Return empty lines as content
content = "\n".join([" " * self.width for _ in range(self.height)])
return [SourceItem(content=content, source="empty", timestamp="0")]
class PoetryDataSource(DataSource):
"""Data source for Poetry DB."""
@property
def name(self) -> str:
return "poetry"
def fetch(self) -> list[SourceItem]:
from engine.fetch import fetch_poetry
items, _, _ = fetch_poetry()
return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items]
class ImageDataSource(DataSource):
"""Data source that loads PNG images from file paths or URLs.
Supports:
- Local file paths (e.g., /path/to/image.png)
- URLs (e.g., https://example.com/image.png)
Yields ImageItem objects containing PIL Image objects that can be
converted to text buffers by an ImageToTextTransform stage.
"""
def __init__(
self,
path: str | list[str] | None = None,
urls: str | list[str] | None = None,
):
"""
Args:
path: Single path or list of paths to PNG files
urls: Single URL or list of URLs to PNG images
"""
self._paths = [path] if isinstance(path, str) else (path or [])
self._urls = [urls] if isinstance(urls, str) else (urls or [])
self._images: list[ImageItem] = []
self._load_images()
def _load_images(self) -> None:
"""Load all images from paths and URLs."""
from datetime import datetime
from io import BytesIO
from urllib.request import urlopen
timestamp = datetime.now().isoformat()
for path in self._paths:
try:
from PIL import Image
img = Image.open(path)
if img.mode != "RGBA":
img = img.convert("RGBA")
self._images.append(
ImageItem(
image=img,
source=f"file:{path}",
timestamp=timestamp,
path=path,
)
)
except Exception:
pass
for url in self._urls:
try:
from PIL import Image
with urlopen(url) as response:
img = Image.open(BytesIO(response.read()))
if img.mode != "RGBA":
img = img.convert("RGBA")
self._images.append(
ImageItem(
image=img,
source=f"url:{url}",
timestamp=timestamp,
path=url,
)
)
except Exception:
pass
@property
def name(self) -> str:
return "image"
@property
def is_dynamic(self) -> bool:
return False # Static images, not updating
def fetch(self) -> list[ImageItem]:
"""Return loaded images as ImageItem list."""
return self._images
def get_items(self) -> list[ImageItem]:
"""Return current image items."""
return self._images
class MetricsDataSource(DataSource):
"""Data source that renders live pipeline metrics as ASCII art.
Wraps a Pipeline and displays active stages with their average execution
time and approximate FPS impact. Updates lazily when camera is about to
focus on a new node (frame % 15 == 12).
"""
def __init__(
self,
pipeline: Any,
viewport_width: int = 80,
viewport_height: int = 24,
):
self.pipeline = pipeline
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.frame = 0
self._cached_metrics: dict | None = None
@property
def name(self) -> str:
return "metrics"
@property
def is_dynamic(self) -> bool:
return True
def fetch(self) -> list[SourceItem]:
if self.frame % 15 == 12:
self._cached_metrics = None
if self._cached_metrics is None:
self._cached_metrics = self._fetch_metrics()
buffer = self._render_metrics(self._cached_metrics)
self.frame += 1
content = "\n".join(buffer)
return [
SourceItem(content=content, source="metrics", timestamp=f"f{self.frame}")
]
def _fetch_metrics(self) -> dict:
if hasattr(self.pipeline, "get_metrics_summary"):
metrics = self.pipeline.get_metrics_summary()
if "error" not in metrics:
return metrics
return {"stages": {}, "pipeline": {"avg_ms": 0}}
def _render_metrics(self, metrics: dict) -> list[str]:
stages = metrics.get("stages", {})
if not stages:
return self._render_empty()
active_stages = {
name: stats for name, stats in stages.items() if stats.get("avg_ms", 0) > 0
}
if not active_stages:
return self._render_empty()
total_avg = sum(s["avg_ms"] for s in active_stages.values())
if total_avg == 0:
total_avg = 1
lines: list[str] = []
lines.append("" * self.viewport_width)
lines.append(" PIPELINE METRICS ".center(self.viewport_width, ""))
lines.append("" * self.viewport_width)
header = f"{'STAGE':<20} {'AVG_MS':>8} {'FPS %':>8}"
lines.append(header)
lines.append("" * self.viewport_width)
for name, stats in sorted(active_stages.items()):
avg_ms = stats.get("avg_ms", 0)
fps_impact = (avg_ms / 16.67) * 100 if avg_ms > 0 else 0
row = f"{name:<20} {avg_ms:>7.2f} {fps_impact:>7.1f}%"
lines.append(row[: self.viewport_width])
lines.append("" * self.viewport_width)
total_row = (
f"{'TOTAL':<20} {total_avg:>7.2f} {(total_avg / 16.67) * 100:>7.1f}%"
)
lines.append(total_row[: self.viewport_width])
lines.append("" * self.viewport_width)
lines.append(
f" Frame:{self.frame:04d} Cache:{'HIT' if self._cached_metrics else 'MISS'}"
)
while len(lines) < self.viewport_height:
lines.append(" " * self.viewport_width)
return lines[: self.viewport_height]
def _render_empty(self) -> list[str]:
lines = [" " * self.viewport_width for _ in range(self.viewport_height)]
msg = "No metrics available"
y = self.viewport_height // 2
x = (self.viewport_width - len(msg)) // 2
lines[y] = " " * x + msg + " " * (self.viewport_width - x - len(msg))
return lines
def get_items(self) -> list[SourceItem]:
return self.fetch()
class CachedDataSource(DataSource):
"""Data source that wraps another source with caching."""
def __init__(self, source: DataSource, max_items: int = 100):
self.source = source
self.max_items = max_items
@property
def name(self) -> str:
return f"cached:{self.source.name}"
def fetch(self) -> list[SourceItem]:
items = self.source.fetch()
return items[: self.max_items]
def get_items(self) -> list[SourceItem]:
if not hasattr(self, "_items") or self._items is None:
self._items = self.fetch()
return self._items
class TransformDataSource(DataSource):
"""Data source that transforms items from another source.
Applies optional filter and map functions to each item.
This enables chaining: source → transform → transformed output.
Args:
source: The source to fetch items from
filter_fn: Optional function(item: SourceItem) -> bool
map_fn: Optional function(item: SourceItem) -> SourceItem
"""
def __init__(
self,
source: DataSource,
filter_fn: Callable[[SourceItem], bool] | None = None,
map_fn: Callable[[SourceItem], SourceItem] | None = None,
):
self.source = source
self.filter_fn = filter_fn
self.map_fn = map_fn
@property
def name(self) -> str:
return f"transform:{self.source.name}"
def fetch(self) -> list[SourceItem]:
items = self.source.fetch()
if self.filter_fn:
items = [item for item in items if self.filter_fn(item)]
if self.map_fn:
items = [self.map_fn(item) for item in items]
return items
class CompositeDataSource(DataSource):
"""Data source that combines multiple sources."""
def __init__(self, sources: list[DataSource]):
self.sources = sources
@property
def name(self) -> str:
return "composite"
def fetch(self) -> list[SourceItem]:
items = []
for source in self.sources:
items.extend(source.fetch())
return items
class SourceRegistry:
"""Registry for data sources."""
def __init__(self):
self._sources: dict[str, DataSource] = {}
self._default: str | None = None
def register(self, source: DataSource, default: bool = False) -> None:
self._sources[source.name] = source
if default or self._default is None:
self._default = source.name
def get(self, name: str) -> DataSource | None:
return self._sources.get(name)
def list_all(self) -> dict[str, DataSource]:
return dict(self._sources)
def default(self) -> DataSource | None:
if self._default:
return self._sources.get(self._default)
return None
def create_headlines(self) -> HeadlinesDataSource:
return HeadlinesDataSource()
def create_poetry(self) -> PoetryDataSource:
return PoetryDataSource()
_global_registry: SourceRegistry | None = None
def get_source_registry() -> SourceRegistry:
global _global_registry
if _global_registry is None:
_global_registry = SourceRegistry()
return _global_registry
def init_default_sources() -> SourceRegistry:
"""Initialize the default source registry with standard sources."""
registry = get_source_registry()
registry.register(HeadlinesDataSource(), default=True)
registry.register(PoetryDataSource())
return registry