503 lines
18 KiB
Python
503 lines
18 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Docker-based QEMU ESP32 Runner
|
|
|
|
Runs ESP32 firmware in QEMU using Docker containers for better isolation
|
|
and consistency across different environments.
|
|
"""
|
|
|
|
import argparse
|
|
import os
|
|
import platform
|
|
import re
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional, Union
|
|
|
|
from ci.dockerfiles.DockerManager import DockerManager
|
|
from ci.util.running_process import RunningProcess
|
|
|
|
|
|
def get_docker_env() -> Dict[str, str]:
|
|
"""Get environment for Docker commands, handling Git Bash/MSYS2 path conversion."""
|
|
env = os.environ.copy()
|
|
# Set UTF-8 encoding environment variables for Windows
|
|
env["PYTHONIOENCODING"] = "utf-8"
|
|
env["PYTHONUTF8"] = "1"
|
|
# Only set MSYS_NO_PATHCONV if we're in a Git Bash/MSYS2 environment
|
|
if (
|
|
"MSYSTEM" in os.environ
|
|
or os.environ.get("TERM") == "xterm"
|
|
or "bash.exe" in os.environ.get("SHELL", "")
|
|
):
|
|
env["MSYS_NO_PATHCONV"] = "1"
|
|
return env
|
|
|
|
|
|
def run_subprocess_safe(
|
|
cmd: List[str], **kwargs: Any
|
|
) -> subprocess.CompletedProcess[Any]:
|
|
"""Run subprocess with safe UTF-8 handling and error replacement."""
|
|
kwargs.setdefault("encoding", "utf-8")
|
|
kwargs.setdefault("errors", "replace")
|
|
kwargs.setdefault("env", get_docker_env())
|
|
return subprocess.run(cmd, **kwargs) # type: ignore[misc]
|
|
|
|
|
|
def run_docker_command_streaming(cmd: List[str]) -> int:
|
|
"""Run docker command with streaming output using RunningProcess."""
|
|
print(f"Executing: {' '.join(cmd)}")
|
|
proc = RunningProcess(cmd, env=get_docker_env(), auto_run=True)
|
|
|
|
# Stream all output to stdout
|
|
with proc.line_iter(timeout=None) as it:
|
|
for line in it:
|
|
print(line)
|
|
|
|
returncode = proc.wait()
|
|
if returncode != 0:
|
|
raise subprocess.CalledProcessError(returncode, cmd)
|
|
return returncode
|
|
|
|
|
|
def run_docker_command_no_fail(cmd: List[str]) -> int:
|
|
"""Run docker command with streaming output, but don't raise exceptions on failure."""
|
|
print(f"Executing: {' '.join(cmd)}")
|
|
proc = RunningProcess(cmd, env=get_docker_env(), auto_run=True)
|
|
|
|
# Stream all output to stdout
|
|
with proc.line_iter(timeout=None) as it:
|
|
for line in it:
|
|
print(line)
|
|
|
|
return proc.wait()
|
|
|
|
|
|
class DockerQEMURunner:
|
|
"""Runner for ESP32 QEMU emulation using Docker containers."""
|
|
|
|
DEFAULT_IMAGE = "mluis/qemu-esp32:latest"
|
|
ALTERNATIVE_IMAGE = "espressif/idf:latest"
|
|
FALLBACK_IMAGE = "espressif/idf:release-v5.2"
|
|
|
|
def __init__(self, docker_image: Optional[str] = None):
|
|
"""Initialize Docker QEMU runner.
|
|
|
|
Args:
|
|
docker_image: Docker image to use, defaults to espressif/qemu
|
|
"""
|
|
self.docker_manager = DockerManager()
|
|
self.docker_image = docker_image or self.DEFAULT_IMAGE
|
|
self.container_name = None
|
|
# Use Linux-style paths for all containers since we're using Ubuntu/Alpine
|
|
self.firmware_mount_path = "/workspace/firmware"
|
|
|
|
def check_docker_available(self) -> bool:
|
|
"""Check if Docker is available and running."""
|
|
try:
|
|
proc = RunningProcess(
|
|
["docker", "version"], env=get_docker_env(), auto_run=True
|
|
)
|
|
returncode = proc.wait(timeout=5)
|
|
return returncode == 0
|
|
except (Exception, FileNotFoundError):
|
|
return False
|
|
|
|
def pull_image(self):
|
|
"""Pull the Docker image if not already available."""
|
|
print(f"Ensuring Docker image {self.docker_image} is available...")
|
|
try:
|
|
# Check if image exists locally
|
|
proc = RunningProcess(
|
|
["docker", "images", "-q", self.docker_image],
|
|
env=get_docker_env(),
|
|
auto_run=True,
|
|
)
|
|
stdout_lines: List[str] = []
|
|
with proc.line_iter(timeout=None) as it:
|
|
for line in it:
|
|
stdout_lines.append(line)
|
|
result_stdout = "\n".join(stdout_lines)
|
|
if not result_stdout.strip():
|
|
# Image doesn't exist, pull it directly using docker command
|
|
print(f"Pulling Docker image: {self.docker_image}")
|
|
run_docker_command_streaming(["docker", "pull", self.docker_image])
|
|
print(f"Successfully pulled {self.docker_image}")
|
|
else:
|
|
print(f"Image {self.docker_image} already available locally")
|
|
except subprocess.CalledProcessError as e:
|
|
print(f"Warning: Failed to pull image {self.docker_image}: {e}")
|
|
if self.docker_image == self.DEFAULT_IMAGE:
|
|
print(f"Trying alternative image: {self.ALTERNATIVE_IMAGE}")
|
|
self.docker_image = self.ALTERNATIVE_IMAGE
|
|
try:
|
|
print(f"Pulling alternative Docker image: {self.docker_image}")
|
|
run_docker_command_streaming(["docker", "pull", self.docker_image])
|
|
print(f"Successfully pulled {self.docker_image}")
|
|
except subprocess.CalledProcessError as e2:
|
|
print(f"Failed to pull alternative image: {e2}")
|
|
print(f"Trying fallback image: {self.FALLBACK_IMAGE}")
|
|
self.docker_image = self.FALLBACK_IMAGE
|
|
try:
|
|
print(f"Pulling fallback Docker image: {self.docker_image}")
|
|
run_docker_command_streaming(
|
|
["docker", "pull", self.docker_image]
|
|
)
|
|
print(f"Successfully pulled {self.docker_image}")
|
|
except subprocess.CalledProcessError as e3:
|
|
print(f"Failed to pull fallback image: {e3}")
|
|
raise e3
|
|
else:
|
|
raise e
|
|
|
|
def prepare_firmware(self, firmware_path: Path) -> Path:
|
|
"""Prepare firmware files for mounting into Docker container.
|
|
|
|
Args:
|
|
firmware_path: Path to firmware.bin or build directory
|
|
|
|
Returns:
|
|
Path to the prepared firmware directory
|
|
"""
|
|
# Create temporary directory for firmware files
|
|
temp_dir = Path(tempfile.mkdtemp(prefix="qemu_firmware_"))
|
|
|
|
try:
|
|
if firmware_path.is_file() and firmware_path.suffix == ".bin":
|
|
# Copy single firmware file
|
|
shutil.copy2(firmware_path, temp_dir / "firmware.bin")
|
|
# Create proper 4MB flash image for QEMU
|
|
self._create_flash_image(firmware_path, temp_dir / "flash.bin")
|
|
else:
|
|
# Copy entire build directory
|
|
if firmware_path.is_dir():
|
|
# Copy relevant files from build directory
|
|
patterns = ["*.bin", "*.elf", "partitions.csv", "flash_args"]
|
|
for pattern in patterns:
|
|
for file in firmware_path.glob(pattern):
|
|
shutil.copy2(file, temp_dir)
|
|
|
|
# Also check for PlatformIO build structure
|
|
pio_build = firmware_path / ".pio" / "build" / "esp32dev"
|
|
if pio_build.exists():
|
|
for pattern in patterns:
|
|
for file in pio_build.glob(pattern):
|
|
shutil.copy2(file, temp_dir)
|
|
|
|
# Create proper flash.bin file from firmware.bin
|
|
firmware_bin = temp_dir / "firmware.bin"
|
|
if firmware_bin.exists():
|
|
# Create proper 4MB flash image for QEMU
|
|
self._create_flash_image(firmware_bin, temp_dir / "flash.bin")
|
|
else:
|
|
# Create a minimal 4MB flash.bin for testing
|
|
flash_bin = temp_dir / "flash.bin"
|
|
flash_bin.write_bytes(
|
|
b"\xff" * (4 * 1024 * 1024)
|
|
) # 4MB of 0xFF
|
|
else:
|
|
raise ValueError(f"Invalid firmware path: {firmware_path}")
|
|
|
|
return temp_dir
|
|
except Exception as e:
|
|
# Clean up on error
|
|
shutil.rmtree(temp_dir, ignore_errors=True)
|
|
raise e
|
|
|
|
def _create_flash_image(
|
|
self, firmware_path: Path, output_path: Path, flash_size_mb: int = 4
|
|
):
|
|
"""Create a proper flash image for QEMU ESP32.
|
|
|
|
Args:
|
|
firmware_path: Path to the firmware.bin file
|
|
output_path: Path where to write the flash.bin
|
|
flash_size_mb: Flash size in MB (must be 2, 4, 8, or 16)
|
|
"""
|
|
if flash_size_mb not in [2, 4, 8, 16]:
|
|
raise ValueError(
|
|
f"Flash size must be 2, 4, 8, or 16 MB, got {flash_size_mb}"
|
|
)
|
|
|
|
flash_size = flash_size_mb * 1024 * 1024
|
|
|
|
# Read firmware content
|
|
firmware_data = firmware_path.read_bytes()
|
|
|
|
# Create flash image: firmware at beginning, rest filled with 0xFF
|
|
flash_data = firmware_data + b"\xff" * (flash_size - len(firmware_data))
|
|
|
|
# Ensure we have exactly the right size
|
|
if len(flash_data) > flash_size:
|
|
raise ValueError(
|
|
f"Firmware size ({len(firmware_data)} bytes) exceeds flash size ({flash_size} bytes)"
|
|
)
|
|
|
|
flash_data = flash_data[:flash_size] # Truncate to exact size
|
|
|
|
output_path.write_bytes(flash_data)
|
|
|
|
def build_qemu_command(
|
|
self,
|
|
firmware_name: str = "firmware.bin",
|
|
flash_size: int = 4,
|
|
machine: str = "esp32",
|
|
) -> List[str]:
|
|
"""Build QEMU command to run inside Docker container.
|
|
|
|
Args:
|
|
firmware_name: Name of firmware file in mounted directory
|
|
flash_size: Flash size in MB
|
|
machine: QEMU machine type (esp32, esp32c3, esp32s3, etc.)
|
|
|
|
Returns:
|
|
List of command arguments for QEMU
|
|
"""
|
|
# For Linux containers, use a workaround to run QEMU through Docker Desktop's Windows integration
|
|
firmware_path = f"{self.firmware_mount_path}/flash.bin"
|
|
|
|
# Determine QEMU system and machine based on target
|
|
if machine == "esp32c3":
|
|
qemu_system = "/usr/local/bin/qemu-system-riscv32"
|
|
qemu_machine = "esp32c3"
|
|
echo_target = "ESP32C3"
|
|
elif machine == "esp32s3":
|
|
qemu_system = "/usr/local/bin/qemu-system-xtensa"
|
|
qemu_machine = "esp32s3"
|
|
echo_target = "ESP32S3"
|
|
else:
|
|
# Default to ESP32 (Xtensa)
|
|
qemu_system = "/usr/local/bin/qemu-system-xtensa"
|
|
qemu_machine = "esp32"
|
|
echo_target = "ESP32"
|
|
|
|
# Use container's QEMU with proper configuration
|
|
wrapper_script = f'''#!/bin/bash
|
|
set -e
|
|
echo "Starting {echo_target} QEMU emulation..."
|
|
echo "Firmware: {firmware_path}"
|
|
echo "Machine: {qemu_machine}"
|
|
echo "QEMU system: {qemu_system}"
|
|
echo "Container: $(cat /etc/os-release | head -1)"
|
|
|
|
# Check if firmware file exists
|
|
if [ ! -f "{firmware_path}" ]; then
|
|
echo "ERROR: Firmware file not found: {firmware_path}"
|
|
exit 1
|
|
fi
|
|
|
|
# Check firmware size
|
|
FIRMWARE_SIZE=$(stat -c%s "{firmware_path}")
|
|
echo "Firmware size: $FIRMWARE_SIZE bytes"
|
|
|
|
# Copy firmware to writable location since QEMU needs write access
|
|
cp "{firmware_path}" /tmp/flash.bin
|
|
echo "Copied firmware to writable location: /tmp/flash.bin"
|
|
|
|
# Try different QEMU configurations depending on machine type
|
|
if [ "{qemu_machine}" = "esp32c3" ]; then
|
|
# ESP32C3 uses RISC-V architecture
|
|
echo "Running {qemu_system} for {qemu_machine}"
|
|
{qemu_system} \\
|
|
-nographic \\
|
|
-machine {qemu_machine} \\
|
|
-drive file="/tmp/flash.bin",if=mtd,format=raw \\
|
|
-monitor none \\
|
|
-serial mon:stdio
|
|
else
|
|
# ESP32 uses Xtensa architecture
|
|
echo "Running {qemu_system} for {qemu_machine}"
|
|
{qemu_system} \\
|
|
-nographic \\
|
|
-machine {qemu_machine} \\
|
|
-drive file="/tmp/flash.bin",if=mtd,format=raw \\
|
|
-global driver=timer.esp32.timg,property=wdt_disable,value=true \\
|
|
-monitor none \\
|
|
-serial mon:stdio
|
|
fi
|
|
|
|
echo "QEMU execution completed"
|
|
exit 0
|
|
'''
|
|
|
|
cmd = ["bash", "-c", wrapper_script]
|
|
|
|
return cmd
|
|
|
|
def run(
|
|
self,
|
|
firmware_path: Path,
|
|
timeout: int = 30,
|
|
flash_size: int = 4,
|
|
interrupt_regex: Optional[str] = None,
|
|
interactive: bool = False,
|
|
output_file: Optional[str] = None,
|
|
machine: str = "esp32",
|
|
) -> int:
|
|
"""Run ESP32/ESP32C3/ESP32S3 firmware in QEMU using Docker.
|
|
|
|
Args:
|
|
firmware_path: Path to firmware.bin or build directory
|
|
timeout: Timeout in seconds (timeout is treated as success)
|
|
flash_size: Flash size in MB
|
|
interrupt_regex: Regex pattern to detect in output (informational only)
|
|
interactive: Run in interactive mode (attach to container)
|
|
output_file: Optional file path to write QEMU output to
|
|
machine: QEMU machine type (esp32, esp32c3, esp32s3, etc.)
|
|
|
|
Returns:
|
|
Exit code: actual QEMU/container exit code, except timeout returns 0
|
|
"""
|
|
if not self.check_docker_available():
|
|
print("ERROR: Docker is not available or not running", file=sys.stderr)
|
|
print("Please install Docker and ensure it's running", file=sys.stderr)
|
|
return 1
|
|
|
|
# Pull image if needed
|
|
self.pull_image()
|
|
|
|
# Prepare firmware files
|
|
print(f"Preparing firmware from: {firmware_path}")
|
|
temp_firmware_dir = None
|
|
|
|
try:
|
|
temp_firmware_dir = self.prepare_firmware(firmware_path)
|
|
|
|
# Generate unique container name
|
|
self.container_name = f"qemu_esp32_{int(time.time())}"
|
|
|
|
# Convert Windows paths to Docker-compatible paths
|
|
def windows_to_docker_path(path_str: Union[str, Path]) -> str:
|
|
"""Convert Windows path to Docker volume mount format."""
|
|
import os
|
|
|
|
# Check if we're in Git Bash/MSYS2 environment
|
|
is_git_bash = (
|
|
"MSYSTEM" in os.environ
|
|
or os.environ.get("TERM") == "xterm"
|
|
or "bash.exe" in os.environ.get("SHELL", "")
|
|
)
|
|
|
|
if os.name == "nt" and is_git_bash:
|
|
# Convert C:\path\to\dir to /c/path/to/dir for Git Bash
|
|
path_str = str(path_str).replace("\\", "/")
|
|
if len(path_str) > 2 and path_str[1:3] == ":/": # Drive letter
|
|
path_str = "/" + path_str[0].lower() + path_str[2:]
|
|
else:
|
|
# For cmd.exe on Windows, keep native Windows paths
|
|
path_str = str(path_str)
|
|
|
|
return path_str
|
|
|
|
docker_firmware_path = windows_to_docker_path(temp_firmware_dir)
|
|
|
|
# Only mount the firmware directory - QEMU is built into the container
|
|
volumes = {
|
|
docker_firmware_path: {
|
|
"bind": self.firmware_mount_path,
|
|
"mode": "ro", # Read-only mount
|
|
},
|
|
}
|
|
|
|
# Build QEMU command
|
|
qemu_cmd = self.build_qemu_command(
|
|
firmware_name="firmware.bin", flash_size=flash_size, machine=machine
|
|
)
|
|
|
|
print(f"Running QEMU in Docker container: {self.container_name}")
|
|
print(f"QEMU command: {' '.join(qemu_cmd)}")
|
|
|
|
if interactive:
|
|
# Run interactively with streaming
|
|
return self.docker_manager.run_container_streaming(
|
|
image_name=self.docker_image,
|
|
command=qemu_cmd,
|
|
volumes=volumes,
|
|
name=self.container_name,
|
|
timeout=timeout,
|
|
interrupt_pattern=interrupt_regex,
|
|
output_file=output_file,
|
|
)
|
|
else:
|
|
# Run with streaming output and return actual exit code
|
|
return self.docker_manager.run_container_streaming(
|
|
image_name=self.docker_image,
|
|
command=qemu_cmd,
|
|
volumes=volumes,
|
|
name=self.container_name,
|
|
timeout=timeout,
|
|
interrupt_pattern=interrupt_regex,
|
|
output_file=output_file,
|
|
)
|
|
|
|
except Exception as e:
|
|
print(f"ERROR: {e}", file=sys.stderr)
|
|
return 1
|
|
finally:
|
|
# Cleanup
|
|
if temp_firmware_dir and temp_firmware_dir.exists():
|
|
shutil.rmtree(temp_firmware_dir, ignore_errors=True)
|
|
|
|
# Ensure container is stopped and removed
|
|
if self.container_name:
|
|
try:
|
|
# Stop and remove container
|
|
run_docker_command_no_fail(["docker", "stop", self.container_name])
|
|
run_docker_command_no_fail(["docker", "rm", self.container_name])
|
|
except:
|
|
pass
|
|
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Run ESP32 firmware in QEMU using Docker"
|
|
)
|
|
parser.add_argument(
|
|
"firmware_path", type=Path, help="Path to firmware.bin file or build directory"
|
|
)
|
|
parser.add_argument(
|
|
"--docker-image",
|
|
type=str,
|
|
help=f"Docker image to use (default: {DockerQEMURunner.DEFAULT_IMAGE})",
|
|
)
|
|
parser.add_argument(
|
|
"--flash-size", type=int, default=4, help="Flash size in MB (default: 4)"
|
|
)
|
|
parser.add_argument(
|
|
"--timeout", type=int, default=30, help="Timeout in seconds (default: 30)"
|
|
)
|
|
parser.add_argument(
|
|
"--interrupt-regex", type=str, help="Regex pattern to interrupt execution"
|
|
)
|
|
parser.add_argument(
|
|
"--interactive", action="store_true", help="Run in interactive mode"
|
|
)
|
|
parser.add_argument("--output-file", type=str, help="File to write QEMU output to")
|
|
|
|
args = parser.parse_args()
|
|
|
|
if not args.firmware_path.exists():
|
|
print(
|
|
f"ERROR: Firmware path does not exist: {args.firmware_path}",
|
|
file=sys.stderr,
|
|
)
|
|
return 1
|
|
|
|
runner = DockerQEMURunner(docker_image=args.docker_image)
|
|
return runner.run(
|
|
firmware_path=args.firmware_path,
|
|
timeout=args.timeout,
|
|
flash_size=args.flash_size,
|
|
interrupt_regex=args.interrupt_regex,
|
|
interactive=args.interactive,
|
|
output_file=args.output_file,
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main())
|