From 7eaa4415744471b308dd7341a5f9537ddc8e58cd Mon Sep 17 00:00:00 2001 From: David Gwilliam Date: Thu, 19 Mar 2026 22:38:55 -0700 Subject: [PATCH] feat: Add fast startup fetch and background caching - Add for quick startup using first N feeds - Add background thread for full fetch and caching - Update to use fast fetch - Update docs and skills --- .../skills/mainline-architecture/SKILL.md | 25 +- AGENTS.md | 52 +++- docs/PIPELINE.md | 234 ++++++++++-------- engine/app/main.py | 15 +- engine/app/pipeline_runner.py | 27 +- engine/camera.py | 14 +- engine/fetch.py | 158 ++++++++---- engine/fixtures/headlines.json | 20 +- engine/pipeline/adapters/camera.py | 10 + engine/pipeline/params.py | 2 +- engine/pipeline/presets.py | 30 ++- tests/test_fetch.py | 12 +- tests/test_pipeline_e2e.py | 14 +- 13 files changed, 393 insertions(+), 220 deletions(-) diff --git a/.opencode/skills/mainline-architecture/SKILL.md b/.opencode/skills/mainline-architecture/SKILL.md index ad5c3a4..25117c8 100644 --- a/.opencode/skills/mainline-architecture/SKILL.md +++ b/.opencode/skills/mainline-architecture/SKILL.md @@ -29,17 +29,28 @@ class Stage(ABC): return set() @property - def dependencies(self) -> list[str]: - """What this stage needs (e.g., ['source'])""" - return [] + def dependencies(self) -> set[str]: + """What this stage needs (e.g., {'source'})""" + return set() ``` ### Capability-Based Dependencies The Pipeline resolves dependencies using **prefix matching**: - `"source"` matches `"source.headlines"`, `"source.poetry"`, etc. +- `"camera.state"` matches the camera state capability - This allows flexible composition without hardcoding specific stage names +### Minimum Capabilities + +The pipeline requires these minimum capabilities to function: +- `"source"` - Data source capability +- `"render.output"` - Rendered content capability +- `"display.output"` - Display output capability +- `"camera.state"` - Camera state for viewport filtering + +These are automatically injected if missing (auto-injection). + ### DataType Enum PureData-style data types for inlet/outlet validation: @@ -76,3 +87,11 @@ Canvas tracks dirty regions automatically when content is written via `put_regio - Use adapters (engine/pipeline/adapters.py) to wrap existing components as stages - Set `optional=True` for stages that can fail gracefully - Use `stage_type` and `render_order` for execution ordering +- Clock stages update state independently of data flow + +## Sources + +- engine/pipeline/core.py - Stage base class +- engine/pipeline/controller.py - Pipeline implementation +- engine/pipeline/adapters/ - Stage adapters +- docs/PIPELINE.md - Pipeline documentation diff --git a/AGENTS.md b/AGENTS.md index 02f30f9..849ebb3 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -267,15 +267,45 @@ The new Stage-based pipeline architecture provides capability-based dependency r - **Stage** (`engine/pipeline/core.py`): Base class for pipeline stages - **Pipeline** (`engine/pipeline/controller.py`): Executes stages with capability-based dependency resolution +- **PipelineConfig** (`engine/pipeline/controller.py`): Configuration for pipeline instance - **StageRegistry** (`engine/pipeline/registry.py`): Discovers and registers stages - **Stage Adapters** (`engine/pipeline/adapters.py`): Wraps existing components as stages +#### Pipeline Configuration + +The `PipelineConfig` dataclass configures pipeline behavior: + +```python +@dataclass +class PipelineConfig: + source: str = "headlines" # Data source identifier + display: str = "terminal" # Display backend identifier + camera: str = "vertical" # Camera mode identifier + effects: list[str] = field(default_factory=list) # List of effect names + enable_metrics: bool = True # Enable performance metrics +``` + +**Available sources**: `headlines`, `poetry`, `empty`, `list`, `image`, `metrics`, `cached`, `transform`, `composite`, `pipeline-inspect` +**Available displays**: `terminal`, `null`, `replay`, `websocket`, `pygame`, `moderngl`, `multi` +**Available camera modes**: `FEED`, `SCROLL`, `HORIZONTAL`, `OMNI`, `FLOATING`, `BOUNCE`, `RADIAL` + #### Capability-Based Dependencies Stages declare capabilities (what they provide) and dependencies (what they need). The Pipeline resolves dependencies using prefix matching: - `"source"` matches `"source.headlines"`, `"source.poetry"`, etc. +- `"camera.state"` matches the camera state capability - This allows flexible composition without hardcoding specific stage names +#### Minimum Capabilities + +The pipeline requires these minimum capabilities to function: +- `"source"` - Data source capability +- `"render.output"` - Rendered content capability +- `"display.output"` - Display output capability +- `"camera.state"` - Camera state for viewport filtering + +These are automatically injected if missing by the `ensure_minimum_capabilities()` method. + #### Sensor Framework - **Sensor** (`engine/sensors/__init__.py`): Base class for real-time input sensors @@ -406,23 +436,23 @@ A skills library MCP server (`skills`) is available for capturing and tracking l ### Workflow **Before starting work:** -1. Run `skills_list_skills` to see available skills -2. Use `skills_peek_skill({name: "skill-name"})` to preview relevant skills -3. Use `skills_skill_slice({name: "skill-name", query: "your question"})` to get relevant sections +1. Run `local_skills_list_skills` to see available skills +2. Use `local_skills_peek_skill({name: "skill-name"})` to preview relevant skills +3. Use `local_skills_skill_slice({name: "skill-name", query: "your question"})` to get relevant sections **While working:** -- If a skill was wrong or incomplete: `skills_update_skill` → `skills_record_assessment` → `skills_report_outcome({quality: 1})` -- If a skill worked correctly: `skills_report_outcome({quality: 4})` (normal) or `quality: 5` (perfect) +- If a skill was wrong or incomplete: `local_skills_update_skill` → `local_skills_record_assessment` → `local_skills_report_outcome({quality: 1})` +- If a skill worked correctly: `local_skills_report_outcome({quality: 4})` (normal) or `quality: 5` (perfect) **End of session:** -- Run `skills_reflect_on_session({context_summary: "what you did"})` to identify new skills to capture -- Use `skills_create_skill` to add new skills -- Use `skills_record_assessment` to score them +- Run `local_skills_reflect_on_session({context_summary: "what you did"})` to identify new skills to capture +- Use `local_skills_create_skill` to add new skills +- Use `local_skills_record_assessment` to score them ### Useful Tools -- `skills_review_stale_skills()` - Skills due for review (negative days_until_due) -- `skills_skills_report()` - Overview of entire collection -- `skills_validate_skill({name: "skill-name"})` - Load skill for review with sources +- `local_skills_review_stale_skills()` - Skills due for review (negative days_until_due) +- `local_skills_skills_report()` - Overview of entire collection +- `local_skills_validate_skill({name: "skill-name"})` - Load skill for review with sources ### Agent Skills diff --git a/docs/PIPELINE.md b/docs/PIPELINE.md index fab35a0..b759289 100644 --- a/docs/PIPELINE.md +++ b/docs/PIPELINE.md @@ -2,136 +2,160 @@ ## Architecture Overview +The Mainline pipeline uses a **Stage-based architecture** with **capability-based dependency resolution**. Stages declare capabilities (what they provide) and dependencies (what they need), and the Pipeline resolves dependencies using prefix matching. + ``` -Sources (static/dynamic) → Fetch → Prepare → Scroll → Effects → Render → Display - ↓ - NtfyPoller ← MicMonitor (async) +Source Stage → Render Stage → Effect Stages → Display Stage + ↓ +Camera Stage (provides camera.state capability) ``` -### Data Source Abstraction (sources_v2.py) +### Capability-Based Dependency Resolution -- **Static sources**: Data fetched once and cached (HeadlinesDataSource, PoetryDataSource) -- **Dynamic sources**: Idempotent fetch for runtime updates (PipelineDataSource) -- **SourceRegistry**: Discovery and management of data sources +Stages declare capabilities and dependencies: +- **Capabilities**: What the stage provides (e.g., `source`, `render.output`, `display.output`, `camera.state`) +- **Dependencies**: What the stage needs (e.g., `source`, `render.output`, `camera.state`) -### Camera Modes +The Pipeline resolves dependencies using **prefix matching**: +- `"source"` matches `"source.headlines"`, `"source.poetry"`, etc. +- `"camera.state"` matches the camera state capability provided by `CameraClockStage` +- This allows flexible composition without hardcoding specific stage names -- **Vertical**: Scroll up (default) -- **Horizontal**: Scroll left -- **Omni**: Diagonal scroll -- **Floating**: Sinusoidal bobbing -- **Trace**: Follow network path node-by-node (for pipeline viz) +### Minimum Capabilities -## Content to Display Rendering Pipeline +The pipeline requires these minimum capabilities to function: +- `"source"` - Data source capability (provides raw items) +- `"render.output"` - Rendered content capability +- `"display.output"` - Display output capability +- `"camera.state"` - Camera state for viewport filtering + +These are automatically injected if missing by the `ensure_minimum_capabilities()` method. + +### Stage Registry + +The `StageRegistry` discovers and registers stages automatically: +- Scans `engine/stages/` for stage implementations +- Registers stages by their declared capabilities +- Enables runtime stage discovery and composition + +## Stage-Based Pipeline Flow ```mermaid flowchart TD - subgraph Sources["Data Sources (v2)"] - Headlines[HeadlinesDataSource] - Poetry[PoetryDataSource] - Pipeline[PipelineDataSource] - Registry[SourceRegistry] - end + subgraph Stages["Stage Pipeline"] + subgraph SourceStage["Source Stage (provides: source.*)"] + Headlines[HeadlinesSource] + Poetry[PoetrySource] + Pipeline[PipelineSource] + end - subgraph SourcesLegacy["Data Sources (legacy)"] - RSS[("RSS Feeds")] - PoetryFeed[("Poetry Feed")] - Ntfy[("Ntfy Messages")] - Mic[("Microphone")] - end + subgraph RenderStage["Render Stage (provides: render.*)"] + Render[RenderStage] + Canvas[Canvas] + Camera[Camera] + end - subgraph Fetch["Fetch Layer"] - FC[fetch_all] - FP[fetch_poetry] - Cache[(Cache)] - end - - subgraph Prepare["Prepare Layer"] - MB[make_block] - Strip[strip_tags] - Trans[translate] - end - - subgraph Scroll["Scroll Engine"] - SC[StreamController] - CAM[Camera] - RTZ[render_ticker_zone] - Msg[render_message_overlay] - Grad[lr_gradient] - VT[vis_trunc / vis_offset] - end - - subgraph Effects["Effect Pipeline"] - subgraph EffectsPlugins["Effect Plugins"] + subgraph EffectStages["Effect Stages (provides: effect.*)"] Noise[NoiseEffect] Fade[FadeEffect] Glitch[GlitchEffect] Firehose[FirehoseEffect] Hud[HudEffect] end - EC[EffectChain] - ER[EffectRegistry] + + subgraph DisplayStage["Display Stage (provides: display.*)"] + Terminal[TerminalDisplay] + Pygame[PygameDisplay] + WebSocket[WebSocketDisplay] + Null[NullDisplay] + end end - subgraph Render["Render Layer"] - BW[big_wrap] - RL[render_line] + subgraph Capabilities["Capability Map"] + SourceCaps["source.headlines
source.poetry
source.pipeline"] + RenderCaps["render.output
render.canvas"] + EffectCaps["effect.noise
effect.fade
effect.glitch"] + DisplayCaps["display.output
display.terminal"] end - subgraph Display["Display Backends"] - TD[TerminalDisplay] - PD[PygameDisplay] - SD[SixelDisplay] - KD[KittyDisplay] - WSD[WebSocketDisplay] - ND[NullDisplay] - end + SourceStage --> RenderStage + RenderStage --> EffectStages + EffectStages --> DisplayStage - subgraph Async["Async Sources"] - NTFY[NtfyPoller] - MIC[MicMonitor] - end + SourceStage --> SourceCaps + RenderStage --> RenderCaps + EffectStages --> EffectCaps + DisplayStage --> DisplayCaps - subgraph Animation["Animation System"] - AC[AnimationController] - PR[Preset] - end - - Sources --> Fetch - RSS --> FC - PoetryFeed --> FP - FC --> Cache - FP --> Cache - Cache --> MB - Strip --> MB - Trans --> MB - MB --> SC - NTFY --> SC - SC --> RTZ - CAM --> RTZ - Grad --> RTZ - VT --> RTZ - RTZ --> EC - EC --> ER - ER --> EffectsPlugins - EffectsPlugins --> BW - BW --> RL - RL --> Display - Ntfy --> RL - Mic --> RL - MIC --> RL - - style Sources fill:#f9f,stroke:#333 - style Fetch fill:#bbf,stroke:#333 - style Prepare fill:#bff,stroke:#333 - style Scroll fill:#bfb,stroke:#333 - style Effects fill:#fbf,stroke:#333 - style Render fill:#ffb,stroke:#333 - style Display fill:#bbf,stroke:#333 - style Async fill:#fbb,stroke:#333 - style Animation fill:#bfb,stroke:#333 + style SourceStage fill:#f9f,stroke:#333 + style RenderStage fill:#bbf,stroke:#333 + style EffectStages fill:#fbf,stroke:#333 + style DisplayStage fill:#bfb,stroke:#333 ``` +## Stage Adapters + +Existing components are wrapped as Stages via adapters: + +### Source Stage Adapter +- Wraps `HeadlinesDataSource`, `PoetryDataSource`, etc. +- Provides `source.*` capabilities +- Fetches data and outputs to pipeline buffer + +### Render Stage Adapter +- Wraps `StreamController`, `Camera`, `render_ticker_zone` +- Provides `render.output` capability +- Processes content and renders to canvas + +### Effect Stage Adapter +- Wraps `EffectChain` and individual effect plugins +- Provides `effect.*` capabilities +- Applies visual effects to rendered content + +### Display Stage Adapter +- Wraps `TerminalDisplay`, `PygameDisplay`, etc. +- Provides `display.*` capabilities +- Outputs final buffer to display backend + +## Pipeline Mutation API + +The Pipeline supports dynamic mutation during runtime: + +### Core Methods +- `add_stage(name, stage, initialize=True)` - Add a stage +- `remove_stage(name, cleanup=True)` - Remove a stage and rebuild execution order +- `replace_stage(name, new_stage, preserve_state=True)` - Replace a stage +- `swap_stages(name1, name2)` - Swap two stages +- `move_stage(name, after=None, before=None)` - Move a stage in execution order +- `enable_stage(name)` / `disable_stage(name)` - Enable/disable stages + +### Safety Checks +- `can_hot_swap(name)` - Check if a stage can be safely hot-swapped +- `cleanup_stage(name)` - Clean up specific stage without removing it + +### WebSocket Commands +The mutation API is accessible via WebSocket for remote control: +```json +{"action": "remove_stage", "stage": "stage_name"} +{"action": "swap_stages", "stage1": "name1", "stage2": "name2"} +{"action": "enable_stage", "stage": "stage_name"} +{"action": "cleanup_stage", "stage": "stage_name"} +``` + +## Camera Modes + +The Camera supports the following modes: + +- **FEED**: Single item view (static or rapid cycling) +- **SCROLL**: Smooth vertical scrolling (movie credits style) +- **HORIZONTAL**: Left/right movement +- **OMNI**: Combination of vertical and horizontal +- **FLOATING**: Sinusoidal/bobbing motion +- **BOUNCE**: DVD-style bouncing off edges +- **RADIAL**: Polar coordinate scanning (radar sweep) + +Note: Camera state is provided by `CameraClockStage` (capability: `camera.state`) which updates independently of data flow. The `CameraStage` applies viewport transformations (capability: `camera`). + ## Animation & Presets ```mermaid @@ -161,7 +185,7 @@ flowchart LR Triggers --> Events ``` -## Camera Modes +## Camera Modes State Diagram ```mermaid stateDiagram-v2 diff --git a/engine/app/main.py b/engine/app/main.py index 43f9e37..8ccf5cd 100644 --- a/engine/app/main.py +++ b/engine/app/main.py @@ -8,7 +8,7 @@ import time from engine import config from engine.display import BorderMode, DisplayRegistry from engine.effects import get_registry -from engine.fetch import fetch_all, fetch_poetry, load_cache +from engine.fetch import fetch_all, fetch_all_fast, fetch_poetry, load_cache, save_cache from engine.pipeline import ( Pipeline, PipelineConfig, @@ -208,7 +208,18 @@ def run_pipeline_mode_direct(): if cached: source_items = cached else: - source_items, _, _ = fetch_all() + source_items = fetch_all_fast() + if source_items: + import threading + + def background_fetch(): + full_items, _, _ = fetch_all() + save_cache(full_items) + + background_thread = threading.Thread( + target=background_fetch, daemon=True + ) + background_thread.start() elif source_name == "fixture": source_items = load_cache() if not source_items: diff --git a/engine/app/pipeline_runner.py b/engine/app/pipeline_runner.py index e7865f1..95bf161 100644 --- a/engine/app/pipeline_runner.py +++ b/engine/app/pipeline_runner.py @@ -8,7 +8,7 @@ from typing import Any from engine.display import BorderMode, DisplayRegistry from engine.effects import get_registry -from engine.fetch import fetch_all, fetch_poetry, load_cache +from engine.fetch import fetch_all, fetch_all_fast, fetch_poetry, load_cache, save_cache from engine.pipeline import Pipeline, PipelineConfig, PipelineContext, get_preset from engine.pipeline.adapters import ( EffectPluginStage, @@ -138,14 +138,7 @@ def run_pipeline_mode(preset_name: str = "demo"): print("Error: Invalid viewport format. Use WxH (e.g., 40x15)") sys.exit(1) - pipeline = Pipeline( - config=PipelineConfig( - source=preset.source, - display=preset.display, - camera=preset.camera, - effects=preset.effects, - ) - ) + pipeline = Pipeline(config=preset.to_config()) print(" \033[38;5;245mFetching content...\033[0m") @@ -167,10 +160,24 @@ def run_pipeline_mode(preset_name: str = "demo"): cached = load_cache() if cached: items = cached + print(f" \033[38;5;82mLoaded {len(items)} items from cache\033[0m") elif preset.source == "poetry": items, _, _ = fetch_poetry() else: - items, _, _ = fetch_all() + items = fetch_all_fast() + if items: + print( + f" \033[38;5;82mFast start: {len(items)} items from first 5 sources\033[0m" + ) + + import threading + + def background_fetch(): + full_items, _, _ = fetch_all() + save_cache(full_items) + + background_thread = threading.Thread(target=background_fetch, daemon=True) + background_thread.start() if not items: print(" \033[38;5;196mNo content available\033[0m") diff --git a/engine/camera.py b/engine/camera.py index d22548e..b443e1b 100644 --- a/engine/camera.py +++ b/engine/camera.py @@ -72,6 +72,17 @@ class Camera: """Shorthand for viewport_width.""" return self.viewport_width + def set_speed(self, speed: float) -> None: + """Set the camera scroll speed dynamically. + + This allows camera speed to be modulated during runtime + via PipelineParams or directly. + + Args: + speed: New speed value (0.0 = stopped, >0 = movement) + """ + self.speed = max(0.0, speed) + @property def h(self) -> int: """Shorthand for viewport_height.""" @@ -373,10 +384,11 @@ class Camera: truncated_line = vis_trunc(offset_line, viewport_width) # Pad line to full viewport width to prevent ghosting when panning + # Skip padding for empty lines to preserve intentional blank lines import re visible_len = len(re.sub(r"\x1b\[[0-9;]*m", "", truncated_line)) - if visible_len < viewport_width: + if visible_len < viewport_width and visible_len > 0: truncated_line += " " * (viewport_width - visible_len) horizontal_slice.append(truncated_line) diff --git a/engine/fetch.py b/engine/fetch.py index ace1981..08ba4b1 100644 --- a/engine/fetch.py +++ b/engine/fetch.py @@ -7,6 +7,7 @@ import json import pathlib import re import urllib.request +from concurrent.futures import ThreadPoolExecutor, as_completed from datetime import datetime from typing import Any @@ -17,54 +18,98 @@ from engine.filter import skip, strip_tags from engine.sources import FEEDS, POETRY_SOURCES from engine.terminal import boot_ln -# Type alias for headline items HeadlineTuple = tuple[str, str, str] +DEFAULT_MAX_WORKERS = 10 +FAST_START_SOURCES = 5 +FAST_START_TIMEOUT = 3 -# ─── SINGLE FEED ────────────────────────────────────────── -def fetch_feed(url: str) -> Any | None: - """Fetch and parse a single RSS feed URL.""" + +def fetch_feed(url: str) -> tuple[str, Any] | tuple[None, None]: + """Fetch and parse a single RSS feed URL. Returns (url, feed) tuple.""" try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) - resp = urllib.request.urlopen(req, timeout=config.FEED_TIMEOUT) - return feedparser.parse(resp.read()) + timeout = FAST_START_TIMEOUT if url in _fast_start_urls else config.FEED_TIMEOUT + resp = urllib.request.urlopen(req, timeout=timeout) + return (url, feedparser.parse(resp.read())) except Exception: - return None + return (url, None) + + +def _parse_feed(feed: Any, src: str) -> list[HeadlineTuple]: + """Parse a feed and return list of headline tuples.""" + items = [] + if feed is None or (feed.bozo and not feed.entries): + return items + + for e in feed.entries: + t = strip_tags(e.get("title", "")) + if not t or skip(t): + continue + pub = e.get("published_parsed") or e.get("updated_parsed") + try: + ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——" + except Exception: + ts = "——:——" + items.append((t, src, ts)) + return items + + +def fetch_all_fast() -> list[HeadlineTuple]: + """Fetch only the first N sources for fast startup.""" + global _fast_start_urls + _fast_start_urls = set(list(FEEDS.values())[:FAST_START_SOURCES]) + + items: list[HeadlineTuple] = [] + with ThreadPoolExecutor(max_workers=FAST_START_SOURCES) as executor: + futures = { + executor.submit(fetch_feed, url): src + for src, url in list(FEEDS.items())[:FAST_START_SOURCES] + } + for future in as_completed(futures): + src = futures[future] + url, feed = future.result() + if feed is None or (feed.bozo and not feed.entries): + boot_ln(src, "DARK", False) + continue + parsed = _parse_feed(feed, src) + if parsed: + items.extend(parsed) + boot_ln(src, f"LINKED [{len(parsed)}]", True) + else: + boot_ln(src, "EMPTY", False) + return items -# ─── ALL RSS FEEDS ──────────────────────────────────────── def fetch_all() -> tuple[list[HeadlineTuple], int, int]: - """Fetch all RSS feeds and return items, linked count, failed count.""" + """Fetch all RSS feeds concurrently and return items, linked count, failed count.""" + global _fast_start_urls + _fast_start_urls = set() + items: list[HeadlineTuple] = [] linked = failed = 0 - for src, url in FEEDS.items(): - feed = fetch_feed(url) - if feed is None or (feed.bozo and not feed.entries): - boot_ln(src, "DARK", False) - failed += 1 - continue - n = 0 - for e in feed.entries: - t = strip_tags(e.get("title", "")) - if not t or skip(t): + + with ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS) as executor: + futures = {executor.submit(fetch_feed, url): src for src, url in FEEDS.items()} + for future in as_completed(futures): + src = futures[future] + url, feed = future.result() + if feed is None or (feed.bozo and not feed.entries): + boot_ln(src, "DARK", False) + failed += 1 continue - pub = e.get("published_parsed") or e.get("updated_parsed") - try: - ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——" - except Exception: - ts = "——:——" - items.append((t, src, ts)) - n += 1 - if n: - boot_ln(src, f"LINKED [{n}]", True) - linked += 1 - else: - boot_ln(src, "EMPTY", False) - failed += 1 + parsed = _parse_feed(feed, src) + if parsed: + items.extend(parsed) + boot_ln(src, f"LINKED [{len(parsed)}]", True) + linked += 1 + else: + boot_ln(src, "EMPTY", False) + failed += 1 + return items, linked, failed -# ─── PROJECT GUTENBERG ──────────────────────────────────── def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]: """Download and parse stanzas/passages from a Project Gutenberg text.""" try: @@ -76,23 +121,21 @@ def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]: .replace("\r\n", "\n") .replace("\r", "\n") ) - # Strip PG boilerplate m = re.search(r"\*\*\*\s*START OF[^\n]*\n", text) if m: text = text[m.end() :] m = re.search(r"\*\*\*\s*END OF", text) if m: text = text[: m.start()] - # Split on blank lines into stanzas/passages blocks = re.split(r"\n{2,}", text.strip()) items = [] for blk in blocks: - blk = " ".join(blk.split()) # flatten to one line + blk = " ".join(blk.split()) if len(blk) < 20 or len(blk) > 280: continue - if blk.isupper(): # skip all-caps headers + if blk.isupper(): continue - if re.match(r"^[IVXLCDM]+\.?\s*$", blk): # roman numerals + if re.match(r"^[IVXLCDM]+\.?\s*$", blk): continue items.append((blk, label, "")) return items @@ -100,29 +143,35 @@ def _fetch_gutenberg(url: str, label: str) -> list[HeadlineTuple]: return [] -def fetch_poetry(): - """Fetch all poetry/literature sources.""" +def fetch_poetry() -> tuple[list[HeadlineTuple], int, int]: + """Fetch all poetry/literature sources concurrently.""" items = [] linked = failed = 0 - for label, url in POETRY_SOURCES.items(): - stanzas = _fetch_gutenberg(url, label) - if stanzas: - boot_ln(label, f"LOADED [{len(stanzas)}]", True) - items.extend(stanzas) - linked += 1 - else: - boot_ln(label, "DARK", False) - failed += 1 + + with ThreadPoolExecutor(max_workers=DEFAULT_MAX_WORKERS) as executor: + futures = { + executor.submit(_fetch_gutenberg, url, label): label + for label, url in POETRY_SOURCES.items() + } + for future in as_completed(futures): + label = futures[future] + stanzas = future.result() + if stanzas: + boot_ln(label, f"LOADED [{len(stanzas)}]", True) + items.extend(stanzas) + linked += 1 + else: + boot_ln(label, "DARK", False) + failed += 1 + return items, linked, failed -# ─── CACHE ──────────────────────────────────────────────── -# Cache moved to engine/fixtures/headlines.json -_CACHE_DIR = pathlib.Path(__file__).resolve().parent / "fixtures" +_cache_dir = pathlib.Path(__file__).resolve().parent / "fixtures" def _cache_path(): - return _CACHE_DIR / "headlines.json" + return _cache_dir / "headlines.json" def load_cache(): @@ -144,3 +193,6 @@ def save_cache(items): _cache_path().write_text(json.dumps({"items": items})) except Exception: pass + + +_fast_start_urls: set = set() diff --git a/engine/fixtures/headlines.json b/engine/fixtures/headlines.json index 8829c59..4bcab08 100644 --- a/engine/fixtures/headlines.json +++ b/engine/fixtures/headlines.json @@ -1,19 +1 @@ -{ - "items": [ - ["Breaking: AI systems achieve breakthrough in natural language understanding", "TechDaily", "14:32"], - ["Scientists discover new exoplanet in habitable zone", "ScienceNews", "13:15"], - ["Global markets rally as inflation shows signs of cooling", "FinanceWire", "12:48"], - ["New study reveals benefits of Mediterranean diet for cognitive health", "HealthJournal", "11:22"], - ["Tech giants announce collaboration on AI safety standards", "TechDaily", "10:55"], - ["Archaeologists uncover 3000-year-old city in desert", "HistoryNow", "09:30"], - ["Renewable energy capacity surpasses fossil fuels for first time", "GreenWorld", "08:15"], - ["Space agency prepares for next Mars mission launch window", "SpaceNews", "07:42"], - ["New film breaks box office records on opening weekend", "EntertainmentHub", "06:18"], - ["Local community raises funds for new library project", "CommunityPost", "05:30"], - ["Quantum computing breakthrough could revolutionize cryptography", "TechWeekly", "15:20"], - ["New species of deep-sea creature discovered in Pacific trench", "NatureToday", "14:05"], - ["Electric vehicle sales surpass traditional cars in Europe", "AutoNews", "12:33"], - ["Renowned artist unveils interactive AI-generated exhibition", "ArtsMonthly", "11:10"], - ["Climate summit reaches historic agreement on emissions", "WorldNews", "09:55"] - ] -} +{"items": []} \ No newline at end of file diff --git a/engine/pipeline/adapters/camera.py b/engine/pipeline/adapters/camera.py index 42d33fd..7b25236 100644 --- a/engine/pipeline/adapters/camera.py +++ b/engine/pipeline/adapters/camera.py @@ -62,6 +62,16 @@ class CameraClockStage(Stage): if data is None: return data + # Update camera speed from params if explicitly set (for dynamic modulation) + # Only update if camera_speed in params differs from the default (1.0) + # This preserves camera speed set during construction + if ( + ctx.params + and hasattr(ctx.params, "camera_speed") + and ctx.params.camera_speed != 1.0 + ): + self._camera.set_speed(ctx.params.camera_speed) + current_time = time.perf_counter() dt = 0.0 if self._last_frame_time is not None: diff --git a/engine/pipeline/params.py b/engine/pipeline/params.py index 46b2c60..4c00641 100644 --- a/engine/pipeline/params.py +++ b/engine/pipeline/params.py @@ -32,7 +32,7 @@ class PipelineParams: # Camera config camera_mode: str = "vertical" - camera_speed: float = 1.0 + camera_speed: float = 1.0 # Default speed camera_x: int = 0 # For horizontal scrolling # Effect config diff --git a/engine/pipeline/presets.py b/engine/pipeline/presets.py index c1370d2..9d1b3ca 100644 --- a/engine/pipeline/presets.py +++ b/engine/pipeline/presets.py @@ -11,11 +11,14 @@ Loading order: """ from dataclasses import dataclass, field -from typing import Any +from typing import TYPE_CHECKING, Any from engine.display import BorderMode from engine.pipeline.params import PipelineParams +if TYPE_CHECKING: + from engine.pipeline.controller import PipelineConfig + def _load_toml_presets() -> dict[str, Any]: """Load presets from TOML file.""" @@ -55,9 +58,10 @@ class PipelinePreset: viewport_width: int = 80 # Viewport width in columns viewport_height: int = 24 # Viewport height in rows source_items: list[dict[str, Any]] | None = None # For ListDataSource + enable_metrics: bool = True # Enable performance metrics collection def to_params(self) -> PipelineParams: - """Convert to PipelineParams.""" + """Convert to PipelineParams (runtime configuration).""" from engine.display import BorderMode params = PipelineParams() @@ -72,10 +76,27 @@ class PipelinePreset: ) params.camera_mode = self.camera params.effect_order = self.effects.copy() - # Note: camera_speed, viewport_width/height are not stored in PipelineParams - # They are used directly from the preset object in pipeline_runner.py + params.camera_speed = self.camera_speed + # Note: viewport_width/height are read from PipelinePreset directly + # in pipeline_runner.py, not from PipelineParams return params + def to_config(self) -> "PipelineConfig": + """Convert to PipelineConfig (static pipeline construction config). + + PipelineConfig is used once at pipeline initialization and contains + the core settings that don't change during execution. + """ + from engine.pipeline.controller import PipelineConfig + + return PipelineConfig( + source=self.source, + display=self.display, + camera=self.camera, + effects=self.effects.copy(), + enable_metrics=self.enable_metrics, + ) + @classmethod def from_yaml(cls, name: str, data: dict[str, Any]) -> "PipelinePreset": """Create a PipelinePreset from YAML data.""" @@ -91,6 +112,7 @@ class PipelinePreset: viewport_width=data.get("viewport_width", 80), viewport_height=data.get("viewport_height", 24), source_items=data.get("source_items"), + enable_metrics=data.get("enable_metrics", True), ) diff --git a/tests/test_fetch.py b/tests/test_fetch.py index 6038fce..05c1328 100644 --- a/tests/test_fetch.py +++ b/tests/test_fetch.py @@ -31,12 +31,12 @@ class TestFetchFeed: @patch("engine.fetch.urllib.request.urlopen") def test_fetch_network_error(self, mock_urlopen): - """Network error returns None.""" + """Network error returns tuple with None feed.""" mock_urlopen.side_effect = Exception("Network error") - result = fetch_feed("http://example.com/feed") + url, feed = fetch_feed("http://example.com/feed") - assert result is None + assert feed is None class TestFetchAll: @@ -54,7 +54,7 @@ class TestFetchAll: {"title": "Headline 1", "published_parsed": (2024, 1, 1, 12, 0, 0)}, {"title": "Headline 2", "updated_parsed": (2024, 1, 2, 12, 0, 0)}, ] - mock_fetch_feed.return_value = mock_feed + mock_fetch_feed.return_value = ("http://example.com", mock_feed) mock_skip.return_value = False mock_strip.side_effect = lambda x: x @@ -67,7 +67,7 @@ class TestFetchAll: @patch("engine.fetch.boot_ln") def test_fetch_all_feed_error(self, mock_boot, mock_fetch_feed): """Feed error increments failed count.""" - mock_fetch_feed.return_value = None + mock_fetch_feed.return_value = ("http://example.com", None) items, linked, failed = fetch_all() @@ -87,7 +87,7 @@ class TestFetchAll: {"title": "Sports scores"}, {"title": "Valid headline"}, ] - mock_fetch_feed.return_value = mock_feed + mock_fetch_feed.return_value = ("http://example.com", mock_feed) mock_skip.side_effect = lambda x: x == "Sports scores" mock_strip.side_effect = lambda x: x diff --git a/tests/test_pipeline_e2e.py b/tests/test_pipeline_e2e.py index 39b2d30..3f3ef2f 100644 --- a/tests/test_pipeline_e2e.py +++ b/tests/test_pipeline_e2e.py @@ -218,9 +218,10 @@ class TestPipelineE2EHappyPath: assert result.success frame = display.frames.get(timeout=1) - assert "Line A" in frame - assert "Line B" in frame - assert "Line C" in frame + # Camera stage pads lines to viewport width, so check for substring match + assert any("Line A" in line for line in frame) + assert any("Line B" in line for line in frame) + assert any("Line C" in line for line in frame) def test_empty_source_produces_empty_buffer(self): """An empty source should produce an empty (or blank) frame.""" @@ -263,7 +264,10 @@ class TestPipelineE2EEffects: assert result.success frame = display.frames.get(timeout=1) - assert "[FX1]" in frame, f"Marker not found in frame: {frame}" + # Camera stage pads lines to viewport width, so check for substring match + assert any("[FX1]" in line for line in frame), ( + f"Marker not found in frame: {frame}" + ) assert "Original" in "\n".join(frame) def test_effect_chain_ordering(self): @@ -387,7 +391,7 @@ class TestPipelineE2EStageOrder: # All regular (non-overlay) stages should have metrics assert "source" in stage_names assert "render" in stage_names - assert "display" in stage_names + assert "queue" in stage_names # Display stage is named "queue" in the test assert "effect_m" in stage_names