diff --git a/.opencode/skills/mainline-architecture/SKILL.md b/.opencode/skills/mainline-architecture/SKILL.md
index ad5c3a4..25117c8 100644
--- a/.opencode/skills/mainline-architecture/SKILL.md
+++ b/.opencode/skills/mainline-architecture/SKILL.md
@@ -29,17 +29,28 @@ class Stage(ABC):
return set()
@property
- def dependencies(self) -> list[str]:
- """What this stage needs (e.g., ['source'])"""
- return []
+ def dependencies(self) -> set[str]:
+ """What this stage needs (e.g., {'source'})"""
+ return set()
```
### Capability-Based Dependencies
The Pipeline resolves dependencies using **prefix matching**:
- `"source"` matches `"source.headlines"`, `"source.poetry"`, etc.
+- `"camera.state"` matches the camera state capability
- This allows flexible composition without hardcoding specific stage names
+### Minimum Capabilities
+
+The pipeline requires these minimum capabilities to function:
+- `"source"` - Data source capability
+- `"render.output"` - Rendered content capability
+- `"display.output"` - Display output capability
+- `"camera.state"` - Camera state for viewport filtering
+
+These are automatically injected if missing (auto-injection).
+
### DataType Enum
PureData-style data types for inlet/outlet validation:
@@ -76,3 +87,11 @@ Canvas tracks dirty regions automatically when content is written via `put_regio
- Use adapters (engine/pipeline/adapters.py) to wrap existing components as stages
- Set `optional=True` for stages that can fail gracefully
- Use `stage_type` and `render_order` for execution ordering
+- Clock stages update state independently of data flow
+
+## Sources
+
+- engine/pipeline/core.py - Stage base class
+- engine/pipeline/controller.py - Pipeline implementation
+- engine/pipeline/adapters/ - Stage adapters
+- docs/PIPELINE.md - Pipeline documentation
diff --git a/AGENTS.md b/AGENTS.md
index 02f30f9..849ebb3 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -267,15 +267,45 @@ The new Stage-based pipeline architecture provides capability-based dependency r
- **Stage** (`engine/pipeline/core.py`): Base class for pipeline stages
- **Pipeline** (`engine/pipeline/controller.py`): Executes stages with capability-based dependency resolution
+- **PipelineConfig** (`engine/pipeline/controller.py`): Configuration for pipeline instance
- **StageRegistry** (`engine/pipeline/registry.py`): Discovers and registers stages
- **Stage Adapters** (`engine/pipeline/adapters.py`): Wraps existing components as stages
+#### Pipeline Configuration
+
+The `PipelineConfig` dataclass configures pipeline behavior:
+
+```python
+@dataclass
+class PipelineConfig:
+ source: str = "headlines" # Data source identifier
+ display: str = "terminal" # Display backend identifier
+ camera: str = "vertical" # Camera mode identifier
+ effects: list[str] = field(default_factory=list) # List of effect names
+ enable_metrics: bool = True # Enable performance metrics
+```
+
+**Available sources**: `headlines`, `poetry`, `empty`, `list`, `image`, `metrics`, `cached`, `transform`, `composite`, `pipeline-inspect`
+**Available displays**: `terminal`, `null`, `replay`, `websocket`, `pygame`, `moderngl`, `multi`
+**Available camera modes**: `FEED`, `SCROLL`, `HORIZONTAL`, `OMNI`, `FLOATING`, `BOUNCE`, `RADIAL`
+
#### Capability-Based Dependencies
Stages declare capabilities (what they provide) and dependencies (what they need). The Pipeline resolves dependencies using prefix matching:
- `"source"` matches `"source.headlines"`, `"source.poetry"`, etc.
+- `"camera.state"` matches the camera state capability
- This allows flexible composition without hardcoding specific stage names
+#### Minimum Capabilities
+
+The pipeline requires these minimum capabilities to function:
+- `"source"` - Data source capability
+- `"render.output"` - Rendered content capability
+- `"display.output"` - Display output capability
+- `"camera.state"` - Camera state for viewport filtering
+
+These are automatically injected if missing by the `ensure_minimum_capabilities()` method.
+
#### Sensor Framework
- **Sensor** (`engine/sensors/__init__.py`): Base class for real-time input sensors
@@ -406,23 +436,23 @@ A skills library MCP server (`skills`) is available for capturing and tracking l
### Workflow
**Before starting work:**
-1. Run `skills_list_skills` to see available skills
-2. Use `skills_peek_skill({name: "skill-name"})` to preview relevant skills
-3. Use `skills_skill_slice({name: "skill-name", query: "your question"})` to get relevant sections
+1. Run `local_skills_list_skills` to see available skills
+2. Use `local_skills_peek_skill({name: "skill-name"})` to preview relevant skills
+3. Use `local_skills_skill_slice({name: "skill-name", query: "your question"})` to get relevant sections
**While working:**
-- If a skill was wrong or incomplete: `skills_update_skill` → `skills_record_assessment` → `skills_report_outcome({quality: 1})`
-- If a skill worked correctly: `skills_report_outcome({quality: 4})` (normal) or `quality: 5` (perfect)
+- If a skill was wrong or incomplete: `local_skills_update_skill` → `local_skills_record_assessment` → `local_skills_report_outcome({quality: 1})`
+- If a skill worked correctly: `local_skills_report_outcome({quality: 4})` (normal) or `quality: 5` (perfect)
**End of session:**
-- Run `skills_reflect_on_session({context_summary: "what you did"})` to identify new skills to capture
-- Use `skills_create_skill` to add new skills
-- Use `skills_record_assessment` to score them
+- Run `local_skills_reflect_on_session({context_summary: "what you did"})` to identify new skills to capture
+- Use `local_skills_create_skill` to add new skills
+- Use `local_skills_record_assessment` to score them
### Useful Tools
-- `skills_review_stale_skills()` - Skills due for review (negative days_until_due)
-- `skills_skills_report()` - Overview of entire collection
-- `skills_validate_skill({name: "skill-name"})` - Load skill for review with sources
+- `local_skills_review_stale_skills()` - Skills due for review (negative days_until_due)
+- `local_skills_skills_report()` - Overview of entire collection
+- `local_skills_validate_skill({name: "skill-name"})` - Load skill for review with sources
### Agent Skills
diff --git a/docs/PIPELINE.md b/docs/PIPELINE.md
index fab35a0..b759289 100644
--- a/docs/PIPELINE.md
+++ b/docs/PIPELINE.md
@@ -2,136 +2,160 @@
## Architecture Overview
+The Mainline pipeline uses a **Stage-based architecture** with **capability-based dependency resolution**. Stages declare capabilities (what they provide) and dependencies (what they need), and the Pipeline resolves dependencies using prefix matching.
+
```
-Sources (static/dynamic) → Fetch → Prepare → Scroll → Effects → Render → Display
- ↓
- NtfyPoller ← MicMonitor (async)
+Source Stage → Render Stage → Effect Stages → Display Stage
+ ↓
+Camera Stage (provides camera.state capability)
```
-### Data Source Abstraction (sources_v2.py)
+### Capability-Based Dependency Resolution
-- **Static sources**: Data fetched once and cached (HeadlinesDataSource, PoetryDataSource)
-- **Dynamic sources**: Idempotent fetch for runtime updates (PipelineDataSource)
-- **SourceRegistry**: Discovery and management of data sources
+Stages declare capabilities and dependencies:
+- **Capabilities**: What the stage provides (e.g., `source`, `render.output`, `display.output`, `camera.state`)
+- **Dependencies**: What the stage needs (e.g., `source`, `render.output`, `camera.state`)
-### Camera Modes
+The Pipeline resolves dependencies using **prefix matching**:
+- `"source"` matches `"source.headlines"`, `"source.poetry"`, etc.
+- `"camera.state"` matches the camera state capability provided by `CameraClockStage`
+- This allows flexible composition without hardcoding specific stage names
-- **Vertical**: Scroll up (default)
-- **Horizontal**: Scroll left
-- **Omni**: Diagonal scroll
-- **Floating**: Sinusoidal bobbing
-- **Trace**: Follow network path node-by-node (for pipeline viz)
+### Minimum Capabilities
-## Content to Display Rendering Pipeline
+The pipeline requires these minimum capabilities to function:
+- `"source"` - Data source capability (provides raw items)
+- `"render.output"` - Rendered content capability
+- `"display.output"` - Display output capability
+- `"camera.state"` - Camera state for viewport filtering
+
+These are automatically injected if missing by the `ensure_minimum_capabilities()` method.
+
+### Stage Registry
+
+The `StageRegistry` discovers and registers stages automatically:
+- Scans `engine/stages/` for stage implementations
+- Registers stages by their declared capabilities
+- Enables runtime stage discovery and composition
+
+## Stage-Based Pipeline Flow
```mermaid
flowchart TD
- subgraph Sources["Data Sources (v2)"]
- Headlines[HeadlinesDataSource]
- Poetry[PoetryDataSource]
- Pipeline[PipelineDataSource]
- Registry[SourceRegistry]
- end
+ subgraph Stages["Stage Pipeline"]
+ subgraph SourceStage["Source Stage (provides: source.*)"]
+ Headlines[HeadlinesSource]
+ Poetry[PoetrySource]
+ Pipeline[PipelineSource]
+ end
- subgraph SourcesLegacy["Data Sources (legacy)"]
- RSS[("RSS Feeds")]
- PoetryFeed[("Poetry Feed")]
- Ntfy[("Ntfy Messages")]
- Mic[("Microphone")]
- end
+ subgraph RenderStage["Render Stage (provides: render.*)"]
+ Render[RenderStage]
+ Canvas[Canvas]
+ Camera[Camera]
+ end
- subgraph Fetch["Fetch Layer"]
- FC[fetch_all]
- FP[fetch_poetry]
- Cache[(Cache)]
- end
-
- subgraph Prepare["Prepare Layer"]
- MB[make_block]
- Strip[strip_tags]
- Trans[translate]
- end
-
- subgraph Scroll["Scroll Engine"]
- SC[StreamController]
- CAM[Camera]
- RTZ[render_ticker_zone]
- Msg[render_message_overlay]
- Grad[lr_gradient]
- VT[vis_trunc / vis_offset]
- end
-
- subgraph Effects["Effect Pipeline"]
- subgraph EffectsPlugins["Effect Plugins"]
+ subgraph EffectStages["Effect Stages (provides: effect.*)"]
Noise[NoiseEffect]
Fade[FadeEffect]
Glitch[GlitchEffect]
Firehose[FirehoseEffect]
Hud[HudEffect]
end
- EC[EffectChain]
- ER[EffectRegistry]
+
+ subgraph DisplayStage["Display Stage (provides: display.*)"]
+ Terminal[TerminalDisplay]
+ Pygame[PygameDisplay]
+ WebSocket[WebSocketDisplay]
+ Null[NullDisplay]
+ end
end
- subgraph Render["Render Layer"]
- BW[big_wrap]
- RL[render_line]
+ subgraph Capabilities["Capability Map"]
+ SourceCaps["source.headlines
source.poetry
source.pipeline"]
+ RenderCaps["render.output
render.canvas"]
+ EffectCaps["effect.noise
effect.fade
effect.glitch"]
+ DisplayCaps["display.output
display.terminal"]
end
- subgraph Display["Display Backends"]
- TD[TerminalDisplay]
- PD[PygameDisplay]
- SD[SixelDisplay]
- KD[KittyDisplay]
- WSD[WebSocketDisplay]
- ND[NullDisplay]
- end
+ SourceStage --> RenderStage
+ RenderStage --> EffectStages
+ EffectStages --> DisplayStage
- subgraph Async["Async Sources"]
- NTFY[NtfyPoller]
- MIC[MicMonitor]
- end
+ SourceStage --> SourceCaps
+ RenderStage --> RenderCaps
+ EffectStages --> EffectCaps
+ DisplayStage --> DisplayCaps
- subgraph Animation["Animation System"]
- AC[AnimationController]
- PR[Preset]
- end
-
- Sources --> Fetch
- RSS --> FC
- PoetryFeed --> FP
- FC --> Cache
- FP --> Cache
- Cache --> MB
- Strip --> MB
- Trans --> MB
- MB --> SC
- NTFY --> SC
- SC --> RTZ
- CAM --> RTZ
- Grad --> RTZ
- VT --> RTZ
- RTZ --> EC
- EC --> ER
- ER --> EffectsPlugins
- EffectsPlugins --> BW
- BW --> RL
- RL --> Display
- Ntfy --> RL
- Mic --> RL
- MIC --> RL
-
- style Sources fill:#f9f,stroke:#333
- style Fetch fill:#bbf,stroke:#333
- style Prepare fill:#bff,stroke:#333
- style Scroll fill:#bfb,stroke:#333
- style Effects fill:#fbf,stroke:#333
- style Render fill:#ffb,stroke:#333
- style Display fill:#bbf,stroke:#333
- style Async fill:#fbb,stroke:#333
- style Animation fill:#bfb,stroke:#333
+ style SourceStage fill:#f9f,stroke:#333
+ style RenderStage fill:#bbf,stroke:#333
+ style EffectStages fill:#fbf,stroke:#333
+ style DisplayStage fill:#bfb,stroke:#333
```
+## Stage Adapters
+
+Existing components are wrapped as Stages via adapters:
+
+### Source Stage Adapter
+- Wraps `HeadlinesDataSource`, `PoetryDataSource`, etc.
+- Provides `source.*` capabilities
+- Fetches data and outputs to pipeline buffer
+
+### Render Stage Adapter
+- Wraps `StreamController`, `Camera`, `render_ticker_zone`
+- Provides `render.output` capability
+- Processes content and renders to canvas
+
+### Effect Stage Adapter
+- Wraps `EffectChain` and individual effect plugins
+- Provides `effect.*` capabilities
+- Applies visual effects to rendered content
+
+### Display Stage Adapter
+- Wraps `TerminalDisplay`, `PygameDisplay`, etc.
+- Provides `display.*` capabilities
+- Outputs final buffer to display backend
+
+## Pipeline Mutation API
+
+The Pipeline supports dynamic mutation during runtime:
+
+### Core Methods
+- `add_stage(name, stage, initialize=True)` - Add a stage
+- `remove_stage(name, cleanup=True)` - Remove a stage and rebuild execution order
+- `replace_stage(name, new_stage, preserve_state=True)` - Replace a stage
+- `swap_stages(name1, name2)` - Swap two stages
+- `move_stage(name, after=None, before=None)` - Move a stage in execution order
+- `enable_stage(name)` / `disable_stage(name)` - Enable/disable stages
+
+### Safety Checks
+- `can_hot_swap(name)` - Check if a stage can be safely hot-swapped
+- `cleanup_stage(name)` - Clean up specific stage without removing it
+
+### WebSocket Commands
+The mutation API is accessible via WebSocket for remote control:
+```json
+{"action": "remove_stage", "stage": "stage_name"}
+{"action": "swap_stages", "stage1": "name1", "stage2": "name2"}
+{"action": "enable_stage", "stage": "stage_name"}
+{"action": "cleanup_stage", "stage": "stage_name"}
+```
+
+## Camera Modes
+
+The Camera supports the following modes:
+
+- **FEED**: Single item view (static or rapid cycling)
+- **SCROLL**: Smooth vertical scrolling (movie credits style)
+- **HORIZONTAL**: Left/right movement
+- **OMNI**: Combination of vertical and horizontal
+- **FLOATING**: Sinusoidal/bobbing motion
+- **BOUNCE**: DVD-style bouncing off edges
+- **RADIAL**: Polar coordinate scanning (radar sweep)
+
+Note: Camera state is provided by `CameraClockStage` (capability: `camera.state`) which updates independently of data flow. The `CameraStage` applies viewport transformations (capability: `camera`).
+
## Animation & Presets
```mermaid
@@ -161,7 +185,7 @@ flowchart LR
Triggers --> Events
```
-## Camera Modes
+## Camera Modes State Diagram
```mermaid
stateDiagram-v2
diff --git a/engine/app/main.py b/engine/app/main.py
index 43f9e37..8ccf5cd 100644
--- a/engine/app/main.py
+++ b/engine/app/main.py
@@ -8,7 +8,7 @@ import time
from engine import config
from engine.display import BorderMode, DisplayRegistry
from engine.effects import get_registry
-from engine.fetch import fetch_all, fetch_poetry, load_cache
+from engine.fetch import fetch_all, fetch_all_fast, fetch_poetry, load_cache, save_cache
from engine.pipeline import (
Pipeline,
PipelineConfig,
@@ -208,7 +208,18 @@ def run_pipeline_mode_direct():
if cached:
source_items = cached
else:
- source_items, _, _ = fetch_all()
+ source_items = fetch_all_fast()
+ if source_items:
+ import threading
+
+ def background_fetch():
+ full_items, _, _ = fetch_all()
+ save_cache(full_items)
+
+ background_thread = threading.Thread(
+ target=background_fetch, daemon=True
+ )
+ background_thread.start()
elif source_name == "fixture":
source_items = load_cache()
if not source_items:
diff --git a/engine/app/pipeline_runner.py b/engine/app/pipeline_runner.py
index e7865f1..95bf161 100644
--- a/engine/app/pipeline_runner.py
+++ b/engine/app/pipeline_runner.py
@@ -8,7 +8,7 @@ from typing import Any
from engine.display import BorderMode, DisplayRegistry
from engine.effects import get_registry
-from engine.fetch import fetch_all, fetch_poetry, load_cache
+from engine.fetch import fetch_all, fetch_all_fast, fetch_poetry, load_cache, save_cache
from engine.pipeline import Pipeline, PipelineConfig, PipelineContext, get_preset
from engine.pipeline.adapters import (
EffectPluginStage,
@@ -138,14 +138,7 @@ def run_pipeline_mode(preset_name: str = "demo"):
print("Error: Invalid viewport format. Use WxH (e.g., 40x15)")
sys.exit(1)
- pipeline = Pipeline(
- config=PipelineConfig(
- source=preset.source,
- display=preset.display,
- camera=preset.camera,
- effects=preset.effects,
- )
- )
+ pipeline = Pipeline(config=preset.to_config())
print(" \033[38;5;245mFetching content...\033[0m")
@@ -167,10 +160,24 @@ def run_pipeline_mode(preset_name: str = "demo"):
cached = load_cache()
if cached:
items = cached
+ print(f" \033[38;5;82mLoaded {len(items)} items from cache\033[0m")
elif preset.source == "poetry":
items, _, _ = fetch_poetry()
else:
- items, _, _ = fetch_all()
+ items = fetch_all_fast()
+ if items:
+ print(
+ f" \033[38;5;82mFast start: {len(items)} items from first 5 sources\033[0m"
+ )
+
+ import threading
+
+ def background_fetch():
+ full_items, _, _ = fetch_all()
+ save_cache(full_items)
+
+ background_thread = threading.Thread(target=background_fetch, daemon=True)
+ background_thread.start()
if not items:
print(" \033[38;5;196mNo content available\033[0m")
diff --git a/engine/camera.py b/engine/camera.py
index d22548e..b443e1b 100644
--- a/engine/camera.py
+++ b/engine/camera.py
@@ -72,6 +72,17 @@ class Camera:
"""Shorthand for viewport_width."""
return self.viewport_width
+ def set_speed(self, speed: float) -> None:
+ """Set the camera scroll speed dynamically.
+
+ This allows camera speed to be modulated during runtime
+ via PipelineParams or directly.
+
+ Args:
+ speed: New speed value (0.0 = stopped, >0 = movement)
+ """
+ self.speed = max(0.0, speed)
+
@property
def h(self) -> int:
"""Shorthand for viewport_height."""
@@ -373,10 +384,11 @@ class Camera:
truncated_line = vis_trunc(offset_line, viewport_width)
# Pad line to full viewport width to prevent ghosting when panning
+ # Skip padding for empty lines to preserve intentional blank lines
import re
visible_len = len(re.sub(r"\x1b\[[0-9;]*m", "", truncated_line))
- if visible_len < viewport_width:
+ if visible_len < viewport_width and visible_len > 0:
truncated_line += " " * (viewport_width - visible_len)
horizontal_slice.append(truncated_line)
diff --git a/engine/fetch.py b/engine/fetch.py
index ace1981..08ba4b1 100644
--- a/engine/fetch.py
+++ b/engine/fetch.py
@@ -7,6 +7,7 @@ import json
import pathlib
import re
import urllib.request
+from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Any
@@ -17,54 +18,98 @@ from engine.filter import skip, strip_tags
from engine.sources import FEEDS, POETRY_SOURCES
from engine.terminal import boot_ln
-# Type alias for headline items
HeadlineTuple = tuple[str, str, str]
+DEFAULT_MAX_WORKERS = 10
+FAST_START_SOURCES = 5
+FAST_START_TIMEOUT = 3
-# ─── SINGLE FEED ──────────────────────────────────────────
-def fetch_feed(url: str) -> Any | None:
- """Fetch and parse a single RSS feed URL."""
+
+def fetch_feed(url: str) -> tuple[str, Any] | tuple[None, None]:
+ """Fetch and parse a single RSS feed URL. Returns (url, feed) tuple."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
- resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT)
- return feedparser.parse(resp.read())
+ timeout = FAST_START_TIMEOUT if url in _fast_start_urls else config.FEED_TIMEOUT
+ resp = urllib.request.urlopen(req, timeout=timeout)
+ return (url, feedparser.parse(resp.read()))
except Exception:
- return None
+ return (url, None)
+
+
+def _parse_feed(feed: Any, src: str) -> list[HeadlineTuple]:
+ """Parse a feed and return list of headline tuples."""
+ items = []
+ if feed is None or (feed.bozo and not feed.entries):
+ return items
+
+ for e in feed.entries:
+ t = strip_tags(e.get("title", ""))
+ if not t or skip(t):
+ continue
+ pub = e.get("published_parsed") or e.get("updated_parsed")
+ try:
+ ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——"
+ except Exception:
+ ts = "——:——"
+ items.append((t, src, ts))
+ return items
+
+
+def fetch_all_fast() -> list[HeadlineTuple]:
+ """Fetch only the first N sources for fast startup."""
+ global _fast_start_urls
+ _fast_start_urls = set(list(FEEDS.values())[:FAST_START_SOURCES])
+
+ items: list[HeadlineTuple] = []
+ with ThreadPoolExecutor(max_workers=FAST_START_SOURCES) as executor:
+ futures = {
+ executor.submit(fetch_feed, url): src
+ for src, url in list(FEEDS.items())[:FAST_START_SOURCES]
+ }
+ for future in as_completed(futures):
+ src = futures[future]
+ url, feed = future.result()
+ if feed is None or (feed.bozo and not feed.entries):
+ boot_ln(src, "DARK", False)
+ continue
+ parsed = _parse_feed(feed, src)
+ if parsed:
+ items.extend(parsed)
+ boot_ln(src, f"LINKED [{len(parsed)}]", True)
+ else:
+ boot_ln(src, "EMPTY", False)
+ return items
-# ─── ALL RSS FEEDS ────────────────────────────────────────
def fetch_all() -> tuple[list[HeadlineTuple], int, int]:
- """Fetch all RSS feeds and return items, linked count, failed count."""
+ """Fetch all RSS feeds concurrently and return items, linked count, failed count."""
+ global _fast_start_urls
+ _fast_start_urls = set()
+
items: list[HeadlineTuple] = []
linked = failed = 0
- for src, url in FEEDS.items():
- feed = fetch_feed(url)
- if feed is None or (feed.bozo and not feed.entries):
- boot_ln(src, "DARK", False)
- failed += 1
- continue
- n = 0
- for e in feed.entries:
- t = strip_tags(e.get("title", ""))
- if not t or skip(t):
+
+ with ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS) as executor:
+ futures = {executor.submit(fetch_feed, url): src for src, url in FEEDS.items()}
+ for future in as_completed(futures):
+ src = futures[future]
+ url, feed = future.result()
+ if feed is None or (feed.bozo and not feed.entries):
+ boot_ln(src, "DARK", False)
+ failed += 1
continue
- pub = e.get("published_parsed") or e.get("updated_parsed")
- try:
- ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——"
- except Exception:
- ts = "——:——"
- items.append((t, src, ts))
- n += 1
- if n:
- boot_ln(src, f"LINKED [{n}]", True)
- linked += 1
- else:
- boot_ln(src, "EMPTY", False)
- failed += 1
+ parsed = _parse_feed(feed, src)
+ if parsed:
+ items.extend(parsed)
+ boot_ln(src, f"LINKED [{len(parsed)}]", True)
+ linked += 1
+ else:
+ boot_ln(src, "EMPTY", False)
+ failed += 1
+
return items, linked, failed
-# ─── PROJECT GUTENBERG ────────────────────────────────────
def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]:
"""Download and parse stanzas/passages from a Project Gutenberg text."""
try:
@@ -76,23 +121,21 @@ def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]:
.replace("\r\n", "\n")
.replace("\r", "\n")
)
- # Strip PG boilerplate
m = re.search(r"\*\*\*\s*START OF[^\n]*\n", text)
if m:
text = text[m.end() :]
m = re.search(r"\*\*\*\s*END OF", text)
if m:
text = text[: m.start()]
- # Split on blank lines into stanzas/passages
blocks = re.split(r"\n{2,}", text.strip())
items = []
for blk in blocks:
- blk = " ".join(blk.split()) # flatten to one line
+ blk = " ".join(blk.split())
if len(blk) < 20 or len(blk) > 280:
continue
- if blk.isupper(): # skip all-caps headers
+ if blk.isupper():
continue
- if re.match(r"^[IVXLCDM]+\.?\s*$", blk): # roman numerals
+ if re.match(r"^[IVXLCDM]+\.?\s*$", blk):
continue
items.append((blk, label, ""))
return items
@@ -100,29 +143,35 @@ def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]:
return []
-def fetch_poetry():
- """Fetch all poetry/literature sources."""
+def fetch_poetry() -> tuple[list[HeadlineTuple], int, int]:
+ """Fetch all poetry/literature sources concurrently."""
items = []
linked = failed = 0
- for label, url in POETRY_SOURCES.items():
- stanzas = _fetch_gutenberg(url, label)
- if stanzas:
- boot_ln(label, f"LOADED [{len(stanzas)}]", True)
- items.extend(stanzas)
- linked += 1
- else:
- boot_ln(label, "DARK", False)
- failed += 1
+
+ with ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS) as executor:
+ futures = {
+ executor.submit(_fetch_gutenberg, url, label): label
+ for label, url in POETRY_SOURCES.items()
+ }
+ for future in as_completed(futures):
+ label = futures[future]
+ stanzas = future.result()
+ if stanzas:
+ boot_ln(label, f"LOADED [{len(stanzas)}]", True)
+ items.extend(stanzas)
+ linked += 1
+ else:
+ boot_ln(label, "DARK", False)
+ failed += 1
+
return items, linked, failed
-# ─── CACHE ────────────────────────────────────────────────
-# Cache moved to engine/fixtures/headlines.json
-_CACHE_DIR = pathlib.Path(__file__).resolve().parent / "fixtures"
+_cache_dir = pathlib.Path(__file__).resolve().parent / "fixtures"
def _cache_path():
- return _CACHE_DIR / "headlines.json"
+ return _cache_dir / "headlines.json"
def load_cache():
@@ -144,3 +193,6 @@ def save_cache(items):
_cache_path().write_text(json.dumps({"items": items}))
except Exception:
pass
+
+
+_fast_start_urls: set = set()
diff --git a/engine/fixtures/headlines.json b/engine/fixtures/headlines.json
index 8829c59..4bcab08 100644
--- a/engine/fixtures/headlines.json
+++ b/engine/fixtures/headlines.json
@@ -1,19 +1 @@
-{
- "items": [
- ["Breaking: AI systems achieve breakthrough in natural language understanding", "TechDaily", "14:32"],
- ["Scientists discover new exoplanet in habitable zone", "ScienceNews", "13:15"],
- ["Global markets rally as inflation shows signs of cooling", "FinanceWire", "12:48"],
- ["New study reveals benefits of Mediterranean diet for cognitive health", "HealthJournal", "11:22"],
- ["Tech giants announce collaboration on AI safety standards", "TechDaily", "10:55"],
- ["Archaeologists uncover 3000-year-old city in desert", "HistoryNow", "09:30"],
- ["Renewable energy capacity surpasses fossil fuels for first time", "GreenWorld", "08:15"],
- ["Space agency prepares for next Mars mission launch window", "SpaceNews", "07:42"],
- ["New film breaks box office records on opening weekend", "EntertainmentHub", "06:18"],
- ["Local community raises funds for new library project", "CommunityPost", "05:30"],
- ["Quantum computing breakthrough could revolutionize cryptography", "TechWeekly", "15:20"],
- ["New species of deep-sea creature discovered in Pacific trench", "NatureToday", "14:05"],
- ["Electric vehicle sales surpass traditional cars in Europe", "AutoNews", "12:33"],
- ["Renowned artist unveils interactive AI-generated exhibition", "ArtsMonthly", "11:10"],
- ["Climate summit reaches historic agreement on emissions", "WorldNews", "09:55"]
- ]
-}
+{"items": []}
\ No newline at end of file
diff --git a/engine/pipeline/adapters/camera.py b/engine/pipeline/adapters/camera.py
index 42d33fd..7b25236 100644
--- a/engine/pipeline/adapters/camera.py
+++ b/engine/pipeline/adapters/camera.py
@@ -62,6 +62,16 @@ class CameraClockStage(Stage):
if data is None:
return data
+ # Update camera speed from params if explicitly set (for dynamic modulation)
+ # Only update if camera_speed in params differs from the default (1.0)
+ # This preserves camera speed set during construction
+ if (
+ ctx.params
+ and hasattr(ctx.params, "camera_speed")
+ and ctx.params.camera_speed != 1.0
+ ):
+ self._camera.set_speed(ctx.params.camera_speed)
+
current_time = time.perf_counter()
dt = 0.0
if self._last_frame_time is not None:
diff --git a/engine/pipeline/params.py b/engine/pipeline/params.py
index 46b2c60..4c00641 100644
--- a/engine/pipeline/params.py
+++ b/engine/pipeline/params.py
@@ -32,7 +32,7 @@ class PipelineParams:
# Camera config
camera_mode: str = "vertical"
- camera_speed: float = 1.0
+ camera_speed: float = 1.0 # Default speed
camera_x: int = 0 # For horizontal scrolling
# Effect config
diff --git a/engine/pipeline/presets.py b/engine/pipeline/presets.py
index c1370d2..9d1b3ca 100644
--- a/engine/pipeline/presets.py
+++ b/engine/pipeline/presets.py
@@ -11,11 +11,14 @@ Loading order:
"""
from dataclasses import dataclass, field
-from typing import Any
+from typing import TYPE_CHECKING, Any
from engine.display import BorderMode
from engine.pipeline.params import PipelineParams
+if TYPE_CHECKING:
+ from engine.pipeline.controller import PipelineConfig
+
def _load_toml_presets() -> dict[str, Any]:
"""Load presets from TOML file."""
@@ -55,9 +58,10 @@ class PipelinePreset:
viewport_width: int = 80 # Viewport width in columns
viewport_height: int = 24 # Viewport height in rows
source_items: list[dict[str, Any]] | None = None # For ListDataSource
+ enable_metrics: bool = True # Enable performance metrics collection
def to_params(self) -> PipelineParams:
- """Convert to PipelineParams."""
+ """Convert to PipelineParams (runtime configuration)."""
from engine.display import BorderMode
params = PipelineParams()
@@ -72,10 +76,27 @@ class PipelinePreset:
)
params.camera_mode = self.camera
params.effect_order = self.effects.copy()
- # Note: camera_speed, viewport_width/height are not stored in PipelineParams
- # They are used directly from the preset object in pipeline_runner.py
+ params.camera_speed = self.camera_speed
+ # Note: viewport_width/height are read from PipelinePreset directly
+ # in pipeline_runner.py, not from PipelineParams
return params
+ def to_config(self) -> "PipelineConfig":
+ """Convert to PipelineConfig (static pipeline construction config).
+
+ PipelineConfig is used once at pipeline initialization and contains
+ the core settings that don't change during execution.
+ """
+ from engine.pipeline.controller import PipelineConfig
+
+ return PipelineConfig(
+ source=self.source,
+ display=self.display,
+ camera=self.camera,
+ effects=self.effects.copy(),
+ enable_metrics=self.enable_metrics,
+ )
+
@classmethod
def from_yaml(cls, name: str, data: dict[str, Any]) -> "PipelinePreset":
"""Create a PipelinePreset from YAML data."""
@@ -91,6 +112,7 @@ class PipelinePreset:
viewport_width=data.get("viewport_width", 80),
viewport_height=data.get("viewport_height", 24),
source_items=data.get("source_items"),
+ enable_metrics=data.get("enable_metrics", True),
)
diff --git a/tests/test_fetch.py b/tests/test_fetch.py
index 6038fce..05c1328 100644
--- a/tests/test_fetch.py
+++ b/tests/test_fetch.py
@@ -31,12 +31,12 @@ class TestFetchFeed:
@patch("engine.fetch.urllib.request.urlopen")
def test_fetch_network_error(self, mock_urlopen):
- """Network error returns None."""
+ """Network error returns tuple with None feed."""
mock_urlopen.side_effect = Exception("Network error")
- result = fetch_feed("http://example.com/feed")
+ url, feed = fetch_feed("http://example.com/feed")
- assert result is None
+ assert feed is None
class TestFetchAll:
@@ -54,7 +54,7 @@ class TestFetchAll:
{"title": "Headline 1", "published_parsed": (2024, 1, 1, 12, 0, 0)},
{"title": "Headline 2", "updated_parsed": (2024, 1, 2, 12, 0, 0)},
]
- mock_fetch_feed.return_value = mock_feed
+ mock_fetch_feed.return_value = ("http://example.com", mock_feed)
mock_skip.return_value = False
mock_strip.side_effect = lambda x: x
@@ -67,7 +67,7 @@ class TestFetchAll:
@patch("engine.fetch.boot_ln")
def test_fetch_all_feed_error(self, mock_boot, mock_fetch_feed):
"""Feed error increments failed count."""
- mock_fetch_feed.return_value = None
+ mock_fetch_feed.return_value = ("http://example.com", None)
items, linked, failed = fetch_all()
@@ -87,7 +87,7 @@ class TestFetchAll:
{"title": "Sports scores"},
{"title": "Valid headline"},
]
- mock_fetch_feed.return_value = mock_feed
+ mock_fetch_feed.return_value = ("http://example.com", mock_feed)
mock_skip.side_effect = lambda x: x == "Sports scores"
mock_strip.side_effect = lambda x: x
diff --git a/tests/test_pipeline_e2e.py b/tests/test_pipeline_e2e.py
index 39b2d30..3f3ef2f 100644
--- a/tests/test_pipeline_e2e.py
+++ b/tests/test_pipeline_e2e.py
@@ -218,9 +218,10 @@ class TestPipelineE2EHappyPath:
assert result.success
frame = display.frames.get(timeout=1)
- assert "Line A" in frame
- assert "Line B" in frame
- assert "Line C" in frame
+ # Camera stage pads lines to viewport width, so check for substring match
+ assert any("Line A" in line for line in frame)
+ assert any("Line B" in line for line in frame)
+ assert any("Line C" in line for line in frame)
def test_empty_source_produces_empty_buffer(self):
"""An empty source should produce an empty (or blank) frame."""
@@ -263,7 +264,10 @@ class TestPipelineE2EEffects:
assert result.success
frame = display.frames.get(timeout=1)
- assert "[FX1]" in frame, f"Marker not found in frame: {frame}"
+ # Camera stage pads lines to viewport width, so check for substring match
+ assert any("[FX1]" in line for line in frame), (
+ f"Marker not found in frame: {frame}"
+ )
assert "Original" in "\n".join(frame)
def test_effect_chain_ordering(self):
@@ -387,7 +391,7 @@ class TestPipelineE2EStageOrder:
# All regular (non-overlay) stages should have metrics
assert "source" in stage_names
assert "render" in stage_names
- assert "display" in stage_names
+ assert "queue" in stage_names # Display stage is named "queue" in the test
assert "effect_m" in stage_names