""" Pipeline introspection - generates self-documenting diagrams of the render pipeline. Pipeline Architecture: - Sources: Data providers (RSS, Poetry, Ntfy, Mic) - static or dynamic - Fetch: Retrieve data from sources - Prepare: Transform raw data (make_block, strip_tags, translate) - Scroll: Camera-based viewport rendering (ticker zone, message overlay) - Effects: Post-processing chain (noise, fade, glitch, firehose, hud) - Render: Final line rendering and layout - Display: Output backends (terminal, pygame, websocket, sixel, kitty) Key abstractions: - DataSource: Sources can be static (cached) or dynamic (idempotent fetch) - Camera: Viewport controller (vertical, horizontal, omni, floating, trace) - EffectChain: Ordered effect processing pipeline - Display: Pluggable output backends - SourceRegistry: Source discovery and management - AnimationController: Time-based parameter animation - Preset: Package of initial params + animation for demo modes """ from __future__ import annotations from dataclasses import dataclass @dataclass class PipelineNode: """Represents a node in the pipeline.""" name: str module: str class_name: str | None = None func_name: str | None = None description: str = "" inputs: list[str] | None = None outputs: list[str] | None = None metrics: dict | None = None # Performance metrics (avg_ms, min_ms, max_ms) class PipelineIntrospector: """Introspects the render pipeline and generates documentation.""" def __init__(self): self.nodes: list[PipelineNode] = [] def add_node(self, node: PipelineNode) -> None: self.nodes.append(node) def generate_mermaid_flowchart(self) -> str: """Generate a Mermaid flowchart of the pipeline.""" lines = ["```mermaid", "flowchart TD"] subgraph_groups = { "Sources": [], "Fetch": [], "Prepare": [], "Scroll": [], "Effects": [], "Display": [], "Async": [], "Animation": [], "Viz": [], } other_nodes = [] for node in self.nodes: node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_") label = node.name if node.class_name: label = f"{node.name}\\n({node.class_name})" elif node.func_name: label = f"{node.name}\\n({node.func_name})" if node.description: label += f"\\n{node.description}" if node.metrics: avg = node.metrics.get("avg_ms", 0) if avg > 0: label += f"\\n⏱ {avg:.1f}ms" impact = node.metrics.get("impact_pct", 0) if impact > 0: label += f" ({impact:.0f}%)" node_entry = f' {node_id}["{label}"]' if "DataSource" in node.name or "SourceRegistry" in node.name: subgraph_groups["Sources"].append(node_entry) elif "fetch" in node.name.lower(): subgraph_groups["Fetch"].append(node_entry) elif ( "make_block" in node.name or "strip_tags" in node.name or "translate" in node.name ): subgraph_groups["Prepare"].append(node_entry) elif ( "StreamController" in node.name or "render_ticker" in node.name or "render_message" in node.name or "Camera" in node.name ): subgraph_groups["Scroll"].append(node_entry) elif "Effect" in node.name or "effect" in node.module: subgraph_groups["Effects"].append(node_entry) elif "Display:" in node.name: subgraph_groups["Display"].append(node_entry) elif "Ntfy" in node.name or "Mic" in node.name: subgraph_groups["Async"].append(node_entry) elif "Animation" in node.name or "Preset" in node.name: subgraph_groups["Animation"].append(node_entry) elif "pipeline_viz" in node.module or "CameraLarge" in node.name: subgraph_groups["Viz"].append(node_entry) else: other_nodes.append(node_entry) for group_name, nodes in subgraph_groups.items(): if nodes: lines.append(f" subgraph {group_name}") for node in nodes: lines.append(node) lines.append(" end") for node in other_nodes: lines.append(node) lines.append("") for node in self.nodes: node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_") if node.inputs: for inp in node.inputs: inp_id = inp.replace("-", "_").replace(" ", "_").replace(":", "_") lines.append(f" {inp_id} --> {node_id}") lines.append("```") return "\n".join(lines) def generate_mermaid_sequence(self) -> str: """Generate a Mermaid sequence diagram of message flow.""" lines = ["```mermaid", "sequenceDiagram"] lines.append(" participant Sources") lines.append(" participant Fetch") lines.append(" participant Scroll") lines.append(" participant Effects") lines.append(" participant Display") lines.append(" Sources->>Fetch: headlines") lines.append(" Fetch->>Scroll: content blocks") lines.append(" Scroll->>Effects: buffer") lines.append(" Effects->>Effects: process chain") lines.append(" Effects->>Display: rendered buffer") lines.append("```") return "\n".join(lines) def generate_mermaid_state(self) -> str: """Generate a Mermaid state diagram of camera modes.""" lines = ["```mermaid", "stateDiagram-v2"] lines.append(" [*] --> Vertical") lines.append(" Vertical --> Horizontal: set_mode()") lines.append(" Horizontal --> Omni: set_mode()") lines.append(" Omni --> Floating: set_mode()") lines.append(" Floating --> Trace: set_mode()") lines.append(" Trace --> Vertical: set_mode()") lines.append(" state Vertical {") lines.append(" [*] --> ScrollUp") lines.append(" ScrollUp --> ScrollUp: +y each frame") lines.append(" }") lines.append(" state Horizontal {") lines.append(" [*] --> ScrollLeft") lines.append(" ScrollLeft --> ScrollLeft: +x each frame") lines.append(" }") lines.append(" state Omni {") lines.append(" [*] --> Diagonal") lines.append(" Diagonal --> Diagonal: +x, +y") lines.append(" }") lines.append(" state Floating {") lines.append(" [*] --> Bobbing") lines.append(" Bobbing --> Bobbing: sin(time)") lines.append(" }") lines.append(" state Trace {") lines.append(" [*] --> FollowPath") lines.append(" FollowPath --> FollowPath: node by node") lines.append(" }") lines.append("```") return "\n".join(lines) def generate_full_diagram(self) -> str: """Generate full pipeline documentation.""" lines = [ "# Render Pipeline", "", "## Data Flow", "", self.generate_mermaid_flowchart(), "", "## Message Sequence", "", self.generate_mermaid_sequence(), "", "## Camera States", "", self.generate_mermaid_state(), ] return "\n".join(lines) def introspect_sources(self) -> None: """Introspect data sources.""" from engine import sources for name in dir(sources): obj = getattr(sources, name) if isinstance(obj, dict): self.add_node( PipelineNode( name=f"Data Source: {name}", module="engine.sources", description=f"{len(obj)} feeds configured", ) ) def introspect_sources_v2(self) -> None: """Introspect data sources v2 (new abstraction).""" from engine.sources_v2 import SourceRegistry, init_default_sources init_default_sources() SourceRegistry() self.add_node( PipelineNode( name="SourceRegistry", module="engine.sources_v2", class_name="SourceRegistry", description="Source discovery and management", ) ) for name, desc in [ ("HeadlinesDataSource", "RSS feed headlines"), ("PoetryDataSource", "Poetry DB"), ("PipelineDataSource", "Pipeline viz (dynamic)"), ]: self.add_node( PipelineNode( name=f"DataSource: {name}", module="engine.sources_v2", class_name=name, description=f"{desc}", ) ) def introspect_prepare(self) -> None: """Introspect prepare layer (transformation).""" self.add_node( PipelineNode( name="make_block", module="engine.render", func_name="make_block", description="Transform headline into display block", inputs=["title", "source", "timestamp", "width"], outputs=["block"], ) ) self.add_node( PipelineNode( name="strip_tags", module="engine.filter", func_name="strip_tags", description="Remove HTML tags from content", inputs=["html"], outputs=["plain_text"], ) ) self.add_node( PipelineNode( name="translate_headline", module="engine.translate", func_name="translate_headline", description="Translate headline to target language", inputs=["title", "target_lang"], outputs=["translated_title"], ) ) def introspect_fetch(self) -> None: """Introspect fetch layer.""" self.add_node( PipelineNode( name="fetch_all", module="engine.fetch", func_name="fetch_all", description="Fetch RSS feeds", outputs=["items"], ) ) self.add_node( PipelineNode( name="fetch_poetry", module="engine.fetch", func_name="fetch_poetry", description="Fetch Poetry DB", outputs=["items"], ) ) def introspect_scroll(self) -> None: """Introspect scroll engine (legacy - replaced by pipeline architecture).""" self.add_node( PipelineNode( name="render_ticker_zone", module="engine.layers", func_name="render_ticker_zone", description="Render scrolling ticker content", inputs=["active", "camera"], outputs=["buffer"], ) ) self.add_node( PipelineNode( name="render_message_overlay", module="engine.layers", func_name="render_message_overlay", description="Render ntfy message overlay", inputs=["msg", "width", "height"], outputs=["overlay", "cache"], ) ) def introspect_render(self) -> None: """Introspect render layer.""" self.add_node( PipelineNode( name="big_wrap", module="engine.render", func_name="big_wrap", description="Word-wrap text to width", inputs=["text", "width"], outputs=["lines"], ) ) self.add_node( PipelineNode( name="lr_gradient", module="engine.render", func_name="lr_gradient", description="Apply left-right gradient to lines", inputs=["lines", "position"], outputs=["styled_lines"], ) ) def introspect_async_sources(self) -> None: """Introspect async data sources (ntfy, mic).""" self.add_node( PipelineNode( name="NtfyPoller", module="engine.ntfy", class_name="NtfyPoller", description="Poll ntfy for messages (async)", inputs=["topic"], outputs=["message"], ) ) self.add_node( PipelineNode( name="MicMonitor", module="engine.mic", class_name="MicMonitor", description="Monitor microphone input (async)", outputs=["audio_level"], ) ) def introspect_eventbus(self) -> None: """Introspect event bus for decoupled communication.""" self.add_node( PipelineNode( name="EventBus", module="engine.eventbus", class_name="EventBus", description="Thread-safe event publishing", inputs=["event"], outputs=["subscribers"], ) ) def introspect_animation(self) -> None: """Introspect animation system.""" self.add_node( PipelineNode( name="AnimationController", module="engine.animation", class_name="AnimationController", description="Time-based parameter animation", inputs=["dt"], outputs=["params"], ) ) self.add_node( PipelineNode( name="Preset", module="engine.animation", class_name="Preset", description="Package of initial params + animation", ) ) def introspect_pipeline_viz(self) -> None: """Introspect pipeline visualization.""" self.add_node( PipelineNode( name="generate_large_network_viewport", module="engine.pipeline_viz", func_name="generate_large_network_viewport", description="Large animated network visualization", inputs=["viewport_w", "viewport_h", "frame"], outputs=["buffer"], ) ) self.add_node( PipelineNode( name="CameraLarge", module="engine.pipeline_viz", class_name="CameraLarge", description="Large grid camera (trace mode)", ) ) def introspect_camera(self) -> None: """Introspect camera system.""" self.add_node( PipelineNode( name="Camera", module="engine.camera", class_name="Camera", description="Viewport position controller", inputs=["dt"], outputs=["x", "y"], ) ) def introspect_effects(self) -> None: """Introspect effect system.""" self.add_node( PipelineNode( name="EffectChain", module="engine.effects", class_name="EffectChain", description="Process effects in sequence", inputs=["buffer", "context"], outputs=["buffer"], ) ) self.add_node( PipelineNode( name="EffectRegistry", module="engine.effects", class_name="EffectRegistry", description="Manage effect plugins", ) ) def introspect_display(self) -> None: """Introspect display backends.""" from engine.display import DisplayRegistry DisplayRegistry.initialize() backends = DisplayRegistry.list_backends() for backend in backends: self.add_node( PipelineNode( name=f"Display: {backend}", module="engine.display.backends", class_name=f"{backend.title()}Display", description=f"Render to {backend}", inputs=["buffer"], ) ) def introspect_new_pipeline(self, pipeline=None) -> None: """Introspect new unified pipeline stages with metrics. Args: pipeline: Optional Pipeline instance to collect metrics from """ stages_info = [ ( "ItemsSource", "engine.pipeline.adapters", "ItemsStage", "Provides pre-fetched items", ), ( "Render", "engine.pipeline.adapters", "RenderStage", "Renders items to buffer", ), ( "Effect", "engine.pipeline.adapters", "EffectPluginStage", "Applies effect", ), ( "Display", "engine.pipeline.adapters", "DisplayStage", "Outputs to display", ), ] metrics = None if pipeline and hasattr(pipeline, "get_metrics_summary"): metrics = pipeline.get_metrics_summary() if "error" in metrics: metrics = None total_avg = metrics.get("pipeline", {}).get("avg_ms", 0) if metrics else 0 for stage_name, module, class_name, desc in stages_info: node_metrics = None if metrics and "stages" in metrics: for name, stats in metrics["stages"].items(): if stage_name.lower() in name.lower(): impact_pct = ( (stats.get("avg_ms", 0) / total_avg * 100) if total_avg > 0 else 0 ) node_metrics = { "avg_ms": stats.get("avg_ms", 0), "min_ms": stats.get("min_ms", 0), "max_ms": stats.get("max_ms", 0), "impact_pct": impact_pct, } break self.add_node( PipelineNode( name=f"Pipeline: {stage_name}", module=module, class_name=class_name, description=desc, inputs=["data"], outputs=["data"], metrics=node_metrics, ) ) def run(self) -> str: """Run full introspection.""" self.introspect_sources() self.introspect_sources_v2() self.introspect_fetch() self.introspect_prepare() self.introspect_scroll() self.introspect_render() self.introspect_camera() self.introspect_effects() self.introspect_display() self.introspect_async_sources() self.introspect_eventbus() self.introspect_animation() self.introspect_pipeline_viz() return self.generate_full_diagram() def generate_pipeline_diagram() -> str: """Generate a self-documenting pipeline diagram.""" introspector = PipelineIntrospector() return introspector.run() if __name__ == "__main__": print(generate_pipeline_diagram())