forked from genewildish/Mainline
feat(pipeline): add metrics collection and v2 run mode
- Add RenderStage adapter that handles rendering pipeline - Add EffectPluginStage with proper EffectContext - Add DisplayStage with init handling - Add ItemsStage for pre-fetched items - Add metrics collection to Pipeline (StageMetrics, FrameMetrics) - Add get_metrics_summary() and reset_metrics() methods - Add --pipeline and --pipeline-preset flags for v2 mode - Add PipelineNode.metrics for self-documenting introspection - Add introspect_new_pipeline() method with performance data - Add mise tasks: run-v2, run-v2-demo, run-v2-poetry, run-v2-websocket, run-v2-firehose
This commit is contained in:
@@ -5,6 +5,7 @@ The Pipeline class orchestrates stages in dependency order, handling
|
||||
the complete render cycle from source to display.
|
||||
"""
|
||||
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from typing import Any
|
||||
|
||||
@@ -21,6 +22,26 @@ class PipelineConfig:
|
||||
display: str = "terminal"
|
||||
camera: str = "vertical"
|
||||
effects: list[str] = field(default_factory=list)
|
||||
enable_metrics: bool = True
|
||||
|
||||
|
||||
@dataclass
|
||||
class StageMetrics:
|
||||
"""Metrics for a single stage execution."""
|
||||
|
||||
name: str
|
||||
duration_ms: float
|
||||
chars_in: int = 0
|
||||
chars_out: int = 0
|
||||
|
||||
|
||||
@dataclass
|
||||
class FrameMetrics:
|
||||
"""Metrics for a single frame through the pipeline."""
|
||||
|
||||
frame_number: int
|
||||
total_ms: float
|
||||
stages: list[StageMetrics] = field(default_factory=list)
|
||||
|
||||
|
||||
class Pipeline:
|
||||
@@ -41,6 +62,11 @@ class Pipeline:
|
||||
self._execution_order: list[str] = []
|
||||
self._initialized = False
|
||||
|
||||
self._metrics_enabled = self.config.enable_metrics
|
||||
self._frame_metrics: list[FrameMetrics] = []
|
||||
self._max_metrics_frames = 60
|
||||
self._current_frame_number = 0
|
||||
|
||||
def add_stage(self, name: str, stage: Stage) -> "Pipeline":
|
||||
"""Add a stage to the pipeline."""
|
||||
self._stages[name] = stage
|
||||
@@ -112,12 +138,16 @@ class Pipeline:
|
||||
)
|
||||
|
||||
current_data = data
|
||||
frame_start = time.perf_counter() if self._metrics_enabled else 0
|
||||
stage_timings: list[StageMetrics] = []
|
||||
|
||||
for name in self._execution_order:
|
||||
stage = self._stages.get(name)
|
||||
if not stage or not stage.is_enabled():
|
||||
continue
|
||||
|
||||
stage_start = time.perf_counter() if self._metrics_enabled else 0
|
||||
|
||||
try:
|
||||
current_data = stage.process(current_data, self.context)
|
||||
except Exception as e:
|
||||
@@ -128,9 +158,34 @@ class Pipeline:
|
||||
error=str(e),
|
||||
stage_name=name,
|
||||
)
|
||||
# Skip optional stage on error
|
||||
continue
|
||||
|
||||
if self._metrics_enabled:
|
||||
stage_duration = (time.perf_counter() - stage_start) * 1000
|
||||
chars_in = len(str(data)) if data else 0
|
||||
chars_out = len(str(current_data)) if current_data else 0
|
||||
stage_timings.append(
|
||||
StageMetrics(
|
||||
name=name,
|
||||
duration_ms=stage_duration,
|
||||
chars_in=chars_in,
|
||||
chars_out=chars_out,
|
||||
)
|
||||
)
|
||||
|
||||
if self._metrics_enabled:
|
||||
total_duration = (time.perf_counter() - frame_start) * 1000
|
||||
self._frame_metrics.append(
|
||||
FrameMetrics(
|
||||
frame_number=self._current_frame_number,
|
||||
total_ms=total_duration,
|
||||
stages=stage_timings,
|
||||
)
|
||||
)
|
||||
if len(self._frame_metrics) > self._max_metrics_frames:
|
||||
self._frame_metrics.pop(0)
|
||||
self._current_frame_number += 1
|
||||
|
||||
return StageResult(success=True, data=current_data)
|
||||
|
||||
def cleanup(self) -> None:
|
||||
@@ -159,6 +214,46 @@ class Pipeline:
|
||||
"""Get list of stage names."""
|
||||
return list(self._stages.keys())
|
||||
|
||||
def get_metrics_summary(self) -> dict:
|
||||
"""Get summary of collected metrics."""
|
||||
if not self._frame_metrics:
|
||||
return {"error": "No metrics collected"}
|
||||
|
||||
total_times = [f.total_ms for f in self._frame_metrics]
|
||||
avg_total = sum(total_times) / len(total_times)
|
||||
min_total = min(total_times)
|
||||
max_total = max(total_times)
|
||||
|
||||
stage_stats: dict[str, dict] = {}
|
||||
for frame in self._frame_metrics:
|
||||
for stage in frame.stages:
|
||||
if stage.name not in stage_stats:
|
||||
stage_stats[stage.name] = {"times": [], "total_chars": 0}
|
||||
stage_stats[stage.name]["times"].append(stage.duration_ms)
|
||||
stage_stats[stage.name]["total_chars"] += stage.chars_out
|
||||
|
||||
for name, stats in stage_stats.items():
|
||||
times = stats["times"]
|
||||
stats["avg_ms"] = sum(times) / len(times)
|
||||
stats["min_ms"] = min(times)
|
||||
stats["max_ms"] = max(times)
|
||||
del stats["times"]
|
||||
|
||||
return {
|
||||
"frame_count": len(self._frame_metrics),
|
||||
"pipeline": {
|
||||
"avg_ms": avg_total,
|
||||
"min_ms": min_total,
|
||||
"max_ms": max_total,
|
||||
},
|
||||
"stages": stage_stats,
|
||||
}
|
||||
|
||||
def reset_metrics(self) -> None:
|
||||
"""Reset collected metrics."""
|
||||
self._frame_metrics.clear()
|
||||
self._current_frame_number = 0
|
||||
|
||||
|
||||
class PipelineRunner:
|
||||
"""High-level pipeline runner with animation support."""
|
||||
@@ -180,7 +275,7 @@ class PipelineRunner:
|
||||
def step(self, input_data: Any | None = None) -> Any:
|
||||
"""Execute one pipeline step."""
|
||||
self.params.frame_number += 1
|
||||
self.context.params = self.params
|
||||
self.pipeline.context.params = self.params
|
||||
result = self.pipeline.execute(input_data)
|
||||
return result.data if result.success else None
|
||||
|
||||
|
||||
Reference in New Issue
Block a user