diff --git a/AGENTS.md b/AGENTS.md index 6f9eafa..f38d2c8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -161,7 +161,7 @@ The project uses pytest with strict marker enforcement. Test configuration is in ### Test Coverage Strategy -Current coverage: 56% (336 tests) +Current coverage: 56% (433 tests) Key areas with lower coverage (acceptable for now): - **app.py** (8%): Main entry point - integration heavy, requires terminal @@ -192,6 +192,47 @@ Performance regression tests are in `tests/test_benchmark.py` with `@pytest.mark - **effects/** - plugin architecture with performance monitoring - The render pipeline: fetch → render → effects → scroll → terminal output +### Pipeline Architecture + +The new Stage-based pipeline architecture provides capability-based dependency resolution: + +- **Stage** (`engine/pipeline/core.py`): Base class for pipeline stages +- **Pipeline** (`engine/pipeline/controller.py`): Executes stages with capability-based dependency resolution +- **StageRegistry** (`engine/pipeline/registry.py`): Discovers and registers stages +- **Stage Adapters** (`engine/pipeline/adapters.py`): Wraps existing components as stages + +#### Capability-Based Dependencies + +Stages declare capabilities (what they provide) and dependencies (what they need). The Pipeline resolves dependencies using prefix matching: +- `"source"` matches `"source.headlines"`, `"source.poetry"`, etc. +- This allows flexible composition without hardcoding specific stage names + +#### Sensor Framework + +- **Sensor** (`engine/sensors/__init__.py`): Base class for real-time input sensors +- **SensorRegistry**: Discovers available sensors +- **SensorStage**: Pipeline adapter that provides sensor values to effects +- **MicSensor** (`engine/sensors/mic.py`): Self-contained microphone input +- **OscillatorSensor** (`engine/sensors/oscillator.py`): Test sensor for development + +Sensors support param bindings to drive effect parameters in real-time. + +### Preset System + +Presets use TOML format (no external dependencies): + +- Built-in: `engine/presets.toml` +- User config: `~/.config/mainline/presets.toml` +- Local override: `./presets.toml` + +- **Preset loader** (`engine/pipeline/preset_loader.py`): Loads and validates presets +- **PipelinePreset** (`engine/pipeline/presets.py`): Dataclass for preset configuration + +Functions: +- `validate_preset()` - Validate preset structure +- `validate_signal_path()` - Detect circular dependencies +- `generate_preset_toml()` - Generate skeleton preset + ### Display System - **Display abstraction** (`engine/display/`): swap display backends via the Display protocol diff --git a/effects_plugins/hud.py b/effects_plugins/hud.py index e284728..6f014af 100644 --- a/effects_plugins/hud.py +++ b/effects_plugins/hud.py @@ -1,4 +1,3 @@ -from engine.effects.performance import get_monitor from engine.effects.types import EffectConfig, EffectContext, EffectPlugin @@ -8,15 +7,34 @@ class HudEffect(EffectPlugin): def process(self, buf: list[str], ctx: EffectContext) -> list[str]: result = list(buf) - monitor = get_monitor() + + # Read metrics from pipeline context (first-class citizen) + # Falls back to global monitor for backwards compatibility + metrics = ctx.get_state("metrics") + if not metrics: + # Fallback to global monitor for backwards compatibility + from engine.effects.performance import get_monitor + + monitor = get_monitor() + if monitor: + stats = monitor.get_stats() + if stats and "pipeline" in stats: + metrics = stats fps = 0.0 frame_time = 0.0 - if monitor: - stats = monitor.get_stats() - if stats and "pipeline" in stats: - frame_time = stats["pipeline"].get("avg_ms", 0.0) - frame_count = stats.get("frame_count", 0) + if metrics: + if "error" in metrics: + pass # No metrics available yet + elif "pipeline" in metrics: + frame_time = metrics["pipeline"].get("avg_ms", 0.0) + frame_count = metrics.get("frame_count", 0) + if frame_count > 0 and frame_time > 0: + fps = 1000.0 / frame_time + elif "avg_ms" in metrics: + # Direct metrics format + frame_time = metrics.get("avg_ms", 0.0) + frame_count = metrics.get("frame_count", 0) if frame_count > 0 and frame_time > 0: fps = 1000.0 / frame_time @@ -44,11 +62,17 @@ class HudEffect(EffectPlugin): f"\033[2;1H\033[38;5;45mEFFECT:\033[0m \033[1;38;5;227m{effect_name:12s}\033[0m \033[38;5;245m|\033[0m {bar} \033[38;5;245m|\033[0m \033[38;5;219m{effect_intensity * 100:.0f}%\033[0m" ) - from engine.effects import get_effect_chain + # Try to get pipeline order from context + pipeline_order = ctx.get_state("pipeline_order") + if pipeline_order: + pipeline_str = ",".join(pipeline_order) + else: + # Fallback to legacy effect chain + from engine.effects import get_effect_chain - chain = get_effect_chain() - order = chain.get_order() - pipeline_str = ",".join(order) if order else "(none)" + chain = get_effect_chain() + order = chain.get_order() if chain else [] + pipeline_str = ",".join(order) if order else "(none)" hud_lines.append(f"\033[3;1H\033[38;5;44mPIPELINE:\033[0m {pipeline_str}") for i, line in enumerate(hud_lines): diff --git a/engine/app.py b/engine/app.py index ccfca81..96866aa 100644 --- a/engine/app.py +++ b/engine/app.py @@ -1,880 +1,21 @@ """ -Application orchestrator — boot sequence, signal handling, main loop wiring. +Application orchestrator — pipeline mode entry point. """ -import atexit -import os -import signal import sys -import termios import time -import tty -from engine import config, render -from engine.controller import StreamController -from engine.fetch import fetch_all, fetch_poetry, load_cache, save_cache -from engine.terminal import ( - CLR, - CURSOR_OFF, - CURSOR_ON, - G_DIM, - G_HI, - G_MID, - RST, - W_DIM, - W_GHOST, - boot_ln, - slow_print, - tw, +from engine import config +from engine.pipeline import ( + Pipeline, + PipelineConfig, + get_preset, + list_presets, ) -TITLE = [ - " ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗", - " ████╗ ████║██╔══██╗██║████╗ ██║██║ ██║████╗ ██║██╔════╝", - " ██╔████╔██║███████║██║██╔██╗ ██║██║ ██║██╔██╗ ██║█████╗ ", - " ██║╚██╔╝██║██╔══██║██║██║╚██╗██║██║ ██║██║╚██╗██║██╔══╝ ", - " ██║ ╚═╝ ██║██║ ██║██║██║ ╚████║███████╗██║██║ ╚████║███████╗", - " ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝", -] - - -def _read_picker_key(): - ch = sys.stdin.read(1) - if ch == "\x03": - return "interrupt" - if ch in ("\r", "\n"): - return "enter" - if ch == "\x1b": - c1 = sys.stdin.read(1) - if c1 != "[": - return None - c2 = sys.stdin.read(1) - if c2 == "A": - return "up" - if c2 == "B": - return "down" - return None - if ch in ("k", "K"): - return "up" - if ch in ("j", "J"): - return "down" - if ch in ("q", "Q"): - return "enter" - return None - - -def _normalize_preview_rows(rows): - """Trim shared left padding and trailing spaces for stable on-screen previews.""" - non_empty = [r for r in rows if r.strip()] - if not non_empty: - return [""] - left_pad = min(len(r) - len(r.lstrip(" ")) for r in non_empty) - out = [] - for row in rows: - if left_pad < len(row): - out.append(row[left_pad:].rstrip()) - else: - out.append(row.rstrip()) - return out - - -def _draw_font_picker(faces, selected): - w = tw() - h = 24 - try: - h = os.get_terminal_size().lines - except Exception: - pass - - max_preview_w = max(24, w - 8) - header_h = 6 - footer_h = 3 - preview_h = max(4, min(config.RENDER_H + 2, max(4, h // 2))) - visible = max(1, h - header_h - preview_h - footer_h) - top = max(0, selected - (visible // 2)) - bottom = min(len(faces), top + visible) - top = max(0, bottom - visible) - - print(CLR, end="") - print(CURSOR_OFF, end="") - print() - print(f" {G_HI}FONT PICKER{RST}") - print(f" {W_GHOST}{'─' * (w - 4)}{RST}") - print(f" {W_DIM}{config.FONT_DIR[:max_preview_w]}{RST}") - print(f" {W_GHOST}↑/↓ move · Enter select · q accept current{RST}") - print() - - for pos in range(top, bottom): - face = faces[pos] - active = pos == selected - pointer = "▶" if active else " " - color = G_HI if active else W_DIM - print( - f" {color}{pointer} {face['name']}{RST}{W_GHOST} · {face['file_name']}{RST}" - ) - - if top > 0: - print(f" {W_GHOST}… {top} above{RST}") - if bottom < len(faces): - print(f" {W_GHOST}… {len(faces) - bottom} below{RST}") - - print() - print(f" {W_GHOST}{'─' * (w - 4)}{RST}") - print( - f" {W_DIM}Preview: {faces[selected]['name']} · {faces[selected]['file_name']}{RST}" - ) - preview_rows = faces[selected]["preview_rows"][:preview_h] - for row in preview_rows: - shown = row[:max_preview_w] - print(f" {shown}") - - -def pick_font_face(): - """Interactive startup picker for selecting a face from repo OTF files.""" - if not config.FONT_PICKER: - return - - font_files = config.list_repo_font_files() - if not font_files: - print(CLR, end="") - print(CURSOR_OFF, end="") - print() - print(f" {G_HI}FONT PICKER{RST}") - print(f" {W_GHOST}{'─' * (tw() - 4)}{RST}") - print(f" {G_DIM}> no .otf/.ttf/.ttc files found in: {config.FONT_DIR}{RST}") - print(f" {W_GHOST}> add font files to the fonts folder, then rerun{RST}") - time.sleep(1.8) - sys.exit(1) - - prepared = [] - for font_path in font_files: - try: - faces = render.list_font_faces(font_path, max_faces=64) - except Exception: - fallback = os.path.splitext(os.path.basename(font_path))[0] - faces = [{"index": 0, "name": fallback}] - for face in faces: - idx = face["index"] - name = face["name"] - file_name = os.path.basename(font_path) - try: - fnt = render.load_font_face(font_path, idx) - rows = _normalize_preview_rows(render.render_line(name, fnt)) - except Exception: - rows = ["(preview unavailable)"] - prepared.append( - { - "font_path": font_path, - "font_index": idx, - "name": name, - "file_name": file_name, - "preview_rows": rows, - } - ) - - if not prepared: - print(CLR, end="") - print(CURSOR_OFF, end="") - print() - print(f" {G_HI}FONT PICKER{RST}") - print(f" {W_GHOST}{'─' * (tw() - 4)}{RST}") - print(f" {G_DIM}> no readable font faces found in: {config.FONT_DIR}{RST}") - time.sleep(1.8) - sys.exit(1) - - def _same_path(a, b): - try: - return os.path.samefile(a, b) - except Exception: - return os.path.abspath(a) == os.path.abspath(b) - - selected = next( - ( - i - for i, f in enumerate(prepared) - if _same_path(f["font_path"], config.FONT_PATH) - and f["font_index"] == config.FONT_INDEX - ), - 0, - ) - - if not sys.stdin.isatty(): - selected_font = prepared[selected] - config.set_font_selection( - font_path=selected_font["font_path"], - font_index=selected_font["font_index"], - ) - render.clear_font_cache() - print( - f" {G_DIM}> using {selected_font['name']} ({selected_font['file_name']}){RST}" - ) - time.sleep(0.8) - print(CLR, end="") - print(CURSOR_OFF, end="") - print() - return - - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setcbreak(fd) - while True: - _draw_font_picker(prepared, selected) - key = _read_picker_key() - if key == "up": - selected = max(0, selected - 1) - elif key == "down": - selected = min(len(prepared) - 1, selected + 1) - elif key == "enter": - break - elif key == "interrupt": - raise KeyboardInterrupt - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - - selected_font = prepared[selected] - config.set_font_selection( - font_path=selected_font["font_path"], - font_index=selected_font["font_index"], - ) - render.clear_font_cache() - print( - f" {G_DIM}> using {selected_font['name']} ({selected_font['file_name']}){RST}" - ) - time.sleep(0.8) - print(CLR, end="") - print(CURSOR_OFF, end="") - print() - - -def pick_effects_config(): - """Interactive picker for configuring effects pipeline.""" - import effects_plugins - from engine.effects import get_effect_chain, get_registry - - effects_plugins.discover_plugins() - - registry = get_registry() - chain = get_effect_chain() - chain.set_order(["noise", "fade", "glitch", "firehose"]) - - effects = list(registry.list_all().values()) - if not effects: - return - - selected = 0 - editing_intensity = False - intensity_value = 1.0 - - def _draw_effects_picker(): - w = tw() - print(CLR, end="") - print("\033[1;1H", end="") - print(" \033[1;38;5;231mEFFECTS CONFIG\033[0m") - print(f" \033[2;38;5;37m{'─' * (w - 4)}\033[0m") - print() - - for i, effect in enumerate(effects): - prefix = " > " if i == selected else " " - marker = "[*]" if effect.config.enabled else "[ ]" - if editing_intensity and i == selected: - print( - f"{prefix}{marker} \033[1;38;5;82m{effect.name}\033[0m: intensity={intensity_value:.2f} (use +/- to adjust, Enter to confirm)" - ) - else: - print( - f"{prefix}{marker} {effect.name}: intensity={effect.config.intensity:.2f}" - ) - - print() - print(f" \033[2;38;5;37m{'─' * (w - 4)}\033[0m") - print( - " \033[38;5;245mControls: space=toggle on/off | +/-=adjust intensity | arrows=move | Enter=next effect | q=done\033[0m" - ) - - def _read_effects_key(): - ch = sys.stdin.read(1) - if ch == "\x03": - return "interrupt" - if ch in ("\r", "\n"): - return "enter" - if ch == " ": - return "toggle" - if ch == "q": - return "quit" - if ch == "+" or ch == "=": - return "up" - if ch == "-" or ch == "_": - return "down" - if ch == "\x1b": - c1 = sys.stdin.read(1) - if c1 != "[": - return None - c2 = sys.stdin.read(1) - if c2 == "A": - return "up" - if c2 == "B": - return "down" - return None - return None - - if not sys.stdin.isatty(): - return - - fd = sys.stdin.fileno() - old_settings = termios.tcgetattr(fd) - try: - tty.setcbreak(fd) - while True: - _draw_effects_picker() - key = _read_effects_key() - - if key == "quit" or key == "enter": - break - elif key == "up" and editing_intensity: - intensity_value = min(1.0, intensity_value + 0.1) - effects[selected].config.intensity = intensity_value - elif key == "down" and editing_intensity: - intensity_value = max(0.0, intensity_value - 0.1) - effects[selected].config.intensity = intensity_value - elif key == "up": - selected = max(0, selected - 1) - intensity_value = effects[selected].config.intensity - elif key == "down": - selected = min(len(effects) - 1, selected + 1) - intensity_value = effects[selected].config.intensity - elif key == "toggle": - effects[selected].config.enabled = not effects[selected].config.enabled - elif key == "interrupt": - raise KeyboardInterrupt - finally: - termios.tcsetattr(fd, termios.TCSADRAIN, old_settings) - - -def run_demo_mode(): - """Run demo mode - showcases effects and camera modes with real content. - - .. deprecated:: - This is legacy code. Use run_pipeline_mode() instead. - """ - import warnings - - warnings.warn( - "run_demo_mode is deprecated. Use run_pipeline_mode() instead.", - DeprecationWarning, - stacklevel=2, - ) - import random - - from engine import config - from engine.camera import Camera, CameraMode - from engine.display import DisplayRegistry - from engine.effects import ( - EffectContext, - PerformanceMonitor, - get_effect_chain, - get_registry, - set_monitor, - ) - from engine.fetch import fetch_all, fetch_poetry, load_cache - from engine.scroll import calculate_scroll_step - - print(" \033[1;38;5;46mMAINLINE DEMO MODE\033[0m") - print(" \033[38;5;245mInitializing...\033[0m") - - import effects_plugins - - effects_plugins.discover_plugins() - - registry = get_registry() - chain = get_effect_chain() - chain.set_order(["noise", "fade", "glitch", "firehose", "hud"]) - - monitor = PerformanceMonitor() - set_monitor(monitor) - chain._monitor = monitor - - display = DisplayRegistry.create("pygame") - if not display: - print(" \033[38;5;196mFailed to create pygame display\033[0m") - sys.exit(1) - - w, h = 80, 24 - display.init(w, h) - display.clear() - - print(" \033[38;5;245mFetching content...\033[0m") - - cached = load_cache() - if cached: - items = cached - elif config.MODE == "poetry": - items, _, _ = fetch_poetry() - else: - items, _, _ = fetch_all() - - if not items: - print(" \033[38;5;196mNo content available\033[0m") - sys.exit(1) - - random.shuffle(items) - pool = list(items) - seen = set() - active = [] - ticker_next_y = 0 - noise_cache = {} - scroll_motion_accum = 0.0 - frame_number = 0 - - GAP = 3 - scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, h) - - camera = Camera.vertical(speed=1.0) - - effects_to_demo = ["noise", "fade", "glitch", "firehose"] - effect_idx = 0 - effect_name = effects_to_demo[effect_idx] - effect_start_time = time.time() - current_intensity = 0.0 - ramping_up = True - - camera_modes = [ - (CameraMode.VERTICAL, "vertical"), - (CameraMode.HORIZONTAL, "horizontal"), - (CameraMode.OMNI, "omni"), - (CameraMode.FLOATING, "floating"), - ] - camera_mode_idx = 0 - camera_start_time = time.time() - - print(" \033[38;5;82mStarting effect & camera demo...\033[0m") - print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") - - try: - while True: - elapsed = time.time() - effect_start_time - camera_elapsed = time.time() - camera_start_time - duration = config.DEMO_EFFECT_DURATION - - if elapsed >= duration: - effect_idx = (effect_idx + 1) % len(effects_to_demo) - effect_name = effects_to_demo[effect_idx] - effect_start_time = time.time() - elapsed = 0 - current_intensity = 0.0 - ramping_up = True - - if camera_elapsed >= duration * 2: - camera_mode_idx = (camera_mode_idx + 1) % len(camera_modes) - mode, mode_name = camera_modes[camera_mode_idx] - camera = Camera(mode=mode, speed=1.0) - camera_start_time = time.time() - camera_elapsed = 0 - - progress = elapsed / duration - if ramping_up: - current_intensity = progress - if progress >= 1.0: - ramping_up = False - else: - current_intensity = 1.0 - progress - - for effect in registry.list_all().values(): - if effect.name == effect_name: - effect.config.enabled = True - effect.config.intensity = current_intensity - elif effect.name not in ("hud",): - effect.config.enabled = False - - hud_effect = registry.get("hud") - if hud_effect: - mode_name = camera_modes[camera_mode_idx][1] - hud_effect.config.params["display_effect"] = ( - f"{effect_name} / {mode_name}" - ) - hud_effect.config.params["display_intensity"] = current_intensity - - scroll_motion_accum += config.FRAME_DT - while scroll_motion_accum >= scroll_step_interval: - scroll_motion_accum -= scroll_step_interval - camera.update(config.FRAME_DT) - - while ticker_next_y < camera.y + h + 10 and len(active) < 50: - from engine.effects import next_headline - from engine.render import make_block - - t, src, ts = next_headline(pool, items, seen) - ticker_content, hc, midx = make_block(t, src, ts, w) - active.append((ticker_content, hc, ticker_next_y, midx)) - ticker_next_y += len(ticker_content) + GAP - - active = [ - (c, hc, by, mi) - for c, hc, by, mi in active - if by + len(c) > camera.y - ] - for k in list(noise_cache): - if k < camera.y: - del noise_cache[k] - - grad_offset = (time.time() * config.GRAD_SPEED) % 1.0 - - from engine.layers import render_ticker_zone - - buf, noise_cache = render_ticker_zone( - active, - scroll_cam=camera.y, - camera_x=camera.x, - ticker_h=h, - w=w, - noise_cache=noise_cache, - grad_offset=grad_offset, - ) - - from engine.layers import render_firehose - - firehose_buf = render_firehose(items, w, 0, h) - buf.extend(firehose_buf) - - ctx = EffectContext( - terminal_width=w, - terminal_height=h, - scroll_cam=camera.y, - ticker_height=h, - camera_x=camera.x, - mic_excess=0.0, - grad_offset=grad_offset, - frame_number=frame_number, - has_message=False, - items=items, - ) - - result = chain.process(buf, ctx) - display.show(result) - - new_w, new_h = display.get_dimensions() - if new_w != w or new_h != h: - w, h = new_w, new_h - scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, h) - active = [] - noise_cache = {} - - frame_number += 1 - time.sleep(1 / 60) - - except KeyboardInterrupt: - pass - finally: - display.cleanup() - print("\n \033[38;5;245mDemo ended\033[0m") - - -def run_pipeline_demo(): - """Run pipeline visualization demo mode - shows ASCII pipeline animation. - - .. deprecated:: - This demo mode uses legacy rendering. Use run_pipeline_mode() instead. - """ - import warnings - - warnings.warn( - "run_pipeline_demo is deprecated. Use run_pipeline_mode() instead.", - DeprecationWarning, - stacklevel=2, - ) - import time - - from engine import config - from engine.camera import Camera, CameraMode - from engine.display import DisplayRegistry - from engine.effects import ( - EffectContext, - PerformanceMonitor, - get_effect_chain, - get_registry, - set_monitor, - ) - from engine.pipeline_viz import generate_large_network_viewport - - print(" \033[1;38;5;46mMAINLINE PIPELINE DEMO\033[0m") - print(" \033[38;5;245mInitializing...\033[0m") - - import effects_plugins - - effects_plugins.discover_plugins() - - registry = get_registry() - chain = get_effect_chain() - chain.set_order(["noise", "fade", "glitch", "firehose", "hud"]) - - monitor = PerformanceMonitor() - set_monitor(monitor) - chain._monitor = monitor - - display = DisplayRegistry.create("pygame") - if not display: - print(" \033[38;5;196mFailed to create pygame display\033[0m") - sys.exit(1) - - w, h = 80, 24 - display.init(w, h) - display.clear() - - camera = Camera.vertical(speed=1.0) - - effects_to_demo = ["noise", "fade", "glitch", "firehose"] - effect_idx = 0 - effect_name = effects_to_demo[effect_idx] - effect_start_time = time.time() - current_intensity = 0.0 - ramping_up = True - - camera_modes = [ - (CameraMode.VERTICAL, "vertical"), - (CameraMode.HORIZONTAL, "horizontal"), - (CameraMode.OMNI, "omni"), - (CameraMode.FLOATING, "floating"), - ] - camera_mode_idx = 0 - camera_start_time = time.time() - - frame_number = 0 - - print(" \033[38;5;82mStarting pipeline visualization...\033[0m") - print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") - - try: - while True: - elapsed = time.time() - effect_start_time - camera_elapsed = time.time() - camera_start_time - duration = config.DEMO_EFFECT_DURATION - - if elapsed >= duration: - effect_idx = (effect_idx + 1) % len(effects_to_demo) - effect_name = effects_to_demo[effect_idx] - effect_start_time = time.time() - elapsed = 0 - current_intensity = 0.0 - ramping_up = True - - if camera_elapsed >= duration * 2: - camera_mode_idx = (camera_mode_idx + 1) % len(camera_modes) - mode, mode_name = camera_modes[camera_mode_idx] - camera = Camera(mode=mode, speed=1.0) - camera_start_time = time.time() - camera_elapsed = 0 - - progress = elapsed / duration - if ramping_up: - current_intensity = progress - if progress >= 1.0: - ramping_up = False - else: - current_intensity = 1.0 - progress - - for effect in registry.list_all().values(): - if effect.name == effect_name: - effect.config.enabled = True - effect.config.intensity = current_intensity - elif effect.name not in ("hud",): - effect.config.enabled = False - - hud_effect = registry.get("hud") - if hud_effect: - mode_name = camera_modes[camera_mode_idx][1] - hud_effect.config.params["display_effect"] = ( - f"{effect_name} / {mode_name}" - ) - hud_effect.config.params["display_intensity"] = current_intensity - - camera.update(config.FRAME_DT) - - buf = generate_large_network_viewport(w, h, frame_number) - - ctx = EffectContext( - terminal_width=w, - terminal_height=h, - scroll_cam=camera.y, - ticker_height=h, - camera_x=camera.x, - mic_excess=0.0, - grad_offset=0.0, - frame_number=frame_number, - has_message=False, - items=[], - ) - - result = chain.process(buf, ctx) - display.show(result) - - new_w, new_h = display.get_dimensions() - if new_w != w or new_h != h: - w, h = new_w, new_h - - frame_number += 1 - time.sleep(1 / 60) - - except KeyboardInterrupt: - pass - finally: - display.cleanup() - print("\n \033[38;5;245mPipeline demo ended\033[0m") - - -def run_preset_mode(preset_name: str): - """Run mode using animation presets. - - .. deprecated:: - Use run_pipeline_mode() with preset parameter instead. - """ - import warnings - - warnings.warn( - "run_preset_mode is deprecated. Use run_pipeline_mode() instead.", - DeprecationWarning, - stacklevel=2, - ) - from engine import config - from engine.animation import ( - create_demo_preset, - create_pipeline_preset, - ) - from engine.camera import Camera - from engine.display import DisplayRegistry - from engine.effects import ( - EffectContext, - PerformanceMonitor, - get_effect_chain, - get_registry, - set_monitor, - ) - from engine.sources_v2 import ( - PipelineDataSource, - get_source_registry, - init_default_sources, - ) - - w, h = 80, 24 - - if preset_name == "demo": - preset = create_demo_preset() - init_default_sources() - source = get_source_registry().default() - elif preset_name == "pipeline": - preset = create_pipeline_preset() - source = PipelineDataSource(w, h) - else: - print(f" \033[38;5;196mUnknown preset: {preset_name}\033[0m") - print(" Available: demo, pipeline") - sys.exit(1) - - print(f" \033[1;38;5;46mMAINLINE PRESET: {preset.name}\033[0m") - print(f" \033[38;5;245m{preset.description}\033[0m") - print(" \033[38;5;245mInitializing...\033[0m") - - import effects_plugins - - effects_plugins.discover_plugins() - - registry = get_registry() - chain = get_effect_chain() - chain.set_order(["noise", "fade", "glitch", "firehose", "hud"]) - - monitor = PerformanceMonitor() - set_monitor(monitor) - chain._monitor = monitor - - display = DisplayRegistry.create(preset.initial_params.display_backend) - if not display: - print( - f" \033[38;5;196mFailed to create {preset.initial_params.display_backend} display\033[0m" - ) - sys.exit(1) - - display.init(w, h) - display.clear() - - camera = Camera.vertical() - - print(" \033[38;5;82mStarting preset animation...\033[0m") - print(" \033[38;5;245mPress Ctrl+C to exit\033[0m\n") - - controller = preset.create_controller() - frame_number = 0 - - try: - while True: - params = controller.update() - - effect_name = params.get("current_effect", "none") - intensity = params.get("effect_intensity", 0.0) - camera_mode = params.get("camera_mode", "vertical") - - if camera_mode == "vertical": - camera = Camera.vertical(speed=params.get("camera_speed", 1.0)) - elif camera_mode == "horizontal": - camera = Camera.horizontal(speed=params.get("camera_speed", 1.0)) - elif camera_mode == "omni": - camera = Camera.omni(speed=params.get("camera_speed", 1.0)) - elif camera_mode == "floating": - camera = Camera.floating(speed=params.get("camera_speed", 1.0)) - - camera.update(config.FRAME_DT) - - for eff in registry.list_all().values(): - if eff.name == effect_name: - eff.config.enabled = True - eff.config.intensity = intensity - elif eff.name not in ("hud",): - eff.config.enabled = False - - hud_effect = registry.get("hud") - if hud_effect: - hud_effect.config.params["display_effect"] = ( - f"{effect_name} / {camera_mode}" - ) - hud_effect.config.params["display_intensity"] = intensity - - source.viewport_width = w - source.viewport_height = h - items = source.get_items() - buffer = items[0].content.split("\n") if items else [""] * h - - ctx = EffectContext( - terminal_width=w, - terminal_height=h, - scroll_cam=camera.y, - ticker_height=h, - camera_x=camera.x, - mic_excess=0.0, - grad_offset=0.0, - frame_number=frame_number, - has_message=False, - items=[], - ) - - result = chain.process(buffer, ctx) - display.show(result) - - new_w, new_h = display.get_dimensions() - if new_w != w or new_h != h: - w, h = new_w, new_h - - frame_number += 1 - time.sleep(1 / 60) - - except KeyboardInterrupt: - pass - finally: - display.cleanup() - print("\n \033[38;5;245mPreset ended\033[0m") - def main(): - from engine import config - from engine.pipeline import list_presets - - # Show pipeline diagram if requested + """Main entry point - all modes now use presets.""" if config.PIPELINE_DIAGRAM: try: from engine.pipeline import generate_pipeline_diagram @@ -884,129 +25,23 @@ def main(): print(generate_pipeline_diagram()) return - # Unified preset-based entry point - # All modes are now just presets preset_name = None - # Check for --preset flag first if config.PRESET: preset_name = config.PRESET - # Check for legacy --pipeline flag (mapped to demo preset) elif config.PIPELINE_MODE: preset_name = config.PIPELINE_PRESET - # Default to demo if no preset specified else: preset_name = "demo" - # Validate preset exists available = list_presets() if preset_name not in available: print(f"Error: Unknown preset '{preset_name}'") print(f"Available presets: {', '.join(available)}") sys.exit(1) - # Run with the selected preset run_pipeline_mode(preset_name) - atexit.register(lambda: print(CURSOR_ON, end="", flush=True)) - - def handle_sigint(*_): - print(f"\n\n {G_DIM}> SIGNAL LOST{RST}") - print(f" {W_GHOST}> connection terminated{RST}\n") - sys.exit(0) - - signal.signal(signal.SIGINT, handle_sigint) - - StreamController.warmup_topics() - - w = tw() - print(CLR, end="") - print(CURSOR_OFF, end="") - pick_font_face() - pick_effects_config() - w = tw() - print() - time.sleep(0.4) - - for ln in TITLE: - print(f"{G_HI}{ln}{RST}") - time.sleep(0.07) - - print() - _subtitle = ( - "literary consciousness stream" - if config.MODE == "poetry" - else "digital consciousness stream" - ) - print(f" {W_DIM}v0.1 · {_subtitle}{RST}") - print(f" {W_GHOST}{'─' * (w - 4)}{RST}") - print() - time.sleep(0.4) - - cached = load_cache() if "--refresh" not in sys.argv else None - if cached: - items = cached - boot_ln("Cache", f"LOADED [{len(items)} SIGNALS]", True) - elif config.MODE == "poetry": - slow_print(" > INITIALIZING LITERARY CORPUS...\n") - time.sleep(0.2) - print() - items, linked, failed = fetch_poetry() - print() - print( - f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}" - ) - print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}") - save_cache(items) - else: - slow_print(" > INITIALIZING FEED ARRAY...\n") - time.sleep(0.2) - print() - items, linked, failed = fetch_all() - print() - print( - f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}" - ) - print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}") - save_cache(items) - - if not items: - print(f"\n {W_DIM}> NO SIGNAL — check network{RST}") - sys.exit(1) - - print() - controller = StreamController() - mic_ok, ntfy_ok = controller.initialize_sources() - - if controller.mic and controller.mic.available: - boot_ln( - "Microphone", - "ACTIVE" - if mic_ok - else "OFFLINE · check System Settings → Privacy → Microphone", - bool(mic_ok), - ) - - boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok) - - if config.FIREHOSE: - boot_ln("Firehose", "ENGAGED", True) - - time.sleep(0.4) - slow_print(" > STREAMING...\n") - time.sleep(0.2) - print(f" {W_GHOST}{'─' * (w - 4)}{RST}") - print() - time.sleep(0.4) - - controller.run(items) - - print() - print(f" {W_GHOST}{'─' * (tw() - 4)}{RST}") - print(f" {G_DIM}> {config.HEADLINE_LIMIT} SIGNALS PROCESSED{RST}") - print(f" {W_GHOST}> end of stream{RST}") - print() - def run_pipeline_mode(preset_name: str = "demo"): """Run using the new unified pipeline architecture.""" @@ -1014,11 +49,6 @@ def run_pipeline_mode(preset_name: str = "demo"): from engine.display import DisplayRegistry from engine.effects import PerformanceMonitor, get_registry, set_monitor from engine.fetch import fetch_all, fetch_poetry, load_cache - from engine.pipeline import ( - Pipeline, - PipelineConfig, - get_preset, - ) from engine.pipeline.adapters import ( RenderStage, create_items_stage, @@ -1114,6 +144,15 @@ def run_pipeline_mode(preset_name: str = "demo"): ctx.set("display", display) ctx.set("items", items) ctx.set("pipeline", pipeline) + ctx.set("pipeline_order", pipeline.execution_order) + + current_width = 80 + current_height = 24 + + if hasattr(display, "get_dimensions"): + current_width, current_height = display.get_dimensions() + params.viewport_width = current_width + params.viewport_height = current_height try: frame = 0 @@ -1130,6 +169,13 @@ def run_pipeline_mode(preset_name: str = "demo"): display.clear_quit_request() raise KeyboardInterrupt() + if hasattr(display, "get_dimensions"): + new_w, new_h = display.get_dimensions() + if new_w != current_width or new_h != current_height: + current_width, current_height = new_w, new_h + params.viewport_width = current_width + params.viewport_height = current_height + time.sleep(1 / 60) frame += 1 @@ -1137,8 +183,12 @@ def run_pipeline_mode(preset_name: str = "demo"): pipeline.cleanup() display.cleanup() print("\n \033[38;5;245mPipeline stopped\033[0m") - return # Exit pipeline mode, not font picker + return pipeline.cleanup() display.cleanup() print("\n \033[38;5;245mPipeline stopped\033[0m") + + +if __name__ == "__main__": + main() diff --git a/engine/mic.py b/engine/mic.py index a1e9e21..cec5db5 100644 --- a/engine/mic.py +++ b/engine/mic.py @@ -4,7 +4,7 @@ Gracefully degrades if sounddevice/numpy are unavailable. .. deprecated:: For pipeline integration, use :class:`engine.sensors.mic.MicSensor` instead. - MicMonitor is still used as the backend for MicSensor. + MicSensor is a self-contained implementation and does not use MicMonitor. """ import atexit diff --git a/engine/pipeline/preset_loader.py b/engine/pipeline/preset_loader.py index cf9467c..067eac7 100644 --- a/engine/pipeline/preset_loader.py +++ b/engine/pipeline/preset_loader.py @@ -152,6 +152,64 @@ def validate_preset(preset: dict[str, Any]) -> list[str]: return errors +def validate_signal_flow(stages: list[dict]) -> list[str]: + """Validate signal flow based on inlet/outlet types. + + This validates that the preset's stage configuration produces valid + data flow using the PureData-style type system. + + Args: + stages: List of stage configs with 'name', 'category', 'inlet_types', 'outlet_types' + + Returns: + List of errors (empty if valid) + """ + errors: list[str] = [] + + if not stages: + errors.append("Signal flow is empty") + return errors + + # Define expected types for each category + type_map = { + "source": {"inlet": "NONE", "outlet": "SOURCE_ITEMS"}, + "data": {"inlet": "ANY", "outlet": "SOURCE_ITEMS"}, + "transform": {"inlet": "SOURCE_ITEMS", "outlet": "TEXT_BUFFER"}, + "effect": {"inlet": "TEXT_BUFFER", "outlet": "TEXT_BUFFER"}, + "overlay": {"inlet": "TEXT_BUFFER", "outlet": "TEXT_BUFFER"}, + "camera": {"inlet": "TEXT_BUFFER", "outlet": "TEXT_BUFFER"}, + "display": {"inlet": "TEXT_BUFFER", "outlet": "NONE"}, + "render": {"inlet": "SOURCE_ITEMS", "outlet": "TEXT_BUFFER"}, + } + + # Check stage order and type compatibility + for i, stage in enumerate(stages): + category = stage.get("category", "unknown") + name = stage.get("name", f"stage_{i}") + + if category not in type_map: + continue # Skip unknown categories + + expected = type_map[category] + + # Check against previous stage + if i > 0: + prev = stages[i - 1] + prev_category = prev.get("category", "unknown") + if prev_category in type_map: + prev_outlet = type_map[prev_category]["outlet"] + inlet = expected["inlet"] + + # Validate type compatibility + if inlet != "ANY" and prev_outlet != "ANY" and inlet != prev_outlet: + errors.append( + f"Type mismatch at '{name}': " + f"expects {inlet} but previous stage outputs {prev_outlet}" + ) + + return errors + + def validate_signal_path(stages: list[str]) -> list[str]: """Validate signal path for circular dependencies and connectivity. diff --git a/tests/test_app.py b/tests/test_app.py deleted file mode 100644 index c5ccb9a..0000000 --- a/tests/test_app.py +++ /dev/null @@ -1,55 +0,0 @@ -""" -Tests for engine.app module. -""" - -from engine.app import _normalize_preview_rows - - -class TestNormalizePreviewRows: - """Tests for _normalize_preview_rows function.""" - - def test_empty_rows(self): - """Empty input returns empty list.""" - result = _normalize_preview_rows([]) - assert result == [""] - - def test_strips_left_padding(self): - """Left padding is stripped.""" - result = _normalize_preview_rows([" content", " more"]) - assert all(not r.startswith(" ") for r in result) - - def test_preserves_content(self): - """Content is preserved.""" - result = _normalize_preview_rows([" hello world "]) - assert "hello world" in result[0] - - def test_handles_all_empty_rows(self): - """All empty rows returns single empty string.""" - result = _normalize_preview_rows(["", " ", ""]) - assert result == [""] - - -class TestAppConstants: - """Tests for app module constants.""" - - def test_title_defined(self): - """TITLE is defined.""" - from engine.app import TITLE - - assert len(TITLE) > 0 - - def test_title_lines_are_strings(self): - """TITLE contains string lines.""" - from engine.app import TITLE - - assert all(isinstance(line, str) for line in TITLE) - - -class TestAppImports: - """Tests for app module imports.""" - - def test_app_imports_without_error(self): - """Module imports without error.""" - from engine import app - - assert app is not None