From 31cabe91284299b493e584077b27b2d22ceb8739 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Mon, 16 Mar 2026 03:39:29 -0700 Subject: [PATCH] feat(pipeline): add metrics collection and v2 run mode - Add RenderStage adapter that handles rendering pipeline - Add EffectPluginStage with proper EffectContext - Add DisplayStage with init handling - Add ItemsStage for pre-fetched items - Add metrics collection to Pipeline (StageMetrics, FrameMetrics) - Add get_metrics_summary() and reset_metrics() methods - Add --pipeline and --pipeline-preset flags for v2 mode - Add PipelineNode.metrics for self-documenting introspection - Add introspect_new_pipeline() method with performance data - Add mise tasks: run-v2, run-v2-demo, run-v2-poetry, run-v2-websocket, run-v2-firehose --- engine/app.py | 132 ++++++++++++++- engine/config.py | 4 + engine/pipeline.py | 81 +++++++++ engine/pipeline/adapters.py | 299 ++++++++++++++++++++++++++++++++++ engine/pipeline/controller.py | 99 ++++++++++- mise.toml | 10 ++ 6 files changed, 622 insertions(+), 3 deletions(-) create mode 100644 engine/pipeline/adapters.py diff --git a/engine/app.py b/engine/app.py index 3e06eb8..3647bdd 100644 --- a/engine/app.py +++ b/engine/app.py @@ -839,12 +839,17 @@ def run_preset_mode(preset_name: str): def main(): from engine import config - from engine.pipeline import generate_pipeline_diagram if config.PIPELINE_DIAGRAM: + from engine.pipeline import generate_pipeline_diagram + print(generate_pipeline_diagram()) return + if config.PIPELINE_MODE: + run_pipeline_mode(config.PIPELINE_PRESET) + return + if config.PIPELINE_DEMO: run_pipeline_demo() return @@ -955,3 +960,128 @@ def main(): print(f" {G_DIM}> {config.HEADLINE_LIMIT} SIGNALS PROCESSED{RST}") print(f" {W_GHOST}> end of stream{RST}") print() + + +def run_pipeline_mode(preset_name: str = "demo"): + """Run using the new unified pipeline architecture.""" + import effects_plugins + from engine.display import DisplayRegistry + from engine.effects import get_registry + from engine.fetch import fetch_all, fetch_poetry, load_cache + from engine.pipeline import ( + Pipeline, + PipelineConfig, + get_preset, + ) + from engine.pipeline.adapters import ( + RenderStage, + create_items_stage, + create_stage_from_display, + create_stage_from_effect, + ) + + print(" \033[1;38;5;46mPIPELINE MODE\033[0m") + print(" \033[38;5;245mUsing unified pipeline architecture\033[0m") + + effects_plugins.discover_plugins() + + preset = get_preset(preset_name) + if not preset: + print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m") + sys.exit(1) + + print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m") + + params = preset.to_params() + params.viewport_width = 80 + params.viewport_height = 24 + + pipeline = Pipeline( + config=PipelineConfig( + source=preset.source, + display=preset.display, + camera=preset.camera, + effects=preset.effects, + ) + ) + + print(" \033[38;5;245mFetching content...\033[0m") + cached = load_cache() + if cached: + items = cached + elif preset.source == "poetry": + items, _, _ = fetch_poetry() + else: + items, _, _ = fetch_all() + + if not items: + print(" \033[38;5;196mNo content available\033[0m") + sys.exit(1) + + print(f" \033[38;5;82mLoaded {len(items)} items\033[0m") + + display = DisplayRegistry.create(preset.display) + if not display: + print(f" \033[38;5;196mFailed to create display: {preset.display}\033[0m") + sys.exit(1) + + display.init(80, 24) + + effect_registry = get_registry() + + pipeline.add_stage("source", create_items_stage(items, preset.source)) + pipeline.add_stage( + "render", + RenderStage( + items, + width=80, + height=24, + camera_speed=params.camera_speed, + camera_mode=preset.camera, + firehose_enabled=params.firehose_enabled, + ), + ) + + for effect_name in preset.effects: + effect = effect_registry.get(effect_name) + if effect: + pipeline.add_stage( + f"effect_{effect_name}", create_stage_from_effect(effect, effect_name) + ) + + pipeline.add_stage("display", create_stage_from_display(display, preset.display)) + + pipeline.build() + + if not pipeline.initialize(): + print(" \033[38;5;196mFailed to initialize pipeline\033[0m") + sys.exit(1) + + print(" \033[38;5;82mStarting pipeline...\033[0m") + print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") + + ctx = pipeline.context + ctx.params = params + ctx.set("display", display) + ctx.set("items", items) + ctx.set("pipeline", pipeline) + + try: + frame = 0 + while True: + params.frame_number = frame + ctx.params = params + + result = pipeline.execute(items) + if result.success: + display.show(result.data) + + time.sleep(1 / 60) + frame += 1 + + except KeyboardInterrupt: + pass + finally: + pipeline.cleanup() + display.cleanup() + print("\n \033[38;5;245mPipeline stopped\033[0m") diff --git a/engine/config.py b/engine/config.py index 7db5787..bf227ba 100644 --- a/engine/config.py +++ b/engine/config.py @@ -246,6 +246,10 @@ DEMO = "--demo" in sys.argv DEMO_EFFECT_DURATION = 5.0 # seconds per effect PIPELINE_DEMO = "--pipeline-demo" in sys.argv +# ─── PIPELINE MODE (new unified architecture) ───────────── +PIPELINE_MODE = "--pipeline" in sys.argv +PIPELINE_PRESET = _arg_value("--pipeline-preset", sys.argv) or "demo" + # ─── PRESET MODE ──────────────────────────────────────────── PRESET = _arg_value("--preset", sys.argv) diff --git a/engine/pipeline.py b/engine/pipeline.py index 593d30a..752969f 100644 --- a/engine/pipeline.py +++ b/engine/pipeline.py @@ -36,6 +36,7 @@ class PipelineNode: description: str = "" inputs: list[str] | None = None outputs: list[str] | None = None + metrics: dict | None = None # Performance metrics (avg_ms, min_ms, max_ms) class PipelineIntrospector: @@ -76,6 +77,14 @@ class PipelineIntrospector: if node.description: label += f"\\n{node.description}" + if node.metrics: + avg = node.metrics.get("avg_ms", 0) + if avg > 0: + label += f"\\n⏱ {avg:.1f}ms" + impact = node.metrics.get("impact_pct", 0) + if impact > 0: + label += f" ({impact:.0f}%)" + node_entry = f' {node_id}["{label}"]' if "DataSource" in node.name or "SourceRegistry" in node.name: @@ -501,6 +510,78 @@ class PipelineIntrospector: ) ) + def introspect_new_pipeline(self, pipeline=None) -> None: + """Introspect new unified pipeline stages with metrics. + + Args: + pipeline: Optional Pipeline instance to collect metrics from + """ + + stages_info = [ + ( + "ItemsSource", + "engine.pipeline.adapters", + "ItemsStage", + "Provides pre-fetched items", + ), + ( + "Render", + "engine.pipeline.adapters", + "RenderStage", + "Renders items to buffer", + ), + ( + "Effect", + "engine.pipeline.adapters", + "EffectPluginStage", + "Applies effect", + ), + ( + "Display", + "engine.pipeline.adapters", + "DisplayStage", + "Outputs to display", + ), + ] + + metrics = None + if pipeline and hasattr(pipeline, "get_metrics_summary"): + metrics = pipeline.get_metrics_summary() + if "error" in metrics: + metrics = None + + total_avg = metrics.get("pipeline", {}).get("avg_ms", 0) if metrics else 0 + + for stage_name, module, class_name, desc in stages_info: + node_metrics = None + if metrics and "stages" in metrics: + for name, stats in metrics["stages"].items(): + if stage_name.lower() in name.lower(): + impact_pct = ( + (stats.get("avg_ms", 0) / total_avg * 100) + if total_avg > 0 + else 0 + ) + node_metrics = { + "avg_ms": stats.get("avg_ms", 0), + "min_ms": stats.get("min_ms", 0), + "max_ms": stats.get("max_ms", 0), + "impact_pct": impact_pct, + } + break + + self.add_node( + PipelineNode( + name=f"Pipeline: {stage_name}", + module=module, + class_name=class_name, + description=desc, + inputs=["data"], + outputs=["data"], + metrics=node_metrics, + ) + ) + def run(self) -> str: """Run full introspection.""" self.introspect_sources() diff --git a/engine/pipeline/adapters.py b/engine/pipeline/adapters.py new file mode 100644 index 0000000..6636760 --- /dev/null +++ b/engine/pipeline/adapters.py @@ -0,0 +1,299 @@ +""" +Stage adapters - Bridge existing components to the Stage interface. + +This module provides adapters that wrap existing components +(EffectPlugin, Display, DataSource, Camera) as Stage implementations. +""" + +import random +from typing import Any + +from engine.pipeline.core import PipelineContext, Stage + + +class RenderStage(Stage): + """Stage that renders items to a text buffer for display. + + This mimics the old demo's render pipeline: + - Selects headlines and renders them to blocks + - Applies camera scroll position + - Adds firehose layer if enabled + """ + + def __init__( + self, + items: list, + width: int = 80, + height: int = 24, + camera_speed: float = 1.0, + camera_mode: str = "vertical", + firehose_enabled: bool = False, + name: str = "render", + ): + self.name = name + self.category = "render" + self.optional = False + self._items = items + self._width = width + self._height = height + self._camera_speed = camera_speed + self._camera_mode = camera_mode + self._firehose_enabled = firehose_enabled + + self._camera_y = 0.0 + self._camera_x = 0 + self._scroll_accum = 0.0 + self._ticker_next_y = 0 + self._active: list = [] + self._seen: set = set() + self._pool: list = list(items) + self._noise_cache: dict = {} + self._frame_count = 0 + + @property + def capabilities(self) -> set[str]: + return {"render.output"} + + @property + def dependencies(self) -> set[str]: + return {"source.items"} + + def init(self, ctx: PipelineContext) -> bool: + random.shuffle(self._pool) + return True + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Render items to a text buffer.""" + from engine.effects import next_headline + from engine.layers import render_firehose, render_ticker_zone + from engine.render import make_block + + items = data or self._items + w = ctx.params.viewport_width if ctx.params else self._width + h = ctx.params.viewport_height if ctx.params else self._height + camera_speed = ctx.params.camera_speed if ctx.params else self._camera_speed + firehose = ctx.params.firehose_enabled if ctx.params else self._firehose_enabled + + scroll_step = 0.5 / (camera_speed * 10) + self._scroll_accum += scroll_step + + GAP = 3 + + while self._scroll_accum >= scroll_step: + self._scroll_accum -= scroll_step + self._camera_y += 1.0 + + while ( + self._ticker_next_y < int(self._camera_y) + h + 10 + and len(self._active) < 50 + ): + t, src, ts = next_headline(self._pool, items, self._seen) + ticker_content, hc, midx = make_block(t, src, ts, w) + self._active.append((ticker_content, hc, self._ticker_next_y, midx)) + self._ticker_next_y += len(ticker_content) + GAP + + self._active = [ + (c, hc, by, mi) + for c, hc, by, mi in self._active + if by + len(c) > int(self._camera_y) + ] + for k in list(self._noise_cache): + if k < int(self._camera_y): + del self._noise_cache[k] + + grad_offset = (self._frame_count * 0.01) % 1.0 + + buf, self._noise_cache = render_ticker_zone( + self._active, + scroll_cam=int(self._camera_y), + camera_x=self._camera_x, + ticker_h=h, + w=w, + noise_cache=self._noise_cache, + grad_offset=grad_offset, + ) + + if firehose: + firehose_buf = render_firehose(items, w, 0, h) + buf.extend(firehose_buf) + + self._frame_count += 1 + return buf + + +class EffectPluginStage(Stage): + """Adapter wrapping EffectPlugin as a Stage.""" + + def __init__(self, effect_plugin, name: str = "effect"): + self._effect = effect_plugin + self.name = name + self.category = "effect" + self.optional = False + + @property + def capabilities(self) -> set[str]: + return {f"effect.{self.name}"} + + @property + def dependencies(self) -> set[str]: + return set() + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Process data through the effect.""" + if data is None: + return None + from engine.effects import EffectContext + + w = ctx.params.viewport_width if ctx.params else 80 + h = ctx.params.viewport_height if ctx.params else 24 + frame = ctx.params.frame_number if ctx.params else 0 + + effect_ctx = EffectContext( + terminal_width=w, + terminal_height=h, + scroll_cam=0, + ticker_height=h, + camera_x=0, + mic_excess=0.0, + grad_offset=(frame * 0.01) % 1.0, + frame_number=frame, + has_message=False, + items=ctx.get("items", []), + ) + return self._effect.process(data, effect_ctx) + + +class DisplayStage(Stage): + """Adapter wrapping Display as a Stage.""" + + def __init__(self, display, name: str = "terminal"): + self._display = display + self.name = name + self.category = "display" + self.optional = False + + @property + def capabilities(self) -> set[str]: + return {"display.output"} + + @property + def dependencies(self) -> set[str]: + return set() + + def init(self, ctx: PipelineContext) -> bool: + w = ctx.params.viewport_width if ctx.params else 80 + h = ctx.params.viewport_height if ctx.params else 24 + result = self._display.init(w, h) + return result is not False + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Output data to display.""" + if data is not None: + self._display.show(data) + return data + + def cleanup(self) -> None: + self._display.cleanup() + + +class DataSourceStage(Stage): + """Adapter wrapping DataSource as a Stage.""" + + def __init__(self, data_source, name: str = "headlines"): + self._source = data_source + self.name = name + self.category = "source" + self.optional = False + + @property + def capabilities(self) -> set[str]: + return {f"source.{self.name}"} + + @property + def dependencies(self) -> set[str]: + return set() + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Fetch data from source.""" + if hasattr(self._source, "get_items"): + return self._source.get_items() + return data + + +class ItemsStage(Stage): + """Stage that holds pre-fetched items and provides them to the pipeline.""" + + def __init__(self, items, name: str = "headlines"): + self._items = items + self.name = name + self.category = "source" + self.optional = False + + @property + def capabilities(self) -> set[str]: + return {f"source.{self.name}"} + + @property + def dependencies(self) -> set[str]: + return set() + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Return the pre-fetched items.""" + return self._items + + +class CameraStage(Stage): + """Adapter wrapping Camera as a Stage.""" + + def __init__(self, camera, name: str = "vertical"): + self._camera = camera + self.name = name + self.category = "camera" + self.optional = True + + @property + def capabilities(self) -> set[str]: + return {"camera"} + + @property + def dependencies(self) -> set[str]: + return {"source.items"} + + def process(self, data: Any, ctx: PipelineContext) -> Any: + """Apply camera transformation to data.""" + if data is None: + return None + if hasattr(self._camera, "apply"): + return self._camera.apply( + data, ctx.params.viewport_width if ctx.params else 80 + ) + return data + + def cleanup(self) -> None: + if hasattr(self._camera, "reset"): + self._camera.reset() + + +def create_stage_from_display(display, name: str = "terminal") -> DisplayStage: + """Create a Stage from a Display instance.""" + return DisplayStage(display, name) + + +def create_stage_from_effect(effect_plugin, name: str) -> EffectPluginStage: + """Create a Stage from an EffectPlugin.""" + return EffectPluginStage(effect_plugin, name) + + +def create_stage_from_source(data_source, name: str = "headlines") -> DataSourceStage: + """Create a Stage from a DataSource.""" + return DataSourceStage(data_source, name) + + +def create_stage_from_camera(camera, name: str = "vertical") -> CameraStage: + """Create a Stage from a Camera.""" + return CameraStage(camera, name) + + +def create_items_stage(items, name: str = "headlines") -> ItemsStage: + """Create a Stage that holds pre-fetched items.""" + return ItemsStage(items, name) diff --git a/engine/pipeline/controller.py b/engine/pipeline/controller.py index d03aa88..11e39d1 100644 --- a/engine/pipeline/controller.py +++ b/engine/pipeline/controller.py @@ -5,6 +5,7 @@ The Pipeline class orchestrates stages in dependency order, handling the complete render cycle from source to display. """ +import time from dataclasses import dataclass, field from typing import Any @@ -21,6 +22,26 @@ class PipelineConfig: display: str = "terminal" camera: str = "vertical" effects: list[str] = field(default_factory=list) + enable_metrics: bool = True + + +@dataclass +class StageMetrics: + """Metrics for a single stage execution.""" + + name: str + duration_ms: float + chars_in: int = 0 + chars_out: int = 0 + + +@dataclass +class FrameMetrics: + """Metrics for a single frame through the pipeline.""" + + frame_number: int + total_ms: float + stages: list[StageMetrics] = field(default_factory=list) class Pipeline: @@ -41,6 +62,11 @@ class Pipeline: self._execution_order: list[str] = [] self._initialized = False + self._metrics_enabled = self.config.enable_metrics + self._frame_metrics: list[FrameMetrics] = [] + self._max_metrics_frames = 60 + self._current_frame_number = 0 + def add_stage(self, name: str, stage: Stage) -> "Pipeline": """Add a stage to the pipeline.""" self._stages[name] = stage @@ -112,12 +138,16 @@ class Pipeline: ) current_data = data + frame_start = time.perf_counter() if self._metrics_enabled else 0 + stage_timings: list[StageMetrics] = [] for name in self._execution_order: stage = self._stages.get(name) if not stage or not stage.is_enabled(): continue + stage_start = time.perf_counter() if self._metrics_enabled else 0 + try: current_data = stage.process(current_data, self.context) except Exception as e: @@ -128,9 +158,34 @@ class Pipeline: error=str(e), stage_name=name, ) - # Skip optional stage on error continue + if self._metrics_enabled: + stage_duration = (time.perf_counter() - stage_start) * 1000 + chars_in = len(str(data)) if data else 0 + chars_out = len(str(current_data)) if current_data else 0 + stage_timings.append( + StageMetrics( + name=name, + duration_ms=stage_duration, + chars_in=chars_in, + chars_out=chars_out, + ) + ) + + if self._metrics_enabled: + total_duration = (time.perf_counter() - frame_start) * 1000 + self._frame_metrics.append( + FrameMetrics( + frame_number=self._current_frame_number, + total_ms=total_duration, + stages=stage_timings, + ) + ) + if len(self._frame_metrics) > self._max_metrics_frames: + self._frame_metrics.pop(0) + self._current_frame_number += 1 + return StageResult(success=True, data=current_data) def cleanup(self) -> None: @@ -159,6 +214,46 @@ class Pipeline: """Get list of stage names.""" return list(self._stages.keys()) + def get_metrics_summary(self) -> dict: + """Get summary of collected metrics.""" + if not self._frame_metrics: + return {"error": "No metrics collected"} + + total_times = [f.total_ms for f in self._frame_metrics] + avg_total = sum(total_times) / len(total_times) + min_total = min(total_times) + max_total = max(total_times) + + stage_stats: dict[str, dict] = {} + for frame in self._frame_metrics: + for stage in frame.stages: + if stage.name not in stage_stats: + stage_stats[stage.name] = {"times": [], "total_chars": 0} + stage_stats[stage.name]["times"].append(stage.duration_ms) + stage_stats[stage.name]["total_chars"] += stage.chars_out + + for name, stats in stage_stats.items(): + times = stats["times"] + stats["avg_ms"] = sum(times) / len(times) + stats["min_ms"] = min(times) + stats["max_ms"] = max(times) + del stats["times"] + + return { + "frame_count": len(self._frame_metrics), + "pipeline": { + "avg_ms": avg_total, + "min_ms": min_total, + "max_ms": max_total, + }, + "stages": stage_stats, + } + + def reset_metrics(self) -> None: + """Reset collected metrics.""" + self._frame_metrics.clear() + self._current_frame_number = 0 + class PipelineRunner: """High-level pipeline runner with animation support.""" @@ -180,7 +275,7 @@ class PipelineRunner: def step(self, input_data: Any | None = None) -> Any: """Execute one pipeline step.""" self.params.frame_number += 1 - self.context.params = self.params + self.pipeline.context.params = self.params result = self.pipeline.execute(input_data) return result.data if result.success else None diff --git a/mise.toml b/mise.toml index 8c05609..17ebbdb 100644 --- a/mise.toml +++ b/mise.toml @@ -42,6 +42,16 @@ run-demo = { run = "uv run mainline.py --demo --display pygame", depends = ["syn run-pipeline = "uv run mainline.py --pipeline-diagram" run-pipeline-demo = { run = "uv run mainline.py --pipeline-demo --display pygame", depends = ["sync-all"] } +# ===================== +# New Pipeline Architecture (unified Stage-based) +# ===================== + +run-v2 = { run = "uv run mainline.py --pipeline --display pygame", depends = ["sync-all"] } +run-v2-demo = { run = "uv run mainline.py --pipeline --pipeline-preset demo --display pygame", depends = ["sync-all"] } +run-v2-poetry = { run = "uv run mainline.py --pipeline --pipeline-preset poetry --display pygame", depends = ["sync-all"] } +run-v2-websocket = { run = "uv run mainline.py --pipeline --pipeline-preset websocket", depends = ["sync-all"] } +run-v2-firehose = { run = "uv run mainline.py --pipeline --pipeline-preset firehose --display pygame", depends = ["sync-all"] } + # ===================== # Presets (Animation-controlled modes) # =====================