- Add RenderStage adapter that handles rendering pipeline - Add EffectPluginStage with proper EffectContext - Add DisplayStage with init handling - Add ItemsStage for pre-fetched items - Add metrics collection to Pipeline (StageMetrics, FrameMetrics) - Add get_metrics_summary() and reset_metrics() methods - Add --pipeline and --pipeline-preset flags for v2 mode - Add PipelineNode.metrics for self-documenting introspection - Add introspect_new_pipeline() method with performance data - Add mise tasks: run-v2, run-v2-demo, run-v2-poetry, run-v2-websocket, run-v2-firehose
612 lines
20 KiB
Python
612 lines
20 KiB
Python
"""
|
|
Pipeline introspection - generates self-documenting diagrams of the render pipeline.
|
|
|
|
Pipeline Architecture:
|
|
- Sources: Data providers (RSS, Poetry, Ntfy, Mic) - static or dynamic
|
|
- Fetch: Retrieve data from sources
|
|
- Prepare: Transform raw data (make_block, strip_tags, translate)
|
|
- Scroll: Camera-based viewport rendering (ticker zone, message overlay)
|
|
- Effects: Post-processing chain (noise, fade, glitch, firehose, hud)
|
|
- Render: Final line rendering and layout
|
|
- Display: Output backends (terminal, pygame, websocket, sixel, kitty)
|
|
|
|
Key abstractions:
|
|
- DataSource: Sources can be static (cached) or dynamic (idempotent fetch)
|
|
- Camera: Viewport controller (vertical, horizontal, omni, floating, trace)
|
|
- EffectChain: Ordered effect processing pipeline
|
|
- Display: Pluggable output backends
|
|
- SourceRegistry: Source discovery and management
|
|
- AnimationController: Time-based parameter animation
|
|
- Preset: Package of initial params + animation for demo modes
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
|
|
|
|
@dataclass
|
|
class PipelineNode:
|
|
"""Represents a node in the pipeline."""
|
|
|
|
name: str
|
|
module: str
|
|
class_name: str | None = None
|
|
func_name: str | None = None
|
|
description: str = ""
|
|
inputs: list[str] | None = None
|
|
outputs: list[str] | None = None
|
|
metrics: dict | None = None # Performance metrics (avg_ms, min_ms, max_ms)
|
|
|
|
|
|
class PipelineIntrospector:
|
|
"""Introspects the render pipeline and generates documentation."""
|
|
|
|
def __init__(self):
|
|
self.nodes: list[PipelineNode] = []
|
|
|
|
def add_node(self, node: PipelineNode) -> None:
|
|
self.nodes.append(node)
|
|
|
|
def generate_mermaid_flowchart(self) -> str:
|
|
"""Generate a Mermaid flowchart of the pipeline."""
|
|
lines = ["```mermaid", "flowchart TD"]
|
|
|
|
subgraph_groups = {
|
|
"Sources": [],
|
|
"Fetch": [],
|
|
"Prepare": [],
|
|
"Scroll": [],
|
|
"Effects": [],
|
|
"Display": [],
|
|
"Async": [],
|
|
"Animation": [],
|
|
"Viz": [],
|
|
}
|
|
|
|
other_nodes = []
|
|
|
|
for node in self.nodes:
|
|
node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_")
|
|
label = node.name
|
|
if node.class_name:
|
|
label = f"{node.name}\\n({node.class_name})"
|
|
elif node.func_name:
|
|
label = f"{node.name}\\n({node.func_name})"
|
|
|
|
if node.description:
|
|
label += f"\\n{node.description}"
|
|
|
|
if node.metrics:
|
|
avg = node.metrics.get("avg_ms", 0)
|
|
if avg > 0:
|
|
label += f"\\n⏱ {avg:.1f}ms"
|
|
impact = node.metrics.get("impact_pct", 0)
|
|
if impact > 0:
|
|
label += f" ({impact:.0f}%)"
|
|
|
|
node_entry = f' {node_id}["{label}"]'
|
|
|
|
if "DataSource" in node.name or "SourceRegistry" in node.name:
|
|
subgraph_groups["Sources"].append(node_entry)
|
|
elif "fetch" in node.name.lower():
|
|
subgraph_groups["Fetch"].append(node_entry)
|
|
elif (
|
|
"make_block" in node.name
|
|
or "strip_tags" in node.name
|
|
or "translate" in node.name
|
|
):
|
|
subgraph_groups["Prepare"].append(node_entry)
|
|
elif (
|
|
"StreamController" in node.name
|
|
or "render_ticker" in node.name
|
|
or "render_message" in node.name
|
|
or "Camera" in node.name
|
|
):
|
|
subgraph_groups["Scroll"].append(node_entry)
|
|
elif "Effect" in node.name or "effect" in node.module:
|
|
subgraph_groups["Effects"].append(node_entry)
|
|
elif "Display:" in node.name:
|
|
subgraph_groups["Display"].append(node_entry)
|
|
elif "Ntfy" in node.name or "Mic" in node.name:
|
|
subgraph_groups["Async"].append(node_entry)
|
|
elif "Animation" in node.name or "Preset" in node.name:
|
|
subgraph_groups["Animation"].append(node_entry)
|
|
elif "pipeline_viz" in node.module or "CameraLarge" in node.name:
|
|
subgraph_groups["Viz"].append(node_entry)
|
|
else:
|
|
other_nodes.append(node_entry)
|
|
|
|
for group_name, nodes in subgraph_groups.items():
|
|
if nodes:
|
|
lines.append(f" subgraph {group_name}")
|
|
for node in nodes:
|
|
lines.append(node)
|
|
lines.append(" end")
|
|
|
|
for node in other_nodes:
|
|
lines.append(node)
|
|
|
|
lines.append("")
|
|
|
|
for node in self.nodes:
|
|
node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_")
|
|
if node.inputs:
|
|
for inp in node.inputs:
|
|
inp_id = inp.replace("-", "_").replace(" ", "_").replace(":", "_")
|
|
lines.append(f" {inp_id} --> {node_id}")
|
|
|
|
lines.append("```")
|
|
return "\n".join(lines)
|
|
|
|
def generate_mermaid_sequence(self) -> str:
|
|
"""Generate a Mermaid sequence diagram of message flow."""
|
|
lines = ["```mermaid", "sequenceDiagram"]
|
|
|
|
lines.append(" participant Sources")
|
|
lines.append(" participant Fetch")
|
|
lines.append(" participant Scroll")
|
|
lines.append(" participant Effects")
|
|
lines.append(" participant Display")
|
|
|
|
lines.append(" Sources->>Fetch: headlines")
|
|
lines.append(" Fetch->>Scroll: content blocks")
|
|
lines.append(" Scroll->>Effects: buffer")
|
|
lines.append(" Effects->>Effects: process chain")
|
|
lines.append(" Effects->>Display: rendered buffer")
|
|
|
|
lines.append("```")
|
|
return "\n".join(lines)
|
|
|
|
def generate_mermaid_state(self) -> str:
|
|
"""Generate a Mermaid state diagram of camera modes."""
|
|
lines = ["```mermaid", "stateDiagram-v2"]
|
|
|
|
lines.append(" [*] --> Vertical")
|
|
lines.append(" Vertical --> Horizontal: set_mode()")
|
|
lines.append(" Horizontal --> Omni: set_mode()")
|
|
lines.append(" Omni --> Floating: set_mode()")
|
|
lines.append(" Floating --> Trace: set_mode()")
|
|
lines.append(" Trace --> Vertical: set_mode()")
|
|
|
|
lines.append(" state Vertical {")
|
|
lines.append(" [*] --> ScrollUp")
|
|
lines.append(" ScrollUp --> ScrollUp: +y each frame")
|
|
lines.append(" }")
|
|
|
|
lines.append(" state Horizontal {")
|
|
lines.append(" [*] --> ScrollLeft")
|
|
lines.append(" ScrollLeft --> ScrollLeft: +x each frame")
|
|
lines.append(" }")
|
|
|
|
lines.append(" state Omni {")
|
|
lines.append(" [*] --> Diagonal")
|
|
lines.append(" Diagonal --> Diagonal: +x, +y")
|
|
lines.append(" }")
|
|
|
|
lines.append(" state Floating {")
|
|
lines.append(" [*] --> Bobbing")
|
|
lines.append(" Bobbing --> Bobbing: sin(time)")
|
|
lines.append(" }")
|
|
|
|
lines.append(" state Trace {")
|
|
lines.append(" [*] --> FollowPath")
|
|
lines.append(" FollowPath --> FollowPath: node by node")
|
|
lines.append(" }")
|
|
|
|
lines.append("```")
|
|
return "\n".join(lines)
|
|
|
|
def generate_full_diagram(self) -> str:
|
|
"""Generate full pipeline documentation."""
|
|
lines = [
|
|
"# Render Pipeline",
|
|
"",
|
|
"## Data Flow",
|
|
"",
|
|
self.generate_mermaid_flowchart(),
|
|
"",
|
|
"## Message Sequence",
|
|
"",
|
|
self.generate_mermaid_sequence(),
|
|
"",
|
|
"## Camera States",
|
|
"",
|
|
self.generate_mermaid_state(),
|
|
]
|
|
return "\n".join(lines)
|
|
|
|
def introspect_sources(self) -> None:
|
|
"""Introspect data sources."""
|
|
from engine import sources
|
|
|
|
for name in dir(sources):
|
|
obj = getattr(sources, name)
|
|
if isinstance(obj, dict):
|
|
self.add_node(
|
|
PipelineNode(
|
|
name=f"Data Source: {name}",
|
|
module="engine.sources",
|
|
description=f"{len(obj)} feeds configured",
|
|
)
|
|
)
|
|
|
|
def introspect_sources_v2(self) -> None:
|
|
"""Introspect data sources v2 (new abstraction)."""
|
|
from engine.sources_v2 import SourceRegistry, init_default_sources
|
|
|
|
init_default_sources()
|
|
SourceRegistry()
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="SourceRegistry",
|
|
module="engine.sources_v2",
|
|
class_name="SourceRegistry",
|
|
description="Source discovery and management",
|
|
)
|
|
)
|
|
|
|
for name, desc in [
|
|
("HeadlinesDataSource", "RSS feed headlines"),
|
|
("PoetryDataSource", "Poetry DB"),
|
|
("PipelineDataSource", "Pipeline viz (dynamic)"),
|
|
]:
|
|
self.add_node(
|
|
PipelineNode(
|
|
name=f"DataSource: {name}",
|
|
module="engine.sources_v2",
|
|
class_name=name,
|
|
description=f"{desc}",
|
|
)
|
|
)
|
|
|
|
def introspect_prepare(self) -> None:
|
|
"""Introspect prepare layer (transformation)."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="make_block",
|
|
module="engine.render",
|
|
func_name="make_block",
|
|
description="Transform headline into display block",
|
|
inputs=["title", "source", "timestamp", "width"],
|
|
outputs=["block"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="strip_tags",
|
|
module="engine.filter",
|
|
func_name="strip_tags",
|
|
description="Remove HTML tags from content",
|
|
inputs=["html"],
|
|
outputs=["plain_text"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="translate_headline",
|
|
module="engine.translate",
|
|
func_name="translate_headline",
|
|
description="Translate headline to target language",
|
|
inputs=["title", "target_lang"],
|
|
outputs=["translated_title"],
|
|
)
|
|
)
|
|
|
|
def introspect_fetch(self) -> None:
|
|
"""Introspect fetch layer."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="fetch_all",
|
|
module="engine.fetch",
|
|
func_name="fetch_all",
|
|
description="Fetch RSS feeds",
|
|
outputs=["items"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="fetch_poetry",
|
|
module="engine.fetch",
|
|
func_name="fetch_poetry",
|
|
description="Fetch Poetry DB",
|
|
outputs=["items"],
|
|
)
|
|
)
|
|
|
|
def introspect_scroll(self) -> None:
|
|
"""Introspect scroll engine."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="StreamController",
|
|
module="engine.controller",
|
|
class_name="StreamController",
|
|
description="Main render loop orchestrator",
|
|
inputs=["items", "ntfy_poller", "mic_monitor", "display"],
|
|
outputs=["buffer"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="render_ticker_zone",
|
|
module="engine.layers",
|
|
func_name="render_ticker_zone",
|
|
description="Render scrolling ticker content",
|
|
inputs=["active", "camera"],
|
|
outputs=["buffer"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="render_message_overlay",
|
|
module="engine.layers",
|
|
func_name="render_message_overlay",
|
|
description="Render ntfy message overlay",
|
|
inputs=["msg", "width", "height"],
|
|
outputs=["overlay", "cache"],
|
|
)
|
|
)
|
|
|
|
def introspect_render(self) -> None:
|
|
"""Introspect render layer."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="big_wrap",
|
|
module="engine.render",
|
|
func_name="big_wrap",
|
|
description="Word-wrap text to width",
|
|
inputs=["text", "width"],
|
|
outputs=["lines"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="lr_gradient",
|
|
module="engine.render",
|
|
func_name="lr_gradient",
|
|
description="Apply left-right gradient to lines",
|
|
inputs=["lines", "position"],
|
|
outputs=["styled_lines"],
|
|
)
|
|
)
|
|
|
|
def introspect_async_sources(self) -> None:
|
|
"""Introspect async data sources (ntfy, mic)."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="NtfyPoller",
|
|
module="engine.ntfy",
|
|
class_name="NtfyPoller",
|
|
description="Poll ntfy for messages (async)",
|
|
inputs=["topic"],
|
|
outputs=["message"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="MicMonitor",
|
|
module="engine.mic",
|
|
class_name="MicMonitor",
|
|
description="Monitor microphone input (async)",
|
|
outputs=["audio_level"],
|
|
)
|
|
)
|
|
|
|
def introspect_eventbus(self) -> None:
|
|
"""Introspect event bus for decoupled communication."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="EventBus",
|
|
module="engine.eventbus",
|
|
class_name="EventBus",
|
|
description="Thread-safe event publishing",
|
|
inputs=["event"],
|
|
outputs=["subscribers"],
|
|
)
|
|
)
|
|
|
|
def introspect_animation(self) -> None:
|
|
"""Introspect animation system."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="AnimationController",
|
|
module="engine.animation",
|
|
class_name="AnimationController",
|
|
description="Time-based parameter animation",
|
|
inputs=["dt"],
|
|
outputs=["params"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="Preset",
|
|
module="engine.animation",
|
|
class_name="Preset",
|
|
description="Package of initial params + animation",
|
|
)
|
|
)
|
|
|
|
def introspect_pipeline_viz(self) -> None:
|
|
"""Introspect pipeline visualization."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="generate_large_network_viewport",
|
|
module="engine.pipeline_viz",
|
|
func_name="generate_large_network_viewport",
|
|
description="Large animated network visualization",
|
|
inputs=["viewport_w", "viewport_h", "frame"],
|
|
outputs=["buffer"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="CameraLarge",
|
|
module="engine.pipeline_viz",
|
|
class_name="CameraLarge",
|
|
description="Large grid camera (trace mode)",
|
|
)
|
|
)
|
|
|
|
def introspect_camera(self) -> None:
|
|
"""Introspect camera system."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="Camera",
|
|
module="engine.camera",
|
|
class_name="Camera",
|
|
description="Viewport position controller",
|
|
inputs=["dt"],
|
|
outputs=["x", "y"],
|
|
)
|
|
)
|
|
|
|
def introspect_effects(self) -> None:
|
|
"""Introspect effect system."""
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="EffectChain",
|
|
module="engine.effects",
|
|
class_name="EffectChain",
|
|
description="Process effects in sequence",
|
|
inputs=["buffer", "context"],
|
|
outputs=["buffer"],
|
|
)
|
|
)
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name="EffectRegistry",
|
|
module="engine.effects",
|
|
class_name="EffectRegistry",
|
|
description="Manage effect plugins",
|
|
)
|
|
)
|
|
|
|
def introspect_display(self) -> None:
|
|
"""Introspect display backends."""
|
|
from engine.display import DisplayRegistry
|
|
|
|
DisplayRegistry.initialize()
|
|
backends = DisplayRegistry.list_backends()
|
|
|
|
for backend in backends:
|
|
self.add_node(
|
|
PipelineNode(
|
|
name=f"Display: {backend}",
|
|
module="engine.display.backends",
|
|
class_name=f"{backend.title()}Display",
|
|
description=f"Render to {backend}",
|
|
inputs=["buffer"],
|
|
)
|
|
)
|
|
|
|
def introspect_new_pipeline(self, pipeline=None) -> None:
|
|
"""Introspect new unified pipeline stages with metrics.
|
|
|
|
Args:
|
|
pipeline: Optional Pipeline instance to collect metrics from
|
|
"""
|
|
|
|
stages_info = [
|
|
(
|
|
"ItemsSource",
|
|
"engine.pipeline.adapters",
|
|
"ItemsStage",
|
|
"Provides pre-fetched items",
|
|
),
|
|
(
|
|
"Render",
|
|
"engine.pipeline.adapters",
|
|
"RenderStage",
|
|
"Renders items to buffer",
|
|
),
|
|
(
|
|
"Effect",
|
|
"engine.pipeline.adapters",
|
|
"EffectPluginStage",
|
|
"Applies effect",
|
|
),
|
|
(
|
|
"Display",
|
|
"engine.pipeline.adapters",
|
|
"DisplayStage",
|
|
"Outputs to display",
|
|
),
|
|
]
|
|
|
|
metrics = None
|
|
if pipeline and hasattr(pipeline, "get_metrics_summary"):
|
|
metrics = pipeline.get_metrics_summary()
|
|
if "error" in metrics:
|
|
metrics = None
|
|
|
|
total_avg = metrics.get("pipeline", {}).get("avg_ms", 0) if metrics else 0
|
|
|
|
for stage_name, module, class_name, desc in stages_info:
|
|
node_metrics = None
|
|
if metrics and "stages" in metrics:
|
|
for name, stats in metrics["stages"].items():
|
|
if stage_name.lower() in name.lower():
|
|
impact_pct = (
|
|
(stats.get("avg_ms", 0) / total_avg * 100)
|
|
if total_avg > 0
|
|
else 0
|
|
)
|
|
node_metrics = {
|
|
"avg_ms": stats.get("avg_ms", 0),
|
|
"min_ms": stats.get("min_ms", 0),
|
|
"max_ms": stats.get("max_ms", 0),
|
|
"impact_pct": impact_pct,
|
|
}
|
|
break
|
|
|
|
self.add_node(
|
|
PipelineNode(
|
|
name=f"Pipeline: {stage_name}",
|
|
module=module,
|
|
class_name=class_name,
|
|
description=desc,
|
|
inputs=["data"],
|
|
outputs=["data"],
|
|
metrics=node_metrics,
|
|
)
|
|
)
|
|
|
|
def run(self) -> str:
|
|
"""Run full introspection."""
|
|
self.introspect_sources()
|
|
self.introspect_sources_v2()
|
|
self.introspect_fetch()
|
|
self.introspect_prepare()
|
|
self.introspect_scroll()
|
|
self.introspect_render()
|
|
self.introspect_camera()
|
|
self.introspect_effects()
|
|
self.introspect_display()
|
|
self.introspect_async_sources()
|
|
self.introspect_eventbus()
|
|
self.introspect_animation()
|
|
self.introspect_pipeline_viz()
|
|
|
|
return self.generate_full_diagram()
|
|
|
|
|
|
def generate_pipeline_diagram() -> str:
|
|
"""Generate a self-documenting pipeline diagram."""
|
|
introspector = PipelineIntrospector()
|
|
return introspector.run()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
print(generate_pipeline_diagram())
|