917 lines
32 KiB
Python
Executable File
917 lines
32 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
M A I N L I N E
|
|
Digital news consciousness stream.
|
|
Matrix aesthetic · THX-1138 hue.
|
|
"""
|
|
|
|
import subprocess, sys, os, pathlib
|
|
|
|
# ─── BOOTSTRAP VENV ───────────────────────────────────────
|
|
_VENV = pathlib.Path(__file__).resolve().parent / ".mainline_venv"
|
|
_MARKER = _VENV / ".installed_v3"
|
|
|
|
def _ensure_venv():
|
|
"""Create a local venv and install deps if needed."""
|
|
if _MARKER.exists():
|
|
return
|
|
import venv
|
|
print("\033[2;38;5;34m > first run — creating environment...\033[0m")
|
|
venv.create(str(_VENV), with_pip=True, clear=True)
|
|
pip = str(_VENV / "bin" / "pip")
|
|
subprocess.check_call(
|
|
[pip, "install", "feedparser", "Pillow", "-q"],
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL,
|
|
)
|
|
_MARKER.touch()
|
|
|
|
_ensure_venv()
|
|
|
|
# Install sounddevice on first run after v3
|
|
_MARKER_SD = _VENV / ".installed_sd"
|
|
if not _MARKER_SD.exists():
|
|
_pip = str(_VENV / "bin" / "pip")
|
|
subprocess.check_call([_pip, "install", "sounddevice", "numpy", "-q"],
|
|
stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
_MARKER_SD.touch()
|
|
|
|
sys.path.insert(0, str(next((_VENV / "lib").glob("python*/site-packages"))))
|
|
|
|
import feedparser # noqa: E402
|
|
from PIL import Image, ImageDraw, ImageFont # noqa: E402
|
|
import random, time, re, signal, atexit, textwrap # noqa: E402
|
|
try:
|
|
import sounddevice as _sd
|
|
import numpy as _np
|
|
_HAS_MIC = True
|
|
except Exception:
|
|
_HAS_MIC = False
|
|
import urllib.request, urllib.parse, json # noqa: E402
|
|
from datetime import datetime
|
|
from html import unescape
|
|
from html.parser import HTMLParser
|
|
|
|
# ─── CONFIG ───────────────────────────────────────────────
|
|
HEADLINE_LIMIT = 1000
|
|
FEED_TIMEOUT = 10
|
|
MIC_THRESHOLD_DB = 50 # dB above which glitches intensify
|
|
MODE = 'poetry' if '--poetry' in sys.argv or '-p' in sys.argv else 'news'
|
|
FIREHOSE = '--firehose' in sys.argv
|
|
|
|
# Poetry/literature sources — public domain via Project Gutenberg
|
|
POETRY_SOURCES = {
|
|
"Whitman": "https://www.gutenberg.org/cache/epub/1322/pg1322.txt",
|
|
"Dickinson": "https://www.gutenberg.org/cache/epub/12242/pg12242.txt",
|
|
"Thoreau": "https://www.gutenberg.org/cache/epub/205/pg205.txt",
|
|
"Emerson": "https://www.gutenberg.org/cache/epub/2944/pg2944.txt",
|
|
"Whitman II": "https://www.gutenberg.org/cache/epub/8388/pg8388.txt",
|
|
}
|
|
|
|
# ─── ANSI ─────────────────────────────────────────────────
|
|
RST = "\033[0m"
|
|
BOLD = "\033[1m"
|
|
DIM = "\033[2m"
|
|
# Matrix greens
|
|
G_HI = "\033[38;5;46m"
|
|
G_MID = "\033[38;5;34m"
|
|
G_LO = "\033[38;5;22m"
|
|
G_DIM = "\033[2;38;5;34m"
|
|
# THX-1138 sterile tones
|
|
W_COOL = "\033[38;5;250m"
|
|
W_DIM = "\033[2;38;5;245m"
|
|
W_GHOST = "\033[2;38;5;238m"
|
|
C_DIM = "\033[2;38;5;37m"
|
|
# Terminal control
|
|
CLR = "\033[2J\033[H"
|
|
CURSOR_OFF = "\033[?25l"
|
|
CURSOR_ON = "\033[?25h"
|
|
|
|
# ─── FEEDS ────────────────────────────────────────────────
|
|
FEEDS = {
|
|
# Science & Technology
|
|
"Nature": "https://www.nature.com/nature.rss",
|
|
"Science Daily": "https://www.sciencedaily.com/rss/all.xml",
|
|
"Phys.org": "https://phys.org/rss-feed/",
|
|
"NASA": "https://www.nasa.gov/news-release/feed/",
|
|
"Ars Technica": "https://feeds.arstechnica.com/arstechnica/index",
|
|
"New Scientist": "https://www.newscientist.com/section/news/feed/",
|
|
"Quanta": "https://api.quantamagazine.org/feed/",
|
|
"BBC Science": "http://feeds.bbci.co.uk/news/science_and_environment/rss.xml",
|
|
"MIT Tech Review": "https://www.technologyreview.com/feed/",
|
|
# Economics & Business
|
|
"BBC Business": "http://feeds.bbci.co.uk/news/business/rss.xml",
|
|
"MarketWatch": "https://feeds.marketwatch.com/marketwatch/topstories/",
|
|
"Economist": "https://www.economist.com/finance-and-economics/rss.xml",
|
|
# World & Politics
|
|
"BBC World": "http://feeds.bbci.co.uk/news/world/rss.xml",
|
|
"NPR": "https://feeds.npr.org/1001/rss.xml",
|
|
"Al Jazeera": "https://www.aljazeera.com/xml/rss/all.xml",
|
|
"Guardian World": "https://www.theguardian.com/world/rss",
|
|
"DW": "https://rss.dw.com/rdf/rss-en-all",
|
|
"France24": "https://www.france24.com/en/rss",
|
|
"ABC Australia": "https://www.abc.net.au/news/feed/2942460/rss.xml",
|
|
"Japan Times": "https://www.japantimes.co.jp/feed/",
|
|
"The Hindu": "https://www.thehindu.com/news/national/feeder/default.rss",
|
|
"SCMP": "https://www.scmp.com/rss/91/feed",
|
|
"Der Spiegel": "https://www.spiegel.de/international/index.rss",
|
|
# Culture & Ideas
|
|
"Guardian Culture": "https://www.theguardian.com/culture/rss",
|
|
"Aeon": "https://aeon.co/feed.rss",
|
|
"Smithsonian": "https://www.smithsonianmag.com/rss/latest_articles/",
|
|
"The Marginalian": "https://www.themarginalian.org/feed/",
|
|
"Nautilus": "https://nautil.us/feed/",
|
|
"Wired": "https://www.wired.com/feed/rss",
|
|
"The Conversation": "https://theconversation.com/us/articles.atom",
|
|
"Longreads": "https://longreads.com/feed/",
|
|
"Literary Hub": "https://lithub.com/feed/",
|
|
"Atlas Obscura": "https://www.atlasobscura.com/feeds/latest",
|
|
}
|
|
|
|
# ─── GLYPHS ───────────────────────────────────────────────
|
|
GLITCH = "░▒▓█▌▐╌╍╎╏┃┆┇┊┋"
|
|
KATA = "ハミヒーウシナモニサワツオリアホテマケメエカキムユラセネスタヌヘ"
|
|
|
|
# ─── FONT RENDERING (OTF → terminal blocks) ─────────────
|
|
_FONT_PATH = "/Users/genejohnson/Documents/CS Bishop Drawn/CSBishopDrawn-Italic.otf"
|
|
_FONT_OBJ = None
|
|
_FONT_SZ = 60
|
|
_RENDER_H = 8 # terminal rows per rendered text line
|
|
|
|
# Non-Latin scripts → macOS system fonts
|
|
_SCRIPT_FONTS = {
|
|
'zh-cn': '/System/Library/Fonts/STHeiti Medium.ttc',
|
|
'ja': '/System/Library/Fonts/ヒラギノ角ゴシック W9.ttc',
|
|
'ko': '/System/Library/Fonts/AppleSDGothicNeo.ttc',
|
|
'ru': '/System/Library/Fonts/Supplemental/Arial.ttf',
|
|
'uk': '/System/Library/Fonts/Supplemental/Arial.ttf',
|
|
'el': '/System/Library/Fonts/Supplemental/Arial.ttf',
|
|
'he': '/System/Library/Fonts/Supplemental/Arial.ttf',
|
|
'ar': '/System/Library/Fonts/GeezaPro.ttc',
|
|
'fa': '/System/Library/Fonts/GeezaPro.ttc',
|
|
'hi': '/System/Library/Fonts/Kohinoor.ttc',
|
|
'th': '/System/Library/Fonts/ThonburiUI.ttc',
|
|
}
|
|
_FONT_CACHE = {}
|
|
_NO_UPPER = {'zh-cn', 'ja', 'ko', 'ar', 'fa', 'hi', 'th', 'he'}
|
|
# Left → right gradient: white-hot leading edge fades to near-black
|
|
_GRAD_COLS = [
|
|
"\033[1;38;5;231m", # white
|
|
"\033[1;38;5;195m", # pale cyan-white
|
|
"\033[38;5;123m", # bright cyan
|
|
"\033[38;5;118m", # bright lime
|
|
"\033[38;5;82m", # lime
|
|
"\033[38;5;46m", # bright green
|
|
"\033[38;5;40m", # green
|
|
"\033[38;5;34m", # medium green
|
|
"\033[38;5;28m", # dark green
|
|
"\033[38;5;22m", # deep green
|
|
"\033[2;38;5;22m", # dim deep green
|
|
"\033[2;38;5;235m", # near black
|
|
]
|
|
|
|
|
|
def _font():
|
|
"""Lazy-load the OTF font."""
|
|
global _FONT_OBJ
|
|
if _FONT_OBJ is None:
|
|
_FONT_OBJ = ImageFont.truetype(_FONT_PATH, _FONT_SZ)
|
|
return _FONT_OBJ
|
|
|
|
|
|
def _font_for_lang(lang=None):
|
|
"""Get appropriate font for a language."""
|
|
if lang is None or lang not in _SCRIPT_FONTS:
|
|
return _font()
|
|
if lang not in _FONT_CACHE:
|
|
try:
|
|
_FONT_CACHE[lang] = ImageFont.truetype(_SCRIPT_FONTS[lang], _FONT_SZ)
|
|
except Exception:
|
|
_FONT_CACHE[lang] = _font()
|
|
return _FONT_CACHE[lang]
|
|
|
|
# ─── HELPERS ──────────────────────────────────────────────
|
|
class _Strip(HTMLParser):
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._t = []
|
|
|
|
def handle_data(self, d):
|
|
self._t.append(d)
|
|
|
|
def text(self):
|
|
return "".join(self._t).strip()
|
|
|
|
|
|
def strip_tags(html):
|
|
s = _Strip()
|
|
s.feed(unescape(html or ""))
|
|
return s.text()
|
|
|
|
|
|
def tw():
|
|
try:
|
|
return os.get_terminal_size().columns
|
|
except Exception:
|
|
return 80
|
|
|
|
|
|
def th():
|
|
try:
|
|
return os.get_terminal_size().lines
|
|
except Exception:
|
|
return 24
|
|
|
|
|
|
def noise(w):
|
|
d = random.choice([0.15, 0.25, 0.35, 0.12]) # was [0.08, 0.12, 0.2, 0.05], now much denser
|
|
return "".join(
|
|
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
|
|
f"{random.choice(GLITCH + KATA)}{RST}"
|
|
if random.random() < d
|
|
else " "
|
|
for _ in range(w)
|
|
)
|
|
|
|
|
|
def glitch_bar(w):
|
|
c = random.choice(["░", "▒", "─", "╌"])
|
|
n = random.randint(3, w // 2)
|
|
o = random.randint(0, w - n)
|
|
return " " * o + f"{G_LO}{DIM}" + c * n + RST
|
|
|
|
|
|
# ─── SOURCE → LANGUAGE MAPPING ──────────────────────────
|
|
# Headlines from these outlets render in their cultural home language
|
|
# regardless of content, reflecting the true distribution of sources.
|
|
SOURCE_LANGS = {
|
|
"Der Spiegel": "de",
|
|
"DW": "de",
|
|
"France24": "fr",
|
|
"Japan Times": "ja",
|
|
"The Hindu": "hi",
|
|
"SCMP": "zh-cn",
|
|
"Al Jazeera": "ar",
|
|
}
|
|
|
|
# ─── LOCATION → LANGUAGE ─────────────────────────────────
|
|
_LOCATION_LANGS = {
|
|
r'\b(?:china|chinese|beijing|shanghai|hong kong|xi jinping)\b': 'zh-cn',
|
|
r'\b(?:japan|japanese|tokyo|osaka|kishida)\b': 'ja',
|
|
r'\b(?:korea|korean|seoul|pyongyang)\b': 'ko',
|
|
r'\b(?:russia|russian|moscow|kremlin|putin)\b': 'ru',
|
|
r'\b(?:saudi|dubai|qatar|egypt|cairo|arabic)\b': 'ar',
|
|
r'\b(?:india|indian|delhi|mumbai|modi)\b': 'hi',
|
|
r'\b(?:germany|german|berlin|munich|scholz)\b': 'de',
|
|
r'\b(?:france|french|paris|lyon|macron)\b': 'fr',
|
|
r'\b(?:spain|spanish|madrid)\b': 'es',
|
|
r'\b(?:italy|italian|rome|milan|meloni)\b': 'it',
|
|
r'\b(?:portugal|portuguese|lisbon)\b': 'pt',
|
|
r'\b(?:brazil|brazilian|são paulo|lula)\b': 'pt',
|
|
r'\b(?:greece|greek|athens)\b': 'el',
|
|
r'\b(?:turkey|turkish|istanbul|ankara|erdogan)\b': 'tr',
|
|
r'\b(?:iran|iranian|tehran)\b': 'fa',
|
|
r'\b(?:thailand|thai|bangkok)\b': 'th',
|
|
r'\b(?:vietnam|vietnamese|hanoi)\b': 'vi',
|
|
r'\b(?:ukraine|ukrainian|kyiv|kiev|zelensky)\b': 'uk',
|
|
r'\b(?:israel|israeli|jerusalem|tel aviv|netanyahu)\b': 'he',
|
|
}
|
|
|
|
_TRANSLATE_CACHE = {}
|
|
|
|
|
|
def _detect_location_language(title):
|
|
"""Detect if headline mentions a location, return target language."""
|
|
title_lower = title.lower()
|
|
for pattern, lang in _LOCATION_LANGS.items():
|
|
if re.search(pattern, title_lower):
|
|
return lang
|
|
return None
|
|
|
|
|
|
def _translate_headline(title, target_lang):
|
|
"""Translate headline via Google Translate API (zero dependencies)."""
|
|
key = (title, target_lang)
|
|
if key in _TRANSLATE_CACHE:
|
|
return _TRANSLATE_CACHE[key]
|
|
try:
|
|
q = urllib.parse.quote(title)
|
|
url = ("https://translate.googleapis.com/translate_a/single"
|
|
f"?client=gtx&sl=en&tl={target_lang}&dt=t&q={q}")
|
|
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
|
resp = urllib.request.urlopen(req, timeout=5)
|
|
data = json.loads(resp.read())
|
|
result = "".join(p[0] for p in data[0] if p[0]) or title
|
|
except Exception:
|
|
result = title
|
|
_TRANSLATE_CACHE[key] = result
|
|
return result
|
|
|
|
# ─── CONTENT FILTER ───────────────────────────────────────
|
|
_SKIP_RE = re.compile(
|
|
r'\b(?:'
|
|
# ── sports ──
|
|
r'football|soccer|basketball|baseball|softball|tennis|golf|cricket|rugby|'
|
|
r'hockey|lacrosse|volleyball|badminton|'
|
|
r'nba|nfl|nhl|mlb|mls|fifa|uefa|'
|
|
r'premier league|champions league|la liga|serie a|bundesliga|'
|
|
r'world cup|super bowl|world series|stanley cup|'
|
|
r'playoff|playoffs|touchdown|goalkeeper|striker|quarterback|'
|
|
r'slam dunk|home run|grand slam|offside|halftime|'
|
|
r'batting|wicket|innings|'
|
|
r'formula 1|nascar|motogp|'
|
|
r'boxing|ufc|mma|'
|
|
r'marathon|tour de france|'
|
|
r'transfer window|draft pick|relegation|'
|
|
# ── vapid / insipid ──
|
|
r'kardashian|jenner|reality tv|reality show|'
|
|
r'influencer|viral video|tiktok|instagram|'
|
|
r'best dressed|worst dressed|red carpet|'
|
|
r'horoscope|zodiac|gossip|bikini|selfie|'
|
|
r'you won.t believe|what happened next|'
|
|
r'celebrity couple|celebrity feud|baby bump'
|
|
r')\b',
|
|
re.IGNORECASE
|
|
)
|
|
|
|
|
|
def _skip(title):
|
|
"""Return True if headline is sports, vapid, or insipid."""
|
|
return bool(_SKIP_RE.search(title))
|
|
|
|
|
|
# ─── DISPLAY ──────────────────────────────────────────────
|
|
def type_out(text, color=G_HI):
|
|
i = 0
|
|
while i < len(text):
|
|
if random.random() < 0.3:
|
|
b = random.randint(2, 5)
|
|
sys.stdout.write(f"{color}{text[i:i+b]}{RST}")
|
|
i += b
|
|
else:
|
|
sys.stdout.write(f"{color}{text[i]}{RST}")
|
|
i += 1
|
|
sys.stdout.flush()
|
|
time.sleep(random.uniform(0.004, 0.018))
|
|
|
|
|
|
def slow_print(text, color=G_DIM, delay=0.015):
|
|
for ch in text:
|
|
sys.stdout.write(f"{color}{ch}{RST}")
|
|
sys.stdout.flush()
|
|
time.sleep(delay)
|
|
|
|
|
|
def boot_ln(label, status, ok=True):
|
|
dots = max(3, min(30, tw() - len(label) - len(status) - 8))
|
|
sys.stdout.write(f" {G_DIM}>{RST} {W_DIM}{label} ")
|
|
sys.stdout.flush()
|
|
for _ in range(dots):
|
|
sys.stdout.write(f"{G_LO}.")
|
|
sys.stdout.flush()
|
|
time.sleep(random.uniform(0.006, 0.025))
|
|
c = G_MID if ok else "\033[2;38;5;196m"
|
|
print(f" {c}{status}{RST}")
|
|
time.sleep(random.uniform(0.02, 0.1))
|
|
|
|
|
|
# ─── FETCH ────────────────────────────────────────────────
|
|
def fetch_feed(url):
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
|
resp = urllib.request.urlopen(req, timeout=FEED_TIMEOUT)
|
|
return feedparser.parse(resp.read())
|
|
except Exception:
|
|
return None
|
|
|
|
|
|
def fetch_all():
|
|
items = []
|
|
linked = failed = 0
|
|
for src, url in FEEDS.items():
|
|
feed = fetch_feed(url)
|
|
if feed is None or (feed.bozo and not feed.entries):
|
|
boot_ln(src, "DARK", False)
|
|
failed += 1
|
|
continue
|
|
n = 0
|
|
for e in feed.entries:
|
|
t = strip_tags(e.get("title", ""))
|
|
if not t or _skip(t):
|
|
continue
|
|
pub = e.get("published_parsed") or e.get("updated_parsed")
|
|
try:
|
|
ts = datetime(*pub[:6]).strftime("%H:%M") if pub else "——:——"
|
|
except Exception:
|
|
ts = "——:——"
|
|
items.append((t, src, ts))
|
|
n += 1
|
|
if n:
|
|
boot_ln(src, f"LINKED [{n}]", True)
|
|
linked += 1
|
|
else:
|
|
boot_ln(src, "EMPTY", False)
|
|
failed += 1
|
|
return items, linked, failed
|
|
|
|
|
|
def _fetch_gutenberg(url, label):
|
|
"""Download and parse stanzas/passages from a Project Gutenberg text."""
|
|
try:
|
|
req = urllib.request.Request(url, headers={"User-Agent": "mainline/0.1"})
|
|
resp = urllib.request.urlopen(req, timeout=15)
|
|
text = resp.read().decode('utf-8', errors='replace').replace('\r\n', '\n').replace('\r', '\n')
|
|
# Strip PG boilerplate
|
|
m = re.search(r'\*\*\*\s*START OF[^\n]*\n', text)
|
|
if m:
|
|
text = text[m.end():]
|
|
m = re.search(r'\*\*\*\s*END OF', text)
|
|
if m:
|
|
text = text[:m.start()]
|
|
# Split on blank lines into stanzas/passages
|
|
blocks = re.split(r'\n{2,}', text.strip())
|
|
items = []
|
|
for blk in blocks:
|
|
blk = ' '.join(blk.split()) # flatten to one line
|
|
if len(blk) < 20 or len(blk) > 280:
|
|
continue
|
|
if blk.isupper(): # skip all-caps headers
|
|
continue
|
|
if re.match(r'^[IVXLCDM]+\.?\s*$', blk): # roman numerals
|
|
continue
|
|
items.append((blk, label, ''))
|
|
return items
|
|
except Exception:
|
|
return []
|
|
|
|
|
|
def fetch_poetry():
|
|
"""Fetch all poetry/literature sources."""
|
|
items = []
|
|
linked = failed = 0
|
|
for label, url in POETRY_SOURCES.items():
|
|
stanzas = _fetch_gutenberg(url, label)
|
|
if stanzas:
|
|
boot_ln(label, f"LOADED [{len(stanzas)}]", True)
|
|
items.extend(stanzas)
|
|
linked += 1
|
|
else:
|
|
boot_ln(label, "DARK", False)
|
|
failed += 1
|
|
return items, linked, failed
|
|
|
|
|
|
# ─── STREAM ───────────────────────────────────────────────
|
|
_SCROLL_DUR = 3.75 # seconds per headline
|
|
FIREHOSE_H = 6 # firehose zone height (terminal rows)
|
|
_mic_db = -99.0 # current mic level, written by background thread
|
|
_mic_stream = None
|
|
|
|
|
|
def _start_mic():
|
|
"""Start background mic monitoring; silently skipped if unavailable."""
|
|
global _mic_db, _mic_stream
|
|
if not _HAS_MIC:
|
|
return
|
|
def _cb(indata, frames, t, status):
|
|
global _mic_db
|
|
rms = float(_np.sqrt(_np.mean(indata ** 2)))
|
|
_mic_db = 20 * _np.log10(rms) if rms > 0 else -99.0
|
|
try:
|
|
_mic_stream = _sd.InputStream(
|
|
callback=_cb, channels=1, samplerate=44100, blocksize=2048)
|
|
_mic_stream.start()
|
|
atexit.register(lambda: _mic_stream.stop() if _mic_stream else None)
|
|
return True
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def _render_line(text, font=None):
|
|
"""Render a line of text as terminal rows using OTF font + half-blocks."""
|
|
if font is None:
|
|
font = _font()
|
|
bbox = font.getbbox(text)
|
|
if not bbox or bbox[2] <= bbox[0]:
|
|
return [""]
|
|
pad = 4
|
|
img_w = bbox[2] - bbox[0] + pad * 2
|
|
img_h = bbox[3] - bbox[1] + pad * 2
|
|
img = Image.new('L', (img_w, img_h), 0)
|
|
draw = ImageDraw.Draw(img)
|
|
draw.text((-bbox[0] + pad, -bbox[1] + pad), text, fill=255, font=font)
|
|
pix_h = _RENDER_H * 2
|
|
scale = pix_h / max(img_h, 1)
|
|
new_w = max(1, int(img_w * scale))
|
|
img = img.resize((new_w, pix_h), Image.Resampling.LANCZOS)
|
|
data = img.tobytes()
|
|
thr = 80
|
|
rows = []
|
|
for y in range(0, pix_h, 2):
|
|
row = []
|
|
for x in range(new_w):
|
|
top = data[y * new_w + x] > thr
|
|
bot = data[(y + 1) * new_w + x] > thr if y + 1 < pix_h else False
|
|
if top and bot:
|
|
row.append("█")
|
|
elif top:
|
|
row.append("▀")
|
|
elif bot:
|
|
row.append("▄")
|
|
else:
|
|
row.append(" ")
|
|
rows.append("".join(row))
|
|
while rows and not rows[-1].strip():
|
|
rows.pop()
|
|
while rows and not rows[0].strip():
|
|
rows.pop(0)
|
|
return rows if rows else [""]
|
|
|
|
|
|
def _big_wrap(text, max_w, font=None):
|
|
"""Word-wrap text and render with OTF font."""
|
|
if font is None:
|
|
font = _font()
|
|
words = text.split()
|
|
lines, cur = [], ""
|
|
for word in words:
|
|
test = f"{cur} {word}".strip() if cur else word
|
|
bbox = font.getbbox(test)
|
|
if bbox:
|
|
img_h = bbox[3] - bbox[1] + 8
|
|
pix_h = _RENDER_H * 2
|
|
scale = pix_h / max(img_h, 1)
|
|
term_w = int((bbox[2] - bbox[0] + 8) * scale)
|
|
else:
|
|
term_w = 0
|
|
if term_w > max_w - 4 and cur:
|
|
lines.append(cur)
|
|
cur = word
|
|
else:
|
|
cur = test
|
|
if cur:
|
|
lines.append(cur)
|
|
out = []
|
|
for i, ln in enumerate(lines):
|
|
out.extend(_render_line(ln, font))
|
|
if i < len(lines) - 1:
|
|
out.append("")
|
|
return out
|
|
|
|
|
|
def _lr_gradient(rows):
|
|
"""Color each non-space block character with a left-to-right gradient."""
|
|
n = len(_GRAD_COLS)
|
|
max_x = max((len(r.rstrip()) for r in rows if r.strip()), default=1)
|
|
out = []
|
|
for row in rows:
|
|
if not row.strip():
|
|
out.append(row)
|
|
continue
|
|
buf = []
|
|
for x, ch in enumerate(row):
|
|
if ch == ' ':
|
|
buf.append(' ')
|
|
else:
|
|
idx = min(round(x / max(max_x - 1, 1) * (n - 1)), n - 1)
|
|
buf.append(f"{_GRAD_COLS[idx]}{ch}\033[0m")
|
|
out.append("".join(buf))
|
|
return out
|
|
|
|
|
|
def _fade_line(s, fade):
|
|
"""Dissolve a rendered line by probabilistically dropping characters."""
|
|
if fade >= 1.0:
|
|
return s
|
|
if fade <= 0.0:
|
|
return ''
|
|
result = []
|
|
i = 0
|
|
while i < len(s):
|
|
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[':
|
|
j = i + 2
|
|
while j < len(s) and not s[j].isalpha():
|
|
j += 1
|
|
result.append(s[i:j + 1])
|
|
i = j + 1
|
|
elif s[i] == ' ':
|
|
result.append(' ')
|
|
i += 1
|
|
else:
|
|
result.append(s[i] if random.random() < fade else ' ')
|
|
i += 1
|
|
return ''.join(result)
|
|
|
|
|
|
def _vis_trunc(s, w):
|
|
"""Truncate string to visual width w, skipping ANSI escape codes."""
|
|
result = []
|
|
vw = 0
|
|
i = 0
|
|
while i < len(s):
|
|
if vw >= w:
|
|
break
|
|
if s[i] == '\033' and i + 1 < len(s) and s[i + 1] == '[':
|
|
j = i + 2
|
|
while j < len(s) and not s[j].isalpha():
|
|
j += 1
|
|
result.append(s[i:j + 1])
|
|
i = j + 1
|
|
else:
|
|
result.append(s[i])
|
|
vw += 1
|
|
i += 1
|
|
return ''.join(result)
|
|
|
|
|
|
def _next_headline(pool, items, seen):
|
|
"""Pull the next unique headline from pool, refilling as needed."""
|
|
while True:
|
|
if not pool:
|
|
pool.extend(items)
|
|
random.shuffle(pool)
|
|
seen.clear()
|
|
title, src, ts = pool.pop()
|
|
sig = title.lower().strip()
|
|
if sig not in seen:
|
|
seen.add(sig)
|
|
return title, src, ts
|
|
|
|
|
|
def _make_block(title, src, ts, w):
|
|
"""Render a headline into a content block with color."""
|
|
target_lang = (SOURCE_LANGS.get(src) or _detect_location_language(title)) if MODE == 'news' else None
|
|
lang_font = _font_for_lang(target_lang)
|
|
if target_lang:
|
|
title = _translate_headline(title, target_lang)
|
|
# Don't uppercase scripts that have no case (CJK, Arabic, etc.)
|
|
if target_lang and target_lang in _NO_UPPER:
|
|
title_up = re.sub(r"\s+", " ", title)
|
|
else:
|
|
title_up = re.sub(r"\s+", " ", title.upper())
|
|
for old, new in [("\u2019","'"), ("\u2018","'"), ("\u201c",'"'),
|
|
("\u201d",'"'), ("\u2013","-"), ("\u2014","-")]:
|
|
title_up = title_up.replace(old, new)
|
|
big_rows = _big_wrap(title_up, w - 4, lang_font)
|
|
big_rows = _lr_gradient(big_rows)
|
|
hc = random.choice([
|
|
"\033[38;5;46m", # matrix green
|
|
"\033[38;5;34m", # dark green
|
|
"\033[38;5;82m", # lime
|
|
"\033[38;5;48m", # sea green
|
|
"\033[38;5;37m", # teal
|
|
"\033[38;5;44m", # cyan
|
|
"\033[38;5;87m", # sky
|
|
"\033[38;5;117m", # ice blue
|
|
"\033[38;5;250m", # cool white
|
|
"\033[38;5;156m", # pale green
|
|
"\033[38;5;120m", # mint
|
|
"\033[38;5;80m", # dark cyan
|
|
"\033[38;5;108m", # grey-green
|
|
"\033[38;5;115m", # sage
|
|
"\033[1;38;5;46m", # bold green
|
|
"\033[1;38;5;250m",# bold white
|
|
])
|
|
content = [" " + r for r in big_rows]
|
|
content.append("")
|
|
meta = f"\u2591 {src} \u00b7 {ts}"
|
|
content.append(" " * max(2, w - len(meta) - 2) + meta)
|
|
return content, hc, len(content) - 1 # (rows, color, meta_row_index)
|
|
|
|
|
|
def _firehose_line(items, w):
|
|
"""Generate one line of rapidly cycling firehose content."""
|
|
r = random.random()
|
|
if r < 0.35:
|
|
# Raw headline text
|
|
title, src, ts = random.choice(items)
|
|
text = title[:w - 1]
|
|
color = random.choice([G_LO, G_DIM, W_GHOST, C_DIM])
|
|
return f"{color}{text}{RST}"
|
|
elif r < 0.55:
|
|
# Dense glitch noise
|
|
d = random.choice([0.45, 0.55, 0.65, 0.75])
|
|
return "".join(
|
|
f"{random.choice([G_LO, G_DIM, C_DIM, W_GHOST])}"
|
|
f"{random.choice(GLITCH + KATA)}{RST}"
|
|
if random.random() < d else " "
|
|
for _ in range(w)
|
|
)
|
|
elif r < 0.78:
|
|
# Status / program output
|
|
sources = FEEDS if MODE == 'news' else POETRY_SOURCES
|
|
src = random.choice(list(sources.keys()))
|
|
msgs = [
|
|
f" SIGNAL :: {src} :: {datetime.now().strftime('%H:%M:%S.%f')[:-3]}",
|
|
f" ░░ FEED ACTIVE :: {src}",
|
|
f" >> DECODE 0x{random.randint(0x1000, 0xFFFF):04X} :: {src[:24]}",
|
|
f" ▒▒ ACQUIRE :: {random.choice(['TCP', 'UDP', 'RSS', 'ATOM', 'XML'])} :: {src}",
|
|
f" {''.join(random.choice(KATA) for _ in range(3))} STRM "
|
|
f"{random.randint(0, 255):02X}:{random.randint(0, 255):02X}",
|
|
]
|
|
text = random.choice(msgs)[:w - 1]
|
|
color = random.choice([G_LO, G_DIM, W_GHOST])
|
|
return f"{color}{text}{RST}"
|
|
else:
|
|
# Headline fragment with glitch prefix
|
|
title, _, _ = random.choice(items)
|
|
start = random.randint(0, max(0, len(title) - 20))
|
|
frag = title[start:start + random.randint(10, 35)]
|
|
pad = random.randint(0, max(0, w - len(frag) - 8))
|
|
gp = ''.join(random.choice(GLITCH) for _ in range(random.randint(1, 3)))
|
|
text = (' ' * pad + gp + ' ' + frag)[:w - 1]
|
|
color = random.choice([G_LO, C_DIM, W_GHOST])
|
|
return f"{color}{text}{RST}"
|
|
|
|
|
|
def stream(items):
|
|
random.shuffle(items)
|
|
pool = list(items)
|
|
seen = set()
|
|
queued = 0
|
|
|
|
time.sleep(0.5)
|
|
sys.stdout.write(CLR)
|
|
sys.stdout.flush()
|
|
|
|
w, h = tw(), th()
|
|
fh = FIREHOSE_H if FIREHOSE else 0
|
|
sh = h - fh # scroll zone height
|
|
GAP = 3 # blank rows between headlines
|
|
dt = _SCROLL_DUR / (sh + 15) * 2
|
|
|
|
# active blocks: (content_rows, color, canvas_y, meta_idx)
|
|
active = []
|
|
cam = 0 # viewport top in virtual canvas coords
|
|
next_y = sh # canvas-y where next block starts (off-screen bottom)
|
|
noise_cache = {}
|
|
|
|
def _noise_at(cy):
|
|
if cy not in noise_cache:
|
|
noise_cache[cy] = noise(w) if random.random() < 0.15 else None
|
|
return noise_cache[cy]
|
|
|
|
while queued < HEADLINE_LIMIT or active:
|
|
w, h = tw(), th()
|
|
fh = FIREHOSE_H if FIREHOSE else 0
|
|
sh = h - fh
|
|
|
|
# Enqueue new headlines when room at the bottom
|
|
while next_y < cam + sh + 10 and queued < HEADLINE_LIMIT:
|
|
t, src, ts = _next_headline(pool, items, seen)
|
|
content, hc, midx = _make_block(t, src, ts, w)
|
|
active.append((content, hc, next_y, midx))
|
|
next_y += len(content) + GAP
|
|
queued += 1
|
|
|
|
# Draw scroll zone
|
|
top_zone = max(1, int(sh * 0.25))
|
|
bot_zone = max(1, int(sh * 0.10))
|
|
buf = []
|
|
for r in range(sh):
|
|
cy = cam + r
|
|
top_f = min(1.0, r / top_zone)
|
|
bot_f = 1.0 if FIREHOSE else min(1.0, (sh - 1 - r) / bot_zone)
|
|
row_fade = min(top_f, bot_f)
|
|
drawn = False
|
|
for content, hc, by, midx in active:
|
|
cr = cy - by
|
|
if 0 <= cr < len(content):
|
|
ln = _vis_trunc(content[cr], w)
|
|
if row_fade < 1.0:
|
|
ln = _fade_line(ln, row_fade)
|
|
if cr == midx:
|
|
buf.append(f"\033[{r+1};1H{W_COOL}{ln}{RST}\033[K")
|
|
elif ln.strip():
|
|
buf.append(f"\033[{r+1};1H{hc}{ln}{RST}\033[K")
|
|
else:
|
|
buf.append(f"\033[{r+1};1H\033[K")
|
|
drawn = True
|
|
break
|
|
if not drawn:
|
|
n = _noise_at(cy)
|
|
if row_fade < 1.0 and n:
|
|
n = _fade_line(n, row_fade)
|
|
if n:
|
|
buf.append(f"\033[{r+1};1H{n}")
|
|
else:
|
|
buf.append(f"\033[{r+1};1H\033[K")
|
|
|
|
# Draw firehose zone
|
|
if FIREHOSE and fh > 0:
|
|
for fr in range(fh):
|
|
fline = _firehose_line(items, w)
|
|
buf.append(f"\033[{sh + fr + 1};1H{fline}\033[K")
|
|
|
|
# Glitch — base rate + mic-reactive spikes (scroll zone only)
|
|
mic_excess = max(0.0, _mic_db - MIC_THRESHOLD_DB)
|
|
glitch_prob = 0.32 + min(0.9, mic_excess * 0.16)
|
|
n_hits = 4 + int(mic_excess / 2)
|
|
g_limit = sh if FIREHOSE else len(buf)
|
|
if random.random() < glitch_prob and g_limit > 0:
|
|
for _ in range(min(n_hits, g_limit)):
|
|
gi = random.randint(0, g_limit - 1)
|
|
buf[gi] = f"\033[{gi+1};1H{glitch_bar(w)}"
|
|
|
|
sys.stdout.write("".join(buf))
|
|
sys.stdout.flush()
|
|
time.sleep(dt)
|
|
|
|
# Advance viewport
|
|
cam += 1
|
|
|
|
# Prune off-screen blocks and stale noise
|
|
active = [(c, hc, by, mi) for c, hc, by, mi in active
|
|
if by + len(c) > cam]
|
|
for k in list(noise_cache):
|
|
if k < cam:
|
|
del noise_cache[k]
|
|
|
|
sys.stdout.write(CLR)
|
|
sys.stdout.flush()
|
|
|
|
|
|
# ─── MAIN ─────────────────────────────────────────────────
|
|
TITLE = [
|
|
" ███╗ ███╗ █████╗ ██╗███╗ ██╗██╗ ██╗███╗ ██╗███████╗",
|
|
" ████╗ ████║██╔══██╗██║████╗ ██║██║ ██║████╗ ██║██╔════╝",
|
|
" ██╔████╔██║███████║██║██╔██╗ ██║██║ ██║██╔██╗ ██║█████╗ ",
|
|
" ██║╚██╔╝██║██╔══██║██║██║╚██╗██║██║ ██║██║╚██╗██║██╔══╝ ",
|
|
" ██║ ╚═╝ ██║██║ ██║██║██║ ╚████║███████╗██║██║ ╚████║███████╗",
|
|
" ╚═╝ ╚═╝╚═╝ ╚═╝╚═╝╚═╝ ╚═══╝╚══════╝╚═╝╚═╝ ╚═══╝╚══════╝",
|
|
]
|
|
|
|
|
|
def main():
|
|
atexit.register(lambda: print(CURSOR_ON, end="", flush=True))
|
|
|
|
def handle_sigint(*_):
|
|
print(f"\n\n {G_DIM}> SIGNAL LOST{RST}")
|
|
print(f" {W_GHOST}> connection terminated{RST}\n")
|
|
sys.exit(0)
|
|
|
|
signal.signal(signal.SIGINT, handle_sigint)
|
|
|
|
w = tw()
|
|
print(CLR, end="")
|
|
print(CURSOR_OFF, end="")
|
|
print()
|
|
time.sleep(0.4)
|
|
|
|
for ln in TITLE:
|
|
print(f"{G_HI}{ln}{RST}")
|
|
time.sleep(0.07)
|
|
|
|
print()
|
|
_subtitle = "literary consciousness stream" if MODE == 'poetry' else "digital consciousness stream"
|
|
print(f" {W_DIM}v0.1 · {_subtitle}{RST}")
|
|
print(f" {W_GHOST}{'─' * (w - 4)}{RST}")
|
|
print()
|
|
time.sleep(0.4)
|
|
|
|
if MODE == 'poetry':
|
|
slow_print(" > INITIALIZING LITERARY CORPUS...\n")
|
|
time.sleep(0.2)
|
|
print()
|
|
items, linked, failed = fetch_poetry()
|
|
print()
|
|
print(f" {G_DIM}>{RST} {G_MID}{linked} TEXTS LOADED{RST} {W_GHOST}· {failed} DARK{RST}")
|
|
print(f" {G_DIM}>{RST} {G_MID}{len(items)} STANZAS ACQUIRED{RST}")
|
|
else:
|
|
slow_print(" > INITIALIZING FEED ARRAY...\n")
|
|
time.sleep(0.2)
|
|
print()
|
|
items, linked, failed = fetch_all()
|
|
print()
|
|
print(f" {G_DIM}>{RST} {G_MID}{linked} SOURCES LINKED{RST} {W_GHOST}· {failed} DARK{RST}")
|
|
print(f" {G_DIM}>{RST} {G_MID}{len(items)} SIGNALS ACQUIRED{RST}")
|
|
|
|
if not items:
|
|
print(f"\n {W_DIM}> NO SIGNAL — check network{RST}")
|
|
sys.exit(1)
|
|
|
|
print()
|
|
mic_ok = _start_mic()
|
|
if _HAS_MIC:
|
|
boot_ln("Microphone", "ACTIVE" if mic_ok else "OFFLINE · check System Settings → Privacy → Microphone", mic_ok)
|
|
if FIREHOSE:
|
|
boot_ln("Firehose", "ENGAGED", True)
|
|
|
|
time.sleep(0.4)
|
|
slow_print(" > STREAMING...\n")
|
|
time.sleep(0.2)
|
|
print(f" {W_GHOST}{'─' * (w - 4)}{RST}")
|
|
print()
|
|
time.sleep(0.4)
|
|
|
|
stream(items)
|
|
|
|
print()
|
|
print(f" {W_GHOST}{'─' * (tw() - 4)}{RST}")
|
|
print(f" {G_DIM}> {HEADLINE_LIMIT} SIGNALS PROCESSED{RST}")
|
|
print(f" {W_GHOST}> end of stream{RST}")
|
|
print()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|