Compare commits

...

4 Commits

Author SHA1 Message Date
828b8489e1 feat(pipeline): improve new pipeline architecture
- Add TransformDataSource for filtering/mapping source items
- Add MetricsDataSource for rendering live pipeline metrics as ASCII art
- Fix display stage registration in StageRegistry
- Register sources with both class name and simple name aliases
- Fix DisplayStage.init() to pass reuse parameter
- Simplify create_default_pipeline to use DataSourceStage wrapper
- Set pygame as default display
- Remove old pipeline tasks from mise.toml
- Add tests for new pipeline architecture
2026-03-16 11:30:21 -07:00
31cabe9128 feat(pipeline): add metrics collection and v2 run mode
- Add RenderStage adapter that handles rendering pipeline
- Add EffectPluginStage with proper EffectContext
- Add DisplayStage with init handling
- Add ItemsStage for pre-fetched items
- Add metrics collection to Pipeline (StageMetrics, FrameMetrics)
- Add get_metrics_summary() and reset_metrics() methods
- Add --pipeline and --pipeline-preset flags for v2 mode
- Add PipelineNode.metrics for self-documenting introspection
- Add introspect_new_pipeline() method with performance data
- Add mise tasks: run-v2, run-v2-demo, run-v2-poetry, run-v2-websocket, run-v2-firehose
2026-03-16 03:39:29 -07:00
bcb4ef0cfe feat(pipeline): add unified pipeline architecture with Stage abstraction
- Add engine/pipeline/ module with Stage ABC, PipelineContext, PipelineParams
- Stage provides unified interface for sources, effects, displays, cameras
- Pipeline class handles DAG-based execution with dependency resolution
- PipelinePreset for pre-configured pipelines (demo, poetry, pipeline, etc.)
- Add PipelineParams as params layer for animation-driven config
- Add StageRegistry for unified stage registration
- Add sources_v2.py with DataSource.is_dynamic property
- Add animation.py with Preset and AnimationController
- Skip ntfy integration tests by default (require -m integration)
- Skip e2e tests by default (require -m e2e)
- Update pipeline.py with comprehensive introspection methods
2026-03-16 03:11:24 -07:00
996ba14b1d feat(demo): use beautiful-mermaid for pipeline visualization
- Add beautiful-mermaid library (single-file ASCII renderer)
- Update pipeline_viz to generate mermaid graphs and render with beautiful-mermaid
- Creates dimensional network visualization with arrows connecting nodes
- Animates through effects and highlights active camera mode
2026-03-16 02:12:03 -07:00
20 changed files with 7637 additions and 133 deletions

View File

