#!/usr/bin/env python3 """ M A I N L I N E Digital news consciousness stream. Matrix aesthetic · THX-1138 hue. """ import subprocess, sys, os, pathlib # ─── BOOTSTRAP VENV ─────────────────────────────────────── _VENV = pathlib.Path(__file__).resolve().parent / ".mainline_venv" _MARKER = _VENV / ".installed_v3" def _ensure_venv(): """Create a local venv and install deps if needed.""" if _MARKER.exists(): return import venv print("\033[2;38;5;34m > first run — creating environment...\033[0m") venv.create(str(_VENV), with_pip=True, clear=True) pip = str(_VENV / "bin" / "pip") subprocess.check_call( [pip, "install", "feedparser", "Pillow", "-q"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, ) _MARKER.touch() _ensure_venv() # Install sounddevice on first run after v3 _MARKER_SD = _VENV / ".installed_sd" if not _MARKER_SD.exists(): _pip = str(_VENV / "bin" / "pip") subprocess.check_call([_pip, "install", "sounddevice", "numpy", "-q"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) _MARKER_SD.touch() sys.path.insert(0, str(next((_VENV / "lib").glob("python*/site-packages")))) import feedparser # noqa: E402 from PIL import Image, ImageDraw, ImageFont # noqa: E402 import random, time, re, signal, atexit, textwrap, threading # noqa: E402 try: import sounddevice as _sd import numpy as _np _HAS_MIC = True except Exception: _HAS_MIC = False import urllib.request, urllib.parse, json # noqa: E402 from datetime import datetime from html import unescape from html.parser import HTMLParser # ─── CONFIG ─────────────────────────────────────────────── HEADLINE_LIMIT = 1000 FEED_TIMEOUT = 10 MIC_THRESHOLD_DB = 50 # dB above which glitches intensify MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news' FIREHOSE = '--firehose' in sys.argv # ntfy message queue NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json?since=20s&poll=1" NTFY_POLL_INTERVAL = 15 # seconds between polls MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen # Poetry/literature sources — public domain via Project Gutenberg POETRY_SOURCES = { "Whitman": "https://www.gutenberg.org/cache/epub/1322/pg1322.txt", "Dickinson": "https://www.gutenberg.org/cache/epub/12242/pg12242.txt", "Thoreau": "https://www.gutenberg.org/cache/epub/205/pg205.txt", "Emerson": "https://www.gutenberg.org/cache/epub/2944/pg2944.txt", "Whitman II": "https://www.gutenberg.org/cache/epub/8388/pg8388.txt", } # ─── ANSI ───────────────────────────────────────────────── RST = "\033[0m" BOLD = "\033[1m" DIM = "\033[2m" # Matrix greens G_HI = "\033[38;5;46m" G_MID = "\033[38;5;34m" G_LO = "\033[38;5;22m" G_DIM = "\033[2;38;5;34m" # THX-1138 sterile tones W_COOL = "\033[38;5;250m" W_DIM = "\033[2;38;5;245m" W_GHOST = "\033[2;38;5;238m" C_DIM = "\033[2;38;5;37m" # Terminal control CLR = "\033[2J\033[H" CURSOR_OFF = "\033[?25l" CURSOR_ON = "\033[?25h" # ─── FEEDS ──────────────────────────────────────────────── FEEDS = { # Science & Technology "Nature": "https://www.nature.com/nature.rss", "Science Daily": "https://www.sciencedaily.com/rss/all.xml", "Phys.org": "https://phys.org/rss-feed/", "NASA": "https://www.nasa.gov/news-release/feed/", "Ars Technica": "https://feeds.arstechnica.com/arstechnica/index", "New Scientist": "https://www.newscientist.com/section/news/feed/", "Quanta": "https://api.quantamagazine.org/feed/", "BBC Science": "http://feeds.bbci.co.uk/news/science_and_environment/rss.xml", "MIT Tech Review": "https://www.technologyreview.com/feed/", # Economics & Business "BBC Business": "http://feeds.bbci.co.uk/news/business/rss.xml", "MarketWatch": "https://feeds.marketwatch.com/marketwatch/topstories/", "Economist": "https://www.economist.com/finance-and-economics/rss.xml", # World & Politics "BBC World": "http://feeds.bbci.co.uk/news/world/rss.xml", "NPR": "https://feeds.npr.org/1001/rss.xml", "Al Jazeera": "https://www.aljazeera.com/xml/rss/all.xml", "Guardian World": "https://www.theguardian.com/world/rss", "DW": "https://rss.dw.com/rdf/rss-en-all", "France24": "https://www.france24.com/en/rss", "ABC Australia": "https://www.abc.net.au/news/feed/2942460/rss.xml", "Japan Times": "https://www.japantimes.co.jp/feed/", "The Hindu": "https://www.thehindu.com/news/national/feeder/default.rss", "SCMP": "https://www.scmp.com/rss/91/feed", "Der Spiegel": "https://www.spiegel.de/international/index.rss", # Culture & Ideas "Guardian Culture": "https://www.theguardian.com/culture/rss", "Aeon": "https://aeon.co/feed.rss", "Smithsonian": "https://www.smithsonianmag.com/rss/latest_articles/", "The Marginalian": "https://www.themarginalian.org/feed/", "Nautilus": "https://nautil.us/feed/", "Wired": "https://www.wired.com/feed/rss", "The Conversation": "https://theconversation.com/us/articles.atom", "Longreads": "https://longreads.com/feed/", "Literary Hub": "https://lithub.com/feed/", "Atlas Obscura": "https://www.atlasobscura.com/feeds/latest", } # ─── GLYPHS ─────────────────────────────────────────────── GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ" # ─── FONT RENDERING (OTF → terminal blocks) ───────────── _FONT_PATH = "/Users/genejohnson/Documents/CS Bishop Drawn/CSBishopDrawn-Italic.otf" _FONT_OBJ = None _FONT_SZ = 60 _RENDER_H = 8 # terminal rows per rendered text line # Non-Latin scripts → macOS system fonts _SCRIPT_FONTS = { 'zh-cn': '/System/Library/Fonts/STHeiti Medium.ttc', 'ja': '/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc', 'ko': '/System/Library/Fonts/AppleSDGothicNeo.ttc', 'ru': '/System/Library/Fonts/Supplemental/Arial.ttf', 'uk': '/System/Library/Fonts/Supplemental/Arial.ttf', 'el': '/System/Library/Fonts/Supplemental/Arial.ttf', 'he': '/System/Library/Fonts/Supplemental/Arial.ttf', 'ar': '/System/Library/Fonts/GeezaPro.ttc', 'fa': '/System/Library/Fonts/GeezaPro.ttc', 'hi': '/System/Library/Fonts/Kohinoor.ttc', 'th': '/System/Library/Fonts/ThonburiUI.ttc', } _FONT_CACHE = {} _NO_UPPER = {'zh-cn', 'ja', 'ko', 'ar', 'fa', 'hi', 'th', 'he'} # Left → right gradient: white-hot leading edge fades to near-black _GRAD_COLS = [ "\033[1;38;5;231m", # white "\033[1;38;5;195m", # pale cyan-white "\033[38;5;123m", # bright cyan "\033[38;5;118m", # bright lime "\033[38;5;82m", # lime "\033[38;5;46m", # bright green "\033[38;5;40m", # green "\033[38;5;34m", # medium green "\033[38;5;28m", # dark green "\033[38;5;22m", # deep green "\033[2;38;5;22m", # dim deep green "\033[2;38;5;235m", # near black ] def _font(): """Lazy-load the OTF font.""" global _FONT_OBJ if _FONT_OBJ is None: _FONT_OBJ = ImageFont.truetype(_FONT_PATH, _FONT_SZ) return _FONT_OBJ def _font_for_lang(lang=None): """Get appropriate font for a language.""" if lang is None or lang not in _SCRIPT_FONTS: return _font() if lang not in _FONT_CACHE: try: _FONT_CACHE[lang] = ImageFont.truetype(_SCRIPT_FONTS[lang], _FONT_SZ) except Exception: _FONT_CACHE[lang] = _font() return _FONT_CACHE[lang] # ─── HELPERS ────────────────────────────────────────────── class _Strip(HTMLParser): def __init__(self): super().__init__() self._t = [] def handle_data(self, d): self._t.append(d) def text(self): return "".join(self._t).strip() def strip_tags(html): s = _Strip() s.feed(unescape(html or "")) return s.text() def tw(): try: return os.get_terminal_size().columns except Exception: return 80 def th(): try: return os.get_terminal_size().lines except Exception: return 24 def noise(w): d = random.choice([0.15, 0.25, 0.35, 0.12]) # was [0.08, 0.12, 0.2, 0.05], now much denser return "".join( f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" f"{random.choice(GLITCH + KATA)}{RST}" if random.random() < d else " " for _ in range(w) ) def glitch_bar(w): c = random.choice(["░", "▒", "─", "╌"]) n = random.randint(3, w // 2) o = random.randint(0, w - n) return " " * o + f"{G_LO}{DIM}" + c * n + RST # ─── SOURCE → LANGUAGE MAPPING ────────────────────────── # Headlines from these outlets render in their cultural home language # regardless of content, reflecting the true distribution of sources. SOURCE_LANGS = { "Der Spiegel": "de", "DW": "de", "France24": "fr", "Japan Times": "ja", "The Hindu": "hi", "SCMP": "zh-cn", "Al Jazeera": "ar", } # ─── LOCATION → LANGUAGE ───────────────────────────────── _LOCATION_LANGS = { r'\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b': 'zh-cn', r'\b(?:japan|japanese|tokyo|osaka|kishida)\b': 'ja', r'\b(?:korea|korean|seoul|pyongyang)\b': 'ko', r'\b(?:russia|russian|moscow|kremlin|putin)\b': 'ru', r'\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b': 'ar', r'\b(?:india|indian|delhi|mumbai|modi)\b': 'hi', r'\b(?:germany|german|berlin|munich|scholz)\b': 'de', r'\b(?:france|french|paris|lyon|macron)\b': 'fr', r'\b(?:spain|spanish|madrid)\b': 'es', r'\b(?:italy|italian|rome|milan|meloni)\b': 'it', r'\b(?:portugal|portuguese|lisbon)\b': 'pt', r'\b(?:brazil|brazilian|são paulo|lula)\b': 'pt', r'\b(?:greece|greek|athens)\b': 'el', r'\b(?:turkey|turkish|istanbul|ankara|erdogan)\b': 'tr', r'\b(?:iran|iranian|tehran)\b': 'fa', r'\b(?:thailand|thai|bangkok)\b': 'th', r'\b(?:vietnam|vietnamese|hanoi)\b': 'vi', r'\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b': 'uk', r'\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b': 'he', } _TRANSLATE_CACHE = {} def _detect_location_language(title): """Detect if headline mentions a location, return target language.""" title_lower = title.lower() for pattern, lang in _LOCATION_LANGS.items(): if re.search(pattern, title_lower): return lang return None def _translate_headline(title, target_lang): """Translate headline via Google Translate API (zero dependencies).""" key = (title, target_lang) if key in _TRANSLATE_CACHE: return _TRANSLATE_CACHE[key] try: q = urllib.parse.quote(title) url = ("https://translate.googleapis.com/translate_a/single" f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}") req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) resp = urllib.request.urlopen(req, timeout=5) data = json.loads(resp.read()) result = "".join(p[0] for p in data[0] if p[0]) or title except Exception: result = title _TRANSLATE_CACHE[key] = result return result # ─── CONTENT FILTER ─────────────────────────────────────── _SKIP_RE = re.compile( r'\b(?:' # ── sports ── r'football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|' r'hockey|lacrosse|volleyball|badminton|' r'nba|nfl|nhl|mlb|mls|fifa|uefa|' r'premier league|champions league|la liga|serie a|bundesliga|' r'world cup|super bowl|world series|stanley cup|' r'playoff|playoffs|touchdown|goalkeeper|striker|quarterback|' r'slam dunk|home run|grand slam|offside|halftime|' r'batting|wicket|innings|' r'formula 1|nascar|motogp|' r'boxing|ufc|mma|' r'marathon|tour de france|' r'transfer window|draft pick|relegation|' # ── vapid / insipid ── r'kardashian|jenner|reality tv|reality show|' r'influencer|viral video|tiktok|instagram|' r'best dressed|worst dressed|red carpet|' r'horoscope|zodiac|gossip|bikini|selfie|' r'you won.t believe|what happened next|' r'celebrity couple|celebrity feud|baby bump' r')\b', re.IGNORECASE ) def _skip(title): """Return True if headline is sports, vapid, or insipid.""" return bool(_SKIP_RE.search(title)) # ─── DISPLAY ────────────────────────────────────────────── def type_out(text, color=G_HI): i = 0 while i < len(text): if random.random() < 0.3: b = random.randint(2, 5) sys.stdout.write(f"{color}{text[i:i+b]}{RST}") i += b else: sys.stdout.write(f"{color}{text[i]}{RST}") i += 1 sys.stdout.flush() time.sleep(random.uniform(0.004, 0.018)) def slow_print(text, color=G_DIM, delay=0.015): for ch in text: sys.stdout.write(f"{color}{ch}{RST}") sys.stdout.flush() time.sleep(delay) def boot_ln(label, status, ok=True): dots = max(3, min(30, tw() - len(label) - len(status) - 8)) sys.stdout.write(f" {G_DIM}>{RST} {W_DIM}{label} ") sys.stdout.flush() for _ in range(dots): sys.stdout.write(f"{G_LO}.") sys.stdout.flush() time.sleep(random.uniform(0.006, 0.025)) c = G_MID if ok else "\033[2;38;5;196m" print(f" {c}{status}{RST}") time.sleep(random.uniform(0.02, 0.1)) # ─── FETCH ──────────────────────────────────────────────── def fetch_feed(url): try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) resp = urllib.request.urlopen(req, timeout=FEED_TIMEOUT) return feedparser.parse(resp.read()) except Exception: return None def fetch_all(): items = [] linked = failed = 0 for src, url in FEEDS.items(): feed = fetch_feed(url) if feed is None or (feed.bozo and not feed.entries): boot_ln(src, "DARK", False) failed += 1 continue n = 0 for e in feed.entries: t = strip_tags(e.get("title", "")) if not t or _skip(t): continue pub = e.get("published_parsed") or e.get("updated_parsed") try: ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——" except Exception: ts = "——:——" items.append((t, src, ts)) n += 1 if n: boot_ln(src, f"LINKED [{n}]", True) linked += 1 else: boot_ln(src, "EMPTY", False) failed += 1 return items, linked, failed def _fetch_gutenberg(url, label): """Download and parse stanzas/passages from a Project Gutenberg text.""" try: req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) resp = urllib.request.urlopen(req, timeout=15) text = resp.read().decode('utf-8', errors='replace').replace('\r\n', '\n').replace('\r', '\n') # Strip PG boilerplate m = re.search(r'\*\*\*\s*START OF[^\n]*\n', text) if m: text = text[m.end():] m = re.search(r'\*\*\*\s*END OF', text) if m: text = text[:m.start()] # Split on blank lines into stanzas/passages blocks = re.split(r'\n{2,}', text.strip()) items = [] for blk in blocks: blk = ' '.join(blk.split()) # flatten to one line if len(blk) < 20 or len(blk) > 280: continue if blk.isupper(): # skip all-caps headers continue if re.match(r'^[IVXLCDM]+\.?\s*$', blk): # roman numerals continue items.append((blk, label, '')) return items except Exception: return [] def fetch_poetry(): """Fetch all poetry/literature sources.""" items = [] linked = failed = 0 for label, url in POETRY_SOURCES.items(): stanzas = _fetch_gutenberg(url, label) if stanzas: boot_ln(label, f"LOADED [{len(stanzas)}]", True) items.extend(stanzas) linked += 1 else: boot_ln(label, "DARK", False) failed += 1 return items, linked, failed # ─── CACHE ──────────────────────────────────────────────── _CACHE_DIR = pathlib.Path(__file__).resolve().parent def _cache_path(): return _CACHE_DIR / f".mainline_cache_{MODE}.json" def _load_cache(): """Load cached items from disk if available.""" p = _cache_path() if not p.exists(): return None try: data = json.loads(p.read_text()) items = [tuple(i) for i in data["items"]] return items if items else None except Exception: return None def _save_cache(items): """Save fetched items to disk for fast subsequent runs.""" try: _cache_path().write_text(json.dumps({"items": items})) except Exception: pass # ─── STREAM ─────────────────────────────────────────────── _SCROLL_DUR = 3.75 # seconds per headline _FRAME_DT = 0.05 # 50ms base frame rate (20 FPS) FIREHOSE_H = 12 # firehose zone height (terminal rows) _mic_db = -99.0 # current mic level, written by background thread _mic_stream = None def _start_mic(): """Start background mic monitoring; silently skipped if unavailable.""" global _mic_db, _mic_stream if not _HAS_MIC: return def _cb(indata, frames, t, status): global _mic_db rms = float(_np.sqrt(_np.mean(indata ** 2))) _mic_db = 20 * _np.log10(rms) if rms > 0 else -99.0 try: _mic_stream = _sd.InputStream( callback=_cb, channels=1, samplerate=44100, blocksize=2048) _mic_stream.start() atexit.register(lambda: _mic_stream.stop() if _mic_stream else None) return True except Exception: return False # ─── NTFY MESSAGE QUEUE ─────────────────────────────────── _ntfy_message = None # (title, body, monotonic_timestamp) or None _ntfy_lock = threading.Lock() def _start_ntfy_poller(): """Start background thread polling ntfy for messages.""" def _poll(): global _ntfy_message while True: try: req = urllib.request.Request( NTFY_TOPIC, headers={"User-Agent": "mainline/0.1"}) resp = urllib.request.urlopen(req, timeout=10) for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'): if not line.strip(): continue try: data = json.loads(line) except json.JSONDecodeError: continue if data.get("event") == "message": with _ntfy_lock: _ntfy_message = ( data.get("title", ""), data.get("message", ""), time.monotonic(), ) except Exception: pass time.sleep(NTFY_POLL_INTERVAL) t = threading.Thread(target=_poll, daemon=True) t.start() return True def _render_line(text, font=None): """Render a line of text as terminal rows using OTF font + half-blocks.""" if font is None: font = _font() bbox = font.getbbox(text) if not bbox or bbox[2] <= bbox[0]: return [""] pad = 4 img_w = bbox[2] - bbox[0] + pad * 2 img_h = bbox[3] - bbox[1] + pad * 2 img = Image.new('L', (img_w, img_h), 0) draw = ImageDraw.Draw(img) draw.text((-bbox[0] + pad, -bbox[1] + pad), text, fill=255, font=font) pix_h = _RENDER_H * 2 scale = pix_h / max(img_h, 1) new_w = max(1, int(img_w * scale)) img = img.resize((new_w, pix_h), Image.Resampling.LANCZOS) data = img.tobytes() thr = 80 rows = [] for y in range(0, pix_h, 2): row = [] for x in range(new_w): top = data[y * new_w + x] > thr bot = data[(y + 1) * new_w + x] > thr if y + 1 < pix_h else False if top and bot: row.append("█") elif top: row.append("▀") elif bot: row.append("▄") else: row.append(" ") rows.append("".join(row)) while rows and not rows[-1].strip(): rows.pop() while rows and not rows[0].strip(): rows.pop(0) return rows if rows else [""] def _big_wrap(text, max_w, font=None): """Word-wrap text and render with OTF font.""" if font is None: font = _font() words = text.split() lines, cur = [], "" for word in words: test = f"{cur} {word}".strip() if cur else word bbox = font.getbbox(test) if bbox: img_h = bbox[3] - bbox[1] + 8 pix_h = _RENDER_H * 2 scale = pix_h / max(img_h, 1) term_w = int((bbox[2] - bbox[0] + 8) * scale) else: term_w = 0 if term_w > max_w - 4 and cur: lines.append(cur) cur = word else: cur = test if cur: lines.append(cur) out = [] for i, ln in enumerate(lines): out.extend(_render_line(ln, font)) if i < len(lines) - 1: out.append("") return out def _lr_gradient(rows): """Color each non-space block character with a left-to-right gradient.""" n = len(_GRAD_COLS) max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1) out = [] for row in rows: if not row.strip(): out.append(row) continue buf = [] for x, ch in enumerate(row): if ch == ' ': buf.append(' ') else: idx = min(round(x / max(max_x - 1, 1) * (n - 1)), n - 1) buf.append(f"{_GRAD_COLS[idx]}{ch}\033[0m") out.append("".join(buf)) return out def _fade_line(s, fade): """Dissolve a rendered line by probabilistically dropping characters.""" if fade >= 1.0: return s if fade <= 0.0: return '' result = [] i = 0 while i < len(s): if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[': j = i + 2 while j < len(s) and not s[j].isalpha(): j += 1 result.append(s[i:j + 1]) i = j + 1 elif s[i] == ' ': result.append(' ') i += 1 else: result.append(s[i] if random.random() < fade else ' ') i += 1 return ''.join(result) def _vis_trunc(s, w): """Truncate string to visual width w, skipping ANSI escape codes.""" result = [] vw = 0 i = 0 while i < len(s): if vw >= w: break if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[': j = i + 2 while j < len(s) and not s[j].isalpha(): j += 1 result.append(s[i:j + 1]) i = j + 1 else: result.append(s[i]) vw += 1 i += 1 return ''.join(result) def _next_headline(pool, items, seen): """Pull the next unique headline from pool, refilling as needed.""" while True: if not pool: pool.extend(items) random.shuffle(pool) seen.clear() title, src, ts = pool.pop() sig = title.lower().strip() if sig not in seen: seen.add(sig) return title, src, ts def _make_block(title, src, ts, w): """Render a headline into a content block with color.""" target_lang = (SOURCE_LANGS.get(src) or _detect_location_language(title)) if MODE == 'news' else None lang_font = _font_for_lang(target_lang) if target_lang: title = _translate_headline(title, target_lang) # Don't uppercase scripts that have no case (CJK, Arabic, etc.) if target_lang and target_lang in _NO_UPPER: title_up = re.sub(r"\s+", " ", title) else: title_up = re.sub(r"\s+", " ", title.upper()) for old, new in [("\u2019","'"), ("\u2018","'"), ("\u201c",'"'), ("\u201d",'"'), ("\u2013","-"), ("\u2014","-")]: title_up = title_up.replace(old, new) big_rows = _big_wrap(title_up, w - 4, lang_font) big_rows = _lr_gradient(big_rows) hc = random.choice([ "\033[38;5;46m", # matrix green "\033[38;5;34m", # dark green "\033[38;5;82m", # lime "\033[38;5;48m", # sea green "\033[38;5;37m", # teal "\033[38;5;44m", # cyan "\033[38;5;87m", # sky "\033[38;5;117m", # ice blue "\033[38;5;250m", # cool white "\033[38;5;156m", # pale green "\033[38;5;120m", # mint "\033[38;5;80m", # dark cyan "\033[38;5;108m", # grey-green "\033[38;5;115m", # sage "\033[1;38;5;46m", # bold green "\033[1;38;5;250m",# bold white ]) content = [" " + r for r in big_rows] content.append("") meta = f"\u2591 {src} \u00b7 {ts}" content.append(" " * max(2, w - len(meta) - 2) + meta) return content, hc, len(content) - 1 # (rows, color, meta_row_index) def _firehose_line(items, w): """Generate one line of rapidly cycling firehose content.""" r = random.random() if r < 0.35: # Raw headline text title, src, ts = random.choice(items) text = title[:w - 1] color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM]) return f"{color}{text}{RST}" elif r < 0.55: # Dense glitch noise d = random.choice([0.45, 0.55, 0.65, 0.75]) return "".join( f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" f"{random.choice(GLITCH + KATA)}{RST}" if random.random() < d else " " for _ in range(w) ) elif r < 0.78: # Status / program output sources = FEEDS if MODE == 'news' else POETRY_SOURCES src = random.choice(list(sources.keys())) msgs = [ f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}", f" ░░ FEED ACTIVE :: {src}", f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}", f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}", f" {''.join(random.choice(KATA) for _ in range(3))} STRM " f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}", ] text = random.choice(msgs)[:w - 1] color = random.choice([G_LO, G_DIM, W_GHOST]) return f"{color}{text}{RST}" else: # Headline fragment with glitch prefix title, _, _ = random.choice(items) start = random.randint(0, max(0, len(title) - 20)) frag = title[start:start + random.randint(10, 35)] pad = random.randint(0, max(0, w - len(frag) - 8)) gp = ''.join(random.choice(GLITCH) for _ in range(random.randint(1, 3))) text = (' ' * pad + gp + ' ' + frag)[:w - 1] color = random.choice([G_LO, C_DIM, W_GHOST]) return f"{color}{text}{RST}" def stream(items): random.shuffle(items) pool = list(items) seen = set() queued = 0 time.sleep(0.5) sys.stdout.write(CLR) sys.stdout.flush() w, h = tw(), th() fh = FIREHOSE_H if FIREHOSE else 0 sh = h - fh # scroll zone height GAP = 3 # blank rows between headlines scroll_interval = _SCROLL_DUR / (sh + 15) * 2 # active blocks: (content_rows, color, canvas_y, meta_idx) active = [] cam = 0 # viewport top in virtual canvas coords next_y = sh # canvas-y where next block starts (off-screen bottom) noise_cache = {} scroll_accum = 0.0 def _noise_at(cy): if cy not in noise_cache: noise_cache[cy] = noise(w) if random.random() < 0.15 else None return noise_cache[cy] # Message color: bright cyan/white — distinct from headline greens MSG_COLOR = "\033[1;38;5;87m" # sky cyan MSG_META = "\033[38;5;245m" # cool grey MSG_BORDER = "\033[2;38;5;37m" # dim teal _msg_cache = (None, None) # (cache_key, rendered_rows) while queued < HEADLINE_LIMIT or active: t0 = time.monotonic() w, h = tw(), th() fh = FIREHOSE_H if FIREHOSE else 0 sh = h - fh # ── Check for ntfy message ──────────────────────── msg_active = False with _ntfy_lock: if _ntfy_message is not None: m_title, m_body, m_ts = _ntfy_message if time.monotonic() - m_ts < MESSAGE_DISPLAY_SECS: msg_active = True else: _ntfy_message = None # expired if msg_active: # ── MESSAGE state: freeze scroll, render message ── buf = [] # Render message text with OTF font (cached across frames) display_text = m_body or m_title or "(empty)" display_text = re.sub(r"\s+", " ", display_text.upper()) cache_key = (display_text, w) if _msg_cache[0] != cache_key: msg_rows = _big_wrap(display_text, w - 4) msg_rows = _lr_gradient(msg_rows) _msg_cache = (cache_key, msg_rows) else: msg_rows = _msg_cache[1] # Center vertically in scroll zone total_h = len(msg_rows) + 4 # +4 for border + meta + padding y_off = max(0, (sh - total_h) // 2) for r in range(sh): ri = r - y_off if ri == 0 or ri == total_h - 1: # Border lines bar = "─" * (w - 4) buf.append(f"\033[{r+1};1H {MSG_BORDER}{bar}{RST}\033[K") elif 1 <= ri <= len(msg_rows): ln = _vis_trunc(msg_rows[ri - 1], w) buf.append(f"\033[{r+1};1H {ln}{RST}\033[K") elif ri == len(msg_rows) + 1: # Title line (if present and different from body) if m_title and m_title != m_body: meta = f" {MSG_META}\u2591 {m_title}{RST}" else: meta = "" buf.append(f"\033[{r+1};1H{meta}\033[K") elif ri == len(msg_rows) + 2: # Source + timestamp elapsed_s = int(time.monotonic() - m_ts) remaining = max(0, MESSAGE_DISPLAY_SECS - elapsed_s) ts_str = datetime.now().strftime("%H:%M:%S") meta = f" {MSG_META}\u2591 ntfy \u00b7 {ts_str} \u00b7 {remaining}s{RST}" buf.append(f"\033[{r+1};1H{meta}\033[K") else: # Sparse noise outside the message if random.random() < 0.06: buf.append(f"\033[{r+1};1H{noise(w)}") else: buf.append(f"\033[{r+1};1H\033[K") # Firehose keeps running during messages if FIREHOSE and fh > 0: for fr in range(fh): fline = _firehose_line(items, w) buf.append(f"\033[{sh + fr + 1};1H{fline}\033[K") sys.stdout.buffer.write("".join(buf).encode()) sys.stdout.flush() elapsed = time.monotonic() - t0 time.sleep(max(0, _FRAME_DT - elapsed)) continue # ── SCROLL state: normal headline rendering ─────── # Advance scroll on schedule scroll_accum += _FRAME_DT while scroll_accum >= scroll_interval: scroll_accum -= scroll_interval cam += 1 # Enqueue new headlines when room at the bottom while next_y < cam + sh + 10 and queued < HEADLINE_LIMIT: t, src, ts = _next_headline(pool, items, seen) content, hc, midx = _make_block(t, src, ts, w) active.append((content, hc, next_y, midx)) next_y += len(content) + GAP queued += 1 # Prune off-screen blocks and stale noise active = [(c, hc, by, mi) for c, hc, by, mi in active if by + len(c) > cam] for k in list(noise_cache): if k < cam: del noise_cache[k] # Draw scroll zone top_zone = max(1, int(sh * 0.25)) bot_zone = max(1, int(sh * 0.10)) buf = [] for r in range(sh): cy = cam + r top_f = min(1.0, r / top_zone) bot_f = min(1.0, (sh - 1 - r) / bot_zone) row_fade = min(top_f, bot_f) drawn = False for content, hc, by, midx in active: cr = cy - by if 0 <= cr < len(content): ln = _vis_trunc(content[cr], w) if row_fade < 1.0: ln = _fade_line(ln, row_fade) if cr == midx: buf.append(f"\033[{r+1};1H{W_COOL}{ln}{RST}\033[K") elif ln.strip(): buf.append(f"\033[{r+1};1H{hc}{ln}{RST}\033[K") else: buf.append(f"\033[{r+1};1H\033[K") drawn = True break if not drawn: n = _noise_at(cy) if row_fade < 1.0 and n: n = _fade_line(n, row_fade) if n: buf.append(f"\033[{r+1};1H{n}") else: buf.append(f"\033[{r+1};1H\033[K") # Draw firehose zone if FIREHOSE and fh > 0: for fr in range(fh): fline = _firehose_line(items, w) buf.append(f"\033[{sh + fr + 1};1H{fline}\033[K") # Glitch — base rate + mic-reactive spikes (scroll zone only) mic_excess = max(0.0, _mic_db - MIC_THRESHOLD_DB) glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) n_hits = 4 + int(mic_excess / 2) g_limit = sh if FIREHOSE else len(buf) if random.random() < glitch_prob and g_limit > 0: for _ in range(min(n_hits, g_limit)): gi = random.randint(0, g_limit - 1) buf[gi] = f"\033[{gi+1};1H{glitch_bar(w)}" sys.stdout.buffer.write("".join(buf).encode()) sys.stdout.flush() # Precise frame timing elapsed = time.monotonic() - t0 time.sleep(max(0, _FRAME_DT - elapsed)) sys.stdout.write(CLR) sys.stdout.flush() # ─── MAIN ───────────────────────────────────────────────── TITLE = [ " ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗", " ████╗ ████║██╔══██╗██║████╗ ██║██║ ██║████╗ ██║██╔════╝", " ██╔████╔██║███████║██║██╔██╗ ██║██║ ██║██╔██╗ ██║█████╗ ", " ██║╚██╔╝██║██╔══██║██║██║╚██╗██║██║ ██║██║╚██╗██║██╔══╝ ", " ██║ ╚═╝ ██║██║ ██║██║██║ ╚████║███████╗██║██║ ╚████║███████╗", " ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝", ] def main(): atexit.register(lambda: print(CURSOR_ON, end="", flush=True)) def handle_sigint(*_): print(f"\n\n {G_DIM}> SIGNAL LOST{RST}") print(f" {W_GHOST}> connection terminated{RST}\n") sys.exit(0) signal.signal(signal.SIGINT, handle_sigint) w = tw() print(CLR, end="") print(CURSOR_OFF, end="") print() time.sleep(0.4) for ln in TITLE: print(f"{G_HI}{ln}{RST}") time.sleep(0.07) print() _subtitle = "literary consciousness stream" if MODE == 'poetry' else "digital consciousness stream" print(f" {W_DIM}v0.1 · {_subtitle}{RST}") print(f" {W_GHOST}{'─' * (w - 4)}{RST}") print() time.sleep(0.4) cached = _load_cache() if '--refresh' not in sys.argv else None if cached: items = cached boot_ln("Cache", f"LOADED [{len(items)} SIGNALS]", True) elif MODE == 'poetry': slow_print(" > INITIALIZING LITERARY CORPUS...\n") time.sleep(0.2) print() items, linked, failed = fetch_poetry() print() print(f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}") print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}") _save_cache(items) else: slow_print(" > INITIALIZING FEED ARRAY...\n") time.sleep(0.2) print() items, linked, failed = fetch_all() print() print(f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}") print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}") _save_cache(items) if not items: print(f"\n {W_DIM}> NO SIGNAL — check network{RST}") sys.exit(1) print() mic_ok = _start_mic() if _HAS_MIC: boot_ln("Microphone", "ACTIVE" if mic_ok else "OFFLINE · check System Settings → Privacy → Microphone", mic_ok) ntfy_ok = _start_ntfy_poller() boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok) if FIREHOSE: boot_ln("Firehose", "ENGAGED", True) time.sleep(0.4) slow_print(" > STREAMING...\n") time.sleep(0.2) print(f" {W_GHOST}{'─' * (w - 4)}{RST}") print() time.sleep(0.4) stream(items) print() print(f" {W_GHOST}{'─' * (tw() - 4)}{RST}") print(f" {G_DIM}> {HEADLINE_LIMIT} SIGNALS PROCESSED{RST}") print(f" {W_GHOST}> end of stream{RST}") print() if __name__ == "__main__": main()