Files
klubhaus-doorbell/libraries/FastLED/ci/dockerfiles/qemu_esp32_docker.py
2026-02-12 00:45:31 -08:00

503 lines
18 KiB
Python

#!/usr/bin/env python3
"""
Docker-based QEMU ESP32 Runner
Runs ESP32 firmware in QEMU using Docker containers for better isolation
and consistency across different environments.
"""
import argparse
import os
import platform
import re
import shutil
import subprocess
import sys
import tempfile
import time
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
from ci.dockerfiles.DockerManager import DockerManager
from ci.util.running_process import RunningProcess
def get_docker_env() -> Dict[str, str]:
"""Get environment for Docker commands, handling Git Bash/MSYS2 path conversion."""
env = os.environ.copy()
# Set UTF-8 encoding environment variables for Windows
env["PYTHONIOENCODING"] = "utf-8"
env["PYTHONUTF8"] = "1"
# Only set MSYS_NO_PATHCONV if we're in a Git Bash/MSYS2 environment
if (
"MSYSTEM" in os.environ
or os.environ.get("TERM") == "xterm"
or "bash.exe" in os.environ.get("SHELL", "")
):
env["MSYS_NO_PATHCONV"] = "1"
return env
def run_subprocess_safe(
cmd: List[str], **kwargs: Any
) -> subprocess.CompletedProcess[Any]:
"""Run subprocess with safe UTF-8 handling and error replacement."""
kwargs.setdefault("encoding", "utf-8")
kwargs.setdefault("errors", "replace")
kwargs.setdefault("env", get_docker_env())
return subprocess.run(cmd, **kwargs) # type: ignore[misc]
def run_docker_command_streaming(cmd: List[str]) -> int:
"""Run docker command with streaming output using RunningProcess."""
print(f"Executing: {' '.join(cmd)}")
proc = RunningProcess(cmd, env=get_docker_env(), auto_run=True)
# Stream all output to stdout
with proc.line_iter(timeout=None) as it:
for line in it:
print(line)
returncode = proc.wait()
if returncode != 0:
raise subprocess.CalledProcessError(returncode, cmd)
return returncode
def run_docker_command_no_fail(cmd: List[str]) -> int:
"""Run docker command with streaming output, but don't raise exceptions on failure."""
print(f"Executing: {' '.join(cmd)}")
proc = RunningProcess(cmd, env=get_docker_env(), auto_run=True)
# Stream all output to stdout
with proc.line_iter(timeout=None) as it:
for line in it:
print(line)
return proc.wait()
class DockerQEMURunner:
"""Runner for ESP32 QEMU emulation using Docker containers."""
DEFAULT_IMAGE = "mluis/qemu-esp32:latest"
ALTERNATIVE_IMAGE = "espressif/idf:latest"
FALLBACK_IMAGE = "espressif/idf:release-v5.2"
def __init__(self, docker_image: Optional[str] = None):
"""Initialize Docker QEMU runner.
Args:
docker_image: Docker image to use, defaults to espressif/qemu
"""
self.docker_manager = DockerManager()
self.docker_image = docker_image or self.DEFAULT_IMAGE
self.container_name = None
# Use Linux-style paths for all containers since we're using Ubuntu/Alpine
self.firmware_mount_path = "/workspace/firmware"
def check_docker_available(self) -> bool:
"""Check if Docker is available and running."""
try:
proc = RunningProcess(
["docker", "version"], env=get_docker_env(), auto_run=True
)
returncode = proc.wait(timeout=5)
return returncode == 0
except (Exception, FileNotFoundError):
return False
def pull_image(self):
"""Pull the Docker image if not already available."""
print(f"Ensuring Docker image {self.docker_image} is available...")
try:
# Check if image exists locally
proc = RunningProcess(
["docker", "images", "-q", self.docker_image],
env=get_docker_env(),
auto_run=True,
)
stdout_lines: List[str] = []
with proc.line_iter(timeout=None) as it:
for line in it:
stdout_lines.append(line)
result_stdout = "\n".join(stdout_lines)
if not result_stdout.strip():
# Image doesn't exist, pull it directly using docker command
print(f"Pulling Docker image: {self.docker_image}")
run_docker_command_streaming(["docker", "pull", self.docker_image])
print(f"Successfully pulled {self.docker_image}")
else:
print(f"Image {self.docker_image} already available locally")
except subprocess.CalledProcessError as e:
print(f"Warning: Failed to pull image {self.docker_image}: {e}")
if self.docker_image == self.DEFAULT_IMAGE:
print(f"Trying alternative image: {self.ALTERNATIVE_IMAGE}")
self.docker_image = self.ALTERNATIVE_IMAGE
try:
print(f"Pulling alternative Docker image: {self.docker_image}")
run_docker_command_streaming(["docker", "pull", self.docker_image])
print(f"Successfully pulled {self.docker_image}")
except subprocess.CalledProcessError as e2:
print(f"Failed to pull alternative image: {e2}")
print(f"Trying fallback image: {self.FALLBACK_IMAGE}")
self.docker_image = self.FALLBACK_IMAGE
try:
print(f"Pulling fallback Docker image: {self.docker_image}")
run_docker_command_streaming(
["docker", "pull", self.docker_image]
)
print(f"Successfully pulled {self.docker_image}")
except subprocess.CalledProcessError as e3:
print(f"Failed to pull fallback image: {e3}")
raise e3
else:
raise e
def prepare_firmware(self, firmware_path: Path) -> Path:
"""Prepare firmware files for mounting into Docker container.
Args:
firmware_path: Path to firmware.bin or build directory
Returns:
Path to the prepared firmware directory
"""
# Create temporary directory for firmware files
temp_dir = Path(tempfile.mkdtemp(prefix="qemu_firmware_"))
try:
if firmware_path.is_file() and firmware_path.suffix == ".bin":
# Copy single firmware file
shutil.copy2(firmware_path, temp_dir / "firmware.bin")
# Create proper 4MB flash image for QEMU
self._create_flash_image(firmware_path, temp_dir / "flash.bin")
else:
# Copy entire build directory
if firmware_path.is_dir():
# Copy relevant files from build directory
patterns = ["*.bin", "*.elf", "partitions.csv", "flash_args"]
for pattern in patterns:
for file in firmware_path.glob(pattern):
shutil.copy2(file, temp_dir)
# Also check for PlatformIO build structure
pio_build = firmware_path / ".pio" / "build" / "esp32dev"
if pio_build.exists():
for pattern in patterns:
for file in pio_build.glob(pattern):
shutil.copy2(file, temp_dir)
# Create proper flash.bin file from firmware.bin
firmware_bin = temp_dir / "firmware.bin"
if firmware_bin.exists():
# Create proper 4MB flash image for QEMU
self._create_flash_image(firmware_bin, temp_dir / "flash.bin")
else:
# Create a minimal 4MB flash.bin for testing
flash_bin = temp_dir / "flash.bin"
flash_bin.write_bytes(
b"\xff" * (4 * 1024 * 1024)
) # 4MB of 0xFF
else:
raise ValueError(f"Invalid firmware path: {firmware_path}")
return temp_dir
except Exception as e:
# Clean up on error
shutil.rmtree(temp_dir, ignore_errors=True)
raise e
def _create_flash_image(
self, firmware_path: Path, output_path: Path, flash_size_mb: int = 4
):
"""Create a proper flash image for QEMU ESP32.
Args:
firmware_path: Path to the firmware.bin file
output_path: Path where to write the flash.bin
flash_size_mb: Flash size in MB (must be 2, 4, 8, or 16)
"""
if flash_size_mb not in [2, 4, 8, 16]:
raise ValueError(
f"Flash size must be 2, 4, 8, or 16 MB, got {flash_size_mb}"
)
flash_size = flash_size_mb * 1024 * 1024
# Read firmware content
firmware_data = firmware_path.read_bytes()
# Create flash image: firmware at beginning, rest filled with 0xFF
flash_data = firmware_data + b"\xff" * (flash_size - len(firmware_data))
# Ensure we have exactly the right size
if len(flash_data) > flash_size:
raise ValueError(
f"Firmware size ({len(firmware_data)} bytes) exceeds flash size ({flash_size} bytes)"
)
flash_data = flash_data[:flash_size] # Truncate to exact size
output_path.write_bytes(flash_data)
def build_qemu_command(
self,
firmware_name: str = "firmware.bin",
flash_size: int = 4,
machine: str = "esp32",
) -> List[str]:
"""Build QEMU command to run inside Docker container.
Args:
firmware_name: Name of firmware file in mounted directory
flash_size: Flash size in MB
machine: QEMU machine type (esp32, esp32c3, esp32s3, etc.)
Returns:
List of command arguments for QEMU
"""
# For Linux containers, use a workaround to run QEMU through Docker Desktop's Windows integration
firmware_path = f"{self.firmware_mount_path}/flash.bin"
# Determine QEMU system and machine based on target
if machine == "esp32c3":
qemu_system = "/usr/local/bin/qemu-system-riscv32"
qemu_machine = "esp32c3"
echo_target = "ESP32C3"
elif machine == "esp32s3":
qemu_system = "/usr/local/bin/qemu-system-xtensa"
qemu_machine = "esp32s3"
echo_target = "ESP32S3"
else:
# Default to ESP32 (Xtensa)
qemu_system = "/usr/local/bin/qemu-system-xtensa"
qemu_machine = "esp32"
echo_target = "ESP32"
# Use container's QEMU with proper configuration
wrapper_script = f'''#!/bin/bash
set -e
echo "Starting {echo_target} QEMU emulation..."
echo "Firmware: {firmware_path}"
echo "Machine: {qemu_machine}"
echo "QEMU system: {qemu_system}"
echo "Container: $(cat /etc/os-release | head -1)"
# Check if firmware file exists
if [ ! -f "{firmware_path}" ]; then
echo "ERROR: Firmware file not found: {firmware_path}"
exit 1
fi
# Check firmware size
FIRMWARE_SIZE=$(stat -c%s "{firmware_path}")
echo "Firmware size: $FIRMWARE_SIZE bytes"
# Copy firmware to writable location since QEMU needs write access
cp "{firmware_path}" /tmp/flash.bin
echo "Copied firmware to writable location: /tmp/flash.bin"
# Try different QEMU configurations depending on machine type
if [ "{qemu_machine}" = "esp32c3" ]; then
# ESP32C3 uses RISC-V architecture
echo "Running {qemu_system} for {qemu_machine}"
{qemu_system} \\
-nographic \\
-machine {qemu_machine} \\
-drive file="/tmp/flash.bin",if=mtd,format=raw \\
-monitor none \\
-serial mon:stdio
else
# ESP32 uses Xtensa architecture
echo "Running {qemu_system} for {qemu_machine}"
{qemu_system} \\
-nographic \\
-machine {qemu_machine} \\
-drive file="/tmp/flash.bin",if=mtd,format=raw \\
-global driver=timer.esp32.timg,property=wdt_disable,value=true \\
-monitor none \\
-serial mon:stdio
fi
echo "QEMU execution completed"
exit 0
'''
cmd = ["bash", "-c", wrapper_script]
return cmd
def run(
self,
firmware_path: Path,
timeout: int = 30,
flash_size: int = 4,
interrupt_regex: Optional[str] = None,
interactive: bool = False,
output_file: Optional[str] = None,
machine: str = "esp32",
) -> int:
"""Run ESP32/ESP32C3/ESP32S3 firmware in QEMU using Docker.
Args:
firmware_path: Path to firmware.bin or build directory
timeout: Timeout in seconds (timeout is treated as success)
flash_size: Flash size in MB
interrupt_regex: Regex pattern to detect in output (informational only)
interactive: Run in interactive mode (attach to container)
output_file: Optional file path to write QEMU output to
machine: QEMU machine type (esp32, esp32c3, esp32s3, etc.)
Returns:
Exit code: actual QEMU/container exit code, except timeout returns 0
"""
if not self.check_docker_available():
print("ERROR: Docker is not available or not running", file=sys.stderr)
print("Please install Docker and ensure it's running", file=sys.stderr)
return 1
# Pull image if needed
self.pull_image()
# Prepare firmware files
print(f"Preparing firmware from: {firmware_path}")
temp_firmware_dir = None
try:
temp_firmware_dir = self.prepare_firmware(firmware_path)
# Generate unique container name
self.container_name = f"qemu_esp32_{int(time.time())}"
# Convert Windows paths to Docker-compatible paths
def windows_to_docker_path(path_str: Union[str, Path]) -> str:
"""Convert Windows path to Docker volume mount format."""
import os
# Check if we're in Git Bash/MSYS2 environment
is_git_bash = (
"MSYSTEM" in os.environ
or os.environ.get("TERM") == "xterm"
or "bash.exe" in os.environ.get("SHELL", "")
)
if os.name == "nt" and is_git_bash:
# Convert C:\path\to\dir to /c/path/to/dir for Git Bash
path_str = str(path_str).replace("\\", "/")
if len(path_str) > 2 and path_str[1:3] == ":/": # Drive letter
path_str = "/" + path_str[0].lower() + path_str[2:]
else:
# For cmd.exe on Windows, keep native Windows paths
path_str = str(path_str)
return path_str
docker_firmware_path = windows_to_docker_path(temp_firmware_dir)
# Only mount the firmware directory - QEMU is built into the container
volumes = {
docker_firmware_path: {
"bind": self.firmware_mount_path,
"mode": "ro", # Read-only mount
},
}
# Build QEMU command
qemu_cmd = self.build_qemu_command(
firmware_name="firmware.bin", flash_size=flash_size, machine=machine
)
print(f"Running QEMU in Docker container: {self.container_name}")
print(f"QEMU command: {' '.join(qemu_cmd)}")
if interactive:
# Run interactively with streaming
return self.docker_manager.run_container_streaming(
image_name=self.docker_image,
command=qemu_cmd,
volumes=volumes,
name=self.container_name,
timeout=timeout,
interrupt_pattern=interrupt_regex,
output_file=output_file,
)
else:
# Run with streaming output and return actual exit code
return self.docker_manager.run_container_streaming(
image_name=self.docker_image,
command=qemu_cmd,
volumes=volumes,
name=self.container_name,
timeout=timeout,
interrupt_pattern=interrupt_regex,
output_file=output_file,
)
except Exception as e:
print(f"ERROR: {e}", file=sys.stderr)
return 1
finally:
# Cleanup
if temp_firmware_dir and temp_firmware_dir.exists():
shutil.rmtree(temp_firmware_dir, ignore_errors=True)
# Ensure container is stopped and removed
if self.container_name:
try:
# Stop and remove container
run_docker_command_no_fail(["docker", "stop", self.container_name])
run_docker_command_no_fail(["docker", "rm", self.container_name])
except:
pass
def main():
parser = argparse.ArgumentParser(
description="Run ESP32 firmware in QEMU using Docker"
)
parser.add_argument(
"firmware_path", type=Path, help="Path to firmware.bin file or build directory"
)
parser.add_argument(
"--docker-image",
type=str,
help=f"Docker image to use (default: {DockerQEMURunner.DEFAULT_IMAGE})",
)
parser.add_argument(
"--flash-size", type=int, default=4, help="Flash size in MB (default: 4)"
)
parser.add_argument(
"--timeout", type=int, default=30, help="Timeout in seconds (default: 30)"
)
parser.add_argument(
"--interrupt-regex", type=str, help="Regex pattern to interrupt execution"
)
parser.add_argument(
"--interactive", action="store_true", help="Run in interactive mode"
)
parser.add_argument("--output-file", type=str, help="File to write QEMU output to")
args = parser.parse_args()
if not args.firmware_path.exists():
print(
f"ERROR: Firmware path does not exist: {args.firmware_path}",
file=sys.stderr,
)
return 1
runner = DockerQEMURunner(docker_image=args.docker_image)
return runner.run(
firmware_path=args.firmware_path,
timeout=args.timeout,
flash_size=args.flash_size,
interrupt_regex=args.interrupt_regex,
interactive=args.interactive,
output_file=args.output_file,
)
if __name__ == "__main__":
sys.exit(main())