""" Pipeline visualization - Large animated network visualization with camera modes. """ import math NODE_NETWORK = { "sources": [ {"id": "RSS", "label": "RSS FEEDS", "x": 20, "y": 20}, {"id": "POETRY", "label": "POETRY DB", "x": 100, "y": 20}, {"id": "NTFY", "label": "NTFY MSG", "x": 180, "y": 20}, {"id": "MIC", "label": "MICROPHONE", "x": 260, "y": 20}, ], "fetch": [ {"id": "FETCH", "label": "FETCH LAYER", "x": 140, "y": 100}, {"id": "CACHE", "label": "CACHE", "x": 220, "y": 100}, ], "scroll": [ {"id": "STREAM", "label": "STREAM CTRL", "x": 60, "y": 180}, {"id": "CAMERA", "label": "CAMERA", "x": 140, "y": 180}, {"id": "RENDER", "label": "RENDER", "x": 220, "y": 180}, ], "effects": [ {"id": "NOISE", "label": "NOISE", "x": 20, "y": 260}, {"id": "FADE", "label": "FADE", "x": 80, "y": 260}, {"id": "GLITCH", "label": "GLITCH", "x": 140, "y": 260}, {"id": "FIRE", "label": "FIREHOSE", "x": 200, "y": 260}, {"id": "HUD", "label": "HUD", "x": 260, "y": 260}, ], "display": [ {"id": "TERM", "label": "TERMINAL", "x": 20, "y": 340}, {"id": "WEB", "label": "WEBSOCKET", "x": 80, "y": 340}, {"id": "PYGAME", "label": "PYGAME", "x": 140, "y": 340}, {"id": "SIXEL", "label": "SIXEL", "x": 200, "y": 340}, {"id": "KITTY", "label": "KITTY", "x": 260, "y": 340}, ], } ALL_NODES = [] for group_nodes in NODE_NETWORK.values(): ALL_NODES.extend(group_nodes) NETWORK_PATHS = [ ["RSS", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "NOISE", "TERM"], ["POETRY", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "FADE", "WEB"], ["NTFY", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "GLITCH", "PYGAME"], ["MIC", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "FIRE", "SIXEL"], ["RSS", "FETCH", "CACHE", "STREAM", "CAMERA", "RENDER", "HUD", "KITTY"], ] GRID_WIDTH = 300 GRID_HEIGHT = 400 def get_node_by_id(node_id: str): for node in ALL_NODES: if node["id"] == node_id: return node return None def draw_network_to_grid(frame: int = 0) -> list[list[str]]: grid = [[" " for _ in range(GRID_WIDTH)] for _ in range(GRID_HEIGHT)] active_path_idx = (frame // 60) % len(NETWORK_PATHS) active_path = NETWORK_PATHS[active_path_idx] for node in ALL_NODES: x, y = node["x"], node["y"] label = node["label"] is_active = node["id"] in active_path is_highlight = node["id"] == active_path[(frame // 15) % len(active_path)] node_w, node_h = 20, 7 for dy in range(node_h): for dx in range(node_w): gx, gy = x + dx, y + dy if 0 <= gx < GRID_WIDTH and 0 <= gy < GRID_HEIGHT: if dy == 0: char = "┌" if dx == 0 else ("┐" if dx == node_w - 1 else "─") elif dy == node_h - 1: char = "└" if dx == 0 else ("┘" if dx == node_w - 1 else "─") elif dy == node_h // 2: if dx == 0 or dx == node_w - 1: char = "│" else: pad = (node_w - 2 - len(label)) // 2 if dx - 1 == pad and len(label) <= node_w - 2: char = ( label[dx - 1 - pad] if dx - 1 - pad < len(label) else " " ) else: char = " " else: char = "│" if dx == 0 or dx == node_w - 1 else " " if char.strip(): if is_highlight: grid[gy][gx] = "\033[1;38;5;46m" + char + "\033[0m" elif is_active: grid[gy][gx] = "\033[1;38;5;220m" + char + "\033[0m" else: grid[gy][gx] = "\033[38;5;240m" + char + "\033[0m" for i, node_id in enumerate(active_path[:-1]): curr = get_node_by_id(node_id) next_id = active_path[i + 1] next_node = get_node_by_id(next_id) if curr and next_node: x1, y1 = curr["x"] + 7, curr["y"] + 2 x2, y2 = next_node["x"] + 7, next_node["y"] + 2 step = 1 if x2 >= x1 else -1 for x in range(x1, x2 + step, step): if 0 <= x < GRID_WIDTH and 0 <= y1 < GRID_HEIGHT: grid[y1][x] = "\033[38;5;45m─\033[0m" step = 1 if y2 >= y1 else -1 for y in range(y1, y2 + step, step): if 0 <= x2 < GRID_WIDTH and 0 <= y < GRID_HEIGHT: grid[y][x2] = "\033[38;5;45m│\033[0m" return grid class TraceCamera: def __init__(self): self.x = 0 self.y = 0 self.target_x = 0 self.target_y = 0 self.current_node_idx = 0 self.path = [] self.frame = 0 def update(self, dt: float, frame: int = 0) -> None: self.frame = frame active_path = NETWORK_PATHS[(frame // 60) % len(NETWORK_PATHS)] if self.path != active_path: self.path = active_path self.current_node_idx = 0 if self.current_node_idx < len(self.path): node_id = self.path[self.current_node_idx] node = get_node_by_id(node_id) if node: self.target_x = max(0, node["x"] - 40) self.target_y = max(0, node["y"] - 10) self.current_node_idx += 1 self.x += int((self.target_x - self.x) * 0.1) self.y += int((self.target_y - self.y) * 0.1) class CameraLarge: def __init__(self, viewport_w: int, viewport_h: int, frame: int): self.viewport_w = viewport_w self.viewport_h = viewport_h self.frame = frame self.x = 0 self.y = 0 self.mode = "trace" self.trace_camera = TraceCamera() def set_vertical_mode(self): self.mode = "vertical" def set_horizontal_mode(self): self.mode = "horizontal" def set_omni_mode(self): self.mode = "omni" def set_floating_mode(self): self.mode = "floating" def set_trace_mode(self): self.mode = "trace" def update(self, dt: float): self.frame += 1 if self.mode == "vertical": self.y = int((self.frame * 0.5) % (GRID_HEIGHT - self.viewport_h)) elif self.mode == "horizontal": self.x = int((self.frame * 0.5) % (GRID_WIDTH - self.viewport_w)) elif self.mode == "omni": self.x = int((self.frame * 0.3) % (GRID_WIDTH - self.viewport_w)) self.y = int((self.frame * 0.5) % (GRID_HEIGHT - self.viewport_h)) elif self.mode == "floating": self.x = int(50 + math.sin(self.frame * 0.02) * 30) self.y = int(50 + math.cos(self.frame * 0.015) * 30) elif self.mode == "trace": self.trace_camera.update(dt, self.frame) self.x = self.trace_camera.x self.y = self.trace_camera.y def generate_mermaid_graph(frame: int = 0) -> str: effects = ["NOISE", "FADE", "GLITCH", "FIREHOSE"] active_effect = effects[(frame // 30) % 4] cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"] active_cam = cam_modes[(frame // 100) % 5] return f"""graph LR subgraph SOURCES RSS[RSS Feeds] Poetry[Poetry DB] Ntfy[Ntfy Msg] Mic[Microphone] end subgraph FETCH Fetch(fetch_all) Cache[(Cache)] end subgraph SCROLL Scroll(StreamController) Camera({active_cam}) end subgraph EFFECTS Noise[NOISE] Fade[FADE] Glitch[GLITCH] Fire[FIREHOSE] Hud[HUD] end subgraph DISPLAY Term[Terminal] Web[WebSocket] Pygame[PyGame] Sixel[Sixel] end RSS --> Fetch Poetry --> Fetch Ntfy --> Fetch Fetch --> Cache Cache --> Scroll Scroll --> Noise Scroll --> Fade Scroll --> Glitch Scroll --> Fire Scroll --> Hud Noise --> Term Fade --> Web Glitch --> Pygame Fire --> Sixel style {active_effect} fill:#90EE90 style Camera fill:#87CEEB """ def generate_network_pipeline( width: int = 80, height: int = 24, frame: int = 0 ) -> list[str]: try: from engine.beautiful_mermaid import render_mermaid_ascii mermaid_graph = generate_mermaid_graph(frame) ascii_output = render_mermaid_ascii(mermaid_graph, padding_x=2, padding_y=1) lines = ascii_output.split("\n") result = [] for y in range(height): if y < len(lines): line = lines[y] if len(line) < width: line = line + " " * (width - len(line)) elif len(line) > width: line = line[:width] result.append(line) else: result.append(" " * width) status_y = height - 2 if status_y < height: fps = 60 - (frame % 15) cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"] cam = cam_modes[(frame // 100) % 5] effects = ["NOISE", "FADE", "GLITCH", "FIREHOSE"] eff = effects[(frame // 30) % 4] anim = "▓▒░ "[frame % 4] status = f" FPS:{fps:3.0f} │ {anim} {eff} │ Cam:{cam}" status = status[: width - 4].ljust(width - 4) result[status_y] = "║ " + status + " ║" if height > 0: result[0] = "═" * width result[height - 1] = "═" * width return result except Exception as e: return [ f"Error: {e}" + " " * (width - len(f"Error: {e}")) for _ in range(height) ] def generate_large_network_viewport( viewport_w: int = 80, viewport_h: int = 24, frame: int = 0 ) -> list[str]: cam_modes = ["VERTICAL", "HORIZONTAL", "OMNI", "FLOATING", "TRACE"] camera_mode = cam_modes[(frame // 100) % 5] camera = CameraLarge(viewport_w, viewport_h, frame) if camera_mode == "TRACE": camera.set_trace_mode() elif camera_mode == "VERTICAL": camera.set_vertical_mode() elif camera_mode == "HORIZONTAL": camera.set_horizontal_mode() elif camera_mode == "OMNI": camera.set_omni_mode() elif camera_mode == "FLOATING": camera.set_floating_mode() camera.update(1 / 60) grid = draw_network_to_grid(frame) result = [] for vy in range(viewport_h): line = "" for vx in range(viewport_w): gx = camera.x + vx gy = camera.y + vy if 0 <= gx < GRID_WIDTH and 0 <= gy < GRID_HEIGHT: line += grid[gy][gx] else: line += " " result.append(line) fps = 60 - (frame % 15) active_path = NETWORK_PATHS[(frame // 60) % len(NETWORK_PATHS)] active_node = active_path[(frame // 15) % len(active_path)] anim = "▓▒░ "[frame % 4] status = f" FPS:{fps:3.0f} │ {anim} {camera_mode:9s} │ Node:{active_node}" status = status[: viewport_w - 4].ljust(viewport_w - 4) if viewport_h > 2: result[viewport_h - 2] = "║ " + status + " ║" if viewport_h > 0: result[0] = "═" * viewport_w result[viewport_h - 1] = "═" * viewport_w return result