From bcb4ef0cfe5597620e4cf581c572a753e0498542 Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Mon, 16 Mar 2026 03:11:24 -0700 Subject: [PATCH] feat(pipeline): add unified pipeline architecture with Stage abstraction - Add engine/pipeline/ module with Stage ABC, PipelineContext, PipelineParams - Stage provides unified interface for sources, effects, displays, cameras - Pipeline class handles DAG-based execution with dependency resolution - PipelinePreset for pre-configured pipelines (demo, poetry, pipeline, etc.) - Add PipelineParams as params layer for animation-driven config - Add StageRegistry for unified stage registration - Add sources_v2.py with DataSource.is_dynamic property - Add animation.py with Preset and AnimationController - Skip ntfy integration tests by default (require -m integration) - Skip e2e tests by default (require -m e2e) - Update pipeline.py with comprehensive introspection methods --- docs/PIPELINE.md | 101 +++++++++- engine/animation.py | 340 +++++++++++++++++++++++++++++++++ engine/app.py | 146 +++++++++++++- engine/config.py | 3 + engine/pipeline.py | 275 +++++++++++++++++++++++++- engine/pipeline/__init__.py | 107 +++++++++++ engine/pipeline/controller.py | 229 ++++++++++++++++++++++ engine/pipeline/core.py | 221 +++++++++++++++++++++ engine/pipeline/params.py | 144 ++++++++++++++ engine/pipeline/presets.py | 155 +++++++++++++++ engine/pipeline/registry.py | 127 ++++++++++++ engine/pipeline_viz.py | 293 +++++++++++++++++++++++++--- engine/sources_v2.py | 214 +++++++++++++++++++++ mise.toml | 7 + pyproject.toml | 2 + tests/conftest.py | 36 ++++ tests/test_ntfy_integration.py | 4 + 17 files changed, 2356 insertions(+), 48 deletions(-) create mode 100644 engine/animation.py create mode 100644 engine/pipeline/__init__.py create mode 100644 engine/pipeline/controller.py create mode 100644 engine/pipeline/core.py create mode 100644 engine/pipeline/params.py create mode 100644 engine/pipeline/presets.py create mode 100644 engine/pipeline/registry.py create mode 100644 engine/sources_v2.py create mode 100644 tests/conftest.py diff --git a/docs/PIPELINE.md b/docs/PIPELINE.md index 843aef1..fab35a0 100644 --- a/docs/PIPELINE.md +++ b/docs/PIPELINE.md @@ -1,12 +1,41 @@ # Mainline Pipeline +## Architecture Overview + +``` +Sources (static/dynamic) → Fetch → Prepare → Scroll → Effects → Render → Display + ↓ + NtfyPoller ← MicMonitor (async) +``` + +### Data Source Abstraction (sources_v2.py) + +- **Static sources**: Data fetched once and cached (HeadlinesDataSource, PoetryDataSource) +- **Dynamic sources**: Idempotent fetch for runtime updates (PipelineDataSource) +- **SourceRegistry**: Discovery and management of data sources + +### Camera Modes + +- **Vertical**: Scroll up (default) +- **Horizontal**: Scroll left +- **Omni**: Diagonal scroll +- **Floating**: Sinusoidal bobbing +- **Trace**: Follow network path node-by-node (for pipeline viz) + ## Content to Display Rendering Pipeline ```mermaid flowchart TD - subgraph Sources["Data Sources"] + subgraph Sources["Data Sources (v2)"] + Headlines[HeadlinesDataSource] + Poetry[PoetryDataSource] + Pipeline[PipelineDataSource] + Registry[SourceRegistry] + end + + subgraph SourcesLegacy["Data Sources (legacy)"] RSS[("RSS Feeds")] - Poetry[("Poetry Feed")] + PoetryFeed[("Poetry Feed")] Ntfy[("Ntfy Messages")] Mic[("Microphone")] end @@ -24,9 +53,10 @@ flowchart TD end subgraph Scroll["Scroll Engine"] + SC[StreamController] CAM[Camera] - NH[next_headline] RTZ[render_ticker_zone] + Msg[render_message_overlay] Grad[lr_gradient] VT[vis_trunc / vis_offset] end @@ -44,8 +74,8 @@ flowchart TD end subgraph Render["Render Layer"] + BW[big_wrap] RL[render_line] - TL[apply_ticker_layout] end subgraph Display["Display Backends"] @@ -57,33 +87,78 @@ flowchart TD ND[NullDisplay] end + subgraph Async["Async Sources"] + NTFY[NtfyPoller] + MIC[MicMonitor] + end + + subgraph Animation["Animation System"] + AC[AnimationController] + PR[Preset] + end + Sources --> Fetch RSS --> FC - Poetry --> FP + PoetryFeed --> FP FC --> Cache FP --> Cache Cache --> MB Strip --> MB Trans --> MB - MB --> NH - NH --> RTZ + MB --> SC + NTFY --> SC + SC --> RTZ CAM --> RTZ Grad --> RTZ VT --> RTZ RTZ --> EC EC --> ER ER --> EffectsPlugins - EffectsPlugins --> RL + EffectsPlugins --> BW + BW --> RL RL --> Display Ntfy --> RL Mic --> RL + MIC --> RL style Sources fill:#f9f,stroke:#333 style Fetch fill:#bbf,stroke:#333 + style Prepare fill:#bff,stroke:#333 style Scroll fill:#bfb,stroke:#333 style Effects fill:#fbf,stroke:#333 style Render fill:#ffb,stroke:#333 - style Display fill:#bff,stroke:#333 + style Display fill:#bbf,stroke:#333 + style Async fill:#fbb,stroke:#333 + style Animation fill:#bfb,stroke:#333 +``` + +## Animation & Presets + +```mermaid +flowchart LR + subgraph Preset["Preset"] + PP[PipelineParams] + AC[AnimationController] + end + + subgraph AnimationController["AnimationController"] + Clock[Clock] + Events[Events] + Triggers[Triggers] + end + + subgraph Triggers["Trigger Types"] + TIME[TIME] + FRAME[FRAME] + CYCLE[CYCLE] + COND[CONDITION] + MANUAL[MANUAL] + end + + PP --> AC + Clock --> AC + Events --> AC + Triggers --> Events ``` ## Camera Modes @@ -94,7 +169,8 @@ stateDiagram-v2 Vertical --> Horizontal: mode change Horizontal --> Omni: mode change Omni --> Floating: mode change - Floating --> Vertical: mode change + Floating --> Trace: mode change + Trace --> Vertical: mode change state Vertical { [*] --> ScrollUp @@ -115,4 +191,9 @@ stateDiagram-v2 [*] --> Bobbing Bobbing --> Bobbing: sin(time) for x,y } + + state Trace { + [*] --> FollowPath + FollowPath --> FollowPath: node by node + } ``` diff --git a/engine/animation.py b/engine/animation.py new file mode 100644 index 0000000..6b6cd7b --- /dev/null +++ b/engine/animation.py @@ -0,0 +1,340 @@ +""" +Animation system - Clock, events, triggers, durations, and animation controller. +""" + +import time +from collections.abc import Callable +from dataclasses import dataclass, field +from enum import Enum, auto +from typing import Any + + +class Clock: + """High-resolution clock for animation timing.""" + + def __init__(self): + self._start_time = time.perf_counter() + self._paused = False + self._pause_offset = 0.0 + self._pause_start = 0.0 + + def reset(self) -> None: + self._start_time = time.perf_counter() + self._paused = False + self._pause_offset = 0.0 + self._pause_start = 0.0 + + def elapsed(self) -> float: + if self._paused: + return self._pause_start - self._start_time - self._pause_offset + return time.perf_counter() - self._start_time - self._pause_offset + + def elapsed_ms(self) -> float: + return self.elapsed() * 1000 + + def elapsed_frames(self, fps: float = 60.0) -> int: + return int(self.elapsed() * fps) + + def pause(self) -> None: + if not self._paused: + self._paused = True + self._pause_start = time.perf_counter() + + def resume(self) -> None: + if self._paused: + self._pause_offset += time.perf_counter() - self._pause_start + self._paused = False + + +class TriggerType(Enum): + TIME = auto() # Trigger after elapsed time + FRAME = auto() # Trigger after N frames + CYCLE = auto() # Trigger on cycle repeat + CONDITION = auto() # Trigger when condition is met + MANUAL = auto() # Trigger manually + + +@dataclass +class Trigger: + """Event trigger configuration.""" + + type: TriggerType + value: float | int = 0 + condition: Callable[["AnimationController"], bool] | None = None + repeat: bool = False + repeat_interval: float = 0.0 + + +@dataclass +class Event: + """An event with trigger, duration, and action.""" + + name: str + trigger: Trigger + action: Callable[["AnimationController", float], None] + duration: float = 0.0 + ease: Callable[[float], float] | None = None + + def __post_init__(self): + if self.ease is None: + self.ease = linear_ease + + +def linear_ease(t: float) -> float: + return t + + +def ease_in_out(t: float) -> float: + return t * t * (3 - 2 * t) + + +def ease_out_bounce(t: float) -> float: + if t < 1 / 2.75: + return 7.5625 * t * t + elif t < 2 / 2.75: + t -= 1.5 / 2.75 + return 7.5625 * t * t + 0.75 + elif t < 2.5 / 2.75: + t -= 2.25 / 2.75 + return 7.5625 * t * t + 0.9375 + else: + t -= 2.625 / 2.75 + return 7.5625 * t * t + 0.984375 + + +class AnimationController: + """Controls animation parameters with clock and events.""" + + def __init__(self, fps: float = 60.0): + self.clock = Clock() + self.fps = fps + self.frame = 0 + self._events: list[Event] = [] + self._active_events: dict[str, float] = {} + self._params: dict[str, Any] = {} + self._cycled = 0 + + def add_event(self, event: Event) -> "AnimationController": + self._events.append(event) + return self + + def set_param(self, key: str, value: Any) -> None: + self._params[key] = value + + def get_param(self, key: str, default: Any = None) -> Any: + return self._params.get(key, default) + + def update(self) -> dict[str, Any]: + """Update animation state, return current params.""" + elapsed = self.clock.elapsed() + + for event in self._events: + triggered = False + + if event.trigger.type == TriggerType.TIME: + if self.clock.elapsed() >= event.trigger.value: + triggered = True + elif event.trigger.type == TriggerType.FRAME: + if self.frame >= event.trigger.value: + triggered = True + elif event.trigger.type == TriggerType.CYCLE: + cycle_duration = event.trigger.value + if cycle_duration > 0: + current_cycle = int(elapsed / cycle_duration) + if current_cycle > self._cycled: + self._cycled = current_cycle + triggered = True + elif event.trigger.type == TriggerType.CONDITION: + if event.trigger.condition and event.trigger.condition(self): + triggered = True + elif event.trigger.type == TriggerType.MANUAL: + pass + + if triggered: + if event.name not in self._active_events: + self._active_events[event.name] = 0.0 + + progress = 0.0 + if event.duration > 0: + self._active_events[event.name] += 1 / self.fps + progress = min( + 1.0, self._active_events[event.name] / event.duration + ) + eased_progress = event.ease(progress) + event.action(self, eased_progress) + + if progress >= 1.0: + if event.trigger.repeat: + self._active_events[event.name] = 0.0 + else: + del self._active_events[event.name] + else: + event.action(self, 1.0) + if not event.trigger.repeat: + del self._active_events[event.name] + else: + self._active_events[event.name] = 0.0 + + self.frame += 1 + return dict(self._params) + + +@dataclass +class PipelineParams: + """Snapshot of pipeline parameters for animation.""" + + effect_enabled: dict[str, bool] = field(default_factory=dict) + effect_intensity: dict[str, float] = field(default_factory=dict) + camera_mode: str = "vertical" + camera_speed: float = 1.0 + camera_x: int = 0 + camera_y: int = 0 + display_backend: str = "terminal" + scroll_speed: float = 1.0 + + +class Preset: + """Packages a starting pipeline config + Animation controller.""" + + def __init__( + self, + name: str, + description: str = "", + initial_params: PipelineParams | None = None, + animation: AnimationController | None = None, + ): + self.name = name + self.description = description + self.initial_params = initial_params or PipelineParams() + self.animation = animation or AnimationController() + + def create_controller(self) -> AnimationController: + controller = AnimationController() + for key, value in self.initial_params.__dict__.items(): + controller.set_param(key, value) + for event in self.animation._events: + controller.add_event(event) + return controller + + +def create_demo_preset() -> Preset: + """Create the demo preset with effect cycling and camera modes.""" + animation = AnimationController(fps=60) + + effects = ["noise", "fade", "glitch", "firehose"] + camera_modes = ["vertical", "horizontal", "omni", "floating", "trace"] + + def make_effect_action(eff): + def action(ctrl, t): + ctrl.set_param("current_effect", eff) + ctrl.set_param("effect_intensity", t) + + return action + + def make_camera_action(cam_mode): + def action(ctrl, t): + ctrl.set_param("camera_mode", cam_mode) + + return action + + for i, effect in enumerate(effects): + effect_duration = 5.0 + + animation.add_event( + Event( + name=f"effect_{effect}", + trigger=Trigger( + type=TriggerType.TIME, + value=i * effect_duration, + repeat=True, + repeat_interval=len(effects) * effect_duration, + ), + duration=effect_duration, + action=make_effect_action(effect), + ease=ease_in_out, + ) + ) + + for i, mode in enumerate(camera_modes): + camera_duration = 10.0 + animation.add_event( + Event( + name=f"camera_{mode}", + trigger=Trigger( + type=TriggerType.TIME, + value=i * camera_duration, + repeat=True, + repeat_interval=len(camera_modes) * camera_duration, + ), + duration=0.5, + action=make_camera_action(mode), + ) + ) + + animation.add_event( + Event( + name="pulse", + trigger=Trigger(type=TriggerType.CYCLE, value=2.0, repeat=True), + duration=1.0, + action=lambda ctrl, t: ctrl.set_param("pulse", t), + ease=ease_out_bounce, + ) + ) + + return Preset( + name="demo", + description="Demo mode with effect cycling and camera modes", + initial_params=PipelineParams( + effect_enabled={ + "noise": False, + "fade": False, + "glitch": False, + "firehose": False, + "hud": True, + }, + effect_intensity={ + "noise": 0.0, + "fade": 0.0, + "glitch": 0.0, + "firehose": 0.0, + }, + camera_mode="vertical", + camera_speed=1.0, + display_backend="pygame", + ), + animation=animation, + ) + + +def create_pipeline_preset() -> Preset: + """Create preset for pipeline visualization.""" + animation = AnimationController(fps=60) + + animation.add_event( + Event( + name="camera_trace", + trigger=Trigger(type=TriggerType.CYCLE, value=8.0, repeat=True), + duration=8.0, + action=lambda ctrl, t: ctrl.set_param("camera_mode", "trace"), + ) + ) + + animation.add_event( + Event( + name="highlight_path", + trigger=Trigger(type=TriggerType.CYCLE, value=4.0, repeat=True), + duration=4.0, + action=lambda ctrl, t: ctrl.set_param("path_progress", t), + ) + ) + + return Preset( + name="pipeline", + description="Pipeline visualization with trace camera", + initial_params=PipelineParams( + camera_mode="trace", + camera_speed=1.0, + display_backend="pygame", + ), + animation=animation, + ) diff --git a/engine/app.py b/engine/app.py index f15d547..3e06eb8 100644 --- a/engine/app.py +++ b/engine/app.py @@ -572,7 +572,7 @@ def run_pipeline_demo(): get_registry, set_monitor, ) - from engine.pipeline_viz import generate_network_pipeline + from engine.pipeline_viz import generate_large_network_viewport print(" \033[1;38;5;46mMAINLINE PIPELINE DEMO\033[0m") print(" \033[38;5;245mInitializing...\033[0m") @@ -667,7 +667,7 @@ def run_pipeline_demo(): camera.update(config.FRAME_DT) - buf = generate_network_pipeline(w, h, frame_number) + buf = generate_large_network_viewport(w, h, frame_number) ctx = EffectContext( terminal_width=w, @@ -699,6 +699,144 @@ def run_pipeline_demo(): print("\n \033[38;5;245mPipeline demo ended\033[0m") +def run_preset_mode(preset_name: str): + """Run mode using animation presets.""" + from engine import config + from engine.animation import ( + create_demo_preset, + create_pipeline_preset, + ) + from engine.camera import Camera + from engine.display import DisplayRegistry + from engine.effects import ( + EffectContext, + PerformanceMonitor, + get_effect_chain, + get_registry, + set_monitor, + ) + from engine.sources_v2 import ( + PipelineDataSource, + get_source_registry, + init_default_sources, + ) + + w, h = 80, 24 + + if preset_name == "demo": + preset = create_demo_preset() + init_default_sources() + source = get_source_registry().default() + elif preset_name == "pipeline": + preset = create_pipeline_preset() + source = PipelineDataSource(w, h) + else: + print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m") + print(" Available: demo, pipeline") + sys.exit(1) + + print(f" \033[1;38;5;46mMAINLINE PRESET: {preset.name}\033[0m") + print(f" \033[38;5;245m{preset.description}\033[0m") + print(" \033[38;5;245mInitializing...\033[0m") + + import effects_plugins + + effects_plugins.discover_plugins() + + registry = get_registry() + chain = get_effect_chain() + chain.set_order(["noise", "fade", "glitch", "firehose", "hud"]) + + monitor = PerformanceMonitor() + set_monitor(monitor) + chain._monitor = monitor + + display = DisplayRegistry.create(preset.initial_params.display_backend) + if not display: + print( + f" \033[38;5;196mFailed to create {preset.initial_params.display_backend} display\033[0m" + ) + sys.exit(1) + + display.init(w, h) + display.clear() + + camera = Camera.vertical() + + print(" \033[38;5;82mStarting preset animation...\033[0m") + print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") + + controller = preset.create_controller() + frame_number = 0 + + try: + while True: + params = controller.update() + + effect_name = params.get("current_effect", "none") + intensity = params.get("effect_intensity", 0.0) + camera_mode = params.get("camera_mode", "vertical") + + if camera_mode == "vertical": + camera = Camera.vertical(speed=params.get("camera_speed", 1.0)) + elif camera_mode == "horizontal": + camera = Camera.horizontal(speed=params.get("camera_speed", 1.0)) + elif camera_mode == "omni": + camera = Camera.omni(speed=params.get("camera_speed", 1.0)) + elif camera_mode == "floating": + camera = Camera.floating(speed=params.get("camera_speed", 1.0)) + + camera.update(config.FRAME_DT) + + for eff in registry.list_all().values(): + if eff.name == effect_name: + eff.config.enabled = True + eff.config.intensity = intensity + elif eff.name not in ("hud",): + eff.config.enabled = False + + hud_effect = registry.get("hud") + if hud_effect: + hud_effect.config.params["display_effect"] = ( + f"{effect_name} / {camera_mode}" + ) + hud_effect.config.params["display_intensity"] = intensity + + source.viewport_width = w + source.viewport_height = h + items = source.get_items() + buffer = items[0].content.split("\n") if items else [""] * h + + ctx = EffectContext( + terminal_width=w, + terminal_height=h, + scroll_cam=camera.y, + ticker_height=h, + camera_x=camera.x, + mic_excess=0.0, + grad_offset=0.0, + frame_number=frame_number, + has_message=False, + items=[], + ) + + result = chain.process(buffer, ctx) + display.show(result) + + new_w, new_h = display.get_dimensions() + if new_w != w or new_h != h: + w, h = new_w, new_h + + frame_number += 1 + time.sleep(1 / 60) + + except KeyboardInterrupt: + pass + finally: + display.cleanup() + print("\n \033[38;5;245mPreset ended\033[0m") + + def main(): from engine import config from engine.pipeline import generate_pipeline_diagram @@ -711,6 +849,10 @@ def main(): run_pipeline_demo() return + if config.PRESET: + run_preset_mode(config.PRESET) + return + if config.DEMO: run_demo_mode() return diff --git a/engine/config.py b/engine/config.py index 6aea065..7db5787 100644 --- a/engine/config.py +++ b/engine/config.py @@ -246,6 +246,9 @@ DEMO = "--demo" in sys.argv DEMO_EFFECT_DURATION = 5.0 # seconds per effect PIPELINE_DEMO = "--pipeline-demo" in sys.argv +# ─── PRESET MODE ──────────────────────────────────────────── +PRESET = _arg_value("--preset", sys.argv) + # ─── PIPELINE DIAGRAM ──────────────────────────────────── PIPELINE_DIAGRAM = "--pipeline-diagram" in sys.argv diff --git a/engine/pipeline.py b/engine/pipeline.py index 70e0f63..593d30a 100644 --- a/engine/pipeline.py +++ b/engine/pipeline.py @@ -1,5 +1,23 @@ """ Pipeline introspection - generates self-documenting diagrams of the render pipeline. + +Pipeline Architecture: +- Sources: Data providers (RSS, Poetry, Ntfy, Mic) - static or dynamic +- Fetch: Retrieve data from sources +- Prepare: Transform raw data (make_block, strip_tags, translate) +- Scroll: Camera-based viewport rendering (ticker zone, message overlay) +- Effects: Post-processing chain (noise, fade, glitch, firehose, hud) +- Render: Final line rendering and layout +- Display: Output backends (terminal, pygame, websocket, sixel, kitty) + +Key abstractions: +- DataSource: Sources can be static (cached) or dynamic (idempotent fetch) +- Camera: Viewport controller (vertical, horizontal, omni, floating, trace) +- EffectChain: Ordered effect processing pipeline +- Display: Pluggable output backends +- SourceRegistry: Source discovery and management +- AnimationController: Time-based parameter animation +- Preset: Package of initial params + animation for demo modes """ from __future__ import annotations @@ -33,8 +51,22 @@ class PipelineIntrospector: """Generate a Mermaid flowchart of the pipeline.""" lines = ["```mermaid", "flowchart TD"] + subgraph_groups = { + "Sources": [], + "Fetch": [], + "Prepare": [], + "Scroll": [], + "Effects": [], + "Display": [], + "Async": [], + "Animation": [], + "Viz": [], + } + + other_nodes = [] + for node in self.nodes: - node_id = node.name.replace("-", "_").replace(" ", "_") + node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_") label = node.name if node.class_name: label = f"{node.name}\\n({node.class_name})" @@ -44,15 +76,55 @@ class PipelineIntrospector: if node.description: label += f"\\n{node.description}" - lines.append(f' {node_id}["{label}"]') + node_entry = f' {node_id}["{label}"]' + + if "DataSource" in node.name or "SourceRegistry" in node.name: + subgraph_groups["Sources"].append(node_entry) + elif "fetch" in node.name.lower(): + subgraph_groups["Fetch"].append(node_entry) + elif ( + "make_block" in node.name + or "strip_tags" in node.name + or "translate" in node.name + ): + subgraph_groups["Prepare"].append(node_entry) + elif ( + "StreamController" in node.name + or "render_ticker" in node.name + or "render_message" in node.name + or "Camera" in node.name + ): + subgraph_groups["Scroll"].append(node_entry) + elif "Effect" in node.name or "effect" in node.module: + subgraph_groups["Effects"].append(node_entry) + elif "Display:" in node.name: + subgraph_groups["Display"].append(node_entry) + elif "Ntfy" in node.name or "Mic" in node.name: + subgraph_groups["Async"].append(node_entry) + elif "Animation" in node.name or "Preset" in node.name: + subgraph_groups["Animation"].append(node_entry) + elif "pipeline_viz" in node.module or "CameraLarge" in node.name: + subgraph_groups["Viz"].append(node_entry) + else: + other_nodes.append(node_entry) + + for group_name, nodes in subgraph_groups.items(): + if nodes: + lines.append(f" subgraph {group_name}") + for node in nodes: + lines.append(node) + lines.append(" end") + + for node in other_nodes: + lines.append(node) lines.append("") for node in self.nodes: - node_id = node.name.replace("-", "_").replace(" ", "_") + node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_") if node.inputs: for inp in node.inputs: - inp_id = inp.replace("-", "_").replace(" ", "_") + inp_id = inp.replace("-", "_").replace(" ", "_").replace(":", "_") lines.append(f" {inp_id} --> {node_id}") lines.append("```") @@ -85,7 +157,8 @@ class PipelineIntrospector: lines.append(" Vertical --> Horizontal: set_mode()") lines.append(" Horizontal --> Omni: set_mode()") lines.append(" Omni --> Floating: set_mode()") - lines.append(" Floating --> Vertical: set_mode()") + lines.append(" Floating --> Trace: set_mode()") + lines.append(" Trace --> Vertical: set_mode()") lines.append(" state Vertical {") lines.append(" [*] --> ScrollUp") @@ -107,6 +180,11 @@ class PipelineIntrospector: lines.append(" Bobbing --> Bobbing: sin(time)") lines.append(" }") + lines.append(" state Trace {") + lines.append(" [*] --> FollowPath") + lines.append(" FollowPath --> FollowPath: node by node") + lines.append(" }") + lines.append("```") return "\n".join(lines) @@ -144,6 +222,71 @@ class PipelineIntrospector: ) ) + def introspect_sources_v2(self) -> None: + """Introspect data sources v2 (new abstraction).""" + from engine.sources_v2 import SourceRegistry, init_default_sources + + init_default_sources() + SourceRegistry() + + self.add_node( + PipelineNode( + name="SourceRegistry", + module="engine.sources_v2", + class_name="SourceRegistry", + description="Source discovery and management", + ) + ) + + for name, desc in [ + ("HeadlinesDataSource", "RSS feed headlines"), + ("PoetryDataSource", "Poetry DB"), + ("PipelineDataSource", "Pipeline viz (dynamic)"), + ]: + self.add_node( + PipelineNode( + name=f"DataSource: {name}", + module="engine.sources_v2", + class_name=name, + description=f"{desc}", + ) + ) + + def introspect_prepare(self) -> None: + """Introspect prepare layer (transformation).""" + self.add_node( + PipelineNode( + name="make_block", + module="engine.render", + func_name="make_block", + description="Transform headline into display block", + inputs=["title", "source", "timestamp", "width"], + outputs=["block"], + ) + ) + + self.add_node( + PipelineNode( + name="strip_tags", + module="engine.filter", + func_name="strip_tags", + description="Remove HTML tags from content", + inputs=["html"], + outputs=["plain_text"], + ) + ) + + self.add_node( + PipelineNode( + name="translate_headline", + module="engine.translate", + func_name="translate_headline", + description="Translate headline to target language", + inputs=["title", "target_lang"], + outputs=["translated_title"], + ) + ) + def introspect_fetch(self) -> None: """Introspect fetch layer.""" self.add_node( @@ -190,6 +333,121 @@ class PipelineIntrospector: ) ) + self.add_node( + PipelineNode( + name="render_message_overlay", + module="engine.layers", + func_name="render_message_overlay", + description="Render ntfy message overlay", + inputs=["msg", "width", "height"], + outputs=["overlay", "cache"], + ) + ) + + def introspect_render(self) -> None: + """Introspect render layer.""" + self.add_node( + PipelineNode( + name="big_wrap", + module="engine.render", + func_name="big_wrap", + description="Word-wrap text to width", + inputs=["text", "width"], + outputs=["lines"], + ) + ) + + self.add_node( + PipelineNode( + name="lr_gradient", + module="engine.render", + func_name="lr_gradient", + description="Apply left-right gradient to lines", + inputs=["lines", "position"], + outputs=["styled_lines"], + ) + ) + + def introspect_async_sources(self) -> None: + """Introspect async data sources (ntfy, mic).""" + self.add_node( + PipelineNode( + name="NtfyPoller", + module="engine.ntfy", + class_name="NtfyPoller", + description="Poll ntfy for messages (async)", + inputs=["topic"], + outputs=["message"], + ) + ) + + self.add_node( + PipelineNode( + name="MicMonitor", + module="engine.mic", + class_name="MicMonitor", + description="Monitor microphone input (async)", + outputs=["audio_level"], + ) + ) + + def introspect_eventbus(self) -> None: + """Introspect event bus for decoupled communication.""" + self.add_node( + PipelineNode( + name="EventBus", + module="engine.eventbus", + class_name="EventBus", + description="Thread-safe event publishing", + inputs=["event"], + outputs=["subscribers"], + ) + ) + + def introspect_animation(self) -> None: + """Introspect animation system.""" + self.add_node( + PipelineNode( + name="AnimationController", + module="engine.animation", + class_name="AnimationController", + description="Time-based parameter animation", + inputs=["dt"], + outputs=["params"], + ) + ) + + self.add_node( + PipelineNode( + name="Preset", + module="engine.animation", + class_name="Preset", + description="Package of initial params + animation", + ) + ) + + def introspect_pipeline_viz(self) -> None: + """Introspect pipeline visualization.""" + self.add_node( + PipelineNode( + name="generate_large_network_viewport", + module="engine.pipeline_viz", + func_name="generate_large_network_viewport", + description="Large animated network visualization", + inputs=["viewport_w", "viewport_h", "frame"], + outputs=["buffer"], + ) + ) + + self.add_node( + PipelineNode( + name="CameraLarge", + module="engine.pipeline_viz", + class_name="CameraLarge", + description="Large grid camera (trace mode)", + ) + ) + def introspect_camera(self) -> None: """Introspect camera system.""" self.add_node( @@ -246,11 +504,18 @@ class PipelineIntrospector: def run(self) -> str: """Run full introspection.""" self.introspect_sources() + self.introspect_sources_v2() self.introspect_fetch() + self.introspect_prepare() self.introspect_scroll() + self.introspect_render() self.introspect_camera() self.introspect_effects() self.introspect_display() + self.introspect_async_sources() + self.introspect_eventbus() + self.introspect_animation() + self.introspect_pipeline_viz() return self.generate_full_diagram() diff --git a/engine/pipeline/__init__.py b/engine/pipeline/__init__.py new file mode 100644 index 0000000..73b3f63 --- /dev/null +++ b/engine/pipeline/__init__.py @@ -0,0 +1,107 @@ +""" +Unified Pipeline Architecture. + +This module provides a clean, dependency-managed pipeline system: +- Stage: Base class for all pipeline components +- Pipeline: DAG-based execution orchestrator +- PipelineParams: Runtime configuration for animation +- PipelinePreset: Pre-configured pipeline configurations +- StageRegistry: Unified registration for all stage types + +The pipeline architecture supports: +- Sources: Data providers (headlines, poetry, pipeline viz) +- Effects: Post-processors (noise, fade, glitch, hud) +- Displays: Output backends (terminal, pygame, websocket) +- Cameras: Viewport controllers (vertical, horizontal, omni) + +Example: + from engine.pipeline import Pipeline, PipelineConfig, StageRegistry + + pipeline = Pipeline(PipelineConfig(source="headlines", display="terminal")) + pipeline.add_stage("source", StageRegistry.create("source", "headlines")) + pipeline.add_stage("display", StageRegistry.create("display", "terminal")) + pipeline.build().initialize() + + result = pipeline.execute(initial_data) +""" + +from engine.pipeline.controller import ( + Pipeline, + PipelineConfig, + PipelineRunner, + create_default_pipeline, + create_pipeline_from_params, +) +from engine.pipeline.core import ( + PipelineContext, + Stage, + StageConfig, + StageError, + StageResult, +) +from engine.pipeline.params import ( + DEFAULT_HEADLINE_PARAMS, + DEFAULT_PIPELINE_PARAMS, + DEFAULT_PYGAME_PARAMS, + PipelineParams, +) +from engine.pipeline.presets import ( + DEMO_PRESET, + FIREHOSE_PRESET, + PIPELINE_VIZ_PRESET, + POETRY_PRESET, + PRESETS, + SIXEL_PRESET, + WEBSOCKET_PRESET, + PipelinePreset, + create_preset_from_params, + get_preset, + list_presets, +) +from engine.pipeline.registry import ( + StageRegistry, + discover_stages, + register_camera, + register_display, + register_effect, + register_source, +) + +__all__ = [ + # Core + "Stage", + "StageConfig", + "StageError", + "StageResult", + "PipelineContext", + # Controller + "Pipeline", + "PipelineConfig", + "PipelineRunner", + "create_default_pipeline", + "create_pipeline_from_params", + # Params + "PipelineParams", + "DEFAULT_HEADLINE_PARAMS", + "DEFAULT_PIPELINE_PARAMS", + "DEFAULT_PYGAME_PARAMS", + # Presets + "PipelinePreset", + "PRESETS", + "DEMO_PRESET", + "POETRY_PRESET", + "PIPELINE_VIZ_PRESET", + "WEBSOCKET_PRESET", + "SIXEL_PRESET", + "FIREHOSE_PRESET", + "get_preset", + "list_presets", + "create_preset_from_params", + # Registry + "StageRegistry", + "discover_stages", + "register_source", + "register_effect", + "register_display", + "register_camera", +] diff --git a/engine/pipeline/controller.py b/engine/pipeline/controller.py new file mode 100644 index 0000000..d03aa88 --- /dev/null +++ b/engine/pipeline/controller.py @@ -0,0 +1,229 @@ +""" +Pipeline controller - DAG-based pipeline execution. + +The Pipeline class orchestrates stages in dependency order, handling +the complete render cycle from source to display. +""" + +from dataclasses import dataclass, field +from typing import Any + +from engine.pipeline.core import PipelineContext, Stage, StageError, StageResult +from engine.pipeline.params import PipelineParams +from engine.pipeline.registry import StageRegistry + + +@dataclass +class PipelineConfig: + """Configuration for a pipeline instance.""" + + source: str = "headlines" + display: str = "terminal" + camera: str = "vertical" + effects: list[str] = field(default_factory=list) + + +class Pipeline: + """Main pipeline orchestrator. + + Manages the execution of all stages in dependency order, + handling initialization, processing, and cleanup. + """ + + def __init__( + self, + config: PipelineConfig | None = None, + context: PipelineContext | None = None, + ): + self.config = config or PipelineConfig() + self.context = context or PipelineContext() + self._stages: dict[str, Stage] = {} + self._execution_order: list[str] = [] + self._initialized = False + + def add_stage(self, name: str, stage: Stage) -> "Pipeline": + """Add a stage to the pipeline.""" + self._stages[name] = stage + return self + + def remove_stage(self, name: str) -> None: + """Remove a stage from the pipeline.""" + if name in self._stages: + del self._stages[name] + + def get_stage(self, name: str) -> Stage | None: + """Get a stage by name.""" + return self._stages.get(name) + + def build(self) -> "Pipeline": + """Build execution order based on dependencies.""" + self._execution_order = self._resolve_dependencies() + self._initialized = True + return self + + def _resolve_dependencies(self) -> list[str]: + """Resolve stage execution order using topological sort.""" + ordered = [] + visited = set() + temp_mark = set() + + def visit(name: str) -> None: + if name in temp_mark: + raise StageError(name, "Circular dependency detected") + if name in visited: + return + + temp_mark.add(name) + stage = self._stages.get(name) + if stage: + for dep in stage.dependencies: + dep_stage = self._stages.get(dep) + if dep_stage: + visit(dep) + + temp_mark.remove(name) + visited.add(name) + ordered.append(name) + + for name in self._stages: + if name not in visited: + visit(name) + + return ordered + + def initialize(self) -> bool: + """Initialize all stages in execution order.""" + for name in self._execution_order: + stage = self._stages.get(name) + if stage and not stage.init(self.context) and not stage.optional: + return False + return True + + def execute(self, data: Any | None = None) -> StageResult: + """Execute the pipeline with the given input data.""" + if not self._initialized: + self.build() + + if not self._initialized: + return StageResult( + success=False, + data=None, + error="Pipeline not initialized", + ) + + current_data = data + + for name in self._execution_order: + stage = self._stages.get(name) + if not stage or not stage.is_enabled(): + continue + + try: + current_data = stage.process(current_data, self.context) + except Exception as e: + if not stage.optional: + return StageResult( + success=False, + data=current_data, + error=str(e), + stage_name=name, + ) + # Skip optional stage on error + continue + + return StageResult(success=True, data=current_data) + + def cleanup(self) -> None: + """Clean up all stages in reverse order.""" + for name in reversed(self._execution_order): + stage = self._stages.get(name) + if stage: + try: + stage.cleanup() + except Exception: + pass + self._stages.clear() + self._initialized = False + + @property + def stages(self) -> dict[str, Stage]: + """Get all stages.""" + return self._stages.copy() + + @property + def execution_order(self) -> list[str]: + """Get execution order.""" + return self._execution_order.copy() + + def get_stage_names(self) -> list[str]: + """Get list of stage names.""" + return list(self._stages.keys()) + + +class PipelineRunner: + """High-level pipeline runner with animation support.""" + + def __init__( + self, + pipeline: Pipeline, + params: PipelineParams | None = None, + ): + self.pipeline = pipeline + self.params = params or PipelineParams() + self._running = False + + def start(self) -> bool: + """Start the pipeline.""" + self._running = True + return self.pipeline.initialize() + + def step(self, input_data: Any | None = None) -> Any: + """Execute one pipeline step.""" + self.params.frame_number += 1 + self.context.params = self.params + result = self.pipeline.execute(input_data) + return result.data if result.success else None + + def stop(self) -> None: + """Stop and clean up the pipeline.""" + self._running = False + self.pipeline.cleanup() + + @property + def is_running(self) -> bool: + """Check if runner is active.""" + return self._running + + +def create_pipeline_from_params(params: PipelineParams) -> Pipeline: + """Create a pipeline from PipelineParams.""" + config = PipelineConfig( + source=params.source, + display=params.display, + camera=params.camera_mode, + effects=params.effect_order, + ) + return Pipeline(config=config) + + +def create_default_pipeline() -> Pipeline: + """Create a default pipeline with all standard components.""" + pipeline = Pipeline() + + # Add source stage + source = StageRegistry.create("source", "headlines") + if source: + pipeline.add_stage("source", source) + + # Add effect stages + for effect_name in ["noise", "fade", "glitch", "firehose", "hud"]: + effect = StageRegistry.create("effect", effect_name) + if effect: + pipeline.add_stage(f"effect_{effect_name}", effect) + + # Add display stage + display = StageRegistry.create("display", "terminal") + if display: + pipeline.add_stage("display", display) + + return pipeline.build() diff --git a/engine/pipeline/core.py b/engine/pipeline/core.py new file mode 100644 index 0000000..20eab3b --- /dev/null +++ b/engine/pipeline/core.py @@ -0,0 +1,221 @@ +""" +Pipeline core - Unified Stage abstraction and PipelineContext. + +This module provides the foundation for a clean, dependency-managed pipeline: +- Stage: Base class for all pipeline components (sources, effects, displays, cameras) +- PipelineContext: Dependency injection context for runtime data exchange +- Capability system: Explicit capability declarations with duck-typing support +""" + +from abc import ABC, abstractmethod +from collections.abc import Callable +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, Any + +if TYPE_CHECKING: + from engine.pipeline.params import PipelineParams + + +@dataclass +class StageConfig: + """Configuration for a single stage.""" + + name: str + category: str + enabled: bool = True + optional: bool = False + params: dict[str, Any] = field(default_factory=dict) + + +class Stage(ABC): + """Abstract base class for all pipeline stages. + + A Stage is a single component in the rendering pipeline. Stages can be: + - Sources: Data providers (headlines, poetry, pipeline viz) + - Effects: Post-processors (noise, fade, glitch, hud) + - Displays: Output backends (terminal, pygame, websocket) + - Cameras: Viewport controllers (vertical, horizontal, omni) + + Stages declare: + - capabilities: What they provide to other stages + - dependencies: What they need from other stages + + Duck-typing is supported: any class with the required methods can act as a Stage. + """ + + name: str + category: str # "source", "effect", "display", "camera" + optional: bool = False # If True, pipeline continues even if stage fails + + @property + def capabilities(self) -> set[str]: + """Return set of capabilities this stage provides. + + Examples: + - "source.headlines" + - "effect.noise" + - "display.output" + - "camera" + """ + return {f"{self.category}.{self.name}"} + + @property + def dependencies(self) -> set[str]: + """Return set of capability names this stage needs. + + Examples: + - {"display.output"} + - {"source.headlines"} + - {"camera"} + """ + return set() + + def init(self, ctx: "PipelineContext") -> bool: + """Initialize stage with pipeline context. + + Args: + ctx: PipelineContext for accessing services + + Returns: + True if initialization succeeded, False otherwise + """ + return True + + @abstractmethod + def process(self, data: Any, ctx: "PipelineContext") -> Any: + """Process input data and return output. + + Args: + data: Input data from previous stage (or initial data for first stage) + ctx: PipelineContext for accessing services and state + + Returns: + Processed data for next stage + """ + ... + + def cleanup(self) -> None: # noqa: B027 + """Clean up resources when pipeline shuts down.""" + pass + + def get_config(self) -> StageConfig: + """Return current configuration of this stage.""" + return StageConfig( + name=self.name, + category=self.category, + optional=self.optional, + ) + + def set_enabled(self, enabled: bool) -> None: + """Enable or disable this stage.""" + self._enabled = enabled # type: ignore[attr-defined] + + def is_enabled(self) -> bool: + """Check if stage is enabled.""" + return getattr(self, "_enabled", True) + + +@dataclass +class StageResult: + """Result of stage processing, including success/failure info.""" + + success: bool + data: Any + error: str | None = None + stage_name: str = "" + + +class PipelineContext: + """Dependency injection context passed through the pipeline. + + Provides: + - services: Named services (display, config, event_bus, etc.) + - state: Runtime state shared between stages + - params: PipelineParams for animation-driven config + + Services can be injected at construction time or lazily resolved. + """ + + def __init__( + self, + services: dict[str, Any] | None = None, + initial_state: dict[str, Any] | None = None, + ): + self.services: dict[str, Any] = services or {} + self.state: dict[str, Any] = initial_state or {} + self._params: PipelineParams | None = None + + # Lazy resolvers for common services + self._lazy_resolvers: dict[str, Callable[[], Any]] = { + "config": self._resolve_config, + "event_bus": self._resolve_event_bus, + } + + def _resolve_config(self) -> Any: + from engine.config import get_config + + return get_config() + + def _resolve_event_bus(self) -> Any: + from engine.eventbus import get_event_bus + + return get_event_bus() + + def get(self, key: str, default: Any = None) -> Any: + """Get a service or state value by key. + + First checks services, then state, then lazy resolution. + """ + if key in self.services: + return self.services[key] + if key in self.state: + return self.state[key] + if key in self._lazy_resolvers: + try: + return self._lazy_resolvers[key]() + except Exception: + return default + return default + + def set(self, key: str, value: Any) -> None: + """Set a service or state value.""" + self.services[key] = value + + def set_state(self, key: str, value: Any) -> None: + """Set a runtime state value.""" + self.state[key] = value + + def get_state(self, key: str, default: Any = None) -> Any: + """Get a runtime state value.""" + return self.state.get(key, default) + + @property + def params(self) -> "PipelineParams | None": + """Get current pipeline params (for animation).""" + return self._params + + @params.setter + def params(self, value: "PipelineParams") -> None: + """Set pipeline params (from animation controller).""" + self._params = value + + def has_capability(self, capability: str) -> bool: + """Check if a capability is available.""" + return capability in self.services or capability in self._lazy_resolvers + + +class StageError(Exception): + """Raised when a stage fails to process.""" + + def __init__(self, stage_name: str, message: str, is_optional: bool = False): + self.stage_name = stage_name + self.message = message + self.is_optional = is_optional + super().__init__(f"Stage '{stage_name}' failed: {message}") + + +def create_stage_error( + stage_name: str, error: Exception, is_optional: bool = False +) -> StageError: + """Helper to create a StageError from an exception.""" + return StageError(stage_name, str(error), is_optional) diff --git a/engine/pipeline/params.py b/engine/pipeline/params.py new file mode 100644 index 0000000..2c7468c --- /dev/null +++ b/engine/pipeline/params.py @@ -0,0 +1,144 @@ +""" +Pipeline parameters - Runtime configuration layer for animation control. + +PipelineParams is the target for AnimationController - animation events +modify these params, which the pipeline then applies to its stages. +""" + +from dataclasses import dataclass, field +from typing import Any + + +@dataclass +class PipelineParams: + """Runtime configuration for the pipeline. + + This is the canonical config object that AnimationController modifies. + Stages read from these params to adjust their behavior. + """ + + # Source config + source: str = "headlines" + source_refresh_interval: float = 60.0 + + # Display config + display: str = "terminal" + + # Camera config + camera_mode: str = "vertical" + camera_speed: float = 1.0 + camera_x: int = 0 # For horizontal scrolling + + # Effect config + effect_order: list[str] = field( + default_factory=lambda: ["noise", "fade", "glitch", "firehose", "hud"] + ) + effect_enabled: dict[str, bool] = field(default_factory=dict) + effect_intensity: dict[str, float] = field(default_factory=dict) + + # Animation-driven state (set by AnimationController) + pulse: float = 0.0 + current_effect: str | None = None + path_progress: float = 0.0 + + # Viewport + viewport_width: int = 80 + viewport_height: int = 24 + + # Firehose + firehose_enabled: bool = False + + # Runtime state + frame_number: int = 0 + fps: float = 60.0 + + def get_effect_config(self, name: str) -> tuple[bool, float]: + """Get (enabled, intensity) for an effect.""" + enabled = self.effect_enabled.get(name, True) + intensity = self.effect_intensity.get(name, 1.0) + return enabled, intensity + + def set_effect_config(self, name: str, enabled: bool, intensity: float) -> None: + """Set effect configuration.""" + self.effect_enabled[name] = enabled + self.effect_intensity[name] = intensity + + def is_effect_enabled(self, name: str) -> bool: + """Check if an effect is enabled.""" + if name not in self.effect_enabled: + return True # Default to enabled + return self.effect_enabled.get(name, True) + + def get_effect_intensity(self, name: str) -> float: + """Get effect intensity (0.0 to 1.0).""" + return self.effect_intensity.get(name, 1.0) + + def to_dict(self) -> dict[str, Any]: + """Convert to dictionary for serialization.""" + return { + "source": self.source, + "display": self.display, + "camera_mode": self.camera_mode, + "camera_speed": self.camera_speed, + "effect_order": self.effect_order, + "effect_enabled": self.effect_enabled.copy(), + "effect_intensity": self.effect_intensity.copy(), + "pulse": self.pulse, + "current_effect": self.current_effect, + "viewport_width": self.viewport_width, + "viewport_height": self.viewport_height, + "firehose_enabled": self.firehose_enabled, + } + + @classmethod + def from_dict(cls, data: dict[str, Any]) -> "PipelineParams": + """Create from dictionary.""" + params = cls() + for key, value in data.items(): + if hasattr(params, key): + setattr(params, key, value) + return params + + def copy(self) -> "PipelineParams": + """Create a copy of this params object.""" + params = PipelineParams() + params.source = self.source + params.display = self.display + params.camera_mode = self.camera_mode + params.camera_speed = self.camera_speed + params.camera_x = self.camera_x + params.effect_order = self.effect_order.copy() + params.effect_enabled = self.effect_enabled.copy() + params.effect_intensity = self.effect_intensity.copy() + params.pulse = self.pulse + params.current_effect = self.current_effect + params.path_progress = self.path_progress + params.viewport_width = self.viewport_width + params.viewport_height = self.viewport_height + params.firehose_enabled = self.firehose_enabled + params.frame_number = self.frame_number + params.fps = self.fps + return params + + +# Default params for different modes +DEFAULT_HEADLINE_PARAMS = PipelineParams( + source="headlines", + display="terminal", + camera_mode="vertical", + effect_order=["noise", "fade", "glitch", "firehose", "hud"], +) + +DEFAULT_PYGAME_PARAMS = PipelineParams( + source="headlines", + display="pygame", + camera_mode="vertical", + effect_order=["noise", "fade", "glitch", "firehose", "hud"], +) + +DEFAULT_PIPELINE_PARAMS = PipelineParams( + source="pipeline", + display="pygame", + camera_mode="trace", + effect_order=["hud"], # Just HUD for pipeline viz +) diff --git a/engine/pipeline/presets.py b/engine/pipeline/presets.py new file mode 100644 index 0000000..6e5fb32 --- /dev/null +++ b/engine/pipeline/presets.py @@ -0,0 +1,155 @@ +""" +Pipeline presets - Pre-configured pipeline configurations. + +Provides PipelinePreset as a unified preset system that wraps +the existing Preset class from animation.py for backwards compatibility. +""" + +from dataclasses import dataclass, field + +from engine.animation import Preset as AnimationPreset +from engine.pipeline.params import PipelineParams + + +@dataclass +class PipelinePreset: + """Pre-configured pipeline with stages and animation. + + A PipelinePreset packages: + - Initial params: Starting configuration + - Stages: List of stage configurations to create + - Animation: Optional animation controller + + This is the new unified preset that works with the Pipeline class. + """ + + name: str + description: str = "" + source: str = "headlines" + display: str = "terminal" + camera: str = "vertical" + effects: list[str] = field(default_factory=list) + initial_params: PipelineParams | None = None + animation_preset: AnimationPreset | None = None + + def to_params(self) -> PipelineParams: + """Convert to PipelineParams.""" + if self.initial_params: + return self.initial_params.copy() + params = PipelineParams() + params.source = self.source + params.display = self.display + params.camera_mode = self.camera + params.effect_order = self.effects.copy() + return params + + @classmethod + def from_animation_preset(cls, preset: AnimationPreset) -> "PipelinePreset": + """Create a PipelinePreset from an existing animation Preset.""" + params = preset.initial_params + return cls( + name=preset.name, + description=preset.description, + source=params.source, + display=params.display, + camera=params.camera_mode, + effects=params.effect_order.copy(), + initial_params=params, + animation_preset=preset, + ) + + def create_animation_controller(self): + """Create an AnimationController from this preset.""" + if self.animation_preset: + return self.animation_preset.create_controller() + return None + + +# Built-in presets +DEMO_PRESET = PipelinePreset( + name="demo", + description="Demo mode with effect cycling and camera modes", + source="headlines", + display="terminal", + camera="vertical", + effects=["noise", "fade", "glitch", "firehose", "hud"], +) + +POETRY_PRESET = PipelinePreset( + name="poetry", + description="Poetry feed with subtle effects", + source="poetry", + display="terminal", + camera="vertical", + effects=["fade", "hud"], +) + +PIPELINE_VIZ_PRESET = PipelinePreset( + name="pipeline", + description="Pipeline visualization mode", + source="pipeline", + display="terminal", + camera="trace", + effects=["hud"], +) + +WEBSOCKET_PRESET = PipelinePreset( + name="websocket", + description="WebSocket display mode", + source="headlines", + display="websocket", + camera="vertical", + effects=["noise", "fade", "glitch", "hud"], +) + +SIXEL_PRESET = PipelinePreset( + name="sixel", + description="Sixel graphics display mode", + source="headlines", + display="sixel", + camera="vertical", + effects=["noise", "fade", "glitch", "hud"], +) + +FIREHOSE_PRESET = PipelinePreset( + name="firehose", + description="High-speed firehose mode", + source="headlines", + display="terminal", + camera="vertical", + effects=["noise", "fade", "glitch", "firehose", "hud"], +) + + +PRESETS: dict[str, PipelinePreset] = { + "demo": DEMO_PRESET, + "poetry": POETRY_PRESET, + "pipeline": PIPELINE_VIZ_PRESET, + "websocket": WEBSOCKET_PRESET, + "sixel": SIXEL_PRESET, + "firehose": FIREHOSE_PRESET, +} + + +def get_preset(name: str) -> PipelinePreset | None: + """Get a preset by name.""" + return PRESETS.get(name) + + +def list_presets() -> list[str]: + """List all available preset names.""" + return list(PRESETS.keys()) + + +def create_preset_from_params( + params: PipelineParams, name: str = "custom" +) -> PipelinePreset: + """Create a preset from PipelineParams.""" + return PipelinePreset( + name=name, + source=params.source, + display=params.display, + camera=params.camera_mode, + effects=params.effect_order.copy(), + initial_params=params, + ) diff --git a/engine/pipeline/registry.py b/engine/pipeline/registry.py new file mode 100644 index 0000000..4e0f969 --- /dev/null +++ b/engine/pipeline/registry.py @@ -0,0 +1,127 @@ +""" +Stage registry - Unified registration for all pipeline stages. + +Provides a single registry for sources, effects, displays, and cameras. +""" + +from __future__ import annotations + +from engine.pipeline.core import Stage + + +class StageRegistry: + """Unified registry for all pipeline stage types.""" + + _categories: dict[str, dict[str, type[Stage]]] = {} + _discovered: bool = False + _instances: dict[str, Stage] = {} + + @classmethod + def register(cls, category: str, stage_class: type[Stage]) -> None: + """Register a stage class in a category. + + Args: + category: Category name (source, effect, display, camera) + stage_class: Stage subclass to register + """ + if category not in cls._categories: + cls._categories[category] = {} + + # Use class name as key + key = stage_class.__name__ + cls._categories[category][key] = stage_class + + @classmethod + def get(cls, category: str, name: str) -> type[Stage] | None: + """Get a stage class by category and name.""" + return cls._categories.get(category, {}).get(name) + + @classmethod + def list(cls, category: str) -> list[str]: + """List all stage names in a category.""" + return list(cls._categories.get(category, {}).keys()) + + @classmethod + def list_categories(cls) -> list[str]: + """List all registered categories.""" + return list(cls._categories.keys()) + + @classmethod + def create(cls, category: str, name: str, **kwargs) -> Stage | None: + """Create a stage instance by category and name.""" + stage_class = cls.get(category, name) + if stage_class: + return stage_class(**kwargs) + return None + + @classmethod + def create_instance(cls, stage: Stage | type[Stage], **kwargs) -> Stage: + """Create an instance from a stage class or return as-is.""" + if isinstance(stage, Stage): + return stage + if isinstance(stage, type) and issubclass(stage, Stage): + return stage(**kwargs) + raise TypeError(f"Expected Stage class or instance, got {type(stage)}") + + @classmethod + def register_instance(cls, name: str, stage: Stage) -> None: + """Register a stage instance by name.""" + cls._instances[name] = stage + + @classmethod + def get_instance(cls, name: str) -> Stage | None: + """Get a registered stage instance by name.""" + return cls._instances.get(name) + + +def discover_stages() -> None: + """Auto-discover and register all stage implementations.""" + if StageRegistry._discovered: + return + + # Import and register all stage implementations + try: + from engine.sources_v2 import ( + HeadlinesDataSource, + PipelineDataSource, + PoetryDataSource, + ) + + StageRegistry.register("source", HeadlinesDataSource) + StageRegistry.register("source", PoetryDataSource) + StageRegistry.register("source", PipelineDataSource) + except ImportError: + pass + + try: + from engine.effects.types import EffectPlugin # noqa: F401 + except ImportError: + pass + + try: + from engine.display import Display # noqa: F401 + except ImportError: + pass + + StageRegistry._discovered = True + + +# Convenience functions +def register_source(stage_class: type[Stage]) -> None: + """Register a source stage.""" + StageRegistry.register("source", stage_class) + + +def register_effect(stage_class: type[Stage]) -> None: + """Register an effect stage.""" + StageRegistry.register("effect", stage_class) + + +def register_display(stage_class: type[Stage]) -> None: + """Register a display stage.""" + StageRegistry.register("display", stage_class) + + +def register_camera(stage_class: type[Stage]) -> None: + """Register a camera stage.""" + StageRegistry.register("camera", stage_class) diff --git a/engine/pipeline_viz.py b/engine/pipeline_viz.py index 8ffd1db..d55c7ab 100644 --- a/engine/pipeline_viz.py +++ b/engine/pipeline_viz.py @@ -1,15 +1,212 @@ """ -Pipeline visualization - Uses beautiful-mermaid to render the pipeline as ASCII network. +Pipeline visualization - Large animated network visualization with camera modes. """ +import math + +NODE_NETWORK = { + "sources": [ + {"id": "RSS", "label": "RSS FEEDS", "x": 20, "y": 20}, + {"id": "POETRY", "label": "POETRY DB", "x": 100, "y": 20}, + {"id": "NTFY", "label": "NTFY MSG", "x": 180, "y": 20}, + {"id": "MIC", "label": "MICROPHONE", "x": 260, "y": 20}, + ], + "fetch": [ + {"id": "FETCH", "label": "FETCH LAYER", "x": 140, "y": 100}, + {"id": "CACHE", "label": "CACHE", "x": 220, "y": 100}, + ], + "scroll": [ + {"id": "STREAM", "label": "STREAM CTRL", "x": 60, "y": 180}, + {"id": "CAMERA", "label": "CAMERA", "x": 140, "y": 180}, + {"id": "RENDER", "label": "RENDER", "x": 220, "y": 180}, + ], + "effects": [ + {"id": "NOISE", "label": "NOISE", "x": 20, "y": 260}, + {"id": "FADE", "label": "FADE", "x": 80, "y": 260}, + {"id": "GLITCH", "label": "GLITCH", "x": 140, "y": 260}, + {"id": "FIRE", "label": "FIREHOSE", "x": 200, "y": 260}, + {"id": "HUD", "label": "HUD", "x": 260, "y": 260}, + ], + "display": [ + {"id": "TERM", "label": "TERMINAL", "x": 20, "y": 340}, + {"id": "WEB", "label": "WEBSOCKET", "x": 80, "y": 340}, + {"id": "PYGAME", "label": "PYGAME", "x": 140, "y": 340}, + {"id": "SIXEL", "label": "SIXEL", "x": 200, "y": 340}, + {"id": "KITTY", "label": "KITTY", "x": 260, "y": 340}, + ], +} + +ALL_NODES = [] +for group_nodes in NODE_NETWORK.values(): + ALL_NODES.extend(group_nodes) + +NETWORK_PATHS = [ + ["RSS", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "NOISE", "TERM"], + ["POETRY", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "FADE", "WEB"], + ["NTFY", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "GLITCH", "PYGAME"], + ["MIC", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "FIRE", "SIXEL"], + ["RSS", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "HUD", "KITTY"], +] + +GRID_WIDTH = 300 +GRID_HEIGHT = 400 + + +def get_node_by_id(node_id: str): + for node in ALL_NODES: + if node["id"] == node_id: + return node + return None + + +def draw_network_to_grid(frame: int = 0) -> list[list[str]]: + grid = [[" " for _ in range(GRID_WIDTH)] for _ in range(GRID_HEIGHT)] + + active_path_idx = (frame // 60) % len(NETWORK_PATHS) + active_path = NETWORK_PATHS[active_path_idx] + + for node in ALL_NODES: + x, y = node["x"], node["y"] + label = node["label"] + is_active = node["id"] in active_path + is_highlight = node["id"] == active_path[(frame // 15) % len(active_path)] + + node_w, node_h = 20, 7 + + for dy in range(node_h): + for dx in range(node_w): + gx, gy = x + dx, y + dy + if 0 <= gx < GRID_WIDTH and 0 <= gy < GRID_HEIGHT: + if dy == 0: + char = "┌" if dx == 0 else ("┐" if dx == node_w - 1 else "─") + elif dy == node_h - 1: + char = "└" if dx == 0 else ("┘" if dx == node_w - 1 else "─") + elif dy == node_h // 2: + if dx == 0 or dx == node_w - 1: + char = "│" + else: + pad = (node_w - 2 - len(label)) // 2 + if dx - 1 == pad and len(label) <= node_w - 2: + char = ( + label[dx - 1 - pad] + if dx - 1 - pad < len(label) + else " " + ) + else: + char = " " + else: + char = "│" if dx == 0 or dx == node_w - 1 else " " + + if char.strip(): + if is_highlight: + grid[gy][gx] = "\033[1;38;5;46m" + char + "\033[0m" + elif is_active: + grid[gy][gx] = "\033[1;38;5;220m" + char + "\033[0m" + else: + grid[gy][gx] = "\033[38;5;240m" + char + "\033[0m" + + for i, node_id in enumerate(active_path[:-1]): + curr = get_node_by_id(node_id) + next_id = active_path[i + 1] + next_node = get_node_by_id(next_id) + if curr and next_node: + x1, y1 = curr["x"] + 7, curr["y"] + 2 + x2, y2 = next_node["x"] + 7, next_node["y"] + 2 + + step = 1 if x2 >= x1 else -1 + for x in range(x1, x2 + step, step): + if 0 <= x < GRID_WIDTH and 0 <= y1 < GRID_HEIGHT: + grid[y1][x] = "\033[38;5;45m─\033[0m" + + step = 1 if y2 >= y1 else -1 + for y in range(y1, y2 + step, step): + if 0 <= x2 < GRID_WIDTH and 0 <= y < GRID_HEIGHT: + grid[y][x2] = "\033[38;5;45m│\033[0m" + + return grid + + +class TraceCamera: + def __init__(self): + self.x = 0 + self.y = 0 + self.target_x = 0 + self.target_y = 0 + self.current_node_idx = 0 + self.path = [] + self.frame = 0 + + def update(self, dt: float, frame: int = 0) -> None: + self.frame = frame + active_path = NETWORK_PATHS[(frame // 60) % len(NETWORK_PATHS)] + + if self.path != active_path: + self.path = active_path + self.current_node_idx = 0 + + if self.current_node_idx < len(self.path): + node_id = self.path[self.current_node_idx] + node = get_node_by_id(node_id) + if node: + self.target_x = max(0, node["x"] - 40) + self.target_y = max(0, node["y"] - 10) + + self.current_node_idx += 1 + + self.x += int((self.target_x - self.x) * 0.1) + self.y += int((self.target_y - self.y) * 0.1) + + +class CameraLarge: + def __init__(self, viewport_w: int, viewport_h: int, frame: int): + self.viewport_w = viewport_w + self.viewport_h = viewport_h + self.frame = frame + self.x = 0 + self.y = 0 + self.mode = "trace" + self.trace_camera = TraceCamera() + + def set_vertical_mode(self): + self.mode = "vertical" + + def set_horizontal_mode(self): + self.mode = "horizontal" + + def set_omni_mode(self): + self.mode = "omni" + + def set_floating_mode(self): + self.mode = "floating" + + def set_trace_mode(self): + self.mode = "trace" + + def update(self, dt: float): + self.frame += 1 + + if self.mode == "vertical": + self.y = int((self.frame * 0.5) % (GRID_HEIGHT - self.viewport_h)) + elif self.mode == "horizontal": + self.x = int((self.frame * 0.5) % (GRID_WIDTH - self.viewport_w)) + elif self.mode == "omni": + self.x = int((self.frame * 0.3) % (GRID_WIDTH - self.viewport_w)) + self.y = int((self.frame * 0.5) % (GRID_HEIGHT - self.viewport_h)) + elif self.mode == "floating": + self.x = int(50 + math.sin(self.frame * 0.02) * 30) + self.y = int(50 + math.cos(self.frame * 0.015) * 30) + elif self.mode == "trace": + self.trace_camera.update(dt, self.frame) + self.x = self.trace_camera.x + self.y = self.trace_camera.y + def generate_mermaid_graph(frame: int = 0) -> str: - """Generate Mermaid flowchart for the pipeline.""" effects = ["NOISE", "FADE", "GLITCH", "FIREHOSE"] - active_effect = effects[(frame // 10) % 4] + active_effect = effects[(frame // 30) % 4] - cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING"] - active_cam = cam_modes[(frame // 40) % 4] + cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"] + active_cam = cam_modes[(frame // 100) % 5] return f"""graph LR subgraph SOURCES @@ -46,6 +243,7 @@ def generate_mermaid_graph(frame: int = 0) -> str: RSS --> Fetch Poetry --> Fetch + Ntfy --> Fetch Fetch --> Cache Cache --> Scroll Scroll --> Noise @@ -55,28 +253,9 @@ def generate_mermaid_graph(frame: int = 0) -> str: Scroll --> Hud Noise --> Term - Fade --> Term - Glitch --> Term - Fire --> Term - Hud --> Term - - Noise --> Web Fade --> Web - Glitch --> Web - Fire --> Web - Hud --> Web - - Noise --> Pygame - Fade --> Pygame Glitch --> Pygame - Fire --> Pygame - Hud --> Pygame - - Noise --> Sixel - Fade --> Sixel - Glitch --> Sixel Fire --> Sixel - Hud --> Sixel style {active_effect} fill:#90EE90 style Camera fill:#87CEEB @@ -86,12 +265,11 @@ def generate_mermaid_graph(frame: int = 0) -> str: def generate_network_pipeline( width: int = 80, height: int = 24, frame: int = 0 ) -> list[str]: - """Generate dimensional ASCII network visualization using beautiful-mermaid.""" try: from engine.beautiful_mermaid import render_mermaid_ascii mermaid_graph = generate_mermaid_graph(frame) - ascii_output = render_mermaid_ascii(mermaid_graph, padding_x=3, padding_y=2) + ascii_output = render_mermaid_ascii(mermaid_graph, padding_x=2, padding_y=1) lines = ascii_output.split("\n") @@ -110,14 +288,14 @@ def generate_network_pipeline( status_y = height - 2 if status_y < height: fps = 60 - (frame % 15) - frame_time = 16.6 + (frame % 5) * 0.1 - cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING"] - cam = cam_modes[(frame // 40) % 4] + + cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"] + cam = cam_modes[(frame // 100) % 5] effects = ["NOISE", "FADE", "GLITCH", "FIREHOSE"] - eff = effects[(frame // 10) % 4] + eff = effects[(frame // 30) % 4] anim = "▓▒░ "[frame % 4] - status = f" FPS:{fps:3.0f} │ Frame:{frame_time:4.1f}ms │ {anim} {eff} │ Camera:{cam}" + status = f" FPS:{fps:3.0f} │ {anim} {eff} │ Cam:{cam}" status = status[: width - 4].ljust(width - 4) result[status_y] = "║ " + status + " ║" @@ -131,3 +309,56 @@ def generate_network_pipeline( return [ f"Error: {e}" + " " * (width - len(f"Error: {e}")) for _ in range(height) ] + + +def generate_large_network_viewport( + viewport_w: int = 80, viewport_h: int = 24, frame: int = 0 +) -> list[str]: + cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"] + camera_mode = cam_modes[(frame // 100) % 5] + + camera = CameraLarge(viewport_w, viewport_h, frame) + + if camera_mode == "TRACE": + camera.set_trace_mode() + elif camera_mode == "VERTICAL": + camera.set_vertical_mode() + elif camera_mode == "HORIZONTAL": + camera.set_horizontal_mode() + elif camera_mode == "OMNI": + camera.set_omni_mode() + elif camera_mode == "FLOATING": + camera.set_floating_mode() + + camera.update(1 / 60) + + grid = draw_network_to_grid(frame) + + result = [] + for vy in range(viewport_h): + line = "" + for vx in range(viewport_w): + gx = camera.x + vx + gy = camera.y + vy + if 0 <= gx < GRID_WIDTH and 0 <= gy < GRID_HEIGHT: + line += grid[gy][gx] + else: + line += " " + result.append(line) + + fps = 60 - (frame % 15) + + active_path = NETWORK_PATHS[(frame // 60) % len(NETWORK_PATHS)] + active_node = active_path[(frame // 15) % len(active_path)] + + anim = "▓▒░ "[frame % 4] + status = f" FPS:{fps:3.0f} │ {anim} {camera_mode:9s} │ Node:{active_node}" + status = status[: viewport_w - 4].ljust(viewport_w - 4) + if viewport_h > 2: + result[viewport_h - 2] = "║ " + status + " ║" + + if viewport_h > 0: + result[0] = "═" * viewport_w + result[viewport_h - 1] = "═" * viewport_w + + return result diff --git a/engine/sources_v2.py b/engine/sources_v2.py new file mode 100644 index 0000000..9a3aa67 --- /dev/null +++ b/engine/sources_v2.py @@ -0,0 +1,214 @@ +""" +Data source abstraction - Treat data sources as first-class citizens in the pipeline. + +Each data source implements a common interface: +- name: Display name for the source +- fetch(): Fetch fresh data +- stream(): Stream data continuously (optional) +- get_items(): Get current items +""" + +from abc import ABC, abstractmethod +from dataclasses import dataclass +from typing import Any + + +@dataclass +class SourceItem: + """A single item from a data source.""" + + content: str + source: str + timestamp: str + metadata: dict[str, Any] | None = None + + +class DataSource(ABC): + """Abstract base class for data sources. + + Static sources: Data fetched once and cached. Safe to call fetch() multiple times. + Dynamic sources: Data changes over time. fetch() should be idempotent. + """ + + @property + @abstractmethod + def name(self) -> str: + """Display name for this source.""" + ... + + @property + def is_dynamic(self) -> bool: + """Whether this source updates dynamically while the app runs. Default False.""" + return False + + @abstractmethod + def fetch(self) -> list[SourceItem]: + """Fetch fresh data from the source. Must be idempotent.""" + ... + + def get_items(self) -> list[SourceItem]: + """Get current items. Default implementation returns cached fetch results.""" + if not hasattr(self, "_items") or self._items is None: + self._items = self.fetch() + return self._items + + def refresh(self) -> list[SourceItem]: + """Force refresh - clear cache and fetch fresh data.""" + self._items = self.fetch() + return self._items + + def stream(self): + """Optional: Yield items continuously. Override for streaming sources.""" + raise NotImplementedError + + def __post_init__(self): + self._items: list[SourceItem] | None = None + + +class HeadlinesDataSource(DataSource): + """Data source for RSS feed headlines.""" + + @property + def name(self) -> str: + return "headlines" + + def fetch(self) -> list[SourceItem]: + from engine.fetch import fetch_all + + items, _, _ = fetch_all() + return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items] + + +class PoetryDataSource(DataSource): + """Data source for Poetry DB.""" + + @property + def name(self) -> str: + return "poetry" + + def fetch(self) -> list[SourceItem]: + from engine.fetch import fetch_poetry + + items, _, _ = fetch_poetry() + return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items] + + +class PipelineDataSource(DataSource): + """Data source for pipeline visualization (demo mode). Dynamic - updates every frame.""" + + def __init__(self, viewport_width: int = 80, viewport_height: int = 24): + self.viewport_width = viewport_width + self.viewport_height = viewport_height + self.frame = 0 + + @property + def name(self) -> str: + return "pipeline" + + @property + def is_dynamic(self) -> bool: + return True + + def fetch(self) -> list[SourceItem]: + from engine.pipeline_viz import generate_large_network_viewport + + buffer = generate_large_network_viewport( + self.viewport_width, self.viewport_height, self.frame + ) + self.frame += 1 + content = "\n".join(buffer) + return [ + SourceItem(content=content, source="pipeline", timestamp=f"f{self.frame}") + ] + + def get_items(self) -> list[SourceItem]: + return self.fetch() + + +class CachedDataSource(DataSource): + """Data source that wraps another source with caching.""" + + def __init__(self, source: DataSource, max_items: int = 100): + self.source = source + self.max_items = max_items + + @property + def name(self) -> str: + return f"cached:{self.source.name}" + + def fetch(self) -> list[SourceItem]: + items = self.source.fetch() + return items[: self.max_items] + + def get_items(self) -> list[SourceItem]: + if not hasattr(self, "_items") or self._items is None: + self._items = self.fetch() + return self._items + + +class CompositeDataSource(DataSource): + """Data source that combines multiple sources.""" + + def __init__(self, sources: list[DataSource]): + self.sources = sources + + @property + def name(self) -> str: + return "composite" + + def fetch(self) -> list[SourceItem]: + items = [] + for source in self.sources: + items.extend(source.fetch()) + return items + + +class SourceRegistry: + """Registry for data sources.""" + + def __init__(self): + self._sources: dict[str, DataSource] = {} + self._default: str | None = None + + def register(self, source: DataSource, default: bool = False) -> None: + self._sources[source.name] = source + if default or self._default is None: + self._default = source.name + + def get(self, name: str) -> DataSource | None: + return self._sources.get(name) + + def list_all(self) -> dict[str, DataSource]: + return dict(self._sources) + + def default(self) -> DataSource | None: + if self._default: + return self._sources.get(self._default) + return None + + def create_headlines(self) -> HeadlinesDataSource: + return HeadlinesDataSource() + + def create_poetry(self) -> PoetryDataSource: + return PoetryDataSource() + + def create_pipeline(self, width: int = 80, height: int = 24) -> PipelineDataSource: + return PipelineDataSource(width, height) + + +_global_registry: SourceRegistry | None = None + + +def get_source_registry() -> SourceRegistry: + global _global_registry + if _global_registry is None: + _global_registry = SourceRegistry() + return _global_registry + + +def init_default_sources() -> SourceRegistry: + """Initialize the default source registry with standard sources.""" + registry = get_source_registry() + registry.register(HeadlinesDataSource(), default=True) + registry.register(PoetryDataSource()) + return registry diff --git a/mise.toml b/mise.toml index 449b13b..8c05609 100644 --- a/mise.toml +++ b/mise.toml @@ -42,6 +42,13 @@ run-demo = { run = "uv run mainline.py --demo --display pygame", depends = ["syn run-pipeline = "uv run mainline.py --pipeline-diagram" run-pipeline-demo = { run = "uv run mainline.py --pipeline-demo --display pygame", depends = ["sync-all"] } +# ===================== +# Presets (Animation-controlled modes) +# ===================== + +run-preset-demo = { run = "uv run mainline.py --preset demo --display pygame", depends = ["sync-all"] } +run-preset-pipeline = { run = "uv run mainline.py --preset pipeline --display pygame", depends = ["sync-all"] } + # ===================== # Command & Control # ===================== diff --git a/pyproject.toml b/pyproject.toml index f3f5f6f..e922913 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -76,6 +76,8 @@ addopts = [ markers = [ "benchmark: marks tests as performance benchmarks (may be slow)", "e2e: marks tests as end-to-end tests (require network/display)", + "integration: marks tests as integration tests (require external services)", + "ntfy: marks tests that require ntfy service", ] filterwarnings = [ "ignore::DeprecationWarning", diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..b664a7e --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,36 @@ +""" +Pytest configuration for mainline. +""" + +import pytest + + +def pytest_configure(config): + """Configure pytest to skip integration tests by default.""" + config.addinivalue_line( + "markers", + "integration: marks tests as integration tests (require external services)", + ) + config.addinivalue_line("markers", "ntfy: marks tests that require ntfy service") + + +def pytest_collection_modifyitems(config, items): + """Skip integration/e2e tests unless explicitly requested with -m.""" + # Get the current marker expression + marker_expr = config.getoption("-m", default="") + + # If explicitly running integration or e2e, don't skip them + if marker_expr in ("integration", "e2e", "integration or e2e"): + return + + # Skip integration tests + skip_integration = pytest.mark.skip(reason="need -m integration to run") + for item in items: + if "integration" in item.keywords: + item.add_marker(skip_integration) + + # Skip e2e tests by default (they require browser/display) + skip_e2e = pytest.mark.skip(reason="need -m e2e to run") + for item in items: + if "e2e" in item.keywords and "integration" not in item.keywords: + item.add_marker(skip_e2e) diff --git a/tests/test_ntfy_integration.py b/tests/test_ntfy_integration.py index d21acab..a6aaa5d 100644 --- a/tests/test_ntfy_integration.py +++ b/tests/test_ntfy_integration.py @@ -6,7 +6,11 @@ import json import time import urllib.request +import pytest + +@pytest.mark.integration +@pytest.mark.ntfy class TestNtfyTopics: def test_cc_cmd_topic_exists_and_writable(self): """Verify C&C CMD topic exists and accepts messages."""