#!/usr/bin/env python3 """Monitor agent - multiplexed serial monitor for AI interaction""" import os import sys import json import time import threading import serial import select BOARD = sys.argv[1] if len(sys.argv) > 1 else "esp32-32e-4" # Load board config exec(open(f"./boards/{BOARD}/board-config.sh").read().replace("export ", "")) SERIAL_PORT = os.environ.get("PORT", PORT) LOGFILE = f"/tmp/doorbell-{BOARD}.jsonl" STATEFILE = f"/tmp/doorbell-{BOARD}-state.json" CMDFIFO = f"/tmp/doorbell-{BOARD}-cmd.fifo" # Initialize state with open(STATEFILE, "w") as f: json.dump({"screen": "BOOT", "deviceState": "BOOTED", "backlightOn": False}, f) # Open serial port ser = serial.Serial(SERIAL_PORT, 115200, timeout=1) # State parsing def parse_state(line): updates = {} if "[STATE]" in line and "→ OFF" in line: updates["screen"] = "OFF" elif "[STATE]" in line and "→ DASHBOARD" in line: updates["screen"] = "DASHBOARD" elif "[STATE]" in line and "→ ALERT" in line: updates["screen"] = "ALERT" elif "[STATE]" in line and "→ BOOT" in line: updates["screen"] = "BOOT" elif "[ADMIN]" in line and "dashboard" in line: updates["screen"] = "DASHBOARD" elif "[ADMIN]" in line and "off" in line: updates["screen"] = "OFF" return updates print(f"Monitor agent started (BOARD={BOARD}):") print(f" Log: {LOGFILE}") print(f" State: {STATEFILE}") print(f" Cmd: echo 'dashboard' > {CMDFIFO}") print() print("Press Ctrl+C to exit") # Create FIFO for commands if not exists if not os.path.exists(CMDFIFO): os.mkfifo(CMDFIFO) # Command FIFO reader thread cmd_running = True def cmd_reader(): global cmd_running while cmd_running: try: with open(CMDFIFO, "r") as fifo: # Use select to avoid blocking forever ready, _, _ = select.select([fifo], [], [], 1) if ready: cmd = fifo.read().strip() if cmd: ser.write((cmd + "\n").encode()) print(f"[SENT] {cmd}") except Exception as e: if cmd_running: print(f"Cmd reader error: {e}") time.sleep(0.1) cmd_thread = threading.Thread(target=cmd_reader, daemon=True) cmd_thread.start() # Main loop: read serial and log try: while True: if ser.in_waiting: line = ser.readline().decode("utf-8", errors="replace").strip() if line: ts = time.time() # Write JSON log with open(LOGFILE, "a") as f: json.dump({"ts": f"{ts:.3f}", "line": line}, f) f.write("\n") # Update state updates = parse_state(line) if updates: with open(STATEFILE, "w") as f: json.dump(updates, f) time.sleep(0.01) except KeyboardInterrupt: pass finally: cmd_running = False ser.close() print("\nMonitor stopped")