@@ -1,12 +1,41 @@
# Mainline Pipeline
## Architecture Overview
```
Sources (static/dynamic) → Fetch → Prepare → Scroll → Effects → Render → Display
NtfyPoller ← MicMonitor (async)
```
### Data Source Abstraction (sources_v2.py)
- **Static sources**: Data fetched once and cached (HeadlinesDataSource, PoetryDataSource)
- **Dynamic sources**: Idempotent fetch for runtime updates (PipelineDataSource)
- **SourceRegistry**: Discovery and management of data sources
### Camera Modes
- **Vertical**: Scroll up (default)
- **Horizontal**: Scroll left
- **Omni**: Diagonal scroll
- **Floating**: Sinusoidal bobbing
- **Trace**: Follow network path node-by-node (for pipeline viz)
## Content to Display Rendering Pipeline
```mermaid
flowchart TD
subgraph Sources["Data Sources"]
subgraph Sources["Data Sources (v2)"]
Headlines[HeadlinesDataSource]
Poetry[PoetryDataSource]
Pipeline[PipelineDataSource]
Registry[SourceRegistry]
end
subgraph SourcesLegacy["Data Sources (legacy)"]
RSS[("RSS Feeds")]
Poetry[("Poetry Feed")]
PoetryFeed[("Poetry Feed")]
Ntfy[("Ntfy Messages")]
Mic[("Microphone")]
end
@@ -24,9 +53,10 @@ flowchart TD
end
subgraph Scroll["Scroll Engine"]
SC[StreamController]
CAM[Camera]
NH[next_headline]
RTZ[render_ticker_zone]
Msg[render_message_overlay]
Grad[lr_gradient]
VT[vis_trunc / vis_offset]
end
@@ -44,8 +74,8 @@ flowchart TD
end
subgraph Render["Render Layer"]
BW[big_wrap]
RL[render_line]
TL[apply_ticker_layout]
end
subgraph Display["Display Backends"]
@@ -57,33 +87,78 @@ flowchart TD
ND[NullDisplay]
end
subgraph Async["Async Sources"]
NTFY[NtfyPoller]
MIC[MicMonitor]
end
subgraph Animation["Animation System"]
AC[AnimationController]
PR[Preset]
end
Sources --> Fetch
RSS --> FC
Poetry --> FP
PoetryFeed --> FP
FC --> Cache
FP --> Cache
Cache --> MB
Strip --> MB
Trans --> MB
MB --> NH
NH --> RTZ
MB --> SC
NTFY --> SC
SC --> RTZ
CAM --> RTZ
Grad --> RTZ
VT --> RTZ
RTZ --> EC
EC --> ER
ER --> EffectsPlugins
EffectsPlugins --> RL
EffectsPlugins --> BW
BW --> RL
RL --> Display
Ntfy --> RL
Mic --> RL
MIC --> RL
style Sources fill:#f9f,stroke:#333
style Fetch fill:#bbf,stroke:#333
style Prepare fill:#bff,stroke:#333
style Scroll fill:#bfb,stroke:#333
style Effects fill:#fbf,stroke:#333
style Render fill:#ffb,stroke:#333
style Display fill:#bff,stroke:#333
style Display fill:#bbf,stroke:#333
style Async fill:#fbb,stroke:#333
style Animation fill:#bfb,stroke:#333
```
## Animation & Presets
```mermaid
flowchart LR
subgraph Preset["Preset"]
PP[PipelineParams]
AC[AnimationController]
end
subgraph AnimationController["AnimationController"]
Clock[Clock]
Events[Events]
Triggers[Triggers]
end
subgraph Triggers["Trigger Types"]
TIME[TIME]
FRAME[FRAME]
CYCLE[CYCLE]
COND[CONDITION]
MANUAL[MANUAL]
end
PP --> AC
Clock --> AC
Events --> AC
Triggers --> Events
```
## Camera Modes
@@ -94,7 +169,8 @@ stateDiagram-v2
Vertical --> Horizontal: mode change
Horizontal --> Omni: mode change
Omni --> Floating: mode change
Floating --> Vertical: mode change
Floating --> Trace: mode change
Trace --> Vertical: mode change
state Vertical {
[*] --> ScrollUp
@@ -115,4 +191,9 @@ stateDiagram-v2
[*] --> Bobbing
Bobbing --> Bobbing: sin(time) for x,y
}
state Trace {
[*] --> FollowPath
FollowPath --> FollowPath: node by node
}
```

340
engine/animation.py Normal file
View File

@@ -0,0 +1,340 @@
"""
Animation system - Clock, events, triggers, durations, and animation controller.
"""
import time
from collections.abc import Callable
from dataclasses import dataclass, field
from enum import Enum, auto
from typing import Any
class Clock:
"""High-resolution clock for animation timing."""
def __init__(self):
self._start_time = time.perf_counter()
self._paused = False
self._pause_offset = 0.0
self._pause_start = 0.0
def reset(self) -> None:
self._start_time = time.perf_counter()
self._paused = False
self._pause_offset = 0.0
self._pause_start = 0.0
def elapsed(self) -> float:
if self._paused:
return self._pause_start - self._start_time - self._pause_offset
return time.perf_counter() - self._start_time - self._pause_offset
def elapsed_ms(self) -> float:
return self.elapsed() * 1000
def elapsed_frames(self, fps: float = 60.0) -> int:
return int(self.elapsed() * fps)
def pause(self) -> None:
if not self._paused:
self._paused = True
self._pause_start = time.perf_counter()
def resume(self) -> None:
if self._paused:
self._pause_offset += time.perf_counter() - self._pause_start
self._paused = False
class TriggerType(Enum):
TIME = auto() # Trigger after elapsed time
FRAME = auto() # Trigger after N frames
CYCLE = auto() # Trigger on cycle repeat
CONDITION = auto() # Trigger when condition is met
MANUAL = auto() # Trigger manually
@dataclass
class Trigger:
"""Event trigger configuration."""
type: TriggerType
value: float | int = 0
condition: Callable[["AnimationController"], bool] | None = None
repeat: bool = False
repeat_interval: float = 0.0
@dataclass
class Event:
"""An event with trigger, duration, and action."""
name: str
trigger: Trigger
action: Callable[["AnimationController", float], None]
duration: float = 0.0
ease: Callable[[float], float] | None = None
def __post_init__(self):
if self.ease is None:
self.ease = linear_ease
def linear_ease(t: float) -> float:
return t
def ease_in_out(t: float) -> float:
return t * t * (3 - 2 * t)
def ease_out_bounce(t: float) -> float:
if t < 1 / 2.75:
return 7.5625 * t * t
elif t < 2 / 2.75:
t -= 1.5 / 2.75
return 7.5625 * t * t + 0.75
elif t < 2.5 / 2.75:
t -= 2.25 / 2.75
return 7.5625 * t * t + 0.9375
else:
t -= 2.625 / 2.75
return 7.5625 * t * t + 0.984375
class AnimationController:
"""Controls animation parameters with clock and events."""
def __init__(self, fps: float = 60.0):
self.clock = Clock()
self.fps = fps
self.frame = 0
self._events: list[Event] = []
self._active_events: dict[str, float] = {}
self._params: dict[str, Any] = {}
self._cycled = 0
def add_event(self, event: Event) -> "AnimationController":
self._events.append(event)
return self
def set_param(self, key: str, value: Any) -> None:
self._params[key] = value
def get_param(self, key: str, default: Any = None) -> Any:
return self._params.get(key, default)
def update(self) -> dict[str, Any]:
"""Update animation state, return current params."""
elapsed = self.clock.elapsed()
for event in self._events:
triggered = False
if event.trigger.type == TriggerType.TIME:
if self.clock.elapsed() >= event.trigger.value:
triggered = True
elif event.trigger.type == TriggerType.FRAME:
if self.frame >= event.trigger.value:
triggered = True
elif event.trigger.type == TriggerType.CYCLE:
cycle_duration = event.trigger.value
if cycle_duration > 0:
current_cycle = int(elapsed / cycle_duration)
if current_cycle > self._cycled:
self._cycled = current_cycle
triggered = True
elif event.trigger.type == TriggerType.CONDITION:
if event.trigger.condition and event.trigger.condition(self):
triggered = True
elif event.trigger.type == TriggerType.MANUAL:
pass
if triggered:
if event.name not in self._active_events:
self._active_events[event.name] = 0.0
progress = 0.0
if event.duration > 0:
self._active_events[event.name] += 1 / self.fps
progress = min(
1.0, self._active_events[event.name] / event.duration
)
eased_progress = event.ease(progress)
event.action(self, eased_progress)
if progress >= 1.0:
if event.trigger.repeat:
self._active_events[event.name] = 0.0
else:
del self._active_events[event.name]
else:
event.action(self, 1.0)
if not event.trigger.repeat:
del self._active_events[event.name]
else:
self._active_events[event.name] = 0.0
self.frame += 1
return dict(self._params)
@dataclass
class PipelineParams:
"""Snapshot of pipeline parameters for animation."""
effect_enabled: dict[str, bool] = field(default_factory=dict)
effect_intensity: dict[str, float] = field(default_factory=dict)
camera_mode: str = "vertical"
camera_speed: float = 1.0
camera_x: int = 0
camera_y: int = 0
display_backend: str = "terminal"
scroll_speed: float = 1.0
class Preset:
"""Packages a starting pipeline config + Animation controller."""
def __init__(
self,
name: str,
description: str = "",
initial_params: PipelineParams | None = None,
animation: AnimationController | None = None,
):
self.name = name
self.description = description
self.initial_params = initial_params or PipelineParams()
self.animation = animation or AnimationController()
def create_controller(self) -> AnimationController:
controller = AnimationController()
for key, value in self.initial_params.__dict__.items():
controller.set_param(key, value)
for event in self.animation._events:
controller.add_event(event)
return controller
def create_demo_preset() -> Preset:
"""Create the demo preset with effect cycling and camera modes."""
animation = AnimationController(fps=60)
effects = ["noise", "fade", "glitch", "firehose"]
camera_modes = ["vertical", "horizontal", "omni", "floating", "trace"]
def make_effect_action(eff):
def action(ctrl, t):
ctrl.set_param("current_effect", eff)
ctrl.set_param("effect_intensity", t)
return action
def make_camera_action(cam_mode):
def action(ctrl, t):
ctrl.set_param("camera_mode", cam_mode)
return action
for i, effect in enumerate(effects):
effect_duration = 5.0
animation.add_event(
Event(
name=f"effect_{effect}",
trigger=Trigger(
type=TriggerType.TIME,
value=i * effect_duration,
repeat=True,
repeat_interval=len(effects) * effect_duration,
),
duration=effect_duration,
action=make_effect_action(effect),
ease=ease_in_out,
)
)
for i, mode in enumerate(camera_modes):
camera_duration = 10.0
animation.add_event(
Event(
name=f"camera_{mode}",
trigger=Trigger(
type=TriggerType.TIME,
value=i * camera_duration,
repeat=True,
repeat_interval=len(camera_modes) * camera_duration,
),
duration=0.5,
action=make_camera_action(mode),
)
)
animation.add_event(
Event(
name="pulse",
trigger=Trigger(type=TriggerType.CYCLE, value=2.0, repeat=True),
duration=1.0,
action=lambda ctrl, t: ctrl.set_param("pulse", t),
ease=ease_out_bounce,
)
)
return Preset(
name="demo",
description="Demo mode with effect cycling and camera modes",
initial_params=PipelineParams(
effect_enabled={
"noise": False,
"fade": False,
"glitch": False,
"firehose": False,
"hud": True,
},
effect_intensity={
"noise": 0.0,
"fade": 0.0,
"glitch": 0.0,
"firehose": 0.0,
},
camera_mode="vertical",
camera_speed=1.0,
display_backend="pygame",
),
animation=animation,
)
def create_pipeline_preset() -> Preset:
"""Create preset for pipeline visualization."""
animation = AnimationController(fps=60)
animation.add_event(
Event(
name="camera_trace",
trigger=Trigger(type=TriggerType.CYCLE, value=8.0, repeat=True),
duration=8.0,
action=lambda ctrl, t: ctrl.set_param("camera_mode", "trace"),
)
)
animation.add_event(
Event(
name="highlight_path",
trigger=Trigger(type=TriggerType.CYCLE, value=4.0, repeat=True),
duration=4.0,
action=lambda ctrl, t: ctrl.set_param("path_progress", t),
)
)
return Preset(
name="pipeline",
description="Pipeline visualization with trace camera",
initial_params=PipelineParams(
camera_mode="trace",
camera_speed=1.0,
display_backend="pygame",
),
animation=animation,
)

View File

@@ -572,7 +572,7 @@ def run_pipeline_demo():
get_registry,
set_monitor,
)
from engine.pipeline_viz import generate_animated_pipeline
from engine.pipeline_viz import generate_large_network_viewport
print(" \033[1;38;5;46mMAINLINE PIPELINE DEMO\033[0m")
print(" \033[38;5;245mInitializing...\033[0m")
@@ -667,7 +667,7 @@ def run_pipeline_demo():
camera.update(config.FRAME_DT)
buf = generate_animated_pipeline(w, frame_number)
buf = generate_large_network_viewport(w, h, frame_number)
ctx = EffectContext(
terminal_width=w,
@@ -699,18 +699,165 @@ def run_pipeline_demo():
print("\n \033[38;5;245mPipeline demo ended\033[0m")
def run_preset_mode(preset_name: str):
"""Run mode using animation presets."""
from engine import config
from engine.animation import (
create_demo_preset,
create_pipeline_preset,
)
from engine.camera import Camera
from engine.display import DisplayRegistry
from engine.effects import (
EffectContext,
PerformanceMonitor,
get_effect_chain,
get_registry,
set_monitor,
)
from engine.sources_v2 import (
PipelineDataSource,
get_source_registry,
init_default_sources,
)
w, h = 80, 24
if preset_name == "demo":
preset = create_demo_preset()
init_default_sources()
source = get_source_registry().default()
elif preset_name == "pipeline":
preset = create_pipeline_preset()
source = PipelineDataSource(w, h)
else:
print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m")
print(" Available: demo, pipeline")
sys.exit(1)
print(f" \033[1;38;5;46mMAINLINE PRESET: {preset.name}\033[0m")
print(f" \033[38;5;245m{preset.description}\033[0m")
print(" \033[38;5;245mInitializing...\033[0m")
import effects_plugins
effects_plugins.discover_plugins()
registry = get_registry()
chain = get_effect_chain()
chain.set_order(["noise", "fade", "glitch", "firehose", "hud"])
monitor = PerformanceMonitor()
set_monitor(monitor)
chain._monitor = monitor
display = DisplayRegistry.create(preset.initial_params.display_backend)
if not display:
print(
f" \033[38;5;196mFailed to create {preset.initial_params.display_backend} display\033[0m"
)
sys.exit(1)
display.init(w, h)
display.clear()
camera = Camera.vertical()
print(" \033[38;5;82mStarting preset animation...\033[0m")
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
controller = preset.create_controller()
frame_number = 0
try:
while True:
params = controller.update()
effect_name = params.get("current_effect", "none")
intensity = params.get("effect_intensity", 0.0)
camera_mode = params.get("camera_mode", "vertical")
if camera_mode == "vertical":
camera = Camera.vertical(speed=params.get("camera_speed", 1.0))
elif camera_mode == "horizontal":
camera = Camera.horizontal(speed=params.get("camera_speed", 1.0))
elif camera_mode == "omni":
camera = Camera.omni(speed=params.get("camera_speed", 1.0))
elif camera_mode == "floating":
camera = Camera.floating(speed=params.get("camera_speed", 1.0))
camera.update(config.FRAME_DT)
for eff in registry.list_all().values():
if eff.name == effect_name:
eff.config.enabled = True
eff.config.intensity = intensity
elif eff.name not in ("hud",):
eff.config.enabled = False
hud_effect = registry.get("hud")
if hud_effect:
hud_effect.config.params["display_effect"] = (
f"{effect_name} / {camera_mode}"
)
hud_effect.config.params["display_intensity"] = intensity
source.viewport_width = w
source.viewport_height = h
items = source.get_items()
buffer = items[0].content.split("\n") if items else [""] * h
ctx = EffectContext(
terminal_width=w,
terminal_height=h,
scroll_cam=camera.y,
ticker_height=h,
camera_x=camera.x,
mic_excess=0.0,
grad_offset=0.0,
frame_number=frame_number,
has_message=False,
items=[],
)
result = chain.process(buffer, ctx)
display.show(result)
new_w, new_h = display.get_dimensions()
if new_w != w or new_h != h:
w, h = new_w, new_h
frame_number += 1
time.sleep(1 / 60)
except KeyboardInterrupt:
pass
finally:
display.cleanup()
print("\n \033[38;5;245mPreset ended\033[0m")
def main():
from engine import config
from engine.pipeline import generate_pipeline_diagram
if config.PIPELINE_DIAGRAM:
from engine.pipeline import generate_pipeline_diagram
print(generate_pipeline_diagram())
return
if config.PIPELINE_MODE:
run_pipeline_mode(config.PIPELINE_PRESET)
return
if config.PIPELINE_DEMO:
run_pipeline_demo()
return
if config.PRESET:
run_preset_mode(config.PRESET)
return
if config.DEMO:
run_demo_mode()
return
@@ -813,3 +960,128 @@ def main():
print(f" {G_DIM}> {config.HEADLINE_LIMIT} SIGNALS PROCESSED{RST}")
print(f" {W_GHOST}> end of stream{RST}")
print()
def run_pipeline_mode(preset_name: str = "demo"):
"""Run using the new unified pipeline architecture."""
import effects_plugins
from engine.display import DisplayRegistry
from engine.effects import get_registry
from engine.fetch import fetch_all, fetch_poetry, load_cache
from engine.pipeline import (
Pipeline,
PipelineConfig,
get_preset,
)
from engine.pipeline.adapters import (
RenderStage,
create_items_stage,
create_stage_from_display,
create_stage_from_effect,
)
print(" \033[1;38;5;46mPIPELINE MODE\033[0m")
print(" \033[38;5;245mUsing unified pipeline architecture\033[0m")
effects_plugins.discover_plugins()
preset = get_preset(preset_name)
if not preset:
print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m")
sys.exit(1)
print(f" \033[38;5;245mPreset: {preset.name} - {preset.description}\033[0m")
params = preset.to_params()
params.viewport_width = 80
params.viewport_height = 24
pipeline = Pipeline(
config=PipelineConfig(
source=preset.source,
display=preset.display,
camera=preset.camera,
effects=preset.effects,
)
)
print(" \033[38;5;245mFetching content...\033[0m")
cached = load_cache()
if cached:
items = cached
elif preset.source == "poetry":
items, _, _ = fetch_poetry()
else:
items, _, _ = fetch_all()
if not items:
print(" \033[38;5;196mNo content available\033[0m")
sys.exit(1)
print(f" \033[38;5;82mLoaded {len(items)} items\033[0m")
display = DisplayRegistry.create(preset.display)
if not display:
print(f" \033[38;5;196mFailed to create display: {preset.display}\033[0m")
sys.exit(1)
display.init(80, 24)
effect_registry = get_registry()
pipeline.add_stage("source", create_items_stage(items, preset.source))
pipeline.add_stage(
"render",
RenderStage(
items,
width=80,
height=24,
camera_speed=params.camera_speed,
camera_mode=preset.camera,
firehose_enabled=params.firehose_enabled,
),
)
for effect_name in preset.effects:
effect = effect_registry.get(effect_name)
if effect:
pipeline.add_stage(
f"effect_{effect_name}", create_stage_from_effect(effect, effect_name)
)
pipeline.add_stage("display", create_stage_from_display(display, preset.display))
pipeline.build()
if not pipeline.initialize():
print(" \033[38;5;196mFailed to initialize pipeline\033[0m")
sys.exit(1)
print(" \033[38;5;82mStarting pipeline...\033[0m")
print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n")
ctx = pipeline.context
ctx.params = params
ctx.set("display", display)
ctx.set("items", items)
ctx.set("pipeline", pipeline)
try:
frame = 0
while True:
params.frame_number = frame
ctx.params = params
result = pipeline.execute(items)
if result.success:
display.show(result.data)
time.sleep(1 / 60)
frame += 1
except KeyboardInterrupt:
pass
finally:
pipeline.cleanup()
display.cleanup()
print("\n \033[38;5;245mPipeline stopped\033[0m")

4107
engine/beautiful_mermaid.py Normal file

File diff suppressed because it is too large Load Diff

View File

@@ -129,7 +129,7 @@ class Config:
script_fonts: dict[str, str] = field(default_factory=_get_platform_font_paths)
display: str = "terminal"
display: str = "pygame"
websocket: bool = False
websocket_port: int = 8765
@@ -237,7 +237,7 @@ GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
# ─── WEBSOCKET ─────────────────────────────────────────────
DISPLAY = _arg_value("--display", sys.argv) or "terminal"
DISPLAY = _arg_value("--display", sys.argv) or "pygame"
WEBSOCKET = "--websocket" in sys.argv
WEBSOCKET_PORT = _arg_int("--websocket-port", 8765)
@@ -246,6 +246,13 @@ DEMO = "--demo" in sys.argv
DEMO_EFFECT_DURATION = 5.0 # seconds per effect
PIPELINE_DEMO = "--pipeline-demo" in sys.argv
# ─── PIPELINE MODE (new unified architecture) ─────────────
PIPELINE_MODE = "--pipeline" in sys.argv
PIPELINE_PRESET = _arg_value("--pipeline-preset", sys.argv) or "demo"
# ─── PRESET MODE ────────────────────────────────────────────
PRESET = _arg_value("--preset", sys.argv)
# ─── PIPELINE DIAGRAM ────────────────────────────────────
PIPELINE_DIAGRAM = "--pipeline-diagram" in sys.argv

View File

@@ -1,5 +1,23 @@
"""
Pipeline introspection - generates self-documenting diagrams of the render pipeline.
Pipeline Architecture:
- Sources: Data providers (RSS, Poetry, Ntfy, Mic) - static or dynamic
- Fetch: Retrieve data from sources
- Prepare: Transform raw data (make_block, strip_tags, translate)
- Scroll: Camera-based viewport rendering (ticker zone, message overlay)
- Effects: Post-processing chain (noise, fade, glitch, firehose, hud)
- Render: Final line rendering and layout
- Display: Output backends (terminal, pygame, websocket, sixel, kitty)
Key abstractions:
- DataSource: Sources can be static (cached) or dynamic (idempotent fetch)
- Camera: Viewport controller (vertical, horizontal, omni, floating, trace)
- EffectChain: Ordered effect processing pipeline
- Display: Pluggable output backends
- SourceRegistry: Source discovery and management
- AnimationController: Time-based parameter animation
- Preset: Package of initial params + animation for demo modes
"""
from __future__ import annotations
@@ -18,6 +36,7 @@ class PipelineNode:
description: str = ""
inputs: list[str] | None = None
outputs: list[str] | None = None
metrics: dict | None = None # Performance metrics (avg_ms, min_ms, max_ms)
class PipelineIntrospector:
@@ -33,8 +52,22 @@ class PipelineIntrospector:
"""Generate a Mermaid flowchart of the pipeline."""
lines = ["```mermaid", "flowchart TD"]
subgraph_groups = {
"Sources": [],
"Fetch": [],
"Prepare": [],
"Scroll": [],
"Effects": [],
"Display": [],
"Async": [],
"Animation": [],
"Viz": [],
}
other_nodes = []
for node in self.nodes:
node_id = node.name.replace("-", "_").replace(" ", "_")
node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_")
label = node.name
if node.class_name:
label = f"{node.name}\\n({node.class_name})"
@@ -44,15 +77,63 @@ class PipelineIntrospector:
if node.description:
label += f"\\n{node.description}"
lines.append(f' {node_id}["{label}"]')
if node.metrics:
avg = node.metrics.get("avg_ms", 0)
if avg > 0:
label += f"\\n⏱ {avg:.1f}ms"
impact = node.metrics.get("impact_pct", 0)
if impact > 0:
label += f" ({impact:.0f}%)"
node_entry = f' {node_id}["{label}"]'
if "DataSource" in node.name or "SourceRegistry" in node.name:
subgraph_groups["Sources"].append(node_entry)
elif "fetch" in node.name.lower():
subgraph_groups["Fetch"].append(node_entry)
elif (
"make_block" in node.name
or "strip_tags" in node.name
or "translate" in node.name
):
subgraph_groups["Prepare"].append(node_entry)
elif (
"StreamController" in node.name
or "render_ticker" in node.name
or "render_message" in node.name
or "Camera" in node.name
):
subgraph_groups["Scroll"].append(node_entry)
elif "Effect" in node.name or "effect" in node.module:
subgraph_groups["Effects"].append(node_entry)
elif "Display:" in node.name:
subgraph_groups["Display"].append(node_entry)
elif "Ntfy" in node.name or "Mic" in node.name:
subgraph_groups["Async"].append(node_entry)
elif "Animation" in node.name or "Preset" in node.name:
subgraph_groups["Animation"].append(node_entry)
elif "pipeline_viz" in node.module or "CameraLarge" in node.name:
subgraph_groups["Viz"].append(node_entry)
else:
other_nodes.append(node_entry)
for group_name, nodes in subgraph_groups.items():
if nodes:
lines.append(f" subgraph {group_name}")
for node in nodes:
lines.append(node)
lines.append(" end")
for node in other_nodes:
lines.append(node)
lines.append("")
for node in self.nodes:
node_id = node.name.replace("-", "_").replace(" ", "_")
node_id = node.name.replace("-", "_").replace(" ", "_").replace(":", "_")
if node.inputs:
for inp in node.inputs:
inp_id = inp.replace("-", "_").replace(" ", "_")
inp_id = inp.replace("-", "_").replace(" ", "_").replace(":", "_")
lines.append(f" {inp_id} --> {node_id}")
lines.append("```")
@@ -85,7 +166,8 @@ class PipelineIntrospector:
lines.append(" Vertical --> Horizontal: set_mode()")
lines.append(" Horizontal --> Omni: set_mode()")
lines.append(" Omni --> Floating: set_mode()")
lines.append(" Floating --> Vertical: set_mode()")
lines.append(" Floating --> Trace: set_mode()")
lines.append(" Trace --> Vertical: set_mode()")
lines.append(" state Vertical {")
lines.append(" [*] --> ScrollUp")
@@ -107,6 +189,11 @@ class PipelineIntrospector:
lines.append(" Bobbing --> Bobbing: sin(time)")
lines.append(" }")
lines.append(" state Trace {")
lines.append(" [*] --> FollowPath")
lines.append(" FollowPath --> FollowPath: node by node")
lines.append(" }")
lines.append("```")
return "\n".join(lines)
@@ -144,6 +231,71 @@ class PipelineIntrospector:
)
)
def introspect_sources_v2(self) -> None:
"""Introspect data sources v2 (new abstraction)."""
from engine.sources_v2 import SourceRegistry, init_default_sources
init_default_sources()
SourceRegistry()
self.add_node(
PipelineNode(
name="SourceRegistry",
module="engine.sources_v2",
class_name="SourceRegistry",
description="Source discovery and management",
)
)
for name, desc in [
("HeadlinesDataSource", "RSS feed headlines"),
("PoetryDataSource", "Poetry DB"),
("PipelineDataSource", "Pipeline viz (dynamic)"),
]:
self.add_node(
PipelineNode(
name=f"DataSource: {name}",
module="engine.sources_v2",
class_name=name,
description=f"{desc}",
)
)
def introspect_prepare(self) -> None:
"""Introspect prepare layer (transformation)."""
self.add_node(
PipelineNode(
name="make_block",
module="engine.render",
func_name="make_block",
description="Transform headline into display block",
inputs=["title", "source", "timestamp", "width"],
outputs=["block"],
)
)
self.add_node(
PipelineNode(
name="strip_tags",
module="engine.filter",
func_name="strip_tags",
description="Remove HTML tags from content",
inputs=["html"],
outputs=["plain_text"],
)
)
self.add_node(
PipelineNode(
name="translate_headline",
module="engine.translate",
func_name="translate_headline",
description="Translate headline to target language",
inputs=["title", "target_lang"],
outputs=["translated_title"],
)
)
def introspect_fetch(self) -> None:
"""Introspect fetch layer."""
self.add_node(
@@ -190,6 +342,121 @@ class PipelineIntrospector:
)
)
self.add_node(
PipelineNode(
name="render_message_overlay",
module="engine.layers",
func_name="render_message_overlay",
description="Render ntfy message overlay",
inputs=["msg", "width", "height"],
outputs=["overlay", "cache"],
)
)
def introspect_render(self) -> None:
"""Introspect render layer."""
self.add_node(
PipelineNode(
name="big_wrap",
module="engine.render",
func_name="big_wrap",
description="Word-wrap text to width",
inputs=["text", "width"],
outputs=["lines"],
)
)
self.add_node(
PipelineNode(
name="lr_gradient",
module="engine.render",
func_name="lr_gradient",
description="Apply left-right gradient to lines",
inputs=["lines", "position"],
outputs=["styled_lines"],
)
)
def introspect_async_sources(self) -> None:
"""Introspect async data sources (ntfy, mic)."""
self.add_node(
PipelineNode(
name="NtfyPoller",
module="engine.ntfy",
class_name="NtfyPoller",
description="Poll ntfy for messages (async)",
inputs=["topic"],
outputs=["message"],
)
)
self.add_node(
PipelineNode(
name="MicMonitor",
module="engine.mic",
class_name="MicMonitor",
description="Monitor microphone input (async)",
outputs=["audio_level"],
)
)
def introspect_eventbus(self) -> None:
"""Introspect event bus for decoupled communication."""
self.add_node(
PipelineNode(
name="EventBus",
module="engine.eventbus",
class_name="EventBus",
description="Thread-safe event publishing",
inputs=["event"],
outputs=["subscribers"],
)
)
def introspect_animation(self) -> None:
"""Introspect animation system."""
self.add_node(
PipelineNode(
name="AnimationController",
module="engine.animation",
class_name="AnimationController",
description="Time-based parameter animation",
inputs=["dt"],
outputs=["params"],
)
)
self.add_node(
PipelineNode(
name="Preset",
module="engine.animation",
class_name="Preset",
description="Package of initial params + animation",
)
)
def introspect_pipeline_viz(self) -> None:
"""Introspect pipeline visualization."""
self.add_node(
PipelineNode(
name="generate_large_network_viewport",
module="engine.pipeline_viz",
func_name="generate_large_network_viewport",
description="Large animated network visualization",
inputs=["viewport_w", "viewport_h", "frame"],
outputs=["buffer"],
)
)
self.add_node(
PipelineNode(
name="CameraLarge",
module="engine.pipeline_viz",
class_name="CameraLarge",
description="Large grid camera (trace mode)",
)
)
def introspect_camera(self) -> None:
"""Introspect camera system."""
self.add_node(
@@ -243,14 +510,93 @@ class PipelineIntrospector:
)
)
def introspect_new_pipeline(self, pipeline=None) -> None:
"""Introspect new unified pipeline stages with metrics.
Args:
pipeline: Optional Pipeline instance to collect metrics from
"""
stages_info = [
(
"ItemsSource",
"engine.pipeline.adapters",
"ItemsStage",
"Provides pre-fetched items",
),
(
"Render",
"engine.pipeline.adapters",
"RenderStage",
"Renders items to buffer",
),
(
"Effect",
"engine.pipeline.adapters",
"EffectPluginStage",
"Applies effect",
),
(
"Display",
"engine.pipeline.adapters",
"DisplayStage",
"Outputs to display",
),
]
metrics = None
if pipeline and hasattr(pipeline, "get_metrics_summary"):
metrics = pipeline.get_metrics_summary()
if "error" in metrics:
metrics = None
total_avg = metrics.get("pipeline", {}).get("avg_ms", 0) if metrics else 0
for stage_name, module, class_name, desc in stages_info:
node_metrics = None
if metrics and "stages" in metrics:
for name, stats in metrics["stages"].items():
if stage_name.lower() in name.lower():
impact_pct = (
(stats.get("avg_ms", 0) / total_avg * 100)
if total_avg > 0
else 0
)
node_metrics = {
"avg_ms": stats.get("avg_ms", 0),
"min_ms": stats.get("min_ms", 0),
"max_ms": stats.get("max_ms", 0),
"impact_pct": impact_pct,
}
break
self.add_node(
PipelineNode(
name=f"Pipeline: {stage_name}",
module=module,
class_name=class_name,
description=desc,
inputs=["data"],
outputs=["data"],
metrics=node_metrics,
)
)
def run(self) -> str:
"""Run full introspection."""
self.introspect_sources()
self.introspect_sources_v2()
self.introspect_fetch()
self.introspect_prepare()
self.introspect_scroll()
self.introspect_render()
self.introspect_camera()
self.introspect_effects()
self.introspect_display()
self.introspect_async_sources()
self.introspect_eventbus()
self.introspect_animation()
self.introspect_pipeline_viz()
return self.generate_full_diagram()

