#!/usr/bin/env python3 """ ESP32 QEMU Runner Runs ESP32 firmware in QEMU with logging and validation. Designed to replace the tobozo/esp32-qemu-sim GitHub Action. """ import argparse import platform import re import subprocess import sys import threading import time from pathlib import Path from typing import List, Optional, TextIO from ci.util.running_process import RunningProcess def get_binary_name() -> str: """Get platform-specific QEMU binary name.""" return ( "qemu-system-xtensa.exe" if platform.system().lower() == "windows" else "qemu-system-xtensa" ) def find_executable(binary_name: str) -> Optional[Path]: """Find executable in PATH using platform-appropriate command.""" try: cmd = [ "where" if platform.system().lower() == "windows" else "which", binary_name, ] result = subprocess.run(cmd, capture_output=True, text=True) if result.returncode == 0: # Take first line in case of multiple results (Windows 'where') return Path(result.stdout.strip().split("\n")[0]) except (subprocess.SubprocessError, FileNotFoundError): pass return None def find_qemu_binary() -> Optional[Path]: """Find QEMU ESP32 binary in common locations.""" binary_name = get_binary_name() # ESP-IDF installation paths and portable installation esp_paths = [ Path(".cache") / "qemu", # Project-local portable installation directory Path.home() / ".espressif" / "tools" / "qemu-xtensa", Path.home() / ".espressif" / "python_env", ] for base_path in esp_paths: if base_path.exists(): for qemu_path in base_path.rglob(binary_name): if qemu_path.is_file(): return qemu_path # Check system PATH return find_executable(binary_name) def show_installation_help(): """Display platform-specific installation instructions.""" binary_name = get_binary_name() print(f"ERROR: QEMU ESP32 emulator ({binary_name}) not found!", file=sys.stderr) print() if platform.system().lower() == "windows": print("To install QEMU on Windows:") print("1. Chocolatey: choco install qemu -y") print("2. winget: winget install SoftwareFreedomConservancy.QEMU --scope user") print("3. Manual: Download from https://qemu.weilnetz.de/w64/") print("4. ESP-IDF: uv run ci/install-qemu.py") else: print("Run: uv run ci/install-qemu.py") class QEMURunner: def __init__(self, qemu_binary: Optional[Path] = None): self.qemu_binary = qemu_binary or self._get_qemu_binary() self.output_lines: List[str] = [] self.interrupt_met = False def _get_qemu_binary(self) -> Path: """Get QEMU binary path or raise error.""" qemu_path = find_qemu_binary() if not qemu_path: raise FileNotFoundError( f"Could not find {get_binary_name()}. Please run install-qemu.py first." ) return qemu_path def build_qemu_command(self, firmware_path: Path, flash_size: int = 4) -> List[str]: """Build QEMU command line arguments. Args: firmware_path: Path to firmware.bin or directory containing build artifacts flash_size: Flash size in MB """ # Validate that flash_size is used correctly internally only if not isinstance(flash_size, int) or flash_size <= 0: raise ValueError( f"Invalid flash_size: {flash_size}. Must be a positive integer." ) # Determine if we have a direct firmware.bin or a build directory if firmware_path.is_file() and firmware_path.suffix == ".bin": # Direct firmware.bin path flash_image_path = firmware_path.parent / "flash.bin" print(f"Creating flash image for firmware: {firmware_path}") self._create_flash_image_from_firmware( firmware_path, flash_image_path, flash_size ) else: # Build directory path flash_image_path = firmware_path / "flash.bin" print( f"Creating combined flash image from build directory: {firmware_path}" ) self._create_flash_image_from_build_dir( firmware_path, flash_image_path, flash_size ) # Check for ROM file in multiple locations rom_paths = [ Path(".cache") / "qemu" / "esp32-v3-rom.bin", Path(".cache") / "qemu" / "qemu" / "share" / "qemu" / "esp32-v3-rom.bin", Path(".cache") / "qemu" / "extracted" / "share" / "esp32-v3-rom.bin", ] rom_path = None for path in rom_paths: if path.exists(): rom_path = path break # Ensure flash image path is absolute and exists flash_image_path = flash_image_path.resolve() if not flash_image_path.exists(): raise FileNotFoundError(f"Flash image not found: {flash_image_path}") cmd = [ str(self.qemu_binary), "-nographic", "-machine", "esp32", "-drive", f"file={str(flash_image_path)},if=mtd,format=raw", "-global", "driver=timer.esp32.timg,property=wdt_disable,value=true", ] # Set QEMU data directory if ROM files are available if rom_path and rom_path.exists(): qemu_data_dir = rom_path.parent cmd.extend(["-L", str(qemu_data_dir)]) print(f"Setting QEMU data directory: {qemu_data_dir}") # Note: ESP32 ROM is built into QEMU machine, no need to load external ROM # ESP32 QEMU machine has the ROM integrated, so we don't need to specify it if rom_path and rom_path.exists(): print( f"ROM file available at: {rom_path} (but not needed for ESP32 machine)" ) else: print("Note: ROM file not found, but ESP32 machine has integrated ROM") # Validate that no custom script arguments are accidentally included custom_args = ["--flash-size", "--timeout", "--interrupt-regex"] for arg in cmd: if any(custom_arg in str(arg) for custom_arg in custom_args): raise ValueError( f"Custom script argument {arg} should not be passed to QEMU" ) return cmd def _create_flash_image_from_firmware( self, firmware_bin: Path, flash_bin: Path, flash_size: int ): """Create flash image from a direct firmware.bin file.""" if flash_bin.exists(): print(f"Using existing flash image: {flash_bin}") return print(f"Creating flash image from firmware: {firmware_bin}") # Create empty flash image flash_size_bytes = flash_size * 1024 * 1024 flash_bin.write_bytes(b"\xff" * flash_size_bytes) try: esptool_cmd = self._find_esptool() if esptool_cmd: # Use esptool to create proper flash image self._merge_with_esptool_firmware(esptool_cmd, firmware_bin, flash_bin) else: # Simple manual merge - just write firmware at 0x10000 self._merge_manually_firmware(firmware_bin, flash_bin) except Exception as e: print(f"Warning: Could not create proper flash image: {e}") def _create_flash_image_from_build_dir( self, build_folder: Path, flash_bin: Path, flash_size: int ): """Create combined flash image from ESP32 build artifacts in a directory.""" if flash_bin.exists(): print(f"Using existing flash image: {flash_bin}") return print("Creating combined flash image from build directory...") # Create empty flash image flash_size_bytes = flash_size * 1024 * 1024 flash_bin.write_bytes(b"\xff" * flash_size_bytes) try: esptool_cmd = self._find_esptool() if esptool_cmd: self._merge_with_esptool(esptool_cmd, build_folder, flash_bin) else: self._merge_manually(build_folder, flash_bin) except Exception as e: print(f"Warning: Could not create proper flash image: {e}") def _find_esptool(self) -> Optional[str]: """Find esptool.py executable.""" # Try direct esptool.py if find_executable("esptool.py"): return "esptool.py" # Try python -m esptool try: result = subprocess.run( [sys.executable, "-m", "esptool", "--help"], capture_output=True, timeout=5, ) if result.returncode == 0: return f"{sys.executable} -m esptool" except (subprocess.SubprocessError, subprocess.TimeoutExpired): pass return None def _merge_with_esptool( self, esptool_cmd: str, build_folder: Path, flash_bin: Path ): """Merge using esptool.py merge_bin command.""" cmd = esptool_cmd.split() + [ "--chip", "esp32", # Use esp32 chip type for esp32dev machine "merge-bin", # Updated command name "-o", str(flash_bin), "--flash-mode", # Updated option name "dio", "--flash-freq", # Updated option name "40m", # Standard frequency for esp32 "--flash-size", # Updated option name "4MB", ] # Add binaries at their offsets - check if we're in the actual build directory if (build_folder / "firmware.bin").exists(): firmware_path = build_folder / "firmware.bin" else: firmware_path = ( build_folder / ".pio" / "build" / "esp32dev" / "firmware.bin" ) if not firmware_path.exists(): print( f"FATAL: ESP32dev firmware not found at {firmware_path}", file=sys.stderr, ) print("Please build for esp32dev target first.", file=sys.stderr) sys.exit(1) binaries = [ ("0x1000", build_folder / "bootloader.bin"), # ESP32 bootloader offset ("0x8000", build_folder / "partitions.bin"), ("0x10000", firmware_path), ] for offset, bin_path in binaries: if bin_path.exists(): cmd.extend([offset, str(bin_path)]) # Add padding to 4MB at the end cmd.extend(["--pad-to-size", "4MB"]) print(f"Running esptool command: {' '.join(cmd)}") esptool_proc = RunningProcess(cmd, timeout=60, auto_run=True) # Stream esptool output and check for errors has_errors = False with esptool_proc.line_iter(timeout=None) as it: for line in it: print(f"[esptool] {line}") # Check for common esptool error patterns if any( error_pattern in line.lower() for error_pattern in ["error", "failed", "exception"] ): has_errors = True returncode = esptool_proc.wait() if returncode != 0: raise subprocess.CalledProcessError(returncode, cmd) if has_errors: print("Warning: Errors detected in esptool output") print("Flash image created with esptool") def _merge_with_esptool_firmware( self, esptool_cmd: str, firmware_bin: Path, flash_bin: Path ): """Merge firmware.bin using esptool with minimal bootloader.""" cmd = esptool_cmd.split() + [ "--chip", "esp32", "merge-bin", # Updated command name "-o", str(flash_bin), "--flash-mode", # Updated option name "dio", "--flash-freq", # Updated option name "40m", "--flash-size", # Updated option name "4MB", "0x10000", # Application offset str(firmware_bin), ] # Note: Without bootloader and partition table, QEMU may not boot properly # but we'll try with just the app for now print(f"Running esptool firmware command: {' '.join(cmd)}") esptool_proc = RunningProcess(cmd, timeout=60, auto_run=True) # Stream esptool output and check for errors has_errors = False with esptool_proc.line_iter(timeout=None) as it: for line in it: print(f"[esptool] {line}") # Check for common esptool error patterns if any( error_pattern in line.lower() for error_pattern in ["error", "failed", "exception"] ): has_errors = True returncode = esptool_proc.wait() if returncode != 0: raise subprocess.CalledProcessError(returncode, cmd) if has_errors: print("Warning: Errors detected in esptool output") print("Flash image created with esptool (firmware only)") def _merge_manually_firmware(self, firmware_bin: Path, flash_bin: Path): """Manually write firmware.bin to flash image.""" print("WARNING: Manually merging firmware (esptool not found)") print("Note: Without bootloader, QEMU may not boot properly") with open(flash_bin, "r+b") as flash: # Write firmware at application offset flash.seek(0x10000) flash.write(firmware_bin.read_bytes()) print(f"Firmware written to flash image at offset 0x10000") def _merge_manually(self, build_folder: Path, flash_bin: Path): """Manually merge binary files at correct offsets.""" print("WARNING: Manually merging binaries (esptool not found)") # Find firmware binary - check if we're in the actual build directory if (build_folder / "firmware.bin").exists(): firmware_path = build_folder / "firmware.bin" else: firmware_path = ( build_folder / ".pio" / "build" / "esp32dev" / "firmware.bin" ) if not firmware_path.exists(): print( f"FATAL: ESP32dev firmware not found at {firmware_path}", file=sys.stderr, ) print("Please build for esp32dev target first.", file=sys.stderr) sys.exit(1) binaries = [ ("bootloader.bin", 0x1000), # ESP32 bootloader offset ("partitions.bin", 0x8000), (firmware_path, 0x10000), ] with open(flash_bin, "r+b") as flash: for filename, offset in binaries: bin_path = build_folder / filename if bin_path.exists(): flash.seek(offset) flash.write(bin_path.read_bytes()) def run( self, firmware_path: Path, timeout: int = 30, interrupt_regex: Optional[str] = None, flash_size: int = 4, ) -> int: """Run ESP32 firmware in QEMU. Args: firmware_path: Path to firmware.bin file or build directory timeout: Timeout in seconds interrupt_regex: Regex pattern to interrupt execution flash_size: Flash size in MB """ print("=== Running ESP32 firmware in QEMU ===") print(f"Firmware path: {firmware_path}") print(f"Timeout: {timeout}s") print(f"Flash size: {flash_size}MB") if interrupt_regex: print(f"Interrupt pattern: {interrupt_regex}") if not firmware_path.exists(): print(f"ERROR: Path does not exist: {firmware_path}", file=sys.stderr) return 1 try: cmd = self.build_qemu_command(firmware_path, flash_size) print(f"QEMU command: {' '.join(cmd)}") except Exception as e: print(f"ERROR: Failed to build QEMU command: {e}", file=sys.stderr) return 1 try: qemu_proc = RunningProcess( cmd, timeout=timeout, auto_run=True, ) self.interrupt_met = False interrupt_pattern = re.compile(interrupt_regex) if interrupt_regex else None self.output_lines = [] try: with qemu_proc.line_iter(timeout=timeout) as it: for line in it: print(line, flush=True) self.output_lines.append(line) if interrupt_pattern and interrupt_pattern.search(line): print(f"SUCCESS: Interrupt condition met: {line}") self.interrupt_met = True time.sleep(2) except TimeoutError: print( f"ERROR: QEMU timed out after {timeout} seconds of no output.", file=sys.stderr, ) qemu_proc.kill() return 1 return_code = qemu_proc.wait() log_file = Path("qemu_output.log") log_file.write_text("\n".join(self.output_lines)) print(f"QEMU output saved to: {log_file}") print(f"Total output lines: {len(self.output_lines)}") if self.interrupt_met: return 0 return return_code except Exception as e: print(f"ERROR: Error running QEMU: {e}", file=sys.stderr) return 1 def main(): parser = argparse.ArgumentParser(description="Run ESP32 firmware in QEMU") parser.add_argument( "firmware_path", type=Path, help="Path to firmware.bin file or build directory containing firmware files", ) parser.add_argument( "--flash-size", type=int, default=4, help="Flash size in MB (default: 4)" ) parser.add_argument( "--timeout", type=int, default=30, help="Timeout in seconds (default: 30)" ) parser.add_argument( "--interrupt-regex", type=str, help="Regex pattern to interrupt execution" ) args = parser.parse_args() # Validate arguments to prevent issues if args.flash_size <= 0: print( f"ERROR: Invalid flash size: {args.flash_size}. Must be positive.", file=sys.stderr, ) return 1 if args.timeout <= 0: print( f"ERROR: Invalid timeout: {args.timeout}. Must be positive.", file=sys.stderr, ) return 1 # Ensure we're not accidentally being called as a wrapper for qemu-system-xtensa if len(sys.argv) > 1 and any("qemu-system-xtensa" in arg for arg in sys.argv): print( "ERROR: This script should not be used as a wrapper for qemu-system-xtensa", file=sys.stderr, ) print( "Use this script directly: python qemu-esp32.py [options]", file=sys.stderr, ) return 1 # Check if QEMU is available if not find_qemu_binary(): show_installation_help() return 1 runner = QEMURunner() return runner.run( firmware_path=args.firmware_path, timeout=args.timeout, interrupt_regex=args.interrupt_regex, flash_size=args.flash_size, ) if __name__ == "__main__": sys.exit(main())