diff --git a/Mainline-Interrogation-Scripts.md b/Mainline-Interrogation-Scripts.md index 1e3dac7..1a61527 100644 --- a/Mainline-Interrogation-Scripts.md +++ b/Mainline-Interrogation-Scripts.md @@ -1,117 +1,4 @@ ``` -===== run_upstream_capture.py ===== -#!/usr/bin/env python3 -"""Run upstream/main with figment mode and capture output.""" - -import sys -import os - -# Add current directory to path for imports -sys.path.insert(0, '/home/david/src/Mainline') - -# Import upstream modules -from engine import config -from engine.scroll import stream -from engine.display import NullDisplay -from engine.fetch import fetch_all -from engine.ntfy import NtfyPoller -from engine.mic import MicMonitor - -# Override config for figment mode -config.FIGMENT = True -config.FIGMENT_INTERVAL = 2 # Show figment every 2 seconds for testing -config.HEADLINE_LIMIT = 5 # Limit headlines for quick test -config.FRAME_DT = 0.05 # 20 FPS - -# Create a capture display -class CaptureDisplay: - def __init__(self, output_file="/tmp/upstream_output.txt"): - self.width = 80 - self.height = 24 - self.output_file = output_file - self.frame_count = 0 - # Clear output file - with open(output_file, 'w') as f: - f.write("=== Upstream Figment Capture ===\n\n") - - def init(self, width, height): - self.width, self.height = width, height - print(f"[Capture] Display init: {width}x{height}", file=sys.stderr) - - def show(self, buffer): - self.frame_count += 1 - - # Write frame to file - with open(self.output_file, 'a') as f: - f.write(f"\n{'='*60}\n") - f.write(f"FRAME {self.frame_count}\n") - f.write(f"{'='*60}\n") - for line in buffer: - f.write(line + '\n') - - # Print to stderr periodically - if self.frame_count % 10 == 0: - print(f"[Capture] Frame {self.frame_count}: {len(buffer)} lines", - file=sys.stderr) - - def clear(self): - pass - - def cleanup(self): - print(f"[Capture] Done. Captured {self.frame_count} frames to {self.output_file}", - file=sys.stderr) - -print("Starting upstream/main with figment mode...", file=sys.stderr) -print(f"FIGMENT={config.FIGMENT}, INTERVAL={config.FIGMENT_INTERVAL}s", file=sys.stderr) -print(f"HEADLINE_LIMIT={config.HEADLINE_LIMIT}", file=sys.stderr) - -# Fetch headlines -print("Fetching headlines...", file=sys.stderr) -items, linked, failed = fetch_all() -print(f"Fetched {len(items)} items ({linked} linked, {failed} failed)", file=sys.stderr) - -# Create display -display = CaptureDisplay("/tmp/upstream_output.txt") - -# Create mock ntfy and mic (or minimal versions) -try: - ntfy = NtfyPoller("test", reconnect_delay=1.0, display_secs=2.0) - ntfy.start() -except Exception as e: - print(f"[Warning] Ntfy setup failed: {e}", file=sys.stderr) - class MockNtfy: - def get_active_message(self): return None - ntfy = MockNtfy() - -try: - mic = MicMonitor(threshold_db=50.0) - mic.start() -except Exception as e: - print(f"[Warning] Mic setup failed: {e}", file=sys.stderr) - class MockMic: - excess = 0.0 - available = False - mic = MockMic() - -print("Starting stream (will run for ~30 seconds or until 3 figment frames)...", file=sys.stderr) -print("Press Ctrl+C to stop early.", file=sys.stderr) - -# Run stream -try: - stream(items, ntfy, mic, display) -except KeyboardInterrupt: - print("\n[Capture] Interrupted by user", file=sys.stderr) -except Exception as e: - print(f"\n[Error] Stream failed: {e}", file=sys.stderr) - import traceback - traceback.print_exc() - -print(f"\n[Capture] Complete. Output saved to {display.output_file}", file=sys.stderr) - - - - - ===== run_upstream_capture2.py ===== #!/usr/bin/env python3 @@ -217,534 +104,3 @@ except Exception as e: print(f"\n[Capture] Complete. Output saved to {display.output_file}", file=sys.stderr) - - - - - -===== upstream_layers.py ===== -""" -Layer compositing — message overlay, ticker zone, firehose, noise. -Depends on: config, render, effects. -""" - -import random -import re -import time -from datetime import datetime - -from engine import config -from engine.effects import ( - EffectChain, - EffectContext, - fade_line, - firehose_line, - glitch_bar, - noise, - vis_trunc, -) -from engine.render import big_wrap, lr_gradient, msg_gradient -from engine.terminal import RST, W_COOL - -MSG_META = "\033[38;5;245m" -MSG_BORDER = "\033[2;38;5;37m" - - -def render_message_overlay( - msg: tuple[str, str, float] | None, - w: int, - h: int, - msg_cache: tuple, -) -> tuple[list[str], tuple]: - """Render ntfy message overlay. - - Args: - msg: (title, body, timestamp) or None - w: terminal width - h: terminal height - msg_cache: (cache_key, rendered_rows) for caching - - Returns: - (list of ANSI strings, updated cache) - """ - overlay = [] - if msg is None: - return overlay, msg_cache - - m_title, m_body, m_ts = msg - display_text = m_body or m_title or "(empty)" - display_text = re.sub(r"\s+", " ", display_text.upper()) - - cache_key = (display_text, w) - if msg_cache[0] != cache_key: - msg_rows = big_wrap(display_text, w - 4) - msg_cache = (cache_key, msg_rows) - else: - msg_rows = msg_cache[1] - - msg_rows = msg_gradient(msg_rows, (time.monotonic() * config.GRAD_SPEED) % 1.0) - - elapsed_s = int(time.monotonic() - m_ts) - remaining = max(0, config.MESSAGE_DISPLAY_SECS - elapsed_s) - ts_str = datetime.now().strftime("%H:%M:%S") - panel_h = len(msg_rows) + 2 - panel_top = max(0, (h - panel_h) // 2) - - row_idx = 0 - for mr in msg_rows: - ln = vis_trunc(mr, w) - overlay.append(f"\033[{panel_top + row_idx + 1};1H {ln}\033[0m\033[K") - row_idx += 1 - - meta_parts = [] - if m_title and m_title != m_body: - meta_parts.append(m_title) - meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") - meta = ( - " " + " \u00b7 ".join(meta_parts) - if len(meta_parts) > 1 - else " " + meta_parts[0] - ) - overlay.append(f"\033[{panel_top + row_idx + 1};1H{MSG_META}{meta}\033[0m\033[K") - row_idx += 1 - - bar = "\u2500" * (w - 4) - overlay.append(f"\033[{panel_top + row_idx + 1};1H {MSG_BORDER}{bar}\033[0m\033[K") - - return overlay, msg_cache - - -def render_ticker_zone( - active: list, - scroll_cam: int, - ticker_h: int, - w: int, - noise_cache: dict, - grad_offset: float, -) -> tuple[list[str], dict]: - """Render the ticker scroll zone. - - Args: - active: list of (content_rows, color, canvas_y, meta_idx) - scroll_cam: camera position (viewport top) - ticker_h: height of ticker zone - w: terminal width - noise_cache: dict of cy -> noise string - grad_offset: gradient animation offset - - Returns: - (list of ANSI strings, updated noise_cache) - """ - buf = [] - top_zone = max(1, int(ticker_h * 0.25)) - bot_zone = max(1, int(ticker_h * 0.10)) - - def noise_at(cy): - if cy not in noise_cache: - noise_cache[cy] = noise(w) if random.random() < 0.15 else None - return noise_cache[cy] - - for r in range(ticker_h): - scr_row = r + 1 - cy = scroll_cam + r - top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 - bot_f = min(1.0, (ticker_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 - row_fade = min(top_f, bot_f) - drawn = False - - for content, hc, by, midx in active: - cr = cy - by - if 0 <= cr < len(content): - raw = content[cr] - if cr != midx: - colored = lr_gradient([raw], grad_offset)[0] - else: - colored = raw - ln = vis_trunc(colored, w) - if row_fade < 1.0: - ln = fade_line(ln, row_fade) - - if cr == midx: - buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") - elif ln.strip(): - buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") - else: - buf.append(f"\033[{scr_row};1H\033[K") - drawn = True - break - - if not drawn: - n = noise_at(cy) - if row_fade < 1.0 and n: - n = fade_line(n, row_fade) - if n: - buf.append(f"\033[{scr_row};1H{n}") - else: - buf.append(f"\033[{scr_row};1H\033[K") - - return buf, noise_cache - - -def apply_glitch( - buf: list[str], - ticker_buf_start: int, - mic_excess: float, - w: int, -) -> list[str]: - """Apply glitch effect to ticker buffer. - - Args: - buf: current buffer - ticker_buf_start: index where ticker starts in buffer - mic_excess: mic level above threshold - w: terminal width - - Returns: - Updated buffer with glitches applied - """ - glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) - n_hits = 4 + int(mic_excess / 2) - ticker_buf_len = len(buf) - ticker_buf_start - - if random.random() < glitch_prob and ticker_buf_len > 0: - for _ in range(min(n_hits, ticker_buf_len)): - gi = random.randint(0, ticker_buf_len - 1) - scr_row = gi + 1 - buf[ticker_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" - - return buf - - -def render_firehose(items: list, w: int, fh: int, h: int) -> list[str]: - """Render firehose strip at bottom of screen.""" - buf = [] - if fh > 0: - for fr in range(fh): - scr_row = h - fh + fr + 1 - fline = firehose_line(items, w) - buf.append(f"\033[{scr_row};1H{fline}\033[K") - return buf - - -_effect_chain = None - - -def init_effects() -> None: - """Initialize effect plugins and chain.""" - global _effect_chain - from engine.effects import EffectChain, get_registry - - registry = get_registry() - - import effects_plugins - - effects_plugins.discover_plugins() - - chain = EffectChain(registry) - chain.set_order(["noise", "fade", "glitch", "firehose"]) - _effect_chain = chain - - -def process_effects( - buf: list[str], - w: int, - h: int, - scroll_cam: int, - ticker_h: int, - mic_excess: float, - grad_offset: float, - frame_number: int, - has_message: bool, - items: list, -) -> list[str]: - """Process buffer through effect chain.""" - if _effect_chain is None: - init_effects() - - ctx = EffectContext( - terminal_width=w, - terminal_height=h, - scroll_cam=scroll_cam, - ticker_height=ticker_h, - mic_excess=mic_excess, - grad_offset=grad_offset, - frame_number=frame_number, - has_message=has_message, - items=items, - ) - return _effect_chain.process(buf, ctx) - - -def get_effect_chain() -> EffectChain | None: - """Get the effect chain instance.""" - global _effect_chain - if _effect_chain is None: - init_effects() - return _effect_chain - - -def render_figment_overlay( - figment_state, - w: int, - h: int, -) -> list[str]: - """Render figment overlay as ANSI cursor-positioning commands. - - Args: - figment_state: FigmentState with phase, progress, rows, gradient, centering. - w: terminal width - h: terminal height - - Returns: - List of ANSI strings to append to display buffer. - """ - from engine.render import _color_codes_to_ansi - - rows = figment_state.rows - if not rows: - return [] - - phase = figment_state.phase - progress = figment_state.progress - gradient = figment_state.gradient - center_row = figment_state.center_row - center_col = figment_state.center_col - - cols = _color_codes_to_ansi(gradient) - - # Build a list of non-space cell positions - cell_positions = [] - for r_idx, row in enumerate(rows): - for c_idx, ch in enumerate(row): - if ch != " ": - cell_positions.append((r_idx, c_idx)) - - n_cells = len(cell_positions) - if n_cells == 0: - return [] - - # Use a deterministic seed so the reveal/dissolve pattern is stable per-figment - rng = random.Random(hash(tuple(rows[0][:10])) if rows[0] else 42) - shuffled = list(cell_positions) - rng.shuffle(shuffled) - - # Phase-dependent visibility - from effects_plugins.figment import FigmentPhase - - if phase == FigmentPhase.REVEAL: - visible_count = int(n_cells * progress) - visible = set(shuffled[:visible_count]) - elif phase == FigmentPhase.HOLD: - visible = set(cell_positions) - # Strobe: dim some cells periodically - if int(progress * 20) % 3 == 0: - dim_count = int(n_cells * 0.3) - visible -= set(shuffled[:dim_count]) - elif phase == FigmentPhase.DISSOLVE: - remaining_count = int(n_cells * (1.0 - progress)) - visible = set(shuffled[:remaining_count]) - else: - visible = set(cell_positions) - - # Build overlay commands - overlay: list[str] = [] - n_cols = len(cols) - max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1) - - for r_idx, row in enumerate(rows): - scr_row = center_row + r_idx + 1 # 1-indexed - if scr_row < 1 or scr_row > h: - continue - - line_buf: list[str] = [] - has_content = False - - for c_idx, ch in enumerate(row): - scr_col = center_col + c_idx + 1 - if scr_col < 1 or scr_col > w: - continue - - if ch != " " and (r_idx, c_idx) in visible: - # Apply gradient color - shifted = (c_idx / max(max_x - 1, 1)) % 1.0 - idx = min(round(shifted * (n_cols - 1)), n_cols - 1) - line_buf.append(f"{cols[idx]}{ch}{RST}") - has_content = True - else: - line_buf.append(" ") - - if has_content: - line_str = "".join(line_buf).rstrip() - if line_str.strip(): - overlay.append(f"\033[{scr_row};{center_col + 1}H{line_str}{RST}") - - return overlay - - - - - - -===== upstream_scroll.py ===== -""" -Render engine — ticker content, scroll motion, message panel, and firehose overlay. -Orchestrates viewport, frame timing, and layers. -""" - -import random -import time - -from engine import config -from engine.display import ( - Display, - TerminalDisplay, -) -from engine.display import ( - get_monitor as _get_display_monitor, -) -from engine.frame import calculate_scroll_step -from engine.layers import ( - apply_glitch, - process_effects, - render_figment_overlay, - render_firehose, - render_message_overlay, - render_ticker_zone, -) -from engine.viewport import th, tw - -USE_EFFECT_CHAIN = True - - -def stream(items, ntfy_poller, mic_monitor, display: Display | None = None): - """Main render loop with four layers: message, ticker, scroll motion, firehose.""" - if display is None: - display = TerminalDisplay() - random.shuffle(items) - pool = list(items) - seen = set() - queued = 0 - - time.sleep(0.5) - w, h = tw(), th() - display.init(w, h) - display.clear() - fh = config.FIREHOSE_H if config.FIREHOSE else 0 - ticker_view_h = h - fh - GAP = 3 - scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) - - active = [] - scroll_cam = 0 - ticker_next_y = ticker_view_h - noise_cache = {} - scroll_motion_accum = 0.0 - msg_cache = (None, None) - frame_number = 0 - - # Figment overlay (optional — requires cairosvg) - figment = None - if config.FIGMENT: - try: - from effects_plugins.figment import FigmentEffect - - figment = FigmentEffect() - figment.config.enabled = True - figment.config.params["interval_secs"] = config.FIGMENT_INTERVAL - except (ImportError, OSError): - pass - - while True: - if queued >= config.HEADLINE_LIMIT and not active: - break - - t0 = time.monotonic() - w, h = tw(), th() - fh = config.FIREHOSE_H if config.FIREHOSE else 0 - ticker_view_h = h - fh - scroll_step_interval = calculate_scroll_step(config.SCROLL_DUR, ticker_view_h) - - msg = ntfy_poller.get_active_message() - msg_overlay, msg_cache = render_message_overlay(msg, w, h, msg_cache) - - buf = [] - ticker_h = ticker_view_h - - scroll_motion_accum += config.FRAME_DT - while scroll_motion_accum >= scroll_step_interval: - scroll_motion_accum -= scroll_step_interval - scroll_cam += 1 - - while ( - ticker_next_y < scroll_cam + ticker_view_h + 10 - and queued < config.HEADLINE_LIMIT - ): - from engine.effects import next_headline - from engine.render import make_block - - t, src, ts = next_headline(pool, items, seen) - ticker_content, hc, midx = make_block(t, src, ts, w) - active.append((ticker_content, hc, ticker_next_y, midx)) - ticker_next_y += len(ticker_content) + GAP - queued += 1 - - active = [ - (c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > scroll_cam - ] - for k in list(noise_cache): - if k < scroll_cam: - del noise_cache[k] - - grad_offset = (time.monotonic() * config.GRAD_SPEED) % 1.0 - ticker_buf_start = len(buf) - - ticker_buf, noise_cache = render_ticker_zone( - active, scroll_cam, ticker_h, w, noise_cache, grad_offset - ) - buf.extend(ticker_buf) - - mic_excess = mic_monitor.excess - render_start = time.perf_counter() - - if USE_EFFECT_CHAIN: - buf = process_effects( - buf, - w, - h, - scroll_cam, - ticker_h, - mic_excess, - grad_offset, - frame_number, - msg is not None, - items, - ) - else: - buf = apply_glitch(buf, ticker_buf_start, mic_excess, w) - firehose_buf = render_firehose(items, w, fh, h) - buf.extend(firehose_buf) - - # Figment overlay (between effects and ntfy message) - if figment and figment.config.enabled: - figment_state = figment.get_figment_state(frame_number, w, h) - if figment_state is not None: - figment_buf = render_figment_overlay(figment_state, w, h) - buf.extend(figment_buf) - - if msg_overlay: - buf.extend(msg_overlay) - - render_elapsed = (time.perf_counter() - render_start) * 1000 - monitor = _get_display_monitor() - if monitor: - chars = sum(len(line) for line in buf) - monitor.record_effect("render", render_elapsed, chars, chars) - - display.show(buf) - - elapsed = time.monotonic() - t0 - time.sleep(max(0, config.FRAME_DT - elapsed)) - frame_number += 1 - - display.cleanup() -``` \ No newline at end of file