107
engine/pipeline/__init__.py Normal file
View File

@@ -0,0 +1,107 @@
"""
Unified Pipeline Architecture.
This module provides a clean, dependency-managed pipeline system:
- Stage: Base class for all pipeline components
- Pipeline: DAG-based execution orchestrator
- PipelineParams: Runtime configuration for animation
- PipelinePreset: Pre-configured pipeline configurations
- StageRegistry: Unified registration for all stage types
The pipeline architecture supports:
- Sources: Data providers (headlines, poetry, pipeline viz)
- Effects: Post-processors (noise, fade, glitch, hud)
- Displays: Output backends (terminal, pygame, websocket)
- Cameras: Viewport controllers (vertical, horizontal, omni)
Example:
from engine.pipeline import Pipeline, PipelineConfig, StageRegistry
pipeline = Pipeline(PipelineConfig(source="headlines", display="terminal"))
pipeline.add_stage("source", StageRegistry.create("source", "headlines"))
pipeline.add_stage("display", StageRegistry.create("display", "terminal"))
pipeline.build().initialize()
result = pipeline.execute(initial_data)
"""
from engine.pipeline.controller import (
Pipeline,
PipelineConfig,
PipelineRunner,
create_default_pipeline,
create_pipeline_from_params,
)
from engine.pipeline.core import (
PipelineContext,
Stage,
StageConfig,
StageError,
StageResult,
)
from engine.pipeline.params import (
DEFAULT_HEADLINE_PARAMS,
DEFAULT_PIPELINE_PARAMS,
DEFAULT_PYGAME_PARAMS,
PipelineParams,
)
from engine.pipeline.presets import (
DEMO_PRESET,
FIREHOSE_PRESET,
PIPELINE_VIZ_PRESET,
POETRY_PRESET,
PRESETS,
SIXEL_PRESET,
WEBSOCKET_PRESET,
PipelinePreset,
create_preset_from_params,
get_preset,
list_presets,
)
from engine.pipeline.registry import (
StageRegistry,
discover_stages,
register_camera,
register_display,
register_effect,
register_source,
)
__all__ = [
# Core
"Stage",
"StageConfig",
"StageError",
"StageResult",
"PipelineContext",
# Controller
"Pipeline",
"PipelineConfig",
"PipelineRunner",
"create_default_pipeline",
"create_pipeline_from_params",
# Params
"PipelineParams",
"DEFAULT_HEADLINE_PARAMS",
"DEFAULT_PIPELINE_PARAMS",
"DEFAULT_PYGAME_PARAMS",
# Presets
"PipelinePreset",
"PRESETS",
"DEMO_PRESET",
"POETRY_PRESET",
"PIPELINE_VIZ_PRESET",
"WEBSOCKET_PRESET",
"SIXEL_PRESET",
"FIREHOSE_PRESET",
"get_preset",
"list_presets",
"create_preset_from_params",
# Registry
"StageRegistry",
"discover_stages",
"register_source",
"register_effect",
"register_display",
"register_camera",
]

299
engine/pipeline/adapters.py Normal file
View File

