From 45a44f5b9a49b8763177a0f27728f384a999c9b4 Mon Sep 17 00:00:00 2001 From: david Date: Sat, 21 Mar 2026 06:42:18 +0000 Subject: [PATCH] Add Mainline Interrogation Scripts --- Mainline-Interrogation-Scripts.md | 750 ++++++++++++++++++++++++++++++ 1 file changed, 750 insertions(+) create mode 100644 Mainline-Interrogation-Scripts.md diff --git a/Mainline-Interrogation-Scripts.md b/Mainline-Interrogation-Scripts.md new file mode 100644 index 0000000..1e3dac7 --- /dev/null +++ b/Mainline-Interrogation-Scripts.md @@ -0,0 +1,750 @@ +``` +===== run_upstream_capture.py ===== +#!/usr/bin/env python3 +"""Run upstream/main with figment mode and capture output.""" + +import sys +import os + +# Add current directory to path for imports +sys.path.insert(0, '/home/david/src/Mainline') + +# Import upstream modules +from engine import config +from engine.scroll import stream +from engine.display import NullDisplay +from engine.fetch import fetch_all +from engine.ntfy import NtfyPoller +from engine.mic import MicMonitor + +# Override config for figment mode +config.FIGMENT = True +config.FIGMENT_INTERVAL = 2 # Show figment every 2 seconds for testing +config.HEADLINE_LIMIT = 5 # Limit headlines for quick test +config.FRAME_DT = 0.05 # 20 FPS + +# Create a capture display +class CaptureDisplay: + def __init__(self, output_file="/tmp/upstream_output.txt"): + self.width = 80 + self.height = 24 + self.output_file = output_file + self.frame_count = 0 + # Clear output file + with open(output_file, 'w') as f: + f.write("=== Upstream Figment Capture ===\n\n") + + def init(self, width, height): + self.width, self.height = width, height + print(f"[Capture] Display init: {width}x{height}", file=sys.stderr) + + def show(self, buffer): + self.frame_count += 1 + + # Write frame to file + with open(self.output_file, 'a') as f: + f.write(f"\n{'='*60}\n") + f.write(f"FRAME {self.frame_count}\n") + f.write(f"{'='*60}\n") + for line in buffer: + f.write(line + '\n') + + # Print to stderr periodically + if self.frame_count % 10 == 0: + print(f"[Capture] Frame {self.frame_count}: {len(buffer)} lines", + file=sys.stderr) + + def clear(self): + pass + + def cleanup(self): + print(f"[Capture] Done. Captured {self.frame_count} frames to {self.output_file}", + file=sys.stderr) + +print("Starting upstream/main with figment mode...", file=sys.stderr) +print(f"FIGMENT={config.FIGMENT}, INTERVAL={config.FIGMENT_INTERVAL}s", file=sys.stderr) +print(f"HEADLINE_LIMIT={config.HEADLINE_LIMIT}", file=sys.stderr) + +# Fetch headlines +print("Fetching headlines...", file=sys.stderr) +items, linked, failed = fetch_all() +print(f"Fetched {len(items)} items ({linked} linked, {failed} failed)", file=sys.stderr) + +# Create display +display = CaptureDisplay("/tmp/upstream_output.txt") + +# Create mock ntfy and mic (or minimal versions) +try: + ntfy = NtfyPoller("test", reconnect_delay=1.0, display_secs=2.0) + ntfy.start() +except Exception as e: + print(f"[Warning] Ntfy setup failed: {e}", file=sys.stderr) + class MockNtfy: + def get_active_message(self): return None + ntfy = MockNtfy() + +try: + mic = MicMonitor(threshold_db=50.0) + mic.start() +except Exception as e: + print(f"[Warning] Mic setup failed: {e}", file=sys.stderr) + class MockMic: + excess = 0.0 + available = False + mic = MockMic() + +print("Starting stream (will run for ~30 seconds or until 3 figment frames)...", file=sys.stderr) +print("Press Ctrl+C to stop early.", file=sys.stderr) + +# Run stream +try: + stream(items, ntfy, mic, display) +except KeyboardInterrupt: + print("\n[Capture] Interrupted by user", file=sys.stderr) +except Exception as e: + print(f"\n[Error] Stream failed: {e}", file=sys.stderr) + import traceback + traceback.print_exc() + +print(f"\n[Capture] Complete. Output saved to {display.output_file}", file=sys.stderr) + + + + + + +===== run_upstream_capture2.py ===== +#!/usr/bin/env python3 +"""Run upstream/main with figment mode and capture output.""" + +import sys +import os + +# Set environment variables BEFORE importing +os.environ['FIGMENT'] = 'True' +os.environ['FIGMENT_INTERVAL'] = '2' + +# Add current directory to path for imports +sys.path.insert(0, '/home/david/src/Mainline') + +# Import upstream modules +from engine import config + +# Override config directly after import +config.FIGMENT = True +config.FIGMENT_INTERVAL = 2 # Show figment every 2 seconds for testing +config.HEADLINE_LIMIT = 5 # Limit headlines for quick test +config.FRAME_DT = 0.05 # 20 FPS + +print(f"[Config] FIGMENT={config.FIGMENT}, INTERVAL={config.FIGMENT_INTERVAL}s", file=sys.stderr) + +from engine.scroll import stream +from engine.display import NullDisplay +from engine.fetch import fetch_all +from engine.ntfy import NtfyPoller +from engine.mic import MicMonitor + +# Create a capture display +class CaptureDisplay: + def __init__(self, output_file="/tmp/upstream_output2.txt"): + self.width = 80 + self.height = 24 + self.output_file = output_file + self.frame_count = 0 + # Clear output file + with open(output_file, 'w') as f: + f.write("=== Upstream Figment Capture ===\n\n") + + def init(self, width, height): + self.width, self.height = width, height + print(f"[Capture] Display init: {width}x{height}", file=sys.stderr) + + def show(self, buffer): + self.frame_count += 1 + + # Write frame to file + with open(self.output_file, 'a') as f: + f.write(f"\n{'='*60}\n") + f.write(f"FRAME {self.frame_count}\n") + f.write(f"{'='*60}\n") + for line in buffer: + f.write(line + '\n') + + # Print to stderr periodically + if self.frame_count % 10 == 0: + print(f"[Capture] Frame {self.frame_count}: {len(buffer)} lines", + file=sys.stderr) + + def clear(self): + pass + + def cleanup(self): + print(f"[Capture] Done. Captured {self.frame_count} frames to {self.output_file}", + file=sys.stderr) + +print("Starting upstream/main with figment mode...", file=sys.stderr) + +# Fetch headlines +print("Fetching headlines...", file=sys.stderr) +items, linked, failed = fetch_all() +print(f"Fetched {len(items)} items ({linked} linked, {failed} failed)", file=sys.stderr) + +# Create display +display = CaptureDisplay("/tmp/upstream_output2.txt") + +# Create mock ntfy and mic +class MockNtfy: + def get_active_message(self): return None + +class MockMic: + excess = 0.0 + available = False + +ntfy = MockNtfy() +mic = MockMic() + +print("Starting stream with figment enabled...", file=sys.stderr) + +# Run stream +try: + stream(items, ntfy, mic, display) +except KeyboardInterrupt: + print("\n[Capture] Interrupted by user", file=sys.stderr) +except Exception as e: + print(f"\n[Error] Stream failed: {e}", file=sys.stderr) + import traceback + traceback.print_exc() + +print(f"\n[Capture] Complete. Output saved to {display.output_file}", file=sys.stderr) + + + + + + +===== upstream_layers.py ===== +""" +Layer compositing — message overlay, ticker zone, firehose, noise. +Depends on: config, render, effects. +""" + +import random +import re +import time +from datetime import datetime + +from engine import config +from engine.effects import ( + EffectChain, + EffectContext, + fade_line, + firehose_line, + glitch_bar, + noise, + vis_trunc, +) +from engine.render import big_wrap, lr_gradient, msg_gradient +from engine.terminal import RST, W_COOL + +MSG_META = "\033[38;5;245m" +MSG_BORDER = "\033[2;38;5;37m" + + +def render_message_overlay( + msg: tuple[str, str, float] | None, + w: int, + h: int, + msg_cache: tuple, +) -> tuple[list[str], tuple]: + """Render ntfy message overlay. + + Args: + msg: (title, body, timestamp) or None + w: terminal width + h: terminal height + msg_cache: (cache_key, rendered_rows) for caching + + Returns: + (list of ANSI strings, updated cache) + """ + overlay = [] + if msg is None: + return overlay, msg_cache + + m_title, m_body, m_ts = msg + display_text = m_body or m_title or "(empty)" + display_text = re.sub(r"\s+", " ", display_text.upper()) + + cache_key = (display_text, w) + if msg_cache[0] != cache_key: + msg_rows = big_wrap(display_text, w - 4) + msg_cache = (cache_key, msg_rows) + else: + msg_rows = msg_cache[1] + + msg_rows = msg_gradient(msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0) + + elapsed_s = int(time.monotonic() - m_ts) + remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) + ts_str = datetime.now().strftime("%H:%M:%S") + panel_h = len(msg_rows) + 2 + panel_top = max(0, (h - panel_h) // 2) + + row_idx = 0 + for mr in msg_rows: + ln = vis_trunc(mr, w) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") + row_idx += 1 + + meta_parts = [] + if m_title and m_title != m_body: + meta_parts.append(m_title) + meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") + meta = ( + " " + " \u00b7 ".join(meta_parts) + if len(meta_parts) > 1 + else " " + meta_parts[0] + ) + overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") + row_idx += 1 + + bar = "\u2500" * (w - 4) + overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") + + return overlay, msg_cache + + +def render_ticker_zone( + active: list, + scroll_cam: int, + ticker_h: int, + w: int, + noise_cache: dict, + grad_offset: float, +) -> tuple[list[str], dict]: + """Render the ticker scroll zone. + + Args: + active: list of (content_rows, color, canvas_y, meta_idx) + scroll_cam: camera position (viewport top) + ticker_h: height of ticker zone + w: terminal width + noise_cache: dict of cy -> noise string + grad_offset: gradient animation offset + + Returns: + (list of ANSI strings, updated noise_cache) + """ + buf = [] + top_zone = max(1, int(ticker_h * 0.25)) + bot_zone = max(1, int(ticker_h * 0.10)) + + def noise_at(cy): + if cy not in noise_cache: + noise_cache[cy] = noise(w) if random.random() < 0.15 else None + return noise_cache[cy] + + for r in range(ticker_h): + scr_row = r + 1 + cy = scroll_cam + r + top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 + bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 + row_fade = min(top_f, bot_f) + drawn = False + + for content, hc, by, midx in active: + cr = cy - by + if 0 <= cr < len(content): + raw = content[cr] + if cr != midx: + colored = lr_gradient([raw], grad_offset)[0] + else: + colored = raw + ln = vis_trunc(colored, w) + if row_fade < 1.0: + ln = fade_line(ln, row_fade) + + if cr == midx: + buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") + elif ln.strip(): + buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") + else: + buf.append(f"\033[{scr_row};1H\033[K") + drawn = True + break + + if not drawn: + n = noise_at(cy) + if row_fade < 1.0 and n: + n = fade_line(n, row_fade) + if n: + buf.append(f"\033[{scr_row};1H{n}") + else: + buf.append(f"\033[{scr_row};1H\033[K") + + return buf, noise_cache + + +def apply_glitch( + buf: list[str], + ticker_buf_start: int, + mic_excess: float, + w: int, +) -> list[str]: + """Apply glitch effect to ticker buffer. + + Args: + buf: current buffer + ticker_buf_start: index where ticker starts in buffer + mic_excess: mic level above threshold + w: terminal width + + Returns: + Updated buffer with glitches applied + """ + glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) + n_hits = 4 + int(mic_excess / 2) + ticker_buf_len = len(buf) - ticker_buf_start + + if random.random() < glitch_prob and ticker_buf_len > 0: + for _ in range(min(n_hits, ticker_buf_len)): + gi = random.randint(0, ticker_buf_len - 1) + scr_row = gi + 1 + buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" + + return buf + + +def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: + """Render firehose strip at bottom of screen.""" + buf = [] + if fh > 0: + for fr in range(fh): + scr_row = h - fh + fr + 1 + fline = firehose_line(items, w) + buf.append(f"\033[{scr_row};1H{fline}\033[K") + return buf + + +_effect_chain = None + + +def init_effects() -> None: + """Initialize effect plugins and chain.""" + global _effect_chain + from engine.effects import EffectChain, get_registry + + registry = get_registry() + + import effects_plugins + + effects_plugins.discover_plugins() + + chain = EffectChain(registry) + chain.set_order(["noise", "fade", "glitch", "firehose"]) + _effect_chain = chain + + +def process_effects( + buf: list[str], + w: int, + h: int, + scroll_cam: int, + ticker_h: int, + mic_excess: float, + grad_offset: float, + frame_number: int, + has_message: bool, + items: list, +) -> list[str]: + """Process buffer through effect chain.""" + if _effect_chain is None: + init_effects() + + ctx = EffectContext( + terminal_width=w, + terminal_height=h, + scroll_cam=scroll_cam, + ticker_height=ticker_h, + mic_excess=mic_excess, + grad_offset=grad_offset, + frame_number=frame_number, + has_message=has_message, + items=items, + ) + return _effect_chain.process(buf, ctx) + + +def get_effect_chain() -> EffectChain | None: + """Get the effect chain instance.""" + global _effect_chain + if _effect_chain is None: + init_effects() + return _effect_chain + + +def render_figment_overlay( + figment_state, + w: int, + h: int, +) -> list[str]: + """Render figment overlay as ANSI cursor-positioning commands. + + Args: + figment_state: FigmentState with phase, progress, rows, gradient, centering. + w: terminal width + h: terminal height + + Returns: + List of ANSI strings to append to display buffer. + """ + from engine.render import _color_codes_to_ansi + + rows = figment_state.rows + if not rows: + return [] + + phase = figment_state.phase + progress = figment_state.progress + gradient = figment_state.gradient + center_row = figment_state.center_row + center_col = figment_state.center_col + + cols = _color_codes_to_ansi(gradient) + + # Build a list of non-space cell positions + cell_positions = [] + for r_idx, row in enumerate(rows): + for c_idx, ch in enumerate(row): + if ch != " ": + cell_positions.append((r_idx, c_idx)) + + n_cells = len(cell_positions) + if n_cells == 0: + return [] + + # Use a deterministic seed so the reveal/dissolve pattern is stable per-figment + rng = random.Random(hash(tuple(rows[0][:10])) if rows[0] else 42) + shuffled = list(cell_positions) + rng.shuffle(shuffled) + + # Phase-dependent visibility + from effects_plugins.figment import FigmentPhase + + if phase == FigmentPhase.REVEAL: + visible_count = int(n_cells * progress) + visible = set(shuffled[:visible_count]) + elif phase == FigmentPhase.HOLD: + visible = set(cell_positions) + # Strobe: dim some cells periodically + if int(progress * 20) % 3 == 0: + dim_count = int(n_cells * 0.3) + visible -= set(shuffled[:dim_count]) + elif phase == FigmentPhase.DISSOLVE: + remaining_count = int(n_cells * (1.0 - progress)) + visible = set(shuffled[:remaining_count]) + else: + visible = set(cell_positions) + + # Build overlay commands + overlay: list[str] = [] + n_cols = len(cols) + max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1) + + for r_idx, row in enumerate(rows): + scr_row = center_row + r_idx + 1 # 1-indexed + if scr_row < 1 or scr_row > h: + continue + + line_buf: list[str] = [] + has_content = False + + for c_idx, ch in enumerate(row): + scr_col = center_col + c_idx + 1 + if scr_col < 1 or scr_col > w: + continue + + if ch != " " and (r_idx, c_idx) in visible: + # Apply gradient color + shifted = (c_idx / max(max_x - 1, 1)) % 1.0 + idx = min(round(shifted * (n_cols - 1)), n_cols - 1) + line_buf.append(f"{cols[idx]}{ch}{RST}") + has_content = True + else: + line_buf.append(" ") + + if has_content: + line_str = "".join(line_buf).rstrip() + if line_str.strip(): + overlay.append(f"\033[{scr_row};{center_col + 1}H{line_str}{RST}") + + return overlay + + + + + + +===== upstream_scroll.py ===== +""" +Render engine — ticker content, scroll motion, message panel, and firehose overlay. +Orchestrates viewport, frame timing, and layers. +""" + +import random +import time + +from engine import config +from engine.display import ( + Display, + TerminalDisplay, +) +from engine.display import ( + get_monitor as _get_display_monitor, +) +from engine.frame import calculate_scroll_step +from engine.layers import ( + apply_glitch, + process_effects, + render_figment_overlay, + render_firehose, + render_message_overlay, + render_ticker_zone, +) +from engine.viewport import th, tw + +USE_EFFECT_CHAIN = True + + +def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): + """Main render loop with four layers: message, ticker, scroll motion, firehose.""" + if display is None: + display = TerminalDisplay() + random.shuffle(items) + pool = list(items) + seen = set() + queued = 0 + + time.sleep(0.5) + w, h = tw(), th() + display.init(w, h) + display.clear() + fh = config.FIREHOSE_H if config.FIREHOSE else 0 + ticker_view_h = h - fh + GAP = 3 + scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) + + active = [] + scroll_cam = 0 + ticker_next_y = ticker_view_h + noise_cache = {} + scroll_motion_accum = 0.0 + msg_cache = (None, None) + frame_number = 0 + + # Figment overlay (optional — requires cairosvg) + figment = None + if config.FIGMENT: + try: + from effects_plugins.figment import FigmentEffect + + figment = FigmentEffect() + figment.config.enabled = True + figment.config.params["interval_secs"] = config.FIGMENT_INTERVAL + except (ImportError, OSError): + pass + + while True: + if queued >= config.HEADLINE_LIMIT and not active: + break + + t0 = time.monotonic() + w, h = tw(), th() + fh = config.FIREHOSE_H if config.FIREHOSE else 0 + ticker_view_h = h - fh + scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) + + msg = ntfy_poller.get_active_message() + msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache) + + buf = [] + ticker_h = ticker_view_h + + scroll_motion_accum += config.FRAME_DT + while scroll_motion_accum >= scroll_step_interval: + scroll_motion_accum -= scroll_step_interval + scroll_cam += 1 + + while ( + ticker_next_y < scroll_cam + ticker_view_h + 10 + and queued < config.HEADLINE_LIMIT + ): + from engine.effects import next_headline + from engine.render import make_block + + t, src, ts = next_headline(pool, items, seen) + ticker_content, hc, midx = make_block(t, src, ts, w) + active.append((ticker_content, hc, ticker_next_y, midx)) + ticker_next_y += len(ticker_content) + GAP + queued += 1 + + active = [ + (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam + ] + for k in list(noise_cache): + if k < scroll_cam: + del noise_cache[k] + + grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0 + ticker_buf_start = len(buf) + + ticker_buf, noise_cache = render_ticker_zone( + active, scroll_cam, ticker_h, w, noise_cache, grad_offset + ) + buf.extend(ticker_buf) + + mic_excess = mic_monitor.excess + render_start = time.perf_counter() + + if USE_EFFECT_CHAIN: + buf = process_effects( + buf, + w, + h, + scroll_cam, + ticker_h, + mic_excess, + grad_offset, + frame_number, + msg is not None, + items, + ) + else: + buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) + firehose_buf = render_firehose(items, w, fh, h) + buf.extend(firehose_buf) + + # Figment overlay (between effects and ntfy message) + if figment and figment.config.enabled: + figment_state = figment.get_figment_state(frame_number, w, h) + if figment_state is not None: + figment_buf = render_figment_overlay(figment_state, w, h) + buf.extend(figment_buf) + + if msg_overlay: + buf.extend(msg_overlay) + + render_elapsed = (time.perf_counter() - render_start) * 1000 + monitor = _get_display_monitor() + if monitor: + chars = sum(len(line) for line in buf) + monitor.record_effect("render", render_elapsed, chars, chars) + + display.show(buf) + + elapsed = time.monotonic() - t0 + time.sleep(max(0, config.FRAME_DT - elapsed)) + frame_number += 1 + + display.cleanup() +``` \ No newline at end of file