refactor(doorbell-touch): add build harness with monitor agent and board-specific setup

This commit is contained in:
2026-02-18 03:28:38 -08:00
parent 4ea7165148
commit 3b8e54c511
9 changed files with 353 additions and 84 deletions

104
scripts/monitor-agent.py Normal file
View File

@@ -0,0 +1,104 @@
#!/usr/bin/env python3
"""Monitor agent - multiplexed serial monitor for AI interaction"""
import os
import sys
import json
import time
import threading
import serial
import select
BOARD = sys.argv[1] if len(sys.argv) > 1 else "esp32-32e-4"
# Load board config
exec(open(f"./boards/{BOARD}/board-config.sh").read().replace("export ", ""))
SERIAL_PORT = os.environ.get("PORT", PORT)
LOGFILE = f"/tmp/doorbell-{BOARD}.jsonl"
STATEFILE = f"/tmp/doorbell-{BOARD}-state.json"
CMDFIFO = f"/tmp/doorbell-{BOARD}-cmd.fifo"
# Initialize state
with open(STATEFILE, "w") as f:
json.dump({"screen": "BOOT", "deviceState": "BOOTED", "backlightOn": False}, f)
# Open serial port
ser = serial.Serial(SERIAL_PORT, 115200, timeout=1)
# State parsing
def parse_state(line):
updates = {}
if "[STATE]" in line and "→ OFF" in line:
updates["screen"] = "OFF"
elif "[STATE]" in line and "→ DASHBOARD" in line:
updates["screen"] = "DASHBOARD"
elif "[STATE]" in line and "→ ALERT" in line:
updates["screen"] = "ALERT"
elif "[STATE]" in line and "→ BOOT" in line:
updates["screen"] = "BOOT"
elif "[ADMIN]" in line and "dashboard" in line:
updates["screen"] = "DASHBOARD"
elif "[ADMIN]" in line and "off" in line:
updates["screen"] = "OFF"
return updates
print(f"Monitor agent started (BOARD={BOARD}):")
print(f" Log: {LOGFILE}")
print(f" State: {STATEFILE}")
print(f" Cmd: echo 'dashboard' > {CMDFIFO}")
print()
print("Press Ctrl+C to exit")
# Create FIFO for commands if not exists
if not os.path.exists(CMDFIFO):
os.mkfifo(CMDFIFO)
# Command FIFO reader thread
cmd_running = True
def cmd_reader():
global cmd_running
while cmd_running:
try:
with open(CMDFIFO, "r") as fifo:
# Use select to avoid blocking forever
ready, _, _ = select.select([fifo], [], [], 1)
if ready:
cmd = fifo.read().strip()
if cmd:
ser.write((cmd + "\r").encode())
print(f"[SENT] {cmd}")
except Exception as e:
if cmd_running:
print(f"Cmd reader error: {e}")
time.sleep(0.1)
cmd_thread = threading.Thread(target=cmd_reader, daemon=True)
cmd_thread.start()
# Main loop: read serial and log
try:
while True:
if ser.in_waiting:
line = ser.readline().decode("utf-8", errors="replace").strip()
if line:
ts = time.time()
# Write JSON log
with open(LOGFILE, "a") as f:
json.dump({"ts": f"{ts:.3f}", "line": line}, f)
f.write("\n")
# Update state
updates = parse_state(line)
if updates:
with open(STATEFILE, "w") as f:
json.dump(updates, f)
time.sleep(0.01)
except KeyboardInterrupt:
pass
finally:
cmd_running = False
ser.close()
print("\nMonitor stopped")