From 339510dd60f8db0a9477546202dd5fe552779688 Mon Sep 17 00:00:00 2001 From: Gene Johnson Date: Sat, 14 Mar 2026 23:46:31 -0700 Subject: [PATCH] Please provide the diff for `/Users/genejohnson/Dev/mainline/mainline.py` to generate an accurate commit message. --- mainline.py | 1042 +-------------------------------------------------- 1 file changed, 3 insertions(+), 1039 deletions(-) diff --git a/mainline.py b/mainline.py index b862d95..842674e 100755 --- a/mainline.py +++ b/mainline.py @@ -5,7 +5,7 @@ Digital news consciousness stream. Matrix aesthetic · THX-1138 hue. """ -import subprocess, sys, os, pathlib +import subprocess, sys, pathlib # ─── BOOTSTRAP VENV ─────────────────────────────────────── _VENV = pathlib.Path(__file__).resolve().parent / ".mainline_venv" @@ -37,1044 +37,8 @@ if not _MARKER_SD.exists(): sys.path.insert(0, str(next((_VENV / "lib").glob("python*/site-packages")))) -import feedparser # noqa: E402 -from PIL import Image, ImageDraw, ImageFont # noqa: E402 -import random, time, re, signal, atexit, textwrap, threading # noqa: E402 -try: - import sounddevice as _sd - import numpy as _np - _HAS_MIC = True -except Exception: - _HAS_MIC = False -import urllib.request, urllib.parse, json # noqa: E402 -from datetime import datetime -from html import unescape -from html.parser import HTMLParser - -# ─── CONFIG ─────────────────────────────────────────────── -HEADLINE_LIMIT = 1000 -FEED_TIMEOUT = 10 -MIC_THRESHOLD_DB = 50 # dB above which glitches intensify -MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news' -FIREHOSE = '--firehose' in sys.argv - -# ntfy message queue -NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json?since=20s&poll=1" -NTFY_POLL_INTERVAL = 15 # seconds between polls -MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen - -# Poetry/literature sources — public domain via Project Gutenberg -POETRY_SOURCES = { - "Whitman": "https://www.gutenberg.org/cache/epub/1322/pg1322.txt", - "Dickinson": "https://www.gutenberg.org/cache/epub/12242/pg12242.txt", - "Whitman II": "https://www.gutenberg.org/cache/epub/8388/pg8388.txt", - "Rilke": "https://www.gutenberg.org/cache/epub/38594/pg38594.txt", - "Pound": "https://www.gutenberg.org/cache/epub/41162/pg41162.txt", - "Pound II": "https://www.gutenberg.org/cache/epub/51992/pg51992.txt", - "Eliot": "https://www.gutenberg.org/cache/epub/1567/pg1567.txt", - "Yeats": "https://www.gutenberg.org/cache/epub/38877/pg38877.txt", - "Masters": "https://www.gutenberg.org/cache/epub/1280/pg1280.txt", - "Baudelaire": "https://www.gutenberg.org/cache/epub/36098/pg36098.txt", - "Crane": "https://www.gutenberg.org/cache/epub/40786/pg40786.txt", - "Poe": "https://www.gutenberg.org/cache/epub/10031/pg10031.txt", -} - -# ─── ANSI ───────────────────────────────────────────────── -RST = "\033[0m" -BOLD = "\033[1m" -DIM = "\033[2m" -# Matrix greens -G_HI = "\033[38;5;46m" -G_MID = "\033[38;5;34m" -G_LO = "\033[38;5;22m" -G_DIM = "\033[2;38;5;34m" -# THX-1138 sterile tones -W_COOL = "\033[38;5;250m" -W_DIM = "\033[2;38;5;245m" -W_GHOST = "\033[2;38;5;238m" -C_DIM = "\033[2;38;5;37m" -# Terminal control -CLR = "\033[2J\033[H" -CURSOR_OFF = "\033[?25l" -CURSOR_ON = "\033[?25h" - -# ─── FEEDS ──────────────────────────────────────────────── -FEEDS = { - # Science & Technology - "Nature": "https://www.nature.com/nature.rss", - "Science Daily": "https://www.sciencedaily.com/rss/all.xml", - "Phys.org": "https://phys.org/rss-feed/", - "NASA": "https://www.nasa.gov/news-release/feed/", - "Ars Technica": "https://feeds.arstechnica.com/arstechnica/index", - "New Scientist": "https://www.newscientist.com/section/news/feed/", - "Quanta": "https://api.quantamagazine.org/feed/", - "BBC Science": "http://feeds.bbci.co.uk/news/science_and_environment/rss.xml", - "MIT Tech Review": "https://www.technologyreview.com/feed/", - # Economics & Business - "BBC Business": "http://feeds.bbci.co.uk/news/business/rss.xml", - "MarketWatch": "https://feeds.marketwatch.com/marketwatch/topstories/", - "Economist": "https://www.economist.com/finance-and-economics/rss.xml", - # World & Politics - "BBC World": "http://feeds.bbci.co.uk/news/world/rss.xml", - "NPR": "https://feeds.npr.org/1001/rss.xml", - "Al Jazeera": "https://www.aljazeera.com/xml/rss/all.xml", - "Guardian World": "https://www.theguardian.com/world/rss", - "DW": "https://rss.dw.com/rdf/rss-en-all", - "France24": "https://www.france24.com/en/rss", - "ABC Australia": "https://www.abc.net.au/news/feed/2942460/rss.xml", - "Japan Times": "https://www.japantimes.co.jp/feed/", - "The Hindu": "https://www.thehindu.com/news/national/feeder/default.rss", - "SCMP": "https://www.scmp.com/rss/91/feed", - "Der Spiegel": "https://www.spiegel.de/international/index.rss", - # Culture & Ideas - "Guardian Culture": "https://www.theguardian.com/culture/rss", - "Aeon": "https://aeon.co/feed.rss", - "Smithsonian": "https://www.smithsonianmag.com/rss/latest_articles/", - "The Marginalian": "https://www.themarginalian.org/feed/", - "Nautilus": "https://nautil.us/feed/", - "Wired": "https://www.wired.com/feed/rss", - "The Conversation": "https://theconversation.com/us/articles.atom", - "Longreads": "https://longreads.com/feed/", - "Literary Hub": "https://lithub.com/feed/", - "Atlas Obscura": "https://www.atlasobscura.com/feeds/latest", -} - -# ─── GLYPHS ─────────────────────────────────────────────── -GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋" -KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ" - -# ─── FONT RENDERING (OTF → terminal blocks) ───────────── -_FONT_PATH = "/Users/genejohnson/Documents/CS Bishop Drawn/CSBishopDrawn-Italic.otf" -_FONT_OBJ = None -_FONT_SZ = 60 -_RENDER_H = 8 # terminal rows per rendered text line -_SSAA = 4 # super-sampling factor: render at _SSAA× then downsample - -# Non-Latin scripts → macOS system fonts -_SCRIPT_FONTS = { - 'zh-cn': '/System/Library/Fonts/STHeiti Medium.ttc', - 'ja': '/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc', - 'ko': '/System/Library/Fonts/AppleSDGothicNeo.ttc', - 'ru': '/System/Library/Fonts/Supplemental/Arial.ttf', - 'uk': '/System/Library/Fonts/Supplemental/Arial.ttf', - 'el': '/System/Library/Fonts/Supplemental/Arial.ttf', - 'he': '/System/Library/Fonts/Supplemental/Arial.ttf', - 'ar': '/System/Library/Fonts/GeezaPro.ttc', - 'fa': '/System/Library/Fonts/GeezaPro.ttc', - 'hi': '/System/Library/Fonts/Kohinoor.ttc', - 'th': '/System/Library/Fonts/ThonburiUI.ttc', -} -_FONT_CACHE = {} -_NO_UPPER = {'zh-cn', 'ja', 'ko', 'ar', 'fa', 'hi', 'th', 'he'} -# Left → right gradient: white-hot leading edge fades to near-black -_GRAD_COLS = [ - "\033[1;38;5;231m", # white - "\033[1;38;5;195m", # pale cyan-white - "\033[38;5;123m", # bright cyan - "\033[38;5;118m", # bright lime - "\033[38;5;82m", # lime - "\033[38;5;46m", # bright green - "\033[38;5;40m", # green - "\033[38;5;34m", # medium green - "\033[38;5;28m", # dark green - "\033[38;5;22m", # deep green - "\033[2;38;5;22m", # dim deep green - "\033[2;38;5;235m", # near black -] - - -def _font(): - """Lazy-load the OTF font.""" - global _FONT_OBJ - if _FONT_OBJ is None: - _FONT_OBJ = ImageFont.truetype(_FONT_PATH, _FONT_SZ) - return _FONT_OBJ - - -def _font_for_lang(lang=None): - """Get appropriate font for a language.""" - if lang is None or lang not in _SCRIPT_FONTS: - return _font() - if lang not in _FONT_CACHE: - try: - _FONT_CACHE[lang] = ImageFont.truetype(_SCRIPT_FONTS[lang], _FONT_SZ) - except Exception: - _FONT_CACHE[lang] = _font() - return _FONT_CACHE[lang] - -# ─── HELPERS ────────────────────────────────────────────── -class _Strip(HTMLParser): - def __init__(self): - super().__init__() - self._t = [] - - def handle_data(self, d): - self._t.append(d) - - def text(self): - return "".join(self._t).strip() - - -def strip_tags(html): - s = _Strip() - s.feed(unescape(html or "")) - return s.text() - - -def tw(): - try: - return os.get_terminal_size().columns - except Exception: - return 80 - - -def th(): - try: - return os.get_terminal_size().lines - except Exception: - return 24 - - -def noise(w): - d = random.choice([0.15, 0.25, 0.35, 0.12]) # was [0.08, 0.12, 0.2, 0.05], now much denser - return "".join( - f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" - f"{random.choice(GLITCH + KATA)}{RST}" - if random.random() < d - else " " - for _ in range(w) - ) - - -def glitch_bar(w): - c = random.choice(["░", "▒", "─", "╌"]) - n = random.randint(3, w // 2) - o = random.randint(0, w - n) - return " " * o + f"{G_LO}{DIM}" + c * n + RST - - -# ─── SOURCE → LANGUAGE MAPPING ────────────────────────── -# Headlines from these outlets render in their cultural home language -# regardless of content, reflecting the true distribution of sources. -SOURCE_LANGS = { - "Der Spiegel": "de", - "DW": "de", - "France24": "fr", - "Japan Times": "ja", - "The Hindu": "hi", - "SCMP": "zh-cn", - "Al Jazeera": "ar", -} - -# ─── LOCATION → LANGUAGE ───────────────────────────────── -_LOCATION_LANGS = { - r'\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b': 'zh-cn', - r'\b(?:japan|japanese|tokyo|osaka|kishida)\b': 'ja', - r'\b(?:korea|korean|seoul|pyongyang)\b': 'ko', - r'\b(?:russia|russian|moscow|kremlin|putin)\b': 'ru', - r'\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b': 'ar', - r'\b(?:india|indian|delhi|mumbai|modi)\b': 'hi', - r'\b(?:germany|german|berlin|munich|scholz)\b': 'de', - r'\b(?:france|french|paris|lyon|macron)\b': 'fr', - r'\b(?:spain|spanish|madrid)\b': 'es', - r'\b(?:italy|italian|rome|milan|meloni)\b': 'it', - r'\b(?:portugal|portuguese|lisbon)\b': 'pt', - r'\b(?:brazil|brazilian|são paulo|lula)\b': 'pt', - r'\b(?:greece|greek|athens)\b': 'el', - r'\b(?:turkey|turkish|istanbul|ankara|erdogan)\b': 'tr', - r'\b(?:iran|iranian|tehran)\b': 'fa', - r'\b(?:thailand|thai|bangkok)\b': 'th', - r'\b(?:vietnam|vietnamese|hanoi)\b': 'vi', - r'\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b': 'uk', - r'\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b': 'he', -} - -_TRANSLATE_CACHE = {} - - -def _detect_location_language(title): - """Detect if headline mentions a location, return target language.""" - title_lower = title.lower() - for pattern, lang in _LOCATION_LANGS.items(): - if re.search(pattern, title_lower): - return lang - return None - - -def _translate_headline(title, target_lang): - """Translate headline via Google Translate API (zero dependencies).""" - key = (title, target_lang) - if key in _TRANSLATE_CACHE: - return _TRANSLATE_CACHE[key] - try: - q = urllib.parse.quote(title) - url = ("https://translate.googleapis.com/translate_a/single" - f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}") - req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) - resp = urllib.request.urlopen(req, timeout=5) - data = json.loads(resp.read()) - result = "".join(p[0] for p in data[0] if p[0]) or title - except Exception: - result = title - _TRANSLATE_CACHE[key] = result - return result - -# ─── CONTENT FILTER ─────────────────────────────────────── -_SKIP_RE = re.compile( - r'\b(?:' - # ── sports ── - r'football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|' - r'hockey|lacrosse|volleyball|badminton|' - r'nba|nfl|nhl|mlb|mls|fifa|uefa|' - r'premier league|champions league|la liga|serie a|bundesliga|' - r'world cup|super bowl|world series|stanley cup|' - r'playoff|playoffs|touchdown|goalkeeper|striker|quarterback|' - r'slam dunk|home run|grand slam|offside|halftime|' - r'batting|wicket|innings|' - r'formula 1|nascar|motogp|' - r'boxing|ufc|mma|' - r'marathon|tour de france|' - r'transfer window|draft pick|relegation|' - # ── vapid / insipid ── - r'kardashian|jenner|reality tv|reality show|' - r'influencer|viral video|tiktok|instagram|' - r'best dressed|worst dressed|red carpet|' - r'horoscope|zodiac|gossip|bikini|selfie|' - r'you won.t believe|what happened next|' - r'celebrity couple|celebrity feud|baby bump' - r')\b', - re.IGNORECASE -) - - -def _skip(title): - """Return True if headline is sports, vapid, or insipid.""" - return bool(_SKIP_RE.search(title)) - - -# ─── DISPLAY ────────────────────────────────────────────── -def type_out(text, color=G_HI): - i = 0 - while i < len(text): - if random.random() < 0.3: - b = random.randint(2, 5) - sys.stdout.write(f"{color}{text[i:i+b]}{RST}") - i += b - else: - sys.stdout.write(f"{color}{text[i]}{RST}") - i += 1 - sys.stdout.flush() - time.sleep(random.uniform(0.004, 0.018)) - - -def slow_print(text, color=G_DIM, delay=0.015): - for ch in text: - sys.stdout.write(f"{color}{ch}{RST}") - sys.stdout.flush() - time.sleep(delay) - - -def boot_ln(label, status, ok=True): - dots = max(3, min(30, tw() - len(label) - len(status) - 8)) - sys.stdout.write(f" {G_DIM}>{RST} {W_DIM}{label} ") - sys.stdout.flush() - for _ in range(dots): - sys.stdout.write(f"{G_LO}.") - sys.stdout.flush() - time.sleep(random.uniform(0.006, 0.025)) - c = G_MID if ok else "\033[2;38;5;196m" - print(f" {c}{status}{RST}") - time.sleep(random.uniform(0.02, 0.1)) - - -# ─── FETCH ──────────────────────────────────────────────── -def fetch_feed(url): - try: - req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) - resp = urllib.request.urlopen(req, timeout=FEED_TIMEOUT) - return feedparser.parse(resp.read()) - except Exception: - return None - - -def fetch_all(): - items = [] - linked = failed = 0 - for src, url in FEEDS.items(): - feed = fetch_feed(url) - if feed is None or (feed.bozo and not feed.entries): - boot_ln(src, "DARK", False) - failed += 1 - continue - n = 0 - for e in feed.entries: - t = strip_tags(e.get("title", "")) - if not t or _skip(t): - continue - pub = e.get("published_parsed") or e.get("updated_parsed") - try: - ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——" - except Exception: - ts = "——:——" - items.append((t, src, ts)) - n += 1 - if n: - boot_ln(src, f"LINKED [{n}]", True) - linked += 1 - else: - boot_ln(src, "EMPTY", False) - failed += 1 - return items, linked, failed - - -def _fetch_gutenberg(url, label): - """Download and parse stanzas/passages from a Project Gutenberg text.""" - try: - req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"}) - resp = urllib.request.urlopen(req, timeout=15) - text = resp.read().decode('utf-8', errors='replace').replace('\r\n', '\n').replace('\r', '\n') - # Strip PG boilerplate - m = re.search(r'\*\*\*\s*START OF[^\n]*\n', text) - if m: - text = text[m.end():] - m = re.search(r'\*\*\*\s*END OF', text) - if m: - text = text[:m.start()] - # Split on blank lines into stanzas/passages - blocks = re.split(r'\n{2,}', text.strip()) - items = [] - for blk in blocks: - blk = ' '.join(blk.split()) # flatten to one line - if len(blk) < 20 or len(blk) > 280: - continue - if blk.isupper(): # skip all-caps headers - continue - if re.match(r'^[IVXLCDM]+\.?\s*$', blk): # roman numerals - continue - items.append((blk, label, '')) - return items - except Exception: - return [] - - -def fetch_poetry(): - """Fetch all poetry/literature sources.""" - items = [] - linked = failed = 0 - for label, url in POETRY_SOURCES.items(): - stanzas = _fetch_gutenberg(url, label) - if stanzas: - boot_ln(label, f"LOADED [{len(stanzas)}]", True) - items.extend(stanzas) - linked += 1 - else: - boot_ln(label, "DARK", False) - failed += 1 - return items, linked, failed - - -# ─── CACHE ──────────────────────────────────────────────── -_CACHE_DIR = pathlib.Path(__file__).resolve().parent - - -def _cache_path(): - return _CACHE_DIR / f".mainline_cache_{MODE}.json" - - -def _load_cache(): - """Load cached items from disk if available.""" - p = _cache_path() - if not p.exists(): - return None - try: - data = json.loads(p.read_text()) - items = [tuple(i) for i in data["items"]] - return items if items else None - except Exception: - return None - - -def _save_cache(items): - """Save fetched items to disk for fast subsequent runs.""" - try: - _cache_path().write_text(json.dumps({"items": items})) - except Exception: - pass - - -# ─── STREAM ─────────────────────────────────────────────── -_SCROLL_DUR = 5.625 # seconds per headline (2/3 original speed) -_FRAME_DT = 0.05 # 50ms base frame rate (20 FPS) -FIREHOSE_H = 12 # firehose zone height (terminal rows) -GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep) -_mic_db = -99.0 # current mic level, written by background thread -_mic_stream = None - - -def _start_mic(): - """Start background mic monitoring; silently skipped if unavailable.""" - global _mic_db, _mic_stream - if not _HAS_MIC: - return - def _cb(indata, frames, t, status): - global _mic_db - rms = float(_np.sqrt(_np.mean(indata ** 2))) - _mic_db = 20 * _np.log10(rms) if rms > 0 else -99.0 - try: - _mic_stream = _sd.InputStream( - callback=_cb, channels=1, samplerate=44100, blocksize=2048) - _mic_stream.start() - atexit.register(lambda: _mic_stream.stop() if _mic_stream else None) - return True - except Exception: - return False - - -# ─── NTFY MESSAGE QUEUE ─────────────────────────────────── -_ntfy_message = None # (title, body, monotonic_timestamp) or None -_ntfy_lock = threading.Lock() - - -def _start_ntfy_poller(): - """Start background thread polling ntfy for messages.""" - def _poll(): - global _ntfy_message - while True: - try: - req = urllib.request.Request( - NTFY_TOPIC, headers={"User-Agent": "mainline/0.1"}) - resp = urllib.request.urlopen(req, timeout=10) - for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'): - if not line.strip(): - continue - try: - data = json.loads(line) - except json.JSONDecodeError: - continue - if data.get("event") == "message": - with _ntfy_lock: - _ntfy_message = ( - data.get("title", ""), - data.get("message", ""), - time.monotonic(), - ) - except Exception: - pass - time.sleep(NTFY_POLL_INTERVAL) - t = threading.Thread(target=_poll, daemon=True) - t.start() - return True - - -def _render_line(text, font=None): - """Render a line of text as terminal rows using OTF font + half-blocks.""" - if font is None: - font = _font() - bbox = font.getbbox(text) - if not bbox or bbox[2] <= bbox[0]: - return [""] - pad = 4 - img_w = bbox[2] - bbox[0] + pad * 2 - img_h = bbox[3] - bbox[1] + pad * 2 - img = Image.new('L', (img_w, img_h), 0) - draw = ImageDraw.Draw(img) - draw.text((-bbox[0] + pad, -bbox[1] + pad), text, fill=255, font=font) - pix_h = _RENDER_H * 2 - hi_h = pix_h * _SSAA - scale = hi_h / max(img_h, 1) - new_w_hi = max(1, int(img_w * scale)) - img = img.resize((new_w_hi, hi_h), Image.Resampling.LANCZOS) - new_w = max(1, int(new_w_hi / _SSAA)) - img = img.resize((new_w, pix_h), Image.Resampling.LANCZOS) - data = img.tobytes() - thr = 80 - rows = [] - for y in range(0, pix_h, 2): - row = [] - for x in range(new_w): - top = data[y * new_w + x] > thr - bot = data[(y + 1) * new_w + x] > thr if y + 1 < pix_h else False - if top and bot: - row.append("█") - elif top: - row.append("▀") - elif bot: - row.append("▄") - else: - row.append(" ") - rows.append("".join(row)) - while rows and not rows[-1].strip(): - rows.pop() - while rows and not rows[0].strip(): - rows.pop(0) - return rows if rows else [""] - - -def _big_wrap(text, max_w, font=None): - """Word-wrap text and render with OTF font.""" - if font is None: - font = _font() - words = text.split() - lines, cur = [], "" - for word in words: - test = f"{cur} {word}".strip() if cur else word - bbox = font.getbbox(test) - if bbox: - img_h = bbox[3] - bbox[1] + 8 - pix_h = _RENDER_H * 2 - scale = pix_h / max(img_h, 1) - term_w = int((bbox[2] - bbox[0] + 8) * scale) - else: - term_w = 0 - if term_w > max_w - 4 and cur: - lines.append(cur) - cur = word - else: - cur = test - if cur: - lines.append(cur) - out = [] - for i, ln in enumerate(lines): - out.extend(_render_line(ln, font)) - if i < len(lines) - 1: - out.append("") - return out - - -def _lr_gradient(rows, offset=0.0): - """Color each non-space block character with a shifting left-to-right gradient.""" - n = len(_GRAD_COLS) - max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1) - out = [] - for row in rows: - if not row.strip(): - out.append(row) - continue - buf = [] - for x, ch in enumerate(row): - if ch == ' ': - buf.append(' ') - else: - shifted = (x / max(max_x - 1, 1) + offset) % 1.0 - idx = min(round(shifted * (n - 1)), n - 1) - buf.append(f"{_GRAD_COLS[idx]}{ch}\033[0m") - out.append("".join(buf)) - return out - - -def _fade_line(s, fade): - """Dissolve a rendered line by probabilistically dropping characters.""" - if fade >= 1.0: - return s - if fade <= 0.0: - return '' - result = [] - i = 0 - while i < len(s): - if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[': - j = i + 2 - while j < len(s) and not s[j].isalpha(): - j += 1 - result.append(s[i:j + 1]) - i = j + 1 - elif s[i] == ' ': - result.append(' ') - i += 1 - else: - result.append(s[i] if random.random() < fade else ' ') - i += 1 - return ''.join(result) - - -def _vis_trunc(s, w): - """Truncate string to visual width w, skipping ANSI escape codes.""" - result = [] - vw = 0 - i = 0 - while i < len(s): - if vw >= w: - break - if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[': - j = i + 2 - while j < len(s) and not s[j].isalpha(): - j += 1 - result.append(s[i:j + 1]) - i = j + 1 - else: - result.append(s[i]) - vw += 1 - i += 1 - return ''.join(result) - - -def _next_headline(pool, items, seen): - """Pull the next unique headline from pool, refilling as needed.""" - while True: - if not pool: - pool.extend(items) - random.shuffle(pool) - seen.clear() - title, src, ts = pool.pop() - sig = title.lower().strip() - if sig not in seen: - seen.add(sig) - return title, src, ts - - -def _make_block(title, src, ts, w): - """Render a headline into a content block with color.""" - target_lang = (SOURCE_LANGS.get(src) or _detect_location_language(title)) if MODE == 'news' else None - lang_font = _font_for_lang(target_lang) - if target_lang: - title = _translate_headline(title, target_lang) - # Don't uppercase scripts that have no case (CJK, Arabic, etc.) - if target_lang and target_lang in _NO_UPPER: - title_up = re.sub(r"\s+", " ", title) - else: - title_up = re.sub(r"\s+", " ", title.upper()) - for old, new in [("\u2019","'"), ("\u2018","'"), ("\u201c",'"'), - ("\u201d",'"'), ("\u2013","-"), ("\u2014","-")]: - title_up = title_up.replace(old, new) - big_rows = _big_wrap(title_up, w - 4, lang_font) - hc = random.choice([ - "\033[38;5;46m", # matrix green - "\033[38;5;34m", # dark green - "\033[38;5;82m", # lime - "\033[38;5;48m", # sea green - "\033[38;5;37m", # teal - "\033[38;5;44m", # cyan - "\033[38;5;87m", # sky - "\033[38;5;117m", # ice blue - "\033[38;5;250m", # cool white - "\033[38;5;156m", # pale green - "\033[38;5;120m", # mint - "\033[38;5;80m", # dark cyan - "\033[38;5;108m", # grey-green - "\033[38;5;115m", # sage - "\033[1;38;5;46m", # bold green - "\033[1;38;5;250m",# bold white - ]) - content = [" " + r for r in big_rows] - content.append("") - meta = f"\u2591 {src} \u00b7 {ts}" - content.append(" " * max(2, w - len(meta) - 2) + meta) - return content, hc, len(content) - 1 # (rows, color, meta_row_index) - - -def _firehose_line(items, w): - """Generate one line of rapidly cycling firehose content.""" - r = random.random() - if r < 0.35: - # Raw headline text - title, src, ts = random.choice(items) - text = title[:w - 1] - color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM]) - return f"{color}{text}{RST}" - elif r < 0.55: - # Dense glitch noise - d = random.choice([0.45, 0.55, 0.65, 0.75]) - return "".join( - f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}" - f"{random.choice(GLITCH + KATA)}{RST}" - if random.random() < d else " " - for _ in range(w) - ) - elif r < 0.78: - # Status / program output - sources = FEEDS if MODE == 'news' else POETRY_SOURCES - src = random.choice(list(sources.keys())) - msgs = [ - f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}", - f" ░░ FEED ACTIVE :: {src}", - f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}", - f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}", - f" {''.join(random.choice(KATA) for _ in range(3))} STRM " - f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}", - ] - text = random.choice(msgs)[:w - 1] - color = random.choice([G_LO, G_DIM, W_GHOST]) - return f"{color}{text}{RST}" - else: - # Headline fragment with glitch prefix - title, _, _ = random.choice(items) - start = random.randint(0, max(0, len(title) - 20)) - frag = title[start:start + random.randint(10, 35)] - pad = random.randint(0, max(0, w - len(frag) - 8)) - gp = ''.join(random.choice(GLITCH) for _ in range(random.randint(1, 3))) - text = (' ' * pad + gp + ' ' + frag)[:w - 1] - color = random.choice([G_LO, C_DIM, W_GHOST]) - return f"{color}{text}{RST}" - - -def stream(items): - global _ntfy_message - random.shuffle(items) - pool = list(items) - seen = set() - queued = 0 - - time.sleep(0.5) - sys.stdout.write(CLR) - sys.stdout.flush() - - w, h = tw(), th() - fh = FIREHOSE_H if FIREHOSE else 0 - sh = h - fh # scroll zone height - GAP = 3 # blank rows between headlines - scroll_interval = _SCROLL_DUR / (sh + 15) * 2 - - # active blocks: (content_rows, color, canvas_y, meta_idx) - active = [] - cam = 0 # viewport top in virtual canvas coords - next_y = sh # canvas-y where next block starts (off-screen bottom) - noise_cache = {} - scroll_accum = 0.0 - - def _noise_at(cy): - if cy not in noise_cache: - noise_cache[cy] = noise(w) if random.random() < 0.15 else None - return noise_cache[cy] - - # Message color: bright cyan/white — distinct from headline greens - MSG_COLOR = "\033[1;38;5;87m" # sky cyan - MSG_META = "\033[38;5;245m" # cool grey - MSG_BORDER = "\033[2;38;5;37m" # dim teal - _msg_cache = (None, None) # (cache_key, rendered_rows) - - while queued < HEADLINE_LIMIT or active: - t0 = time.monotonic() - w, h = tw(), th() - fh = FIREHOSE_H if FIREHOSE else 0 - sh = h - fh - - # ── Check for ntfy message ──────────────────────── - msg_h = 0 # rows consumed by message zone at top - msg_active = False - with _ntfy_lock: - if _ntfy_message is not None: - m_title, m_body, m_ts = _ntfy_message - if time.monotonic() - m_ts < MESSAGE_DISPLAY_SECS: - msg_active = True - else: - _ntfy_message = None # expired - - buf = [] - if msg_active: - # ── Message zone: pinned to top, scroll continues below ── - display_text = m_body or m_title or "(empty)" - display_text = re.sub(r"\s+", " ", display_text.upper()) - cache_key = (display_text, w) - if _msg_cache[0] != cache_key: - msg_rows = _big_wrap(display_text, w - 4) - _msg_cache = (cache_key, msg_rows) - else: - msg_rows = _msg_cache[1] - msg_rows = _lr_gradient(msg_rows, (time.monotonic() * GRAD_SPEED) % 1.0) - # Layout: rendered text + meta + border - elapsed_s = int(time.monotonic() - m_ts) - remaining = max(0, MESSAGE_DISPLAY_SECS - elapsed_s) - ts_str = datetime.now().strftime("%H:%M:%S") - row_idx = 0 - for mr in msg_rows: - ln = _vis_trunc(mr, w) - buf.append(f"\033[{row_idx+1};1H {ln}{RST}\033[K") - row_idx += 1 - # Meta line: title (if distinct) + source + countdown - meta_parts = [] - if m_title and m_title != m_body: - meta_parts.append(m_title) - meta_parts.append(f"ntfy \u00b7 {ts_str} \u00b7 {remaining}s") - meta = " " + " \u00b7 ".join(meta_parts) if len(meta_parts) > 1 else " " + meta_parts[0] - buf.append(f"\033[{row_idx+1};1H{MSG_META}{meta}{RST}\033[K") - row_idx += 1 - # Border — constant boundary between message and scroll - bar = "\u2500" * (w - 4) - buf.append(f"\033[{row_idx+1};1H {MSG_BORDER}{bar}{RST}\033[K") - row_idx += 1 - msg_h = row_idx - - # Effective scroll zone: below message, above firehose - scroll_h = sh - msg_h - - # ── Scroll: headline rendering (always runs) ────── - # Advance scroll on schedule - scroll_accum += _FRAME_DT - while scroll_accum >= scroll_interval: - scroll_accum -= scroll_interval - cam += 1 - - # Enqueue new headlines when room at the bottom - while next_y < cam + sh + 10 and queued < HEADLINE_LIMIT: - t, src, ts = _next_headline(pool, items, seen) - content, hc, midx = _make_block(t, src, ts, w) - active.append((content, hc, next_y, midx)) - next_y += len(content) + GAP - queued += 1 - - # Prune off-screen blocks and stale noise - active = [(c, hc, by, mi) for c, hc, by, mi in active - if by + len(c) > cam] - for k in list(noise_cache): - if k < cam: - del noise_cache[k] - - # Draw scroll zone (below message zone, above firehose) - top_zone = max(1, int(scroll_h * 0.25)) - bot_zone = max(1, int(scroll_h * 0.10)) - grad_offset = (time.monotonic() * GRAD_SPEED) % 1.0 - scroll_buf_start = len(buf) # track where scroll rows start in buf - for r in range(scroll_h): - scr_row = msg_h + r + 1 # 1-indexed ANSI screen row - cy = cam + r - top_f = min(1.0, r / top_zone) if top_zone > 0 else 1.0 - bot_f = min(1.0, (scroll_h - 1 - r) / bot_zone) if bot_zone > 0 else 1.0 - row_fade = min(top_f, bot_f) - drawn = False - for content, hc, by, midx in active: - cr = cy - by - if 0 <= cr < len(content): - raw = content[cr] - if cr != midx: - colored = _lr_gradient([raw], grad_offset)[0] - else: - colored = raw - ln = _vis_trunc(colored, w) - if row_fade < 1.0: - ln = _fade_line(ln, row_fade) - if cr == midx: - buf.append(f"\033[{scr_row};1H{W_COOL}{ln}{RST}\033[K") - elif ln.strip(): - buf.append(f"\033[{scr_row};1H{ln}{RST}\033[K") - else: - buf.append(f"\033[{scr_row};1H\033[K") - drawn = True - break - if not drawn: - n = _noise_at(cy) - if row_fade < 1.0 and n: - n = _fade_line(n, row_fade) - if n: - buf.append(f"\033[{scr_row};1H{n}") - else: - buf.append(f"\033[{scr_row};1H\033[K") - - # Draw firehose zone - if FIREHOSE and fh > 0: - for fr in range(fh): - fline = _firehose_line(items, w) - buf.append(f"\033[{sh + fr + 1};1H{fline}\033[K") - - # Glitch — base rate + mic-reactive spikes (scroll zone only) - mic_excess = max(0.0, _mic_db - MIC_THRESHOLD_DB) - glitch_prob = 0.32 + min(0.9, mic_excess * 0.16) - n_hits = 4 + int(mic_excess / 2) - scroll_buf_len = len(buf) - scroll_buf_start - if random.random() < glitch_prob and scroll_buf_len > 0: - for _ in range(min(n_hits, scroll_buf_len)): - gi = random.randint(0, scroll_buf_len - 1) - scr_row = msg_h + gi + 1 - buf[scroll_buf_start + gi] = f"\033[{scr_row};1H{glitch_bar(w)}" - - sys.stdout.buffer.write("".join(buf).encode()) - sys.stdout.flush() - - # Precise frame timing - elapsed = time.monotonic() - t0 - time.sleep(max(0, _FRAME_DT - elapsed)) - - sys.stdout.write(CLR) - sys.stdout.flush() - - -# ─── MAIN ───────────────────────────────────────────────── -TITLE = [ - " ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗", - " ████╗ ████║██╔══██╗██║████╗ ██║██║ ██║████╗ ██║██╔════╝", - " ██╔████╔██║███████║██║██╔██╗ ██║██║ ██║██╔██╗ ██║█████╗ ", - " ██║╚██╔╝██║██╔══██║██║██║╚██╗██║██║ ██║██║╚██╗██║██╔══╝ ", - " ██║ ╚═╝ ██║██║ ██║██║██║ ╚████║███████╗██║██║ ╚████║███████╗", - " ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝", -] - - -def main(): - atexit.register(lambda: print(CURSOR_ON, end="", flush=True)) - - def handle_sigint(*_): - print(f"\n\n {G_DIM}> SIGNAL LOST{RST}") - print(f" {W_GHOST}> connection terminated{RST}\n") - sys.exit(0) - - signal.signal(signal.SIGINT, handle_sigint) - - w = tw() - print(CLR, end="") - print(CURSOR_OFF, end="") - print() - time.sleep(0.4) - - for ln in TITLE: - print(f"{G_HI}{ln}{RST}") - time.sleep(0.07) - - print() - _subtitle = "literary consciousness stream" if MODE == 'poetry' else "digital consciousness stream" - print(f" {W_DIM}v0.1 · {_subtitle}{RST}") - print(f" {W_GHOST}{'─' * (w - 4)}{RST}") - print() - time.sleep(0.4) - - cached = _load_cache() if '--refresh' not in sys.argv else None - if cached: - items = cached - boot_ln("Cache", f"LOADED [{len(items)} SIGNALS]", True) - elif MODE == 'poetry': - slow_print(" > INITIALIZING LITERARY CORPUS...\n") - time.sleep(0.2) - print() - items, linked, failed = fetch_poetry() - print() - print(f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}") - print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}") - _save_cache(items) - else: - slow_print(" > INITIALIZING FEED ARRAY...\n") - time.sleep(0.2) - print() - items, linked, failed = fetch_all() - print() - print(f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}") - print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}") - _save_cache(items) - - if not items: - print(f"\n {W_DIM}> NO SIGNAL — check network{RST}") - sys.exit(1) - - print() - mic_ok = _start_mic() - if _HAS_MIC: - boot_ln("Microphone", "ACTIVE" if mic_ok else "OFFLINE · check System Settings → Privacy → Microphone", mic_ok) - ntfy_ok = _start_ntfy_poller() - boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok) - if FIREHOSE: - boot_ln("Firehose", "ENGAGED", True) - - time.sleep(0.4) - slow_print(" > STREAMING...\n") - time.sleep(0.2) - print(f" {W_GHOST}{'─' * (w - 4)}{RST}") - print() - time.sleep(0.4) - - stream(items) - - print() - print(f" {W_GHOST}{'─' * (tw() - 4)}{RST}") - print(f" {G_DIM}> {HEADLINE_LIMIT} SIGNALS PROCESSED{RST}") - print(f" {W_GHOST}> end of stream{RST}") - print() - +# ─── DELEGATE TO ENGINE ─────────────────────────────────── +from engine.app import main # noqa: E402 if __name__ == "__main__": main()