@@ -0,0 +1,299 @@
"""
Stage adapters - Bridge existing components to the Stage interface.
This module provides adapters that wrap existing components
(EffectPlugin, Display, DataSource, Camera) as Stage implementations.
"""
import random
from typing import Any
from engine.pipeline.core import PipelineContext, Stage
class RenderStage(Stage):
"""Stage that renders items to a text buffer for display.
This mimics the old demo's render pipeline:
- Selects headlines and renders them to blocks
- Applies camera scroll position
- Adds firehose layer if enabled
"""
def __init__(
self,
items: list,
width: int = 80,
height: int = 24,
camera_speed: float = 1.0,
camera_mode: str = "vertical",
firehose_enabled: bool = False,
name: str = "render",
):
self.name = name
self.category = "render"
self.optional = False
self._items = items
self._width = width
self._height = height
self._camera_speed = camera_speed
self._camera_mode = camera_mode
self._firehose_enabled = firehose_enabled
self._camera_y = 0.0
self._camera_x = 0
self._scroll_accum = 0.0
self._ticker_next_y = 0
self._active: list = []
self._seen: set = set()
self._pool: list = list(items)
self._noise_cache: dict = {}
self._frame_count = 0
@property
def capabilities(self) -> set[str]:
return {"render.output"}
@property
def dependencies(self) -> set[str]:
return {"source.items"}
def init(self, ctx: PipelineContext) -> bool:
random.shuffle(self._pool)
return True
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Render items to a text buffer."""
from engine.effects import next_headline
from engine.layers import render_firehose, render_ticker_zone
from engine.render import make_block
items = data or self._items
w = ctx.params.viewport_width if ctx.params else self._width
h = ctx.params.viewport_height if ctx.params else self._height
camera_speed = ctx.params.camera_speed if ctx.params else self._camera_speed
firehose = ctx.params.firehose_enabled if ctx.params else self._firehose_enabled
scroll_step = 0.5 / (camera_speed * 10)
self._scroll_accum += scroll_step
GAP = 3
while self._scroll_accum >= scroll_step:
self._scroll_accum -= scroll_step
self._camera_y += 1.0
while (
self._ticker_next_y < int(self._camera_y) + h + 10
and len(self._active) < 50
):
t, src, ts = next_headline(self._pool, items, self._seen)
ticker_content, hc, midx = make_block(t, src, ts, w)
self._active.append((ticker_content, hc, self._ticker_next_y, midx))
self._ticker_next_y += len(ticker_content) + GAP
self._active = [
(c, hc, by, mi)
for c, hc, by, mi in self._active
if by + len(c) > int(self._camera_y)
]
for k in list(self._noise_cache):
if k < int(self._camera_y):
del self._noise_cache[k]
grad_offset = (self._frame_count * 0.01) % 1.0
buf, self._noise_cache = render_ticker_zone(
self._active,
scroll_cam=int(self._camera_y),
camera_x=self._camera_x,
ticker_h=h,
w=w,
noise_cache=self._noise_cache,
grad_offset=grad_offset,
)
if firehose:
firehose_buf = render_firehose(items, w, 0, h)
buf.extend(firehose_buf)
self._frame_count += 1
return buf
class EffectPluginStage(Stage):
"""Adapter wrapping EffectPlugin as a Stage."""
def __init__(self, effect_plugin, name: str = "effect"):
self._effect = effect_plugin
self.name = name
self.category = "effect"
self.optional = False
@property
def capabilities(self) -> set[str]:
return {f"effect.{self.name}"}
@property
def dependencies(self) -> set[str]:
return set()
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Process data through the effect."""
if data is None:
return None
from engine.effects import EffectContext
w = ctx.params.viewport_width if ctx.params else 80
h = ctx.params.viewport_height if ctx.params else 24
frame = ctx.params.frame_number if ctx.params else 0
effect_ctx = EffectContext(
terminal_width=w,
terminal_height=h,
scroll_cam=0,
ticker_height=h,
camera_x=0,
mic_excess=0.0,
grad_offset=(frame * 0.01) % 1.0,
frame_number=frame,
has_message=False,
items=ctx.get("items", []),
)
return self._effect.process(data, effect_ctx)
class DisplayStage(Stage):
"""Adapter wrapping Display as a Stage."""
def __init__(self, display, name: str = "terminal"):
self._display = display
self.name = name
self.category = "display"
self.optional = False
@property
def capabilities(self) -> set[str]:
return {"display.output"}
@property
def dependencies(self) -> set[str]:
return set()
def init(self, ctx: PipelineContext) -> bool:
w = ctx.params.viewport_width if ctx.params else 80
h = ctx.params.viewport_height if ctx.params else 24
result = self._display.init(w, h, reuse=False)
return result is not False
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Output data to display."""
if data is not None:
self._display.show(data)
return data
def cleanup(self) -> None:
self._display.cleanup()
class DataSourceStage(Stage):
"""Adapter wrapping DataSource as a Stage."""
def __init__(self, data_source, name: str = "headlines"):
self._source = data_source
self.name = name
self.category = "source"
self.optional = False
@property
def capabilities(self) -> set[str]:
return {f"source.{self.name}"}
@property
def dependencies(self) -> set[str]:
return set()
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Fetch data from source."""
if hasattr(self._source, "get_items"):
return self._source.get_items()
return data
class ItemsStage(Stage):
"""Stage that holds pre-fetched items and provides them to the pipeline."""
def __init__(self, items, name: str = "headlines"):
self._items = items
self.name = name
self.category = "source"
self.optional = False
@property
def capabilities(self) -> set[str]:
return {f"source.{self.name}"}
@property
def dependencies(self) -> set[str]:
return set()
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Return the pre-fetched items."""
return self._items
class CameraStage(Stage):
"""Adapter wrapping Camera as a Stage."""
def __init__(self, camera, name: str = "vertical"):
self._camera = camera
self.name = name
self.category = "camera"
self.optional = True
@property
def capabilities(self) -> set[str]:
return {"camera"}
@property
def dependencies(self) -> set[str]:
return {"source.items"}
def process(self, data: Any, ctx: PipelineContext) -> Any:
"""Apply camera transformation to data."""
if data is None:
return None
if hasattr(self._camera, "apply"):
return self._camera.apply(
data, ctx.params.viewport_width if ctx.params else 80
)
return data
def cleanup(self) -> None:
if hasattr(self._camera, "reset"):
self._camera.reset()
def create_stage_from_display(display, name: str = "terminal") -> DisplayStage:
"""Create a Stage from a Display instance."""
return DisplayStage(display, name)
def create_stage_from_effect(effect_plugin, name: str) -> EffectPluginStage:
"""Create a Stage from an EffectPlugin."""
return EffectPluginStage(effect_plugin, name)
def create_stage_from_source(data_source, name: str = "headlines") -> DataSourceStage:
"""Create a Stage from a DataSource."""
return DataSourceStage(data_source, name)
def create_stage_from_camera(camera, name: str = "vertical") -> CameraStage:
"""Create a Stage from a Camera."""
return CameraStage(camera, name)
def create_items_stage(items, name: str = "headlines") -> ItemsStage:
"""Create a Stage that holds pre-fetched items."""
return ItemsStage(items, name)

View File

@@ -0,0 +1,320 @@
"""
Pipeline controller - DAG-based pipeline execution.
The Pipeline class orchestrates stages in dependency order, handling
the complete render cycle from source to display.
"""
import time
from dataclasses import dataclass, field
from typing import Any
from engine.pipeline.core import PipelineContext, Stage, StageError, StageResult
from engine.pipeline.params import PipelineParams
from engine.pipeline.registry import StageRegistry
@dataclass
class PipelineConfig:
"""Configuration for a pipeline instance."""
source: str = "headlines"
display: str = "terminal"
camera: str = "vertical"
effects: list[str] = field(default_factory=list)
enable_metrics: bool = True
@dataclass
class StageMetrics:
"""Metrics for a single stage execution."""
name: str
duration_ms: float
chars_in: int = 0
chars_out: int = 0
@dataclass
class FrameMetrics:
"""Metrics for a single frame through the pipeline."""
frame_number: int
total_ms: float
stages: list[StageMetrics] = field(default_factory=list)
class Pipeline:
"""Main pipeline orchestrator.
Manages the execution of all stages in dependency order,
handling initialization, processing, and cleanup.
"""
def __init__(
self,
config: PipelineConfig | None = None,
context: PipelineContext | None = None,
):
self.config = config or PipelineConfig()
self.context = context or PipelineContext()
self._stages: dict[str, Stage] = {}
self._execution_order: list[str] = []
self._initialized = False
self._metrics_enabled = self.config.enable_metrics
self._frame_metrics: list[FrameMetrics] = []
self._max_metrics_frames = 60
self._current_frame_number = 0
def add_stage(self, name: str, stage: Stage) -> "Pipeline":
"""Add a stage to the pipeline."""
self._stages[name] = stage
return self
def remove_stage(self, name: str) -> None:
"""Remove a stage from the pipeline."""
if name in self._stages:
del self._stages[name]
def get_stage(self, name: str) -> Stage | None:
"""Get a stage by name."""
return self._stages.get(name)
def build(self) -> "Pipeline":
"""Build execution order based on dependencies."""
self._execution_order = self._resolve_dependencies()
self._initialized = True
return self
def _resolve_dependencies(self) -> list[str]:
"""Resolve stage execution order using topological sort."""
ordered = []
visited = set()
temp_mark = set()
def visit(name: str) -> None:
if name in temp_mark:
raise StageError(name, "Circular dependency detected")
if name in visited:
return
temp_mark.add(name)
stage = self._stages.get(name)
if stage:
for dep in stage.dependencies:
dep_stage = self._stages.get(dep)
if dep_stage:
visit(dep)
temp_mark.remove(name)
visited.add(name)
ordered.append(name)
for name in self._stages:
if name not in visited:
visit(name)
return ordered
def initialize(self) -> bool:
"""Initialize all stages in execution order."""
for name in self._execution_order:
stage = self._stages.get(name)
if stage and not stage.init(self.context) and not stage.optional:
return False
return True
def execute(self, data: Any | None = None) -> StageResult:
"""Execute the pipeline with the given input data."""
if not self._initialized:
self.build()
if not self._initialized:
return StageResult(
success=False,
data=None,
error="Pipeline not initialized",
)
current_data = data
frame_start = time.perf_counter() if self._metrics_enabled else 0
stage_timings: list[StageMetrics] = []
for name in self._execution_order:
stage = self._stages.get(name)
if not stage or not stage.is_enabled():
continue
stage_start = time.perf_counter() if self._metrics_enabled else 0
try:
current_data = stage.process(current_data, self.context)
except Exception as e:
if not stage.optional:
return StageResult(
success=False,
data=current_data,
error=str(e),
stage_name=name,
)
continue
if self._metrics_enabled:
stage_duration = (time.perf_counter() - stage_start) * 1000
chars_in = len(str(data)) if data else 0
chars_out = len(str(current_data)) if current_data else 0
stage_timings.append(
StageMetrics(
name=name,
duration_ms=stage_duration,
chars_in=chars_in,
chars_out=chars_out,
)
)
if self._metrics_enabled:
total_duration = (time.perf_counter() - frame_start) * 1000
self._frame_metrics.append(
FrameMetrics(
frame_number=self._current_frame_number,
total_ms=total_duration,
stages=stage_timings,
)
)
if len(self._frame_metrics) > self._max_metrics_frames:
self._frame_metrics.pop(0)
self._current_frame_number += 1
return StageResult(success=True, data=current_data)
def cleanup(self) -> None:
"""Clean up all stages in reverse order."""
for name in reversed(self._execution_order):
stage = self._stages.get(name)
if stage:
try:
stage.cleanup()
except Exception:
pass
self._stages.clear()
self._initialized = False
@property
def stages(self) -> dict[str, Stage]:
"""Get all stages."""
return self._stages.copy()
@property
def execution_order(self) -> list[str]:
"""Get execution order."""
return self._execution_order.copy()
def get_stage_names(self) -> list[str]:
"""Get list of stage names."""
return list(self._stages.keys())
def get_metrics_summary(self) -> dict:
"""Get summary of collected metrics."""
if not self._frame_metrics:
return {"error": "No metrics collected"}
total_times = [f.total_ms for f in self._frame_metrics]
avg_total = sum(total_times) / len(total_times)
min_total = min(total_times)
max_total = max(total_times)
stage_stats: dict[str, dict] = {}
for frame in self._frame_metrics:
for stage in frame.stages:
if stage.name not in stage_stats:
stage_stats[stage.name] = {"times": [], "total_chars": 0}
stage_stats[stage.name]["times"].append(stage.duration_ms)
stage_stats[stage.name]["total_chars"] += stage.chars_out
for name, stats in stage_stats.items():
times = stats["times"]
stats["avg_ms"] = sum(times) / len(times)
stats["min_ms"] = min(times)
stats["max_ms"] = max(times)
del stats["times"]
return {
"frame_count": len(self._frame_metrics),
"pipeline": {
"avg_ms": avg_total,
"min_ms": min_total,
"max_ms": max_total,
},
"stages": stage_stats,
}
def reset_metrics(self) -> None:
"""Reset collected metrics."""
self._frame_metrics.clear()
self._current_frame_number = 0
class PipelineRunner:
"""High-level pipeline runner with animation support."""
def __init__(
self,
pipeline: Pipeline,
params: PipelineParams | None = None,
):
self.pipeline = pipeline
self.params = params or PipelineParams()
self._running = False
def start(self) -> bool:
"""Start the pipeline."""
self._running = True
return self.pipeline.initialize()
def step(self, input_data: Any | None = None) -> Any:
"""Execute one pipeline step."""
self.params.frame_number += 1
self.pipeline.context.params = self.params
result = self.pipeline.execute(input_data)
return result.data if result.success else None
def stop(self) -> None:
"""Stop and clean up the pipeline."""
self._running = False
self.pipeline.cleanup()
@property
def is_running(self) -> bool:
"""Check if runner is active."""
return self._running
def create_pipeline_from_params(params: PipelineParams) -> Pipeline:
"""Create a pipeline from PipelineParams."""
config = PipelineConfig(
source=params.source,
display=params.display,
camera=params.camera_mode,
effects=params.effect_order,
)
return Pipeline(config=config)
def create_default_pipeline() -> Pipeline:
"""Create a default pipeline with all standard components."""
from engine.pipeline.adapters import DataSourceStage
from engine.sources_v2 import HeadlinesDataSource
pipeline = Pipeline()
# Add source stage (wrapped as Stage)
source = HeadlinesDataSource()
pipeline.add_stage("source", DataSourceStage(source, name="headlines"))
# Add display stage
display = StageRegistry.create("display", "terminal")
if display:
pipeline.add_stage("display", display)
return pipeline.build()

221
engine/pipeline/core.py Normal file
View File

@@ -0,0 +1,221 @@
"""
Pipeline core - Unified Stage abstraction and PipelineContext.
This module provides the foundation for a clean, dependency-managed pipeline:
- Stage: Base class for all pipeline components (sources, effects, displays, cameras)
- PipelineContext: Dependency injection context for runtime data exchange
- Capability system: Explicit capability declarations with duck-typing support
"""
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from engine.pipeline.params import PipelineParams
@dataclass
class StageConfig:
"""Configuration for a single stage."""
name: str
category: str
enabled: bool = True
optional: bool = False
params: dict[str, Any] = field(default_factory=dict)
class Stage(ABC):
"""Abstract base class for all pipeline stages.
A Stage is a single component in the rendering pipeline. Stages can be:
- Sources: Data providers (headlines, poetry, pipeline viz)
- Effects: Post-processors (noise, fade, glitch, hud)
- Displays: Output backends (terminal, pygame, websocket)
- Cameras: Viewport controllers (vertical, horizontal, omni)
Stages declare:
- capabilities: What they provide to other stages
- dependencies: What they need from other stages
Duck-typing is supported: any class with the required methods can act as a Stage.
"""
name: str
category: str # "source", "effect", "display", "camera"
optional: bool = False # If True, pipeline continues even if stage fails
@property
def capabilities(self) -> set[str]:
"""Return set of capabilities this stage provides.
Examples:
- "source.headlines"
- "effect.noise"
- "display.output"
- "camera"
"""
return {f"{self.category}.{self.name}"}
@property
def dependencies(self) -> set[str]:
"""Return set of capability names this stage needs.
Examples:
- {"display.output"}
- {"source.headlines"}
- {"camera"}
"""
return set()
def init(self, ctx: "PipelineContext") -> bool:
"""Initialize stage with pipeline context.
Args:
ctx: PipelineContext for accessing services
Returns:
True if initialization succeeded, False otherwise
"""
return True
@abstractmethod
def process(self, data: Any, ctx: "PipelineContext") -> Any:
"""Process input data and return output.
Args:
data: Input data from previous stage (or initial data for first stage)
ctx: PipelineContext for accessing services and state
Returns:
Processed data for next stage
"""
...
def cleanup(self) -> None: # noqa: B027
"""Clean up resources when pipeline shuts down."""
pass
def get_config(self) -> StageConfig:
"""Return current configuration of this stage."""
return StageConfig(
name=self.name,
category=self.category,
optional=self.optional,
)
def set_enabled(self, enabled: bool) -> None:
"""Enable or disable this stage."""
self._enabled = enabled # type: ignore[attr-defined]
def is_enabled(self) -> bool:
"""Check if stage is enabled."""
return getattr(self, "_enabled", True)
@dataclass
class StageResult:
"""Result of stage processing, including success/failure info."""
success: bool
data: Any
error: str | None = None
stage_name: str = ""
class PipelineContext:
"""Dependency injection context passed through the pipeline.
Provides:
- services: Named services (display, config, event_bus, etc.)
- state: Runtime state shared between stages
- params: PipelineParams for animation-driven config
Services can be injected at construction time or lazily resolved.
"""
def __init__(
self,
services: dict[str, Any] | None = None,
initial_state: dict[str, Any] | None = None,
):
self.services: dict[str, Any] = services or {}
self.state: dict[str, Any] = initial_state or {}
self._params: PipelineParams | None = None
# Lazy resolvers for common services
self._lazy_resolvers: dict[str, Callable[[], Any]] = {
"config": self._resolve_config,
"event_bus": self._resolve_event_bus,
}
def _resolve_config(self) -> Any:
from engine.config import get_config
return get_config()
def _resolve_event_bus(self) -> Any:
from engine.eventbus import get_event_bus
return get_event_bus()
def get(self, key: str, default: Any = None) -> Any:
"""Get a service or state value by key.
First checks services, then state, then lazy resolution.
"""
if key in self.services:
return self.services[key]
if key in self.state:
return self.state[key]
if key in self._lazy_resolvers:
try:
return self._lazy_resolvers[key]()
except Exception:
return default
return default
def set(self, key: str, value: Any) -> None:
"""Set a service or state value."""
self.services[key] = value
def set_state(self, key: str, value: Any) -> None:
"""Set a runtime state value."""
self.state[key] = value
def get_state(self, key: str, default: Any = None) -> Any:
"""Get a runtime state value."""
return self.state.get(key, default)
@property
def params(self) -> "PipelineParams | None":
"""Get current pipeline params (for animation)."""
return self._params
@params.setter
def params(self, value: "PipelineParams") -> None:
"""Set pipeline params (from animation controller)."""
self._params = value
def has_capability(self, capability: str) -> bool:
"""Check if a capability is available."""
return capability in self.services or capability in self._lazy_resolvers
class StageError(Exception):
"""Raised when a stage fails to process."""
def __init__(self, stage_name: str, message: str, is_optional: bool = False):
self.stage_name = stage_name
self.message = message
self.is_optional = is_optional
super().__init__(f"Stage '{stage_name}' failed: {message}")
def create_stage_error(
stage_name: str, error: Exception, is_optional: bool = False
) -> StageError:
"""Helper to create a StageError from an exception."""
return StageError(stage_name, str(error), is_optional)

144
engine/pipeline/params.py Normal file
View File

@@ -0,0 +1,144 @@
"""
Pipeline parameters - Runtime configuration layer for animation control.
PipelineParams is the target for AnimationController - animation events
modify these params, which the pipeline then applies to its stages.
"""
from dataclasses import dataclass, field
from typing import Any
@dataclass
class PipelineParams:
"""Runtime configuration for the pipeline.
This is the canonical config object that AnimationController modifies.
Stages read from these params to adjust their behavior.
"""
# Source config
source: str = "headlines"
source_refresh_interval: float = 60.0
# Display config
display: str = "terminal"
# Camera config
camera_mode: str = "vertical"
camera_speed: float = 1.0
camera_x: int = 0 # For horizontal scrolling
# Effect config
effect_order: list[str] = field(
default_factory=lambda: ["noise", "fade", "glitch", "firehose", "hud"]
)
effect_enabled: dict[str, bool] = field(default_factory=dict)
effect_intensity: dict[str, float] = field(default_factory=dict)
# Animation-driven state (set by AnimationController)
pulse: float = 0.0
current_effect: str | None = None
path_progress: float = 0.0
# Viewport
viewport_width: int = 80
viewport_height: int = 24
# Firehose
firehose_enabled: bool = False
# Runtime state
frame_number: int = 0
fps: float = 60.0
def get_effect_config(self, name: str) -> tuple[bool, float]:
"""Get (enabled, intensity) for an effect."""
enabled = self.effect_enabled.get(name, True)
intensity = self.effect_intensity.get(name, 1.0)
return enabled, intensity
def set_effect_config(self, name: str, enabled: bool, intensity: float) -> None:
"""Set effect configuration."""
self.effect_enabled[name] = enabled
self.effect_intensity[name] = intensity
def is_effect_enabled(self, name: str) -> bool:
"""Check if an effect is enabled."""
if name not in self.effect_enabled:
return True # Default to enabled
return self.effect_enabled.get(name, True)
def get_effect_intensity(self, name: str) -> float:
"""Get effect intensity (0.0 to 1.0)."""
return self.effect_intensity.get(name, 1.0)
def to_dict(self) -> dict[str, Any]:
"""Convert to dictionary for serialization."""
return {
"source": self.source,
"display": self.display,
"camera_mode": self.camera_mode,
"camera_speed": self.camera_speed,
"effect_order": self.effect_order,
"effect_enabled": self.effect_enabled.copy(),
"effect_intensity": self.effect_intensity.copy(),
"pulse": self.pulse,
"current_effect": self.current_effect,
"viewport_width": self.viewport_width,
"viewport_height": self.viewport_height,
"firehose_enabled": self.firehose_enabled,
}
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "PipelineParams":
"""Create from dictionary."""
params = cls()
for key, value in data.items():
if hasattr(params, key):
setattr(params, key, value)
return params
def copy(self) -> "PipelineParams":
"""Create a copy of this params object."""
params = PipelineParams()
params.source = self.source
params.display = self.display
params.camera_mode = self.camera_mode
params.camera_speed = self.camera_speed
params.camera_x = self.camera_x
params.effect_order = self.effect_order.copy()
params.effect_enabled = self.effect_enabled.copy()
params.effect_intensity = self.effect_intensity.copy()
params.pulse = self.pulse
params.current_effect = self.current_effect
params.path_progress = self.path_progress
params.viewport_width = self.viewport_width
params.viewport_height = self.viewport_height
params.firehose_enabled = self.firehose_enabled
params.frame_number = self.frame_number
params.fps = self.fps
return params
# Default params for different modes
DEFAULT_HEADLINE_PARAMS = PipelineParams(
source="headlines",
display="terminal",
camera_mode="vertical",
effect_order=["noise", "fade", "glitch", "firehose", "hud"],
)
DEFAULT_PYGAME_PARAMS = PipelineParams(
source="headlines",
display="pygame",
camera_mode="vertical",
effect_order=["noise", "fade", "glitch", "firehose", "hud"],
)
DEFAULT_PIPELINE_PARAMS = PipelineParams(
source="pipeline",
display="pygame",
camera_mode="trace",
effect_order=["hud"], # Just HUD for pipeline viz
)

155
engine/pipeline/presets.py Normal file
View File

@@ -0,0 +1,155 @@
"""
Pipeline presets - Pre-configured pipeline configurations.
Provides PipelinePreset as a unified preset system that wraps
the existing Preset class from animation.py for backwards compatibility.
"""
from dataclasses import dataclass, field
from engine.animation import Preset as AnimationPreset
from engine.pipeline.params import PipelineParams
@dataclass
class PipelinePreset:
"""Pre-configured pipeline with stages and animation.
A PipelinePreset packages:
- Initial params: Starting configuration
- Stages: List of stage configurations to create
- Animation: Optional animation controller
This is the new unified preset that works with the Pipeline class.
"""
name: str
description: str = ""
source: str = "headlines"
display: str = "terminal"
camera: str = "vertical"
effects: list[str] = field(default_factory=list)
initial_params: PipelineParams | None = None
animation_preset: AnimationPreset | None = None
def to_params(self) -> PipelineParams:
"""Convert to PipelineParams."""
if self.initial_params:
return self.initial_params.copy()
params = PipelineParams()
params.source = self.source
params.display = self.display
params.camera_mode = self.camera
params.effect_order = self.effects.copy()
return params
@classmethod
def from_animation_preset(cls, preset: AnimationPreset) -> "PipelinePreset":
"""Create a PipelinePreset from an existing animation Preset."""
params = preset.initial_params
return cls(
name=preset.name,
description=preset.description,
source=params.source,
display=params.display,
camera=params.camera_mode,
effects=params.effect_order.copy(),
initial_params=params,
animation_preset=preset,
)
def create_animation_controller(self):
"""Create an AnimationController from this preset."""
if self.animation_preset:
return self.animation_preset.create_controller()
return None
# Built-in presets
DEMO_PRESET = PipelinePreset(
name="demo",
description="Demo mode with effect cycling and camera modes",
source="headlines",
display="terminal",
camera="vertical",
effects=["noise", "fade", "glitch", "firehose", "hud"],
)
POETRY_PRESET = PipelinePreset(
name="poetry",
description="Poetry feed with subtle effects",
source="poetry",
display="terminal",
camera="vertical",
effects=["fade", "hud"],
)
PIPELINE_VIZ_PRESET = PipelinePreset(
name="pipeline",
description="Pipeline visualization mode",
source="pipeline",
display="terminal",
camera="trace",
effects=["hud"],
)
WEBSOCKET_PRESET = PipelinePreset(
name="websocket",
description="WebSocket display mode",
source="headlines",
display="websocket",
camera="vertical",
effects=["noise", "fade", "glitch", "hud"],
)
SIXEL_PRESET = PipelinePreset(
name="sixel",
description="Sixel graphics display mode",
source="headlines",
display="sixel",
camera="vertical",
effects=["noise", "fade", "glitch", "hud"],
)
FIREHOSE_PRESET = PipelinePreset(
name="firehose",
description="High-speed firehose mode",
source="headlines",
display="terminal",
camera="vertical",
effects=["noise", "fade", "glitch", "firehose", "hud"],
)
PRESETS: dict[str, PipelinePreset] = {
"demo": DEMO_PRESET,
"poetry": POETRY_PRESET,
"pipeline": PIPELINE_VIZ_PRESET,
"websocket": WEBSOCKET_PRESET,
"sixel": SIXEL_PRESET,
"firehose": FIREHOSE_PRESET,
}
def get_preset(name: str) -> PipelinePreset | None:
"""Get a preset by name."""
return PRESETS.get(name)
def list_presets() -> list[str]:
"""List all available preset names."""
return list(PRESETS.keys())
def create_preset_from_params(
params: PipelineParams, name: str = "custom"
) -> PipelinePreset:
"""Create a preset from PipelineParams."""
return PipelinePreset(
name=name,
source=params.source,
display=params.display,
camera=params.camera_mode,
effects=params.effect_order.copy(),
initial_params=params,
)

165
engine/pipeline/registry.py Normal file
View File

@@ -0,0 +1,165 @@
"""
Stage registry - Unified registration for all pipeline stages.
Provides a single registry for sources, effects, displays, and cameras.
"""
from __future__ import annotations
from engine.pipeline.core import Stage
class StageRegistry:
"""Unified registry for all pipeline stage types."""
_categories: dict[str, dict[str, type[Stage]]] = {}
_discovered: bool = False
_instances: dict[str, Stage] = {}
@classmethod
def register(cls, category: str, stage_class: type[Stage]) -> None:
"""Register a stage class in a category.
Args:
category: Category name (source, effect, display, camera)
stage_class: Stage subclass to register
"""
if category not in cls._categories:
cls._categories[category] = {}
# Use class name as key
key = getattr(stage_class, "__name__", stage_class.__class__.__name__)
cls._categories[category][key] = stage_class
@classmethod
def get(cls, category: str, name: str) -> type[Stage] | None:
"""Get a stage class by category and name."""
return cls._categories.get(category, {}).get(name)
@classmethod
def list(cls, category: str) -> list[str]:
"""List all stage names in a category."""
return list(cls._categories.get(category, {}).keys())
@classmethod
def list_categories(cls) -> list[str]:
"""List all registered categories."""
return list(cls._categories.keys())
@classmethod
def create(cls, category: str, name: str, **kwargs) -> Stage | None:
"""Create a stage instance by category and name."""
stage_class = cls.get(category, name)
if stage_class:
return stage_class(**kwargs)
return None
@classmethod
def create_instance(cls, stage: Stage | type[Stage], **kwargs) -> Stage:
"""Create an instance from a stage class or return as-is."""
if isinstance(stage, Stage):
return stage
if isinstance(stage, type) and issubclass(stage, Stage):
return stage(**kwargs)
raise TypeError(f"Expected Stage class or instance, got {type(stage)}")
@classmethod
def register_instance(cls, name: str, stage: Stage) -> None:
"""Register a stage instance by name."""
cls._instances[name] = stage
@classmethod
def get_instance(cls, name: str) -> Stage | None:
"""Get a registered stage instance by name."""
return cls._instances.get(name)
def discover_stages() -> None:
"""Auto-discover and register all stage implementations."""
if StageRegistry._discovered:
return
# Import and register all stage implementations
try:
from engine.sources_v2 import (
HeadlinesDataSource,
PipelineDataSource,
PoetryDataSource,
)
StageRegistry.register("source", HeadlinesDataSource)
StageRegistry.register("source", PoetryDataSource)
StageRegistry.register("source", PipelineDataSource)
StageRegistry._categories["source"]["headlines"] = HeadlinesDataSource
StageRegistry._categories["source"]["poetry"] = PoetryDataSource
StageRegistry._categories["source"]["pipeline"] = PipelineDataSource
except ImportError:
pass
try:
from engine.effects.types import EffectPlugin # noqa: F401
except ImportError:
pass
# Register display stages
_register_display_stages()
StageRegistry._discovered = True
def _register_display_stages() -> None:
"""Register display backends as stages."""
try:
from engine.display import DisplayRegistry
except ImportError:
return
DisplayRegistry.initialize()
for backend_name in DisplayRegistry.list_backends():
factory = _DisplayStageFactory(backend_name)
StageRegistry._categories.setdefault("display", {})[backend_name] = factory
class _DisplayStageFactory:
"""Factory that creates DisplayStage instances for a specific backend."""
def __init__(self, backend_name: str):
self._backend_name = backend_name
def __call__(self):
from engine.display import DisplayRegistry
from engine.pipeline.adapters import DisplayStage
display = DisplayRegistry.create(self._backend_name)
if display is None:
raise RuntimeError(
f"Failed to create display backend: {self._backend_name}"
)
return DisplayStage(display, name=self._backend_name)
@property
def __name__(self) -> str:
return self._backend_name.capitalize() + "Stage"
# Convenience functions
def register_source(stage_class: type[Stage]) -> None:
"""Register a source stage."""
StageRegistry.register("source", stage_class)
def register_effect(stage_class: type[Stage]) -> None:
"""Register an effect stage."""
StageRegistry.register("effect", stage_class)
def register_display(stage_class: type[Stage]) -> None:
"""Register a display stage."""
StageRegistry.register("display", stage_class)
def register_camera(stage_class: type[Stage]) -> None:
"""Register a camera stage."""
StageRegistry.register("camera", stage_class)

View File

@@ -1,123 +1,364 @@
"""
Pipeline visualization - ASCII text graphics showing the render pipeline.
Pipeline visualization - Large animated network visualization with camera modes.
"""
import math
NODE_NETWORK = {
"sources": [
{"id": "RSS", "label": "RSS FEEDS", "x": 20, "y": 20},
{"id": "POETRY", "label": "POETRY DB", "x": 100, "y": 20},
{"id": "NTFY", "label": "NTFY MSG", "x": 180, "y": 20},
{"id": "MIC", "label": "MICROPHONE", "x": 260, "y": 20},
],
"fetch": [
{"id": "FETCH", "label": "FETCH LAYER", "x": 140, "y": 100},
{"id": "CACHE", "label": "CACHE", "x": 220, "y": 100},
],
"scroll": [
{"id": "STREAM", "label": "STREAM CTRL", "x": 60, "y": 180},
{"id": "CAMERA", "label": "CAMERA", "x": 140, "y": 180},
{"id": "RENDER", "label": "RENDER", "x": 220, "y": 180},
],
"effects": [
{"id": "NOISE", "label": "NOISE", "x": 20, "y": 260},
{"id": "FADE", "label": "FADE", "x": 80, "y": 260},
{"id": "GLITCH", "label": "GLITCH", "x": 140, "y": 260},
{"id": "FIRE", "label": "FIREHOSE", "x": 200, "y": 260},
{"id": "HUD", "label": "HUD", "x": 260, "y": 260},
],
"display": [
{"id": "TERM", "label": "TERMINAL", "x": 20, "y": 340},
{"id": "WEB", "label": "WEBSOCKET", "x": 80, "y": 340},
{"id": "PYGAME", "label": "PYGAME", "x": 140, "y": 340},
{"id": "SIXEL", "label": "SIXEL", "x": 200, "y": 340},
{"id": "KITTY", "label": "KITTY", "x": 260, "y": 340},
],
}
ALL_NODES = []
for group_nodes in NODE_NETWORK.values():
ALL_NODES.extend(group_nodes)
NETWORK_PATHS = [
["RSS", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "NOISE", "TERM"],
["POETRY", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "FADE", "WEB"],
["NTFY", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "GLITCH", "PYGAME"],
["MIC", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "FIRE", "SIXEL"],
["RSS", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "HUD", "KITTY"],
]
GRID_WIDTH = 300
GRID_HEIGHT = 400
def get_node_by_id(node_id: str):
for node in ALL_NODES:
if node["id"] == node_id:
return node
return None
def draw_network_to_grid(frame: int = 0) -> list[list[str]]:
grid = [[" " for _ in range(GRID_WIDTH)] for _ in range(GRID_HEIGHT)]
active_path_idx = (frame // 60) % len(NETWORK_PATHS)
active_path = NETWORK_PATHS[active_path_idx]
for node in ALL_NODES:
x, y = node["x"], node["y"]
label = node["label"]
is_active = node["id"] in active_path
is_highlight = node["id"] == active_path[(frame // 15) % len(active_path)]
node_w, node_h = 20, 7
for dy in range(node_h):
for dx in range(node_w):
gx, gy = x + dx, y + dy
if 0 <= gx < GRID_WIDTH and 0 <= gy < GRID_HEIGHT:
if dy == 0:
char = "" if dx == 0 else ("" if dx == node_w - 1 else "")
elif dy == node_h - 1:
char = "" if dx == 0 else ("" if dx == node_w - 1 else "")
elif dy == node_h // 2:
if dx == 0 or dx == node_w - 1:
char = ""
else:
pad = (node_w - 2 - len(label)) // 2
if dx - 1 == pad and len(label) <= node_w - 2:
char = (
label[dx - 1 - pad]
if dx - 1 - pad < len(label)
else " "
)
else:
char = " "
else:
char = "" if dx == 0 or dx == node_w - 1 else " "
if char.strip():
if is_highlight:
grid[gy][gx] = "\033[1;38;5;46m" + char + "\033[0m"
elif is_active:
grid[gy][gx] = "\033[1;38;5;220m" + char + "\033[0m"
else:
grid[gy][gx] = "\033[38;5;240m" + char + "\033[0m"
for i, node_id in enumerate(active_path[:-1]):
curr = get_node_by_id(node_id)
next_id = active_path[i + 1]
next_node = get_node_by_id(next_id)
if curr and next_node:
x1, y1 = curr["x"] + 7, curr["y"] + 2
x2, y2 = next_node["x"] + 7, next_node["y"] + 2
step = 1 if x2 >= x1 else -1
for x in range(x1, x2 + step, step):
if 0 <= x < GRID_WIDTH and 0 <= y1 < GRID_HEIGHT:
grid[y1][x] = "\033[38;5;45m─\033[0m"
step = 1 if y2 >= y1 else -1
for y in range(y1, y2 + step, step):
if 0 <= x2 < GRID_WIDTH and 0 <= y < GRID_HEIGHT:
grid[y][x2] = "\033[38;5;45m│\033[0m"
return grid
class TraceCamera:
def __init__(self):
self.x = 0
self.y = 0
self.target_x = 0
self.target_y = 0
self.current_node_idx = 0
self.path = []
self.frame = 0
def update(self, dt: float, frame: int = 0) -> None:
self.frame = frame
active_path = NETWORK_PATHS[(frame // 60) % len(NETWORK_PATHS)]
if self.path != active_path:
self.path = active_path
self.current_node_idx = 0
if self.current_node_idx < len(self.path):
node_id = self.path[self.current_node_idx]
node = get_node_by_id(node_id)
if node:
self.target_x = max(0, node["x"] - 40)
self.target_y = max(0, node["y"] - 10)
self.current_node_idx += 1
self.x += int((self.target_x - self.x) * 0.1)
self.y += int((self.target_y - self.y) * 0.1)
class CameraLarge:
def __init__(self, viewport_w: int, viewport_h: int, frame: int):
self.viewport_w = viewport_w
self.viewport_h = viewport_h
self.frame = frame
self.x = 0
self.y = 0
self.mode = "trace"
self.trace_camera = TraceCamera()
def set_vertical_mode(self):
self.mode = "vertical"
def set_horizontal_mode(self):
self.mode = "horizontal"
def set_omni_mode(self):
self.mode = "omni"
def set_floating_mode(self):
self.mode = "floating"
def set_trace_mode(self):
self.mode = "trace"
def update(self, dt: float):
self.frame += 1
if self.mode == "vertical":
self.y = int((self.frame * 0.5) % (GRID_HEIGHT - self.viewport_h))
elif self.mode == "horizontal":
self.x = int((self.frame * 0.5) % (GRID_WIDTH - self.viewport_w))
elif self.mode == "omni":
self.x = int((self.frame * 0.3) % (GRID_WIDTH - self.viewport_w))
self.y = int((self.frame * 0.5) % (GRID_HEIGHT - self.viewport_h))
elif self.mode == "floating":
self.x = int(50 + math.sin(self.frame * 0.02) * 30)
self.y = int(50 + math.cos(self.frame * 0.015) * 30)
elif self.mode == "trace":
self.trace_camera.update(dt, self.frame)
self.x = self.trace_camera.x
self.y = self.trace_camera.y
def generate_mermaid_graph(frame: int = 0) -> str:
effects = ["NOISE", "FADE", "GLITCH", "FIREHOSE"]
active_effect = effects[(frame // 30) % 4]
cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"]
active_cam = cam_modes[(frame // 100) % 5]
return f"""graph LR
subgraph SOURCES
RSS[RSS Feeds]
Poetry[Poetry DB]
Ntfy[Ntfy Msg]
Mic[Microphone]
end
subgraph FETCH
Fetch(fetch_all)
Cache[(Cache)]
end
subgraph SCROLL
Scroll(StreamController)
Camera({active_cam})
end
subgraph EFFECTS
Noise[NOISE]
Fade[FADE]
Glitch[GLITCH]
Fire[FIREHOSE]
Hud[HUD]
end
subgraph DISPLAY
Term[Terminal]
Web[WebSocket]
Pygame[PyGame]
Sixel[Sixel]
end
RSS --> Fetch
Poetry --> Fetch
Ntfy --> Fetch
Fetch --> Cache
Cache --> Scroll
Scroll --> Noise
Scroll --> Fade
Scroll --> Glitch
Scroll --> Fire
Scroll --> Hud
Noise --> Term
Fade --> Web
Glitch --> Pygame
Fire --> Sixel
style {active_effect} fill:#90EE90
style Camera fill:#87CEEB
"""
def generate_pipeline_visualization(width: int = 80, height: int = 24) -> list[str]:
"""Generate ASCII visualization of the pipeline.
def generate_network_pipeline(
width: int = 80, height: int = 24, frame: int = 0
) -> list[str]:
try:
from engine.beautiful_mermaid import render_mermaid_ascii
Args:
width: Width of the visualization in characters
height: Height in lines
mermaid_graph = generate_mermaid_graph(frame)
ascii_output = render_mermaid_ascii(mermaid_graph, padding_x=2, padding_y=1)
Returns:
List of formatted strings representing the pipeline
"""
lines = []
lines = ascii_output.split("\n")
for y in range(height):
result = []
for y in range(height):
if y < len(lines):
line = lines[y]
if len(line) < width:
line = line + " " * (width - len(line))
elif len(line) > width:
line = line[:width]
result.append(line)
else:
result.append(" " * width)
status_y = height - 2
if status_y < height:
fps = 60 - (frame % 15)
cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"]
cam = cam_modes[(frame // 100) % 5]
effects = ["NOISE", "FADE", "GLITCH", "FIREHOSE"]
eff = effects[(frame // 30) % 4]
anim = "▓▒░ "[frame % 4]
status = f" FPS:{fps:3.0f}{anim} {eff} │ Cam:{cam}"
status = status[: width - 4].ljust(width - 4)
result[status_y] = "" + status + ""
if height > 0:
result[0] = "" * width
result[height - 1] = "" * width
return result
except Exception as e:
return [
f"Error: {e}" + " " * (width - len(f"Error: {e}")) for _ in range(height)
]
def generate_large_network_viewport(
viewport_w: int = 80, viewport_h: int = 24, frame: int = 0
) -> list[str]:
cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"]
camera_mode = cam_modes[(frame // 100) % 5]
camera = CameraLarge(viewport_w, viewport_h, frame)
if camera_mode == "TRACE":
camera.set_trace_mode()
elif camera_mode == "VERTICAL":
camera.set_vertical_mode()
elif camera_mode == "HORIZONTAL":
camera.set_horizontal_mode()
elif camera_mode == "OMNI":
camera.set_omni_mode()
elif camera_mode == "FLOATING":
camera.set_floating_mode()
camera.update(1 / 60)
grid = draw_network_to_grid(frame)
result = []
for vy in range(viewport_h):
line = ""
for vx in range(viewport_w):
gx = camera.x + vx
gy = camera.y + vy
if 0 <= gx < GRID_WIDTH and 0 <= gy < GRID_HEIGHT:
line += grid[gy][gx]
else:
line += " "
result.append(line)
if y == 1:
line = "" + "" * (width - 2) + ""
elif y == 2:
line = "" + " RENDER PIPELINE ".center(width - 2) + ""
elif y == 3:
line = "" + "" * (width - 2) + ""
fps = 60 - (frame % 15)
elif y == 5:
line = "║ SOURCES ══════════════> FETCH ═════════> SCROLL ═══> EFFECTS ═> DISPLAY"
elif y == 6:
line = "║ │ │ │ │"
elif y == 7:
line = "║ RSS Poetry Camera Terminal"
elif y == 8:
line = "║ Ntfy Cache Noise WebSocket"
elif y == 9:
line = "║ Mic Fade Pygame"
elif y == 10:
line = "║ Glitch Sixel"
elif y == 11:
line = "║ Firehose Kitty"
elif y == 12:
line = "║ Hud"
active_path = NETWORK_PATHS[(frame // 60) % len(NETWORK_PATHS)]
active_node = active_path[(frame // 15) % len(active_path)]
elif y == 14:
line = "" + "" * (width - 2) + ""
elif y == 15:
line = "║ CAMERA MODES "
remaining = width - len(line) - 1
line += (
"" * (remaining // 2 - 7)
+ " VERTICAL "
+ "" * (remaining // 2 - 6)
+ ""
)
elif y == 16:
line = (
""
+ "".center(8)
+ " "
+ "".center(8)
+ " "
+ "".center(8)
+ " "
+ "".center(8)
+ " " * 20
+ ""
)
elif y == 17:
line = (
"║ scroll up scroll left diagonal bobbing "
+ " " * 16
+ ""
)
anim = "▓▒░ "[frame % 4]
status = f" FPS:{fps:3.0f}{anim} {camera_mode:9s} │ Node:{active_node}"
status = status[: viewport_w - 4].ljust(viewport_w - 4)
if viewport_h > 2:
result[viewport_h - 2] = "" + status + ""
elif y == 19:
line = "" + "" * (width - 2) + ""
elif y == 20:
fps = "60"
line = (
f"║ FPS: {fps} │ Frame: 16.7ms │ Effects: 5 active │ Camera: VERTICAL "
+ " " * (width - len(line) - 2)
+ ""
)
if viewport_h > 0:
result[0] = "" * viewport_w
result[viewport_h - 1] = "" * viewport_w
elif y == 21:
line = "" + "" * (width - 2) + ""
else:
line = " " * width
lines.append(line)
return lines
def generate_animated_pipeline(width: int = 80, frame: int = 0) -> list[str]:
"""Generate animated ASCII visualization.
Args:
width: Width of the visualization
frame: Animation frame number
Returns:
List of formatted strings
"""
lines = generate_pipeline_visualization(width, 20)
anim_chars = ["", "", "", " ", "", "", ""]
char = anim_chars[frame % len(anim_chars)]
for i, line in enumerate(lines):
if "Effects" in line:
lines[i] = line.replace("" * 5, char * 5)
if "FPS:" in line:
lines[i] = (
f"║ FPS: {60 - frame % 10} │ Frame: {16 + frame % 5:.1f}ms │ Effects: {5 - (frame % 3)} active │ Camera: {['VERTICAL', 'HORIZONTAL', 'OMNI', 'FLOATING'][frame % 4]} "
+ " " * (80 - len(lines[i]) - 2)
+ ""
)
return lines
return result

362
engine/sources_v2.py Normal file
View File

@@ -0,0 +1,362 @@
"""
Data source abstraction - Treat data sources as first-class citizens in the pipeline.
Each data source implements a common interface:
- name: Display name for the source
- fetch(): Fetch fresh data
- stream(): Stream data continuously (optional)
- get_items(): Get current items
"""
from abc import ABC, abstractmethod
from collections.abc import Callable
from dataclasses import dataclass
from typing import Any
@dataclass
class SourceItem:
"""A single item from a data source."""
content: str
source: str
timestamp: str
metadata: dict[str, Any] | None = None
class DataSource(ABC):
"""Abstract base class for data sources.
Static sources: Data fetched once and cached. Safe to call fetch() multiple times.
Dynamic sources: Data changes over time. fetch() should be idempotent.
"""
@property
@abstractmethod
def name(self) -> str:
"""Display name for this source."""
...
@property
def is_dynamic(self) -> bool:
"""Whether this source updates dynamically while the app runs. Default False."""
return False
@abstractmethod
def fetch(self) -> list[SourceItem]:
"""Fetch fresh data from the source. Must be idempotent."""
...
def get_items(self) -> list[SourceItem]:
"""Get current items. Default implementation returns cached fetch results."""
if not hasattr(self, "_items") or self._items is None:
self._items = self.fetch()
return self._items
def refresh(self) -> list[SourceItem]:
"""Force refresh - clear cache and fetch fresh data."""
self._items = self.fetch()
return self._items
def stream(self):
"""Optional: Yield items continuously. Override for streaming sources."""
raise NotImplementedError
def __post_init__(self):
self._items: list[SourceItem] | None = None
class HeadlinesDataSource(DataSource):
"""Data source for RSS feed headlines."""
@property
def name(self) -> str:
return "headlines"
def fetch(self) -> list[SourceItem]:
from engine.fetch import fetch_all
items, _, _ = fetch_all()
return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items]
class PoetryDataSource(DataSource):
"""Data source for Poetry DB."""
@property
def name(self) -> str:
return "poetry"
def fetch(self) -> list[SourceItem]:
from engine.fetch import fetch_poetry
items, _, _ = fetch_poetry()
return [SourceItem(content=t, source=s, timestamp=ts) for t, s, ts in items]
class PipelineDataSource(DataSource):
"""Data source for pipeline visualization (demo mode). Dynamic - updates every frame."""
def __init__(self, viewport_width: int = 80, viewport_height: int = 24):
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.frame = 0
@property
def name(self) -> str:
return "pipeline"
@property
def is_dynamic(self) -> bool:
return True
def fetch(self) -> list[SourceItem]:
from engine.pipeline_viz import generate_large_network_viewport
buffer = generate_large_network_viewport(
self.viewport_width, self.viewport_height, self.frame
)
self.frame += 1
content = "\n".join(buffer)
return [
SourceItem(content=content, source="pipeline", timestamp=f"f{self.frame}")
]
def get_items(self) -> list[SourceItem]:
return self.fetch()
class MetricsDataSource(DataSource):
"""Data source that renders live pipeline metrics as ASCII art.
Wraps a Pipeline and displays active stages with their average execution
time and approximate FPS impact. Updates lazily when camera is about to
focus on a new node (frame % 15 == 12).
"""
def __init__(
self,
pipeline: Any,
viewport_width: int = 80,
viewport_height: int = 24,
):
self.pipeline = pipeline
self.viewport_width = viewport_width
self.viewport_height = viewport_height
self.frame = 0
self._cached_metrics: dict | None = None
@property
def name(self) -> str:
return "metrics"
@property
def is_dynamic(self) -> bool:
return True
def fetch(self) -> list[SourceItem]:
if self.frame % 15 == 12:
self._cached_metrics = None
if self._cached_metrics is None:
self._cached_metrics = self._fetch_metrics()
buffer = self._render_metrics(self._cached_metrics)
self.frame += 1
content = "\n".join(buffer)
return [
SourceItem(content=content, source="metrics", timestamp=f"f{self.frame}")
]
def _fetch_metrics(self) -> dict:
if hasattr(self.pipeline, "get_metrics_summary"):
metrics = self.pipeline.get_metrics_summary()
if "error" not in metrics:
return metrics
return {"stages": {}, "pipeline": {"avg_ms": 0}}
def _render_metrics(self, metrics: dict) -> list[str]:
stages = metrics.get("stages", {})
if not stages:
return self._render_empty()
active_stages = {
name: stats for name, stats in stages.items() if stats.get("avg_ms", 0) > 0
}
if not active_stages:
return self._render_empty()
total_avg = sum(s["avg_ms"] for s in active_stages.values())
if total_avg == 0:
total_avg = 1
lines: list[str] = []
lines.append("" * self.viewport_width)
lines.append(" PIPELINE METRICS ".center(self.viewport_width, ""))
lines.append("" * self.viewport_width)
header = f"{'STAGE':<20} {'AVG_MS':>8} {'FPS %':>8}"
lines.append(header)
lines.append("" * self.viewport_width)
for name, stats in sorted(active_stages.items()):
avg_ms = stats.get("avg_ms", 0)
fps_impact = (avg_ms / 16.67) * 100 if avg_ms > 0 else 0
row = f"{name:<20} {avg_ms:>7.2f} {fps_impact:>7.1f}%"
lines.append(row[: self.viewport_width])
lines.append("" * self.viewport_width)
total_row = (
f"{'TOTAL':<20} {total_avg:>7.2f} {(total_avg / 16.67) * 100:>7.1f}%"
)
lines.append(total_row[: self.viewport_width])
lines.append("" * self.viewport_width)
lines.append(
f" Frame:{self.frame:04d} Cache:{'HIT' if self._cached_metrics else 'MISS'}"
)
while len(lines) < self.viewport_height:
lines.append(" " * self.viewport_width)
return lines[: self.viewport_height]
def _render_empty(self) -> list[str]:
lines = [" " * self.viewport_width for _ in range(self.viewport_height)]
msg = "No metrics available"
y = self.viewport_height // 2
x = (self.viewport_width - len(msg)) // 2
lines[y] = " " * x + msg + " " * (self.viewport_width - x - len(msg))
return lines
def get_items(self) -> list[SourceItem]:
return self.fetch()
class CachedDataSource(DataSource):
"""Data source that wraps another source with caching."""
def __init__(self, source: DataSource, max_items: int = 100):
self.source = source
self.max_items = max_items
@property
def name(self) -> str:
return f"cached:{self.source.name}"
def fetch(self) -> list[SourceItem]:
items = self.source.fetch()
return items[: self.max_items]
def get_items(self) -> list[SourceItem]:
if not hasattr(self, "_items") or self._items is None:
self._items = self.fetch()
return self._items
class TransformDataSource(DataSource):
"""Data source that transforms items from another source.
Applies optional filter and map functions to each item.
This enables chaining: source → transform → transformed output.
Args:
source: The source to fetch items from
filter_fn: Optional function(item: SourceItem) -> bool
map_fn: Optional function(item: SourceItem) -> SourceItem
"""
def __init__(
self,
source: DataSource,
filter_fn: Callable[[SourceItem], bool] | None = None,
map_fn: Callable[[SourceItem], SourceItem] | None = None,
):
self.source = source
self.filter_fn = filter_fn
self.map_fn = map_fn
@property
def name(self) -> str:
return f"transform:{self.source.name}"
def fetch(self) -> list[SourceItem]:
items = self.source.fetch()
if self.filter_fn:
items = [item for item in items if self.filter_fn(item)]
if self.map_fn:
items = [self.map_fn(item) for item in items]
return items
class CompositeDataSource(DataSource):
"""Data source that combines multiple sources."""
def __init__(self, sources: list[DataSource]):
self.sources = sources
@property
def name(self) -> str:
return "composite"
def fetch(self) -> list[SourceItem]:
items = []
for source in self.sources:
items.extend(source.fetch())
return items
class SourceRegistry:
"""Registry for data sources."""
def __init__(self):
self._sources: dict[str, DataSource] = {}
self._default: str | None = None
def register(self, source: DataSource, default: bool = False) -> None:
self._sources[source.name] = source
if default or self._default is None:
self._default = source.name
def get(self, name: str) -> DataSource | None:
return self._sources.get(name)
def list_all(self) -> dict[str, DataSource]:
return dict(self._sources)
def default(self) -> DataSource | None:
if self._default:
return self._sources.get(self._default)
return None
def create_headlines(self) -> HeadlinesDataSource:
return HeadlinesDataSource()
def create_poetry(self) -> PoetryDataSource:
return PoetryDataSource()
def create_pipeline(self, width: int = 80, height: int = 24) -> PipelineDataSource:
return PipelineDataSource(width, height)
_global_registry: SourceRegistry | None = None
def get_source_registry() -> SourceRegistry:
global _global_registry
if _global_registry is None:
_global_registry = SourceRegistry()
return _global_registry
def init_default_sources() -> SourceRegistry:
"""Initialize the default source registry with standard sources."""
registry = get_source_registry()
registry.register(HeadlinesDataSource(), default=True)
registry.register(PoetryDataSource())
return registry

View File

@@ -38,9 +38,23 @@ run-kitty = { run = "uv run mainline.py --display kitty", depends = ["sync-all"]
run-pygame = { run = "uv run mainline.py --display pygame", depends = ["sync-all"] }
run-both = { run = "uv run mainline.py --display both", depends = ["sync-all"] }
run-client = { run = "mise run run-both & sleep 2 && $(open http://localhost:8766 2>/dev/null || xdg-open http://localhost:8766 2>/dev/null || echo 'Open http://localhost:8766 manually'); wait", depends = ["sync-all"] }
run-demo = { run = "uv run mainline.py --demo --display pygame", depends = ["sync-all"] }
run-pipeline = "uv run mainline.py --pipeline-diagram"
run-pipeline-demo = { run = "uv run mainline.py --pipeline-demo --display pygame", depends = ["sync-all"] }
# =====================
# Pipeline Architecture (unified Stage-based)
# =====================
run-pipeline = { run = "uv run mainline.py --pipeline --display pygame", depends = ["sync-all"] }
run-pipeline-demo = { run = "uv run mainline.py --pipeline --pipeline-preset demo --display pygame", depends = ["sync-all"] }
run-pipeline-poetry = { run = "uv run mainline.py --pipeline --pipeline-preset poetry --display pygame", depends = ["sync-all"] }
run-pipeline-websocket = { run = "uv run mainline.py --pipeline --pipeline-preset websocket", depends = ["sync-all"] }
run-pipeline-firehose = { run = "uv run mainline.py --pipeline --pipeline-preset firehose --display pygame", depends = ["sync-all"] }
# =====================
# Presets (Animation-controlled modes)
# =====================
run-preset-demo = { run = "uv run mainline.py --preset demo --display pygame", depends = ["sync-all"] }
run-preset-pipeline = { run = "uv run mainline.py --preset pipeline --display pygame", depends = ["sync-all"] }
# =====================
# Command & Control

View File

@@ -76,6 +76,8 @@ addopts = [
markers = [
"benchmark: marks tests as performance benchmarks (may be slow)",
"e2e: marks tests as end-to-end tests (require network/display)",
"integration: marks tests as integration tests (require external services)",
"ntfy: marks tests that require ntfy service",
]
filterwarnings = [
"ignore::DeprecationWarning",

36
tests/conftest.py Normal file
View File

@@ -0,0 +1,36 @@
"""
Pytest configuration for mainline.
"""
import pytest
def pytest_configure(config):
"""Configure pytest to skip integration tests by default."""
config.addinivalue_line(
"markers",
"integration: marks tests as integration tests (require external services)",
)
config.addinivalue_line("markers", "ntfy: marks tests that require ntfy service")
def pytest_collection_modifyitems(config, items):
"""Skip integration/e2e tests unless explicitly requested with -m."""
# Get the current marker expression
marker_expr = config.getoption("-m", default="")
# If explicitly running integration or e2e, don't skip them
if marker_expr in ("integration", "e2e", "integration or e2e"):
return
# Skip integration tests
skip_integration = pytest.mark.skip(reason="need -m integration to run")
for item in items:
if "integration" in item.keywords:
item.add_marker(skip_integration)
# Skip e2e tests by default (they require browser/display)
skip_e2e = pytest.mark.skip(reason="need -m e2e to run")
for item in items:
if "e2e" in item.keywords and "integration" not in item.keywords:
item.add_marker(skip_e2e)

View File

@@ -6,7 +6,11 @@ import json
import time
import urllib.request
import pytest
@pytest.mark.integration
@pytest.mark.ntfy
class TestNtfyTopics:
def test_cc_cmd_topic_exists_and_writable(self):
"""Verify C&C CMD topic exists and accepts messages."""

281
tests/test_pipeline.py Normal file
View File

@@ -0,0 +1,281 @@
"""
Tests for the new unified pipeline architecture.
"""
from unittest.mock import MagicMock
from engine.pipeline import (
Pipeline,
PipelineConfig,
PipelineContext,
Stage,
StageRegistry,
create_default_pipeline,
discover_stages,
)
class TestStageRegistry:
"""Tests for StageRegistry."""
def setup_method(self):
"""Reset registry before each test."""
StageRegistry._discovered = False
StageRegistry._categories.clear()
StageRegistry._instances.clear()
def test_discover_stages_registers_sources(self):
"""discover_stages registers source stages."""
discover_stages()
sources = StageRegistry.list("source")
assert "HeadlinesDataSource" in sources
assert "PoetryDataSource" in sources
assert "PipelineDataSource" in sources
def test_discover_stages_registers_displays(self):
"""discover_stages registers display stages."""
discover_stages()
displays = StageRegistry.list("display")
assert "terminal" in displays
assert "pygame" in displays
assert "websocket" in displays
assert "null" in displays
assert "sixel" in displays
def test_create_source_stage(self):
"""StageRegistry.create creates source stages."""
discover_stages()
source = StageRegistry.create("source", "HeadlinesDataSource")
assert source is not None
assert source.name == "headlines"
def test_create_display_stage(self):
"""StageRegistry.create creates display stages."""
discover_stages()
display = StageRegistry.create("display", "terminal")
assert display is not None
assert hasattr(display, "_display")
def test_create_display_stage_pygame(self):
"""StageRegistry.create creates pygame display stage."""
discover_stages()
display = StageRegistry.create("display", "pygame")
assert display is not None
class TestPipeline:
"""Tests for Pipeline class."""
def setup_method(self):
"""Reset registry before each test."""
StageRegistry._discovered = False
StageRegistry._categories.clear()
StageRegistry._instances.clear()
discover_stages()
def test_create_pipeline(self):
"""Pipeline can be created with config."""
config = PipelineConfig(source="headlines", display="terminal")
pipeline = Pipeline(config=config)
assert pipeline.config is not None
assert pipeline.config.source == "headlines"
assert pipeline.config.display == "terminal"
def test_add_stage(self):
"""Pipeline.add_stage adds a stage."""
pipeline = Pipeline()
mock_stage = MagicMock(spec=Stage)
mock_stage.name = "test_stage"
mock_stage.category = "test"
pipeline.add_stage("test", mock_stage)
assert "test" in pipeline.stages
def test_build_resolves_dependencies(self):
"""Pipeline.build resolves execution order."""
pipeline = Pipeline()
mock_source = MagicMock(spec=Stage)
mock_source.name = "source"
mock_source.category = "source"
mock_source.dependencies = set()
mock_display = MagicMock(spec=Stage)
mock_display.name = "display"
mock_display.category = "display"
mock_display.dependencies = {"source"}
pipeline.add_stage("source", mock_source)
pipeline.add_stage("display", mock_display)
pipeline.build()
assert pipeline._initialized is True
assert "source" in pipeline.execution_order
assert "display" in pipeline.execution_order
def test_execute_runs_stages(self):
"""Pipeline.execute runs all stages in order."""
pipeline = Pipeline()
call_order = []
mock_source = MagicMock(spec=Stage)
mock_source.name = "source"
mock_source.category = "source"
mock_source.dependencies = set()
mock_source.process = lambda data, ctx: call_order.append("source") or "data"
mock_effect = MagicMock(spec=Stage)
mock_effect.name = "effect"
mock_effect.category = "effect"
mock_effect.dependencies = {"source"}
mock_effect.process = lambda data, ctx: call_order.append("effect") or data
mock_display = MagicMock(spec=Stage)
mock_display.name = "display"
mock_display.category = "display"
mock_display.dependencies = {"effect"}
mock_display.process = lambda data, ctx: call_order.append("display") or data
pipeline.add_stage("source", mock_source)
pipeline.add_stage("effect", mock_effect)
pipeline.add_stage("display", mock_display)
pipeline.build()
result = pipeline.execute(None)
assert result.success is True
assert call_order == ["source", "effect", "display"]
def test_execute_handles_stage_failure(self):
"""Pipeline.execute handles stage failures."""
pipeline = Pipeline()
mock_source = MagicMock(spec=Stage)
mock_source.name = "source"
mock_source.category = "source"
mock_source.dependencies = set()
mock_source.process = lambda data, ctx: "data"
mock_failing = MagicMock(spec=Stage)
mock_failing.name = "failing"
mock_failing.category = "effect"
mock_failing.dependencies = {"source"}
mock_failing.optional = False
mock_failing.process = lambda data, ctx: (_ for _ in ()).throw(
Exception("fail")
)
pipeline.add_stage("source", mock_source)
pipeline.add_stage("failing", mock_failing)
pipeline.build()
result = pipeline.execute(None)
assert result.success is False
assert result.error is not None
def test_optional_stage_failure_continues(self):
"""Pipeline.execute continues on optional stage failure."""
pipeline = Pipeline()
mock_source = MagicMock(spec=Stage)
mock_source.name = "source"
mock_source.category = "source"
mock_source.dependencies = set()
mock_source.process = lambda data, ctx: "data"
mock_optional = MagicMock(spec=Stage)
mock_optional.name = "optional"
mock_optional.category = "effect"
mock_optional.dependencies = {"source"}
mock_optional.optional = True
mock_optional.process = lambda data, ctx: (_ for _ in ()).throw(
Exception("fail")
)
pipeline.add_stage("source", mock_source)
pipeline.add_stage("optional", mock_optional)
pipeline.build()
result = pipeline.execute(None)
assert result.success is True
class TestPipelineContext:
"""Tests for PipelineContext."""
def test_init_empty(self):
"""PipelineContext initializes with empty services and state."""
ctx = PipelineContext()
assert ctx.services == {}
assert ctx.state == {}
def test_init_with_services(self):
"""PipelineContext accepts initial services."""
ctx = PipelineContext(services={"display": MagicMock()})
assert "display" in ctx.services
def test_init_with_state(self):
"""PipelineContext accepts initial state."""
ctx = PipelineContext(initial_state={"count": 42})
assert ctx.get_state("count") == 42
def test_get_set_services(self):
"""PipelineContext can get/set services."""
ctx = PipelineContext()
mock_service = MagicMock()
ctx.set("test_service", mock_service)
assert ctx.get("test_service") == mock_service
def test_get_set_state(self):
"""PipelineContext can get/set state."""
ctx = PipelineContext()
ctx.set_state("counter", 100)
assert ctx.get_state("counter") == 100
def test_lazy_resolver(self):
"""PipelineContext resolves lazy services."""
ctx = PipelineContext()
config = ctx.get("config")
assert config is not None
def test_has_capability(self):
"""PipelineContext.has_capability checks for services."""
ctx = PipelineContext(services={"display.output": MagicMock()})
assert ctx.has_capability("display.output") is True
assert ctx.has_capability("missing") is False
class TestCreateDefaultPipeline:
"""Tests for create_default_pipeline function."""
def setup_method(self):
"""Reset registry before each test."""
StageRegistry._discovered = False
StageRegistry._categories.clear()
StageRegistry._instances.clear()
discover_stages()
def test_create_default_pipeline(self):
"""create_default_pipeline creates a working pipeline."""
pipeline = create_default_pipeline()
assert pipeline is not None
assert "display" in pipeline.stages