Files
Mainline/mainline.py

1096 lines
40 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
M A I N L I N E
Digital news consciousness stream.
Matrix aesthetic · THX-1138 hue.
"""
import subprocess, sys, os, pathlib
# ─── BOOTSTRAP VENV ───────────────────────────────────────
_VENV = pathlib.Path(__file__).resolve().parent / ".mainline_venv"
_MARKER = _VENV / ".installed_v3"
def _ensure_venv():
"""Create a local venv and install deps if needed."""
if _MARKER.exists():
return
import venv
print("\033[2;38;5;34m > first run — creating environment...\033[0m")
venv.create(str(_VENV), with_pip=True, clear=True)
pip = str(_VENV / "bin" / "pip")
subprocess.check_call(
[pip, "install", "feedparser", "Pillow", "-q"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
)
_MARKER.touch()
_ensure_venv()
# Install sounddevice on first run after v3
_MARKER_SD = _VENV / ".installed_sd"
if not _MARKER_SD.exists():
_pip = str(_VENV / "bin" / "pip")
subprocess.check_call([_pip, "install", "sounddevice", "numpy", "-q"],
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
_MARKER_SD.touch()
sys.path.insert(0, str(next((_VENV / "lib").glob("python*/site-packages"))))
import feedparser # noqa: E402
from PIL import Image, ImageDraw, ImageFont # noqa: E402
import random, time, re, signal, atexit, textwrap, threading # noqa: E402
try:
import sounddevice as _sd
import numpy as _np
_HAS_MIC = True
except Exception:
_HAS_MIC = False
import urllib.request, urllib.parse, json # noqa: E402
from datetime import datetime
from html import unescape
from html.parser import HTMLParser
# ─── CONFIG ───────────────────────────────────────────────
HEADLINE_LIMIT = 1000
FEED_TIMEOUT = 10
MIC_THRESHOLD_DB = 50 # dB above which glitches intensify
MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news'
FIREHOSE = '--firehose' in sys.argv
# ntfy message queue
NTFY_TOPIC = "https://ntfy.sh/klubhaus_terminal_mainline/json?since=20s&poll=1"
NTFY_POLL_INTERVAL = 15 # seconds between polls
MESSAGE_DISPLAY_SECS = 30 # how long a message holds the screen
# Poetry/literature sources — public domain via Project Gutenberg
POETRY_SOURCES = {
"Whitman": "https://www.gutenberg.org/cache/epub/1322/pg1322.txt",
"Dickinson": "https://www.gutenberg.org/cache/epub/12242/pg12242.txt",
"Whitman II": "https://www.gutenberg.org/cache/epub/8388/pg8388.txt",
"Rilke": "https://www.gutenberg.org/cache/epub/38594/pg38594.txt",
"Pound": "https://www.gutenberg.org/cache/epub/41162/pg41162.txt",
"Pound II": "https://www.gutenberg.org/cache/epub/51992/pg51992.txt",
"Eliot": "https://www.gutenberg.org/cache/epub/1567/pg1567.txt",
"Yeats": "https://www.gutenberg.org/cache/epub/38877/pg38877.txt",
"Masters": "https://www.gutenberg.org/cache/epub/1280/pg1280.txt",
"Baudelaire": "https://www.gutenberg.org/cache/epub/36098/pg36098.txt",
"Crane": "https://www.gutenberg.org/cache/epub/40786/pg40786.txt",
"Poe": "https://www.gutenberg.org/cache/epub/10031/pg10031.txt",
}
# ─── ANSI ─────────────────────────────────────────────────
RST = "\033[0m"
BOLD = "\033[1m"
DIM = "\033[2m"
# Matrix greens
G_HI = "\033[38;5;46m"
G_MID = "\033[38;5;34m"
G_LO = "\033[38;5;22m"
G_DIM = "\033[2;38;5;34m"
# THX-1138 sterile tones
W_COOL = "\033[38;5;250m"
W_DIM = "\033[2;38;5;245m"
W_GHOST = "\033[2;38;5;238m"
C_DIM = "\033[2;38;5;37m"
# Terminal control
CLR = "\033[2J\033[H"
CURSOR_OFF = "\033[?25l"
CURSOR_ON = "\033[?25h"
# ─── FEEDS ────────────────────────────────────────────────
FEEDS = {
# Science & Technology
"Nature": "https://www.nature.com/nature.rss",
"Science Daily": "https://www.sciencedaily.com/rss/all.xml",
"Phys.org": "https://phys.org/rss-feed/",
"NASA": "https://www.nasa.gov/news-release/feed/",
"Ars Technica": "https://feeds.arstechnica.com/arstechnica/index",
"New Scientist": "https://www.newscientist.com/section/news/feed/",
"Quanta": "https://api.quantamagazine.org/feed/",
"BBC Science": "http://feeds.bbci.co.uk/news/science_and_environment/rss.xml",
"MIT Tech Review": "https://www.technologyreview.com/feed/",
# Economics & Business
"BBC Business": "http://feeds.bbci.co.uk/news/business/rss.xml",
"MarketWatch": "https://feeds.marketwatch.com/marketwatch/topstories/",
"Economist": "https://www.economist.com/finance-and-economics/rss.xml",
# World & Politics
"BBC World": "http://feeds.bbci.co.uk/news/world/rss.xml",
"NPR": "https://feeds.npr.org/1001/rss.xml",
"Al Jazeera": "https://www.aljazeera.com/xml/rss/all.xml",
"Guardian World": "https://www.theguardian.com/world/rss",
"DW": "https://rss.dw.com/rdf/rss-en-all",
"France24": "https://www.france24.com/en/rss",
"ABC Australia": "https://www.abc.net.au/news/feed/2942460/rss.xml",
"Japan Times": "https://www.japantimes.co.jp/feed/",
"The Hindu": "https://www.thehindu.com/news/national/feeder/default.rss",
"SCMP": "https://www.scmp.com/rss/91/feed",
"Der Spiegel": "https://www.spiegel.de/international/index.rss",
# Culture & Ideas
"Guardian Culture": "https://www.theguardian.com/culture/rss",
"Aeon": "https://aeon.co/feed.rss",
"Smithsonian": "https://www.smithsonianmag.com/rss/latest_articles/",
"The Marginalian": "https://www.themarginalian.org/feed/",
"Nautilus": "https://nautil.us/feed/",
"Wired": "https://www.wired.com/feed/rss",
"The Conversation": "https://theconversation.com/us/articles.atom",
"Longreads": "https://longreads.com/feed/",
"Literary Hub": "https://lithub.com/feed/",
"Atlas Obscura": "https://www.atlasobscura.com/feeds/latest",
}
# ─── GLYPHS ───────────────────────────────────────────────
GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
# ─── FONT RENDERING (OTF → terminal blocks) ─────────────
_FONT_PATH = "/Users/genejohnson/Documents/CS Bishop Drawn/CSBishopDrawn-Italic.otf"
_FONT_OBJ = None
_FONT_SZ = 60
_RENDER_H = 8 # terminal rows per rendered text line
_SSAA = 4 # super-sampling factor: render at _SSAA× then downsample
# Non-Latin scripts → macOS system fonts
_SCRIPT_FONTS = {
'zh-cn': '/System/Library/Fonts/STHeiti Medium.ttc',
'ja': '/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc',
'ko': '/System/Library/Fonts/AppleSDGothicNeo.ttc',
'ru': '/System/Library/Fonts/Supplemental/Arial.ttf',
'uk': '/System/Library/Fonts/Supplemental/Arial.ttf',
'el': '/System/Library/Fonts/Supplemental/Arial.ttf',
'he': '/System/Library/Fonts/Supplemental/Arial.ttf',
'ar': '/System/Library/Fonts/GeezaPro.ttc',
'fa': '/System/Library/Fonts/GeezaPro.ttc',
'hi': '/System/Library/Fonts/Kohinoor.ttc',
'th': '/System/Library/Fonts/ThonburiUI.ttc',
}
_FONT_CACHE = {}
_NO_UPPER = {'zh-cn', 'ja', 'ko', 'ar', 'fa', 'hi', 'th', 'he'}
# Left → right gradient: white-hot leading edge fades to near-black
_GRAD_COLS = [
"\033[1;38;5;231m", # white
"\033[1;38;5;195m", # pale cyan-white
"\033[38;5;123m", # bright cyan
"\033[38;5;118m", # bright lime
"\033[38;5;82m", # lime
"\033[38;5;46m", # bright green
"\033[38;5;40m", # green
"\033[38;5;34m", # medium green
"\033[38;5;28m", # dark green
"\033[38;5;22m", # deep green
"\033[2;38;5;22m", # dim deep green
"\033[2;38;5;235m", # near black
]
def _font():
"""Lazy-load the OTF font."""
global _FONT_OBJ
if _FONT_OBJ is None:
_FONT_OBJ = ImageFont.truetype(_FONT_PATH, _FONT_SZ)
return _FONT_OBJ
def _font_for_lang(lang=None):
"""Get appropriate font for a language."""
if lang is None or lang not in _SCRIPT_FONTS:
return _font()
if lang not in _FONT_CACHE:
try:
_FONT_CACHE[lang] = ImageFont.truetype(_SCRIPT_FONTS[lang], _FONT_SZ)
except Exception:
_FONT_CACHE[lang] = _font()
return _FONT_CACHE[lang]
# ─── HELPERS ──────────────────────────────────────────────
class _Strip(HTMLParser):
def __init__(self):
super().__init__()
self._t = []
def handle_data(self, d):
self._t.append(d)
def text(self):
return "".join(self._t).strip()
def strip_tags(html):
s = _Strip()
s.feed(unescape(html or ""))
return s.text()
def tw():
try:
return os.get_terminal_size().columns
except Exception:
return 80
def th():
try:
return os.get_terminal_size().lines
except Exception:
return 24
def noise(w):
d = random.choice([0.15, 0.25, 0.35, 0.12]) # was [0.08, 0.12, 0.2, 0.05], now much denser
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(GLITCH + KATA)}{RST}"
if random.random() < d
else " "
for _ in range(w)
)
def glitch_bar(w):
c = random.choice(["", "", "", ""])
n = random.randint(3, w // 2)
o = random.randint(0, w - n)
return " " * o + f"{G_LO}{DIM}" + c * n + RST
# ─── SOURCE → LANGUAGE MAPPING ──────────────────────────
# Headlines from these outlets render in their cultural home language
# regardless of content, reflecting the true distribution of sources.
SOURCE_LANGS = {
"Der Spiegel": "de",
"DW": "de",
"France24": "fr",
"Japan Times": "ja",
"The Hindu": "hi",
"SCMP": "zh-cn",
"Al Jazeera": "ar",
}
# ─── LOCATION → LANGUAGE ─────────────────────────────────
_LOCATION_LANGS = {
r'\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b': 'zh-cn',
r'\b(?:japan|japanese|tokyo|osaka|kishida)\b': 'ja',
r'\b(?:korea|korean|seoul|pyongyang)\b': 'ko',
r'\b(?:russia|russian|moscow|kremlin|putin)\b': 'ru',
r'\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b': 'ar',
r'\b(?:india|indian|delhi|mumbai|modi)\b': 'hi',
r'\b(?:germany|german|berlin|munich|scholz)\b': 'de',
r'\b(?:france|french|paris|lyon|macron)\b': 'fr',
r'\b(?:spain|spanish|madrid)\b': 'es',
r'\b(?:italy|italian|rome|milan|meloni)\b': 'it',
r'\b(?:portugal|portuguese|lisbon)\b': 'pt',
r'\b(?:brazil|brazilian|são paulo|lula)\b': 'pt',
r'\b(?:greece|greek|athens)\b': 'el',
r'\b(?:turkey|turkish|istanbul|ankara|erdogan)\b': 'tr',
r'\b(?:iran|iranian|tehran)\b': 'fa',
r'\b(?:thailand|thai|bangkok)\b': 'th',
r'\b(?:vietnam|vietnamese|hanoi)\b': 'vi',
r'\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b': 'uk',
r'\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b': 'he',
}
_TRANSLATE_CACHE = {}
def _detect_location_language(title):
"""Detect if headline mentions a location, return target language."""
title_lower = title.lower()
for pattern, lang in _LOCATION_LANGS.items():
if re.search(pattern, title_lower):
return lang
return None
def _translate_headline(title, target_lang):
"""Translate headline via Google Translate API (zero dependencies)."""
key = (title, target_lang)
if key in _TRANSLATE_CACHE:
return _TRANSLATE_CACHE[key]
try:
q = urllib.parse.quote(title)
url = ("https://translate.googleapis.com/translate_a/single"
f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}")
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read())
result = "".join(p[0] for p in data[0] if p[0]) or title
except Exception:
result = title
_TRANSLATE_CACHE[key] = result
return result
# ─── CONTENT FILTER ───────────────────────────────────────
_SKIP_RE = re.compile(
r'\b(?:'
# ── sports ──
r'football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|'
r'hockey|lacrosse|volleyball|badminton|'
r'nba|nfl|nhl|mlb|mls|fifa|uefa|'
r'premier league|champions league|la liga|serie a|bundesliga|'
r'world cup|super bowl|world series|stanley cup|'
r'playoff|playoffs|touchdown|goalkeeper|striker|quarterback|'
r'slam dunk|home run|grand slam|offside|halftime|'
r'batting|wicket|innings|'
r'formula 1|nascar|motogp|'
r'boxing|ufc|mma|'
r'marathon|tour de france|'
r'transfer window|draft pick|relegation|'
# ── vapid / insipid ──
r'kardashian|jenner|reality tv|reality show|'
r'influencer|viral video|tiktok|instagram|'
r'best dressed|worst dressed|red carpet|'
r'horoscope|zodiac|gossip|bikini|selfie|'
r'you won.t believe|what happened next|'
r'celebrity couple|celebrity feud|baby bump'
r')\b',
re.IGNORECASE
)
def _skip(title):
"""Return True if headline is sports, vapid, or insipid."""
return bool(_SKIP_RE.search(title))
# ─── DISPLAY ──────────────────────────────────────────────
def type_out(text, color=G_HI):
i = 0
while i < len(text):
if random.random() < 0.3:
b = random.randint(2, 5)
sys.stdout.write(f"{color}{text[i:i+b]}{RST}")
i += b
else:
sys.stdout.write(f"{color}{text[i]}{RST}")
i += 1
sys.stdout.flush()
time.sleep(random.uniform(0.004, 0.018))
def slow_print(text, color=G_DIM, delay=0.015):
for ch in text:
sys.stdout.write(f"{color}{ch}{RST}")
sys.stdout.flush()
time.sleep(delay)
def boot_ln(label, status, ok=True):
dots = max(3, min(30, tw() - len(label) - len(status) - 8))
sys.stdout.write(f" {G_DIM}>{RST} {W_DIM}{label} ")
sys.stdout.flush()
for _ in range(dots):
sys.stdout.write(f"{G_LO}.")
sys.stdout.flush()
time.sleep(random.uniform(0.006, 0.025))
c = G_MID if ok else "\033[2;38;5;196m"
print(f" {c}{status}{RST}")
time.sleep(random.uniform(0.02, 0.1))
# ─── FETCH ────────────────────────────────────────────────
def fetch_feed(url):
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=FEED_TIMEOUT)
return feedparser.parse(resp.read())
except Exception:
return None
def fetch_all():
items = []
linked = failed = 0
for src, url in FEEDS.items():
feed = fetch_feed(url)
if feed is None or (feed.bozo and not feed.entries):
boot_ln(src, "DARK", False)
failed += 1
continue
n = 0
for e in feed.entries:
t = strip_tags(e.get("title", ""))
if not t or _skip(t):
continue
pub = e.get("published_parsed") or e.get("updated_parsed")
try:
ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——"
except Exception:
ts = "——:——"
items.append((t, src, ts))
n += 1
if n:
boot_ln(src, f"LINKED [{n}]", True)
linked += 1
else:
boot_ln(src, "EMPTY", False)
failed += 1
return items, linked, failed
def _fetch_gutenberg(url, label):
"""Download and parse stanzas/passages from a Project Gutenberg text."""
try:
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=15)
text = resp.read().decode('utf-8', errors='replace').replace('\r\n', '\n').replace('\r', '\n')
# Strip PG boilerplate
m = re.search(r'\*\*\*\s*START OF[^\n]*\n', text)
if m:
text = text[m.end():]
m = re.search(r'\*\*\*\s*END OF', text)
if m:
text = text[:m.start()]
# Split on blank lines into stanzas/passages
blocks = re.split(r'\n{2,}', text.strip())
items = []
for blk in blocks:
blk = ' '.join(blk.split()) # flatten to one line
if len(blk) < 20 or len(blk) > 280:
continue
if blk.isupper(): # skip all-caps headers
continue
if re.match(r'^[IVXLCDM]+\.?\s*$', blk): # roman numerals
continue
items.append((blk, label, ''))
return items
except Exception:
return []
def fetch_poetry():
"""Fetch all poetry/literature sources."""
items = []
linked = failed = 0
for label, url in POETRY_SOURCES.items():
stanzas = _fetch_gutenberg(url, label)
if stanzas:
boot_ln(label, f"LOADED [{len(stanzas)}]", True)
items.extend(stanzas)
linked += 1
else:
boot_ln(label, "DARK", False)
failed += 1
return items, linked, failed
# ─── CACHE ────────────────────────────────────────────────
_CACHE_DIR = pathlib.Path(__file__).resolve().parent
def _cache_path():
return _CACHE_DIR / f".mainline_cache_{MODE}.json"
def _load_cache():
"""Load cached items from disk if available."""
p = _cache_path()
if not p.exists():
return None
try:
data = json.loads(p.read_text())
items = [tuple(i) for i in data["items"]]
return items if items else None
except Exception:
return None
def _save_cache(items):
"""Save fetched items to disk for fast subsequent runs."""
try:
_cache_path().write_text(json.dumps({"items": items}))
except Exception:
pass
# ─── STREAM ───────────────────────────────────────────────
_SCROLL_DUR = 5.625 # seconds per headline (2/3 original speed)
_FRAME_DT = 0.05 # 50ms base frame rate (20 FPS)
FIREHOSE_H = 12 # firehose zone height (terminal rows)
GRAD_SPEED = 0.08 # gradient traversal speed (cycles/sec, ~12s full sweep)
_mic_db = -99.0 # current mic level, written by background thread
_mic_stream = None
def _start_mic():
"""Start background mic monitoring; silently skipped if unavailable."""
global _mic_db, _mic_stream
if not _HAS_MIC:
return
def _cb(indata, frames, t, status):
global _mic_db
rms = float(_np.sqrt(_np.mean(indata ** 2)))
_mic_db = 20 * _np.log10(rms) if rms > 0 else -99.0
try:
_mic_stream = _sd.InputStream(
callback=_cb, channels=1, samplerate=44100, blocksize=2048)
_mic_stream.start()
atexit.register(lambda: _mic_stream.stop() if _mic_stream else None)
return True
except Exception:
return False
# ─── NTFY MESSAGE QUEUE ───────────────────────────────────
_ntfy_message = None # (title, body, monotonic_timestamp) or None
_ntfy_lock = threading.Lock()
def _start_ntfy_poller():
"""Start background thread polling ntfy for messages."""
def _poll():
global _ntfy_message
while True:
try:
req = urllib.request.Request(
NTFY_TOPIC, headers={"User-Agent": "mainline/0.1"})
resp = urllib.request.urlopen(req, timeout=10)
for line in resp.read().decode('utf-8', errors='replace').strip().split('\n'):
if not line.strip():
continue
try:
data = json.loads(line)
except json.JSONDecodeError:
continue
if data.get("event") == "message":
with _ntfy_lock:
_ntfy_message = (
data.get("title", ""),
data.get("message", ""),
time.monotonic(),
)
except Exception:
pass
time.sleep(NTFY_POLL_INTERVAL)
t = threading.Thread(target=_poll, daemon=True)
t.start()
return True
def _render_line(text, font=None):
"""Render a line of text as terminal rows using OTF font + half-blocks."""
if font is None:
font = _font()
bbox = font.getbbox(text)
if not bbox or bbox[2] <= bbox[0]:
return [""]
pad = 4
img_w = bbox[2] - bbox[0] + pad * 2
img_h = bbox[3] - bbox[1] + pad * 2
img = Image.new('L', (img_w, img_h), 0)
draw = ImageDraw.Draw(img)
draw.text((-bbox[0] + pad, -bbox[1] + pad), text, fill=255, font=font)
pix_h = _RENDER_H * 2
hi_h = pix_h * _SSAA
scale = hi_h / max(img_h, 1)
new_w_hi = max(1, int(img_w * scale))
img = img.resize((new_w_hi, hi_h), Image.Resampling.LANCZOS)
new_w = max(1, int(new_w_hi / _SSAA))
img = img.resize((new_w, pix_h), Image.Resampling.LANCZOS)
data = img.tobytes()
thr = 80
rows = []
for y in range(0, pix_h, 2):
row = []
for x in range(new_w):
top = data[y * new_w + x] > thr
bot = data[(y + 1) * new_w + x] > thr if y + 1 < pix_h else False
if top and bot:
row.append("")
elif top:
row.append("")
elif bot:
row.append("")
else:
row.append(" ")
rows.append("".join(row))
while rows and not rows[-1].strip():
rows.pop()
while rows and not rows[0].strip():
rows.pop(0)
return rows if rows else [""]
def _big_wrap(text, max_w, font=None):
"""Word-wrap text and render with OTF font."""
if font is None:
font = _font()
words = text.split()
lines, cur = [], ""
for word in words:
test = f"{cur} {word}".strip() if cur else word
bbox = font.getbbox(test)
if bbox:
img_h = bbox[3] - bbox[1] + 8
pix_h = _RENDER_H * 2
scale = pix_h / max(img_h, 1)
term_w = int((bbox[2] - bbox[0] + 8) * scale)
else:
term_w = 0
if term_w > max_w - 4 and cur:
lines.append(cur)
cur = word
else:
cur = test
if cur:
lines.append(cur)
out = []
for i, ln in enumerate(lines):
out.extend(_render_line(ln, font))
if i < len(lines) - 1:
out.append("")
return out
def _lr_gradient(rows, offset=0.0):
"""Color each non-space block character with a shifting left-to-right gradient."""
n = len(_GRAD_COLS)
max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1)
out = []
for row in rows:
if not row.strip():
out.append(row)
continue
buf = []
for x, ch in enumerate(row):
if ch == ' ':
buf.append(' ')
else:
shifted = (x / max(max_x - 1, 1) + offset) % 1.0
idx = min(round(shifted * (n - 1)), n - 1)
buf.append(f"{_GRAD_COLS[idx]}{ch}\033[0m")
out.append("".join(buf))
return out
def _fade_line(s, fade):
"""Dissolve a rendered line by probabilistically dropping characters."""
if fade >= 1.0:
return s
if fade <= 0.0:
return ''
result = []
i = 0
while i < len(s):
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[':
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i:j + 1])
i = j + 1
elif s[i] == ' ':
result.append(' ')
i += 1
else:
result.append(s[i] if random.random() < fade else ' ')
i += 1
return ''.join(result)
def _vis_trunc(s, w):
"""Truncate string to visual width w, skipping ANSI escape codes."""
result = []
vw = 0
i = 0
while i < len(s):
if vw >= w:
break
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[':
j = i + 2
while j < len(s) and not s[j].isalpha():
j += 1
result.append(s[i:j + 1])
i = j + 1
else:
result.append(s[i])
vw += 1
i += 1
return ''.join(result)
def _next_headline(pool, items, seen):
"""Pull the next unique headline from pool, refilling as needed."""
while True:
if not pool:
pool.extend(items)
random.shuffle(pool)
seen.clear()
title, src, ts = pool.pop()
sig = title.lower().strip()
if sig not in seen:
seen.add(sig)
return title, src, ts
def _make_block(title, src, ts, w):
"""Render a headline into a content block with color."""
target_lang = (SOURCE_LANGS.get(src) or _detect_location_language(title)) if MODE == 'news' else None
lang_font = _font_for_lang(target_lang)
if target_lang:
title = _translate_headline(title, target_lang)
# Don't uppercase scripts that have no case (CJK, Arabic, etc.)
if target_lang and target_lang in _NO_UPPER:
title_up = re.sub(r"\s+", " ", title)
else:
title_up = re.sub(r"\s+", " ", title.upper())
for old, new in [("\u2019","'"), ("\u2018","'"), ("\u201c",'"'),
("\u201d",'"'), ("\u2013","-"), ("\u2014","-")]:
title_up = title_up.replace(old, new)
big_rows = _big_wrap(title_up, w - 4, lang_font)
hc = random.choice([
"\033[38;5;46m", # matrix green
"\033[38;5;34m", # dark green
"\033[38;5;82m", # lime
"\033[38;5;48m", # sea green
"\033[38;5;37m", # teal
"\033[38;5;44m", # cyan
"\033[38;5;87m", # sky
"\033[38;5;117m", # ice blue
"\033[38;5;250m", # cool white
"\033[38;5;156m", # pale green
"\033[38;5;120m", # mint
"\033[38;5;80m", # dark cyan
"\033[38;5;108m", # grey-green
"\033[38;5;115m", # sage
"\033[1;38;5;46m", # bold green
"\033[1;38;5;250m",# bold white
])
content = [" " + r for r in big_rows]
content.append("")
meta = f"\u2591 {src} \u00b7 {ts}"
content.append(" " * max(2, w - len(meta) - 2) + meta)
return content, hc, len(content) - 1 # (rows, color, meta_row_index)
def _firehose_line(items, w):
"""Generate one line of rapidly cycling firehose content."""
r = random.random()
if r < 0.35:
# Raw headline text
title, src, ts = random.choice(items)
text = title[:w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
return f"{color}{text}{RST}"
elif r < 0.55:
# Dense glitch noise
d = random.choice([0.45, 0.55, 0.65, 0.75])
return "".join(
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
f"{random.choice(GLITCH + KATA)}{RST}"
if random.random() < d else " "
for _ in range(w)
)
elif r < 0.78:
# Status / program output
sources = FEEDS if MODE == 'news' else POETRY_SOURCES
src = random.choice(list(sources.keys()))
msgs = [
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
f" ░░ FEED ACTIVE :: {src}",
f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}",
f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}",
f" {''.join(random.choice(KATA) for _ in range(3))} STRM "
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
]
text = random.choice(msgs)[:w - 1]
color = random.choice([G_LO, G_DIM, W_GHOST])
return f"{color}{text}{RST}"
else:
# Headline fragment with glitch prefix
title, _, _ = random.choice(items)
start = random.randint(0, max(0, len(title) - 20))
frag = title[start:start + random.randint(10, 35)]
pad = random.randint(0, max(0, w - len(frag) - 8))
gp = ''.join(random.choice(GLITCH) for _ in range(random.randint(1, 3)))
text = (' ' * pad + gp + ' ' + frag)[:w - 1]
color = random.choice([G_LO, C_DIM, W_GHOST])
return f"{color}{text}{RST}"
def stream(items):
global _ntfy_message
random.shuffle(items)
pool = list(items)
seen = set()
queued = 0
time.sleep(0.5)
sys.stdout.write(CLR)
sys.stdout.flush()
w, h = tw(), th()
fh = FIREHOSE_H if FIREHOSE else 0
sh = h - fh # scroll zone height
GAP = 3 # blank rows between headlines
scroll_interval = _SCROLL_DUR / (sh + 15) * 2
# active blocks: (content_rows, color, canvas_y, meta_idx)
active = []
cam = 0 # viewport top in virtual canvas coords
next_y = sh # canvas-y where next block starts (off-screen bottom)
noise_cache = {}
scroll_accum = 0.0
def _noise_at(cy):
if cy not in noise_cache:
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
return noise_cache[cy]
# Message color: bright cyan/white — distinct from headline greens
MSG_COLOR = "\033[1;38;5;87m" # sky cyan
MSG_META = "\033[38;5;245m" # cool grey
MSG_BORDER = "\033[2;38;5;37m" # dim teal
_msg_cache = (None, None) # (cache_key, rendered_rows)
while queued < HEADLINE_LIMIT or active:
t0 = time.monotonic()
w, h = tw(), th()
fh = FIREHOSE_H if FIREHOSE else 0
sh = h - fh
# ── Check for ntfy message ────────────────────────
msg_active = False
with _ntfy_lock:
if _ntfy_message is not None:
m_title, m_body, m_ts = _ntfy_message
if time.monotonic() - m_ts < MESSAGE_DISPLAY_SECS:
msg_active = True
else:
_ntfy_message = None # expired
if msg_active:
# ── MESSAGE state: freeze scroll, render message ──
buf = []
# Render message text with OTF font (cached across frames)
display_text = m_body or m_title or "(empty)"
display_text = re.sub(r"\s+", " ", display_text.upper())
cache_key = (display_text, w)
if _msg_cache[0] != cache_key:
msg_rows = _big_wrap(display_text, w - 4)
_msg_cache = (cache_key, msg_rows)
else:
msg_rows = _msg_cache[1]
msg_rows = _lr_gradient(msg_rows, (time.monotonic() * GRAD_SPEED) % 1.0)
# Center vertically in scroll zone
total_h = len(msg_rows) + 4 # +4 for border + meta + padding
y_off = max(0, (sh - total_h) // 2)
for r in range(sh):
ri = r - y_off
if ri == 0 or ri == total_h - 1:
# Border lines
bar = "" * (w - 4)
buf.append(f"\033[{r+1};1H {MSG_BORDER}{bar}{RST}\033[K")
elif 1 <= ri <= len(msg_rows):
ln = _vis_trunc(msg_rows[ri - 1], w)
buf.append(f"\033[{r+1};1H {ln}{RST}\033[K")
elif ri == len(msg_rows) + 1:
# Title line (if present and different from body)
if m_title and m_title != m_body:
meta = f" {MSG_META}\u2591 {m_title}{RST}"
else:
meta = ""
buf.append(f"\033[{r+1};1H{meta}\033[K")
elif ri == len(msg_rows) + 2:
# Source + timestamp
elapsed_s = int(time.monotonic() - m_ts)
remaining = max(0, MESSAGE_DISPLAY_SECS - elapsed_s)
ts_str = datetime.now().strftime("%H:%M:%S")
meta = f" {MSG_META}\u2591 ntfy \u00b7 {ts_str} \u00b7 {remaining}s{RST}"
buf.append(f"\033[{r+1};1H{meta}\033[K")
else:
# Sparse noise outside the message
if random.random() < 0.06:
buf.append(f"\033[{r+1};1H{noise(w)}")
else:
buf.append(f"\033[{r+1};1H\033[K")
# Firehose keeps running during messages
if FIREHOSE and fh > 0:
for fr in range(fh):
fline = _firehose_line(items, w)
buf.append(f"\033[{sh + fr + 1};1H{fline}\033[K")
sys.stdout.buffer.write("".join(buf).encode())
sys.stdout.flush()
elapsed = time.monotonic() - t0
time.sleep(max(0, _FRAME_DT - elapsed))
continue
# ── SCROLL state: normal headline rendering ───────
# Advance scroll on schedule
scroll_accum += _FRAME_DT
while scroll_accum >= scroll_interval:
scroll_accum -= scroll_interval
cam += 1
# Enqueue new headlines when room at the bottom
while next_y < cam + sh + 10 and queued < HEADLINE_LIMIT:
t, src, ts = _next_headline(pool, items, seen)
content, hc, midx = _make_block(t, src, ts, w)
active.append((content, hc, next_y, midx))
next_y += len(content) + GAP
queued += 1
# Prune off-screen blocks and stale noise
active = [(c, hc, by, mi) for c, hc, by, mi in active
if by + len(c) > cam]
for k in list(noise_cache):
if k < cam:
del noise_cache[k]
# Draw scroll zone
top_zone = max(1, int(sh * 0.25))
bot_zone = max(1, int(sh * 0.10))
grad_offset = (time.monotonic() * GRAD_SPEED) % 1.0
buf = []
for r in range(sh):
cy = cam + r
top_f = min(1.0, r / top_zone)
bot_f = min(1.0, (sh - 1 - r) / bot_zone)
row_fade = min(top_f, bot_f)
drawn = False
for content, hc, by, midx in active:
cr = cy - by
if 0 <= cr < len(content):
raw = content[cr]
if cr != midx:
colored = _lr_gradient([raw], grad_offset)[0]
else:
colored = raw
ln = _vis_trunc(colored, w)
if row_fade < 1.0:
ln = _fade_line(ln, row_fade)
if cr == midx:
buf.append(f"\033[{r+1};1H{W_COOL}{ln}{RST}\033[K")
elif ln.strip():
buf.append(f"\033[{r+1};1H{ln}{RST}\033[K")
else:
buf.append(f"\033[{r+1};1H\033[K")
drawn = True
break
if not drawn:
n = _noise_at(cy)
if row_fade < 1.0 and n:
n = _fade_line(n, row_fade)
if n:
buf.append(f"\033[{r+1};1H{n}")
else:
buf.append(f"\033[{r+1};1H\033[K")
# Draw firehose zone
if FIREHOSE and fh > 0:
for fr in range(fh):
fline = _firehose_line(items, w)
buf.append(f"\033[{sh + fr + 1};1H{fline}\033[K")
# Glitch — base rate + mic-reactive spikes (scroll zone only)
mic_excess = max(0.0, _mic_db - MIC_THRESHOLD_DB)
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
n_hits = 4 + int(mic_excess / 2)
g_limit = sh if FIREHOSE else len(buf)
if random.random() < glitch_prob and g_limit > 0:
for _ in range(min(n_hits, g_limit)):
gi = random.randint(0, g_limit - 1)
buf[gi] = f"\033[{gi+1};1H{glitch_bar(w)}"
sys.stdout.buffer.write("".join(buf).encode())
sys.stdout.flush()
# Precise frame timing
elapsed = time.monotonic() - t0
time.sleep(max(0, _FRAME_DT - elapsed))
sys.stdout.write(CLR)
sys.stdout.flush()
# ─── MAIN ─────────────────────────────────────────────────
TITLE = [
" ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗",
" ████╗ ████║██╔══██╗██║████╗ ██║██║ ██║████╗ ██║██╔════╝",
" ██╔████╔██║███████║██║██╔██╗ ██║██║ ██║██╔██╗ ██║█████╗ ",
" ██║╚██╔╝██║██╔══██║██║██║╚██╗██║██║ ██║██║╚██╗██║██╔══╝ ",
" ██║ ╚═╝ ██║██║ ██║██║██║ ╚████║███████╗██║██║ ╚████║███████╗",
" ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝",
]
def main():
atexit.register(lambda: print(CURSOR_ON, end="", flush=True))
def handle_sigint(*_):
print(f"\n\n {G_DIM}> SIGNAL LOST{RST}")
print(f" {W_GHOST}> connection terminated{RST}\n")
sys.exit(0)
signal.signal(signal.SIGINT, handle_sigint)
w = tw()
print(CLR, end="")
print(CURSOR_OFF, end="")
print()
time.sleep(0.4)
for ln in TITLE:
print(f"{G_HI}{ln}{RST}")
time.sleep(0.07)
print()
_subtitle = "literary consciousness stream" if MODE == 'poetry' else "digital consciousness stream"
print(f" {W_DIM}v0.1 · {_subtitle}{RST}")
print(f" {W_GHOST}{'' * (w - 4)}{RST}")
print()
time.sleep(0.4)
cached = _load_cache() if '--refresh' not in sys.argv else None
if cached:
items = cached
boot_ln("Cache", f"LOADED [{len(items)} SIGNALS]", True)
elif MODE == 'poetry':
slow_print(" > INITIALIZING LITERARY CORPUS...\n")
time.sleep(0.2)
print()
items, linked, failed = fetch_poetry()
print()
print(f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}")
print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}")
_save_cache(items)
else:
slow_print(" > INITIALIZING FEED ARRAY...\n")
time.sleep(0.2)
print()
items, linked, failed = fetch_all()
print()
print(f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}")
print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}")
_save_cache(items)
if not items:
print(f"\n {W_DIM}> NO SIGNAL — check network{RST}")
sys.exit(1)
print()
mic_ok = _start_mic()
if _HAS_MIC:
boot_ln("Microphone", "ACTIVE" if mic_ok else "OFFLINE · check System Settings → Privacy → Microphone", mic_ok)
ntfy_ok = _start_ntfy_poller()
boot_ln("ntfy", "LISTENING" if ntfy_ok else "OFFLINE", ntfy_ok)
if FIREHOSE:
boot_ln("Firehose", "ENGAGED", True)
time.sleep(0.4)
slow_print(" > STREAMING...\n")
time.sleep(0.2)
print(f" {W_GHOST}{'' * (w - 4)}{RST}")
print()
time.sleep(0.4)
stream(items)
print()
print(f" {W_GHOST}{'' * (tw() - 4)}{RST}")
print(f" {G_DIM}> {HEADLINE_LIMIT} SIGNALS PROCESSED{RST}")
print(f" {W_GHOST}> end of stream{RST}")
print()
if __name__ == "__main__":
main()