forked from genewildish/Mainline
The old engine/pipeline/core.py file was removed as part of the Sideline/Mainline split. All imports that referenced engine.pipeline.core have been updated to use engine.pipeline which re-exports from sideline.pipeline.core. This ensures consistency and avoids duplicate DataType enum instances.
313 lines
9.4 KiB
Python
313 lines
9.4 KiB
Python
"""
|
|
Pipeline introspection source - Renders live visualization of pipeline DAG and metrics.
|
|
|
|
This DataSource introspects one or more Pipeline instances and renders
|
|
an ASCII visualization showing:
|
|
- Stage DAG with signal flow connections
|
|
- Per-stage execution times
|
|
- Sparkline of frame times
|
|
- Stage breakdown bars
|
|
|
|
Example:
|
|
source = PipelineIntrospectionSource(pipelines=[my_pipeline])
|
|
items = source.fetch() # Returns ASCII visualization
|
|
"""
|
|
|
|
from typing import TYPE_CHECKING
|
|
|
|
from engine.data_sources.sources import DataSource, SourceItem
|
|
|
|
if TYPE_CHECKING:
|
|
from engine.pipeline.controller import Pipeline
|
|
|
|
|
|
SPARKLINE_CHARS = " ▁▂▃▄▅▆▇█"
|
|
BAR_CHARS = " ▁▂▃▄▅▆▇█"
|
|
|
|
|
|
class PipelineIntrospectionSource(DataSource):
|
|
"""Data source that renders live pipeline introspection visualization.
|
|
|
|
Renders:
|
|
- DAG of stages with signal flow
|
|
- Per-stage execution times
|
|
- Sparkline of frame history
|
|
- Stage breakdown bars
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
pipeline: "Pipeline | None" = None,
|
|
viewport_width: int = 100,
|
|
viewport_height: int = 35,
|
|
):
|
|
self._pipeline = pipeline # May be None initially, set later via set_pipeline()
|
|
self.viewport_width = viewport_width
|
|
self.viewport_height = viewport_height
|
|
self.frame = 0
|
|
self._ready = False
|
|
|
|
def set_pipeline(self, pipeline: "Pipeline") -> None:
|
|
"""Set the pipeline to introspect (call after pipeline is built)."""
|
|
self._pipeline = [pipeline] # Wrap in list for iteration
|
|
self._ready = True
|
|
|
|
@property
|
|
def ready(self) -> bool:
|
|
"""Check if source is ready to fetch."""
|
|
return self._ready
|
|
|
|
@property
|
|
def name(self) -> str:
|
|
return "pipeline-inspect"
|
|
|
|
@property
|
|
def is_dynamic(self) -> bool:
|
|
return True
|
|
|
|
@property
|
|
def inlet_types(self) -> set:
|
|
from engine.pipeline import DataType
|
|
|
|
return {DataType.NONE}
|
|
|
|
@property
|
|
def outlet_types(self) -> set:
|
|
from engine.pipeline import DataType
|
|
|
|
return {DataType.SOURCE_ITEMS}
|
|
|
|
def add_pipeline(self, pipeline: "Pipeline") -> None:
|
|
"""Add a pipeline to visualize."""
|
|
if self._pipeline is None:
|
|
self._pipeline = [pipeline]
|
|
elif isinstance(self._pipeline, list):
|
|
self._pipeline.append(pipeline)
|
|
else:
|
|
self._pipeline = [self._pipeline, pipeline]
|
|
self._ready = True
|
|
|
|
def remove_pipeline(self, pipeline: "Pipeline") -> None:
|
|
"""Remove a pipeline from visualization."""
|
|
if self._pipeline is None:
|
|
return
|
|
elif isinstance(self._pipeline, list):
|
|
self._pipeline = [p for p in self._pipeline if p is not pipeline]
|
|
if not self._pipeline:
|
|
self._pipeline = None
|
|
self._ready = False
|
|
elif self._pipeline is pipeline:
|
|
self._pipeline = None
|
|
self._ready = False
|
|
|
|
def fetch(self) -> list[SourceItem]:
|
|
"""Fetch the introspection visualization."""
|
|
if not self._ready:
|
|
# Return a placeholder until ready
|
|
return [
|
|
SourceItem(
|
|
content="Initializing...",
|
|
source="pipeline-inspect",
|
|
timestamp="init",
|
|
)
|
|
]
|
|
|
|
lines = self._render()
|
|
self.frame += 1
|
|
content = "\n".join(lines)
|
|
return [
|
|
SourceItem(
|
|
content=content, source="pipeline-inspect", timestamp=f"f{self.frame}"
|
|
)
|
|
]
|
|
|
|
def get_items(self) -> list[SourceItem]:
|
|
return self.fetch()
|
|
|
|
def _render(self) -> list[str]:
|
|
"""Render the full visualization."""
|
|
lines: list[str] = []
|
|
|
|
# Header
|
|
lines.extend(self._render_header())
|
|
|
|
# Render pipeline(s) if ready
|
|
if self._ready and self._pipeline:
|
|
pipelines = (
|
|
self._pipeline if isinstance(self._pipeline, list) else [self._pipeline]
|
|
)
|
|
for pipeline in pipelines:
|
|
lines.extend(self._render_pipeline(pipeline))
|
|
|
|
# Footer with sparkline
|
|
lines.extend(self._render_footer())
|
|
|
|
return lines
|
|
|
|
@property
|
|
def _pipelines(self) -> list:
|
|
"""Return pipelines as a list for iteration."""
|
|
if self._pipeline is None:
|
|
return []
|
|
elif isinstance(self._pipeline, list):
|
|
return self._pipeline
|
|
else:
|
|
return [self._pipeline]
|
|
|
|
def _render_header(self) -> list[str]:
|
|
"""Render the header with frame info and metrics summary."""
|
|
lines: list[str] = []
|
|
|
|
if not self._pipeline:
|
|
return ["PIPELINE INTROSPECTION"]
|
|
|
|
# Get aggregate metrics
|
|
total_ms = 0.0
|
|
fps = 0.0
|
|
frame_count = 0
|
|
|
|
for pipeline in self._pipelines:
|
|
try:
|
|
metrics = pipeline.get_metrics_summary()
|
|
if metrics and "error" not in metrics:
|
|
# Get avg_ms from pipeline metrics
|
|
pipeline_avg = metrics.get("pipeline", {}).get("avg_ms", 0)
|
|
total_ms = max(total_ms, pipeline_avg)
|
|
# Calculate FPS from avg_ms
|
|
if pipeline_avg > 0:
|
|
fps = max(fps, 1000.0 / pipeline_avg)
|
|
frame_count = max(frame_count, metrics.get("frame_count", 0))
|
|
except Exception:
|
|
pass
|
|
|
|
header = f"PIPELINE INTROSPECTION -- frame: {self.frame} -- avg: {total_ms:.1f}ms -- fps: {fps:.1f}"
|
|
lines.append(header)
|
|
|
|
return lines
|
|
|
|
def _render_pipeline(self, pipeline: "Pipeline") -> list[str]:
|
|
"""Render a single pipeline's DAG."""
|
|
lines: list[str] = []
|
|
|
|
stages = pipeline.stages
|
|
execution_order = pipeline.execution_order
|
|
|
|
if not stages:
|
|
lines.append(" (no stages)")
|
|
return lines
|
|
|
|
# Build stage info
|
|
stage_infos: list[dict] = []
|
|
for name in execution_order:
|
|
stage = stages.get(name)
|
|
if not stage:
|
|
continue
|
|
|
|
try:
|
|
metrics = pipeline.get_metrics_summary()
|
|
stage_ms = metrics.get("stages", {}).get(name, {}).get("avg_ms", 0.0)
|
|
except Exception:
|
|
stage_ms = 0.0
|
|
|
|
stage_infos.append(
|
|
{
|
|
"name": name,
|
|
"category": stage.category,
|
|
"ms": stage_ms,
|
|
}
|
|
)
|
|
|
|
# Calculate total time for percentages
|
|
total_time = sum(s["ms"] for s in stage_infos) or 1.0
|
|
|
|
# Render DAG - group by category
|
|
lines.append("")
|
|
lines.append(" Signal Flow:")
|
|
|
|
# Group stages by category for display
|
|
categories: dict[str, list[dict]] = {}
|
|
for info in stage_infos:
|
|
cat = info["category"]
|
|
if cat not in categories:
|
|
categories[cat] = []
|
|
categories[cat].append(info)
|
|
|
|
# Render categories in order
|
|
cat_order = ["source", "render", "effect", "overlay", "display", "system"]
|
|
|
|
for cat in cat_order:
|
|
if cat not in categories:
|
|
continue
|
|
|
|
cat_stages = categories[cat]
|
|
cat_names = [s["name"] for s in cat_stages]
|
|
lines.append(f" {cat}: {' → '.join(cat_names)}")
|
|
|
|
# Render timing breakdown
|
|
lines.append("")
|
|
lines.append(" Stage Timings:")
|
|
|
|
for info in stage_infos:
|
|
name = info["name"]
|
|
ms = info["ms"]
|
|
pct = (ms / total_time) * 100
|
|
bar = self._render_bar(pct, 20)
|
|
lines.append(f" {name:12s} {ms:6.2f}ms {bar} {pct:5.1f}%")
|
|
|
|
lines.append("")
|
|
|
|
return lines
|
|
|
|
def _render_footer(self) -> list[str]:
|
|
"""Render the footer with sparkline."""
|
|
lines: list[str] = []
|
|
|
|
# Get frame history from first pipeline
|
|
pipelines = self._pipelines
|
|
if pipelines:
|
|
try:
|
|
frame_times = pipelines[0].get_frame_times()
|
|
except Exception:
|
|
frame_times = []
|
|
else:
|
|
frame_times = []
|
|
|
|
if frame_times:
|
|
sparkline = self._render_sparkline(frame_times[-60:], 50)
|
|
lines.append(f" Frame Time History (last {len(frame_times[-60:])} frames)")
|
|
lines.append(f" {sparkline}")
|
|
else:
|
|
lines.append(" Frame Time History")
|
|
lines.append(" (collecting data...)")
|
|
|
|
lines.append("")
|
|
|
|
return lines
|
|
|
|
def _render_bar(self, percentage: float, width: int) -> str:
|
|
"""Render a horizontal bar for percentage."""
|
|
filled = int((percentage / 100.0) * width)
|
|
bar = "█" * filled + "░" * (width - filled)
|
|
return bar
|
|
|
|
def _render_sparkline(self, values: list[float], width: int) -> str:
|
|
"""Render a sparkline from values."""
|
|
if not values:
|
|
return " " * width
|
|
|
|
min_val = min(values)
|
|
max_val = max(values)
|
|
range_val = max_val - min_val or 1.0
|
|
|
|
result = []
|
|
for v in values[-width:]:
|
|
normalized = (v - min_val) / range_val
|
|
idx = int(normalized * (len(SPARKLINE_CHARS) - 1))
|
|
idx = max(0, min(idx, len(SPARKLINE_CHARS) - 1))
|
|
result.append(SPARKLINE_CHARS[idx])
|
|
|
|
# Pad to width
|
|
while len(result) < width:
|
|
result.insert(0, " ")
|
|
return "".join(result[:width])
|