907 lines
30 KiB
Python
907 lines
30 KiB
Python
#!/usr/bin/env python3
|
|
# pyright: reportUnknownMemberType=false, reportMissingParameterType=false
|
|
import argparse
|
|
import multiprocessing
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time # Added for timing test execution
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from queue import Empty, PriorityQueue
|
|
from threading import Event, Lock, Thread
|
|
from typing import List
|
|
|
|
import psutil
|
|
|
|
from ci.util.paths import PROJECT_ROOT
|
|
|
|
|
|
def optimize_python_command(cmd: list[str]) -> list[str]:
|
|
"""
|
|
Optimize command list for subprocess execution in uv environment.
|
|
|
|
For python commands, we need to use 'uv run python' to ensure access to
|
|
installed packages like ziglang. Direct sys.executable bypasses uv environment.
|
|
|
|
Args:
|
|
cmd: Command list that may contain 'python' as first element
|
|
|
|
Returns:
|
|
list[str]: Optimized command with 'python' prefixed by 'uv run'
|
|
"""
|
|
if cmd and (cmd[0] == "python" or cmd[0] == "python3"):
|
|
# Use uv run python to ensure access to uv-managed packages
|
|
optimized_cmd = ["uv", "run", "python"] + cmd[1:]
|
|
return optimized_cmd
|
|
return cmd
|
|
|
|
|
|
from ci.util.test_exceptions import (
|
|
CompilationFailedException,
|
|
TestExecutionFailedException,
|
|
TestFailureInfo,
|
|
TestTimeoutException,
|
|
)
|
|
|
|
|
|
class OutputBuffer:
|
|
"""Thread-safe output buffer with ordered output display"""
|
|
|
|
def __init__(self) -> None:
|
|
self.output_queue: PriorityQueue[tuple[int, int, str]] = PriorityQueue()
|
|
self.next_sequence: int = 0
|
|
self.sequence_lock: Lock = Lock()
|
|
self.stop_event: Event = Event()
|
|
self.output_thread: Thread = Thread(target=self._output_worker, daemon=True)
|
|
self.output_thread.start()
|
|
|
|
def write(self, test_index: int, message: str) -> None:
|
|
"""Write a message to the buffer with test index for ordering"""
|
|
with self.sequence_lock:
|
|
sequence = self.next_sequence
|
|
self.next_sequence += 1
|
|
self.output_queue.put((test_index, sequence, message))
|
|
|
|
def _output_worker(self) -> None:
|
|
"""Worker thread that processes output in order"""
|
|
while not self.stop_event.is_set() or not self.output_queue.empty():
|
|
try:
|
|
item: tuple[int, int, str] = self.output_queue.get(timeout=0.1)
|
|
_, _, message = item
|
|
print(message, flush=True)
|
|
self.output_queue.task_done()
|
|
except Empty:
|
|
continue
|
|
except Exception as e:
|
|
print(f"Error in output worker: {e}")
|
|
continue
|
|
|
|
def stop(self) -> None:
|
|
"""Stop the output worker thread"""
|
|
self.stop_event.set()
|
|
if self.output_thread.is_alive():
|
|
self.output_thread.join()
|
|
|
|
|
|
# Configure console for UTF-8 output on Windows
|
|
if os.name == "nt": # Windows
|
|
# Try to set console to UTF-8 mode
|
|
try:
|
|
# Set stdout and stderr to UTF-8 encoding
|
|
# Note: reconfigure() was added in Python 3.7
|
|
if hasattr(sys.stdout, "reconfigure") and callable(
|
|
getattr(sys.stdout, "reconfigure", None)
|
|
):
|
|
sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined]
|
|
if hasattr(sys.stderr, "reconfigure") and callable(
|
|
getattr(sys.stderr, "reconfigure", None)
|
|
):
|
|
sys.stderr.reconfigure(encoding="utf-8", errors="replace") # type: ignore[attr-defined]
|
|
except (AttributeError, OSError):
|
|
# Fallback for older Python versions or if reconfigure fails
|
|
pass
|
|
|
|
|
|
# Environment flags for backward compatibility
|
|
_SHOW_COMPILE = os.environ.get("FASTLED_TEST_SHOW_COMPILE", "").lower() in (
|
|
"1",
|
|
"true",
|
|
"yes",
|
|
)
|
|
_SHOW_LINK = os.environ.get("FASTLED_TEST_SHOW_LINK", "").lower() in (
|
|
"1",
|
|
"true",
|
|
"yes",
|
|
)
|
|
|
|
|
|
@dataclass
|
|
class FailedTest:
|
|
name: str
|
|
return_code: int
|
|
stdout: str
|
|
|
|
|
|
def check_iwyu_available() -> bool:
|
|
"""Check if include-what-you-use is available in the system"""
|
|
try:
|
|
result = subprocess.run(
|
|
["include-what-you-use", "--version"],
|
|
capture_output=True,
|
|
text=True,
|
|
timeout=10,
|
|
)
|
|
return result.returncode == 0
|
|
except (
|
|
subprocess.CalledProcessError,
|
|
FileNotFoundError,
|
|
subprocess.TimeoutExpired,
|
|
):
|
|
return False
|
|
|
|
|
|
def run_command(
|
|
command: str | list[str],
|
|
use_gdb: bool = False,
|
|
*,
|
|
verbose: bool = False,
|
|
show_compile: bool = False,
|
|
show_link: bool = False,
|
|
) -> tuple[int, str]:
|
|
captured_lines: list[str] = []
|
|
|
|
# Determine command type
|
|
is_test_execution = False
|
|
is_compile = False
|
|
is_link = False
|
|
if isinstance(command, str):
|
|
cmd_lower = command.replace("\\", "/").lower()
|
|
# Check if running test executable
|
|
is_test_execution = (
|
|
"/test_" in cmd_lower
|
|
or ".build/bin/test_" in cmd_lower
|
|
or cmd_lower.endswith(".exe")
|
|
)
|
|
# Check if compiling
|
|
is_compile = "-c" in cmd_lower and (".cpp" in cmd_lower or ".c" in cmd_lower)
|
|
# Check if linking
|
|
is_link = (
|
|
not is_compile
|
|
and not is_test_execution
|
|
and ("-o" in cmd_lower or "lib" in cmd_lower)
|
|
)
|
|
|
|
if use_gdb:
|
|
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as gdb_script:
|
|
gdb_script.write("set pagination off\n")
|
|
gdb_script.write("run\n")
|
|
gdb_script.write("bt full\n")
|
|
gdb_script.write("info registers\n")
|
|
gdb_script.write("x/16i $pc\n")
|
|
gdb_script.write("thread apply all bt full\n")
|
|
gdb_script.write("quit\n")
|
|
|
|
gdb_command = (
|
|
f"gdb -return-child-result -batch -x {gdb_script.name} --args {command}"
|
|
)
|
|
process = subprocess.Popen(
|
|
gdb_command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, # Merge stderr into stdout
|
|
shell=True,
|
|
text=False,
|
|
)
|
|
assert process.stdout is not None
|
|
# Stream and capture output
|
|
while True:
|
|
line_bytes = process.stdout.readline()
|
|
line = line_bytes.decode("utf-8", errors="ignore")
|
|
if not line and process.poll() is not None:
|
|
break
|
|
if line:
|
|
captured_lines.append(line.rstrip())
|
|
# Always print GDB output (it's only used for crashes anyway)
|
|
try:
|
|
print(line, end="", flush=True)
|
|
except UnicodeEncodeError:
|
|
# Fallback: replace problematic characters
|
|
print(
|
|
line.encode("utf-8", errors="replace").decode(
|
|
"utf-8", errors="replace"
|
|
),
|
|
end="",
|
|
flush=True,
|
|
)
|
|
|
|
os.unlink(gdb_script.name)
|
|
output = "\n".join(captured_lines)
|
|
return process.returncode, output
|
|
else:
|
|
# Optimize list commands to avoid shell overhead
|
|
if isinstance(command, list):
|
|
# Optimize python commands and use shell=False for better performance
|
|
python_exe = optimize_python_command(command)
|
|
process = subprocess.Popen(
|
|
python_exe,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, # Merge stderr into stdout
|
|
shell=False, # Use shell=False for better performance with list commands
|
|
text=False,
|
|
)
|
|
else:
|
|
# String commands still need shell=True
|
|
process = subprocess.Popen(
|
|
command,
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT, # Merge stderr into stdout
|
|
shell=True,
|
|
text=False,
|
|
)
|
|
assert process.stdout is not None
|
|
# Stream and capture output
|
|
while True:
|
|
line_bytes = process.stdout.readline()
|
|
line = line_bytes.decode("utf-8", errors="ignore")
|
|
if not line and process.poll() is not None:
|
|
break
|
|
if line:
|
|
captured_lines.append(line.rstrip())
|
|
# Determine if we should print this line
|
|
should_print = (
|
|
verbose # Always print in verbose mode
|
|
or (is_compile and show_compile) # Print compilation if enabled
|
|
or (is_link and show_link) # Print linking if enabled
|
|
or (not is_test_execution) # Print non-test output
|
|
or (
|
|
is_test_execution and process.returncode != 0
|
|
) # Print failed test output
|
|
or (
|
|
is_test_execution
|
|
and any(
|
|
marker in line
|
|
for marker in [
|
|
"Running test:",
|
|
"Test passed",
|
|
"Test FAILED",
|
|
"passed with return code",
|
|
"Test output:",
|
|
]
|
|
)
|
|
) # Print test status
|
|
)
|
|
if should_print:
|
|
try:
|
|
# Add prefix for compile/link commands
|
|
if is_compile and show_compile:
|
|
print("[COMPILE] ", end="", flush=True)
|
|
elif is_link and show_link:
|
|
print("[LINK] ", end="", flush=True)
|
|
print(line, end="", flush=True)
|
|
except UnicodeEncodeError:
|
|
# Fallback: replace problematic characters
|
|
print(
|
|
line.encode("utf-8", errors="replace").decode(
|
|
"utf-8", errors="replace"
|
|
),
|
|
end="",
|
|
flush=True,
|
|
)
|
|
|
|
output = "\n".join(captured_lines)
|
|
return process.returncode, output
|
|
|
|
|
|
def compile_tests(
|
|
clean: bool = False,
|
|
unknown_args: list[str] = [],
|
|
specific_test: str | None = None,
|
|
quick_build: bool = True,
|
|
*,
|
|
verbose: bool = False,
|
|
show_compile: bool = False,
|
|
show_link: bool = False,
|
|
) -> None:
|
|
"""
|
|
Compile C++ tests using the Python build system.
|
|
"""
|
|
os.chdir(str(PROJECT_ROOT))
|
|
print("🔧 Compiling tests using Python build system")
|
|
|
|
_compile_tests_python(
|
|
clean,
|
|
unknown_args,
|
|
specific_test,
|
|
quick_build=quick_build,
|
|
verbose=verbose,
|
|
show_compile=show_compile,
|
|
show_link=show_link,
|
|
)
|
|
|
|
|
|
def _compile_tests_python(
|
|
clean: bool = False,
|
|
unknown_args: list[str] = [],
|
|
specific_test: str | None = None,
|
|
quick_build: bool = True,
|
|
*,
|
|
verbose: bool = False,
|
|
show_compile: bool = False,
|
|
show_link: bool = False,
|
|
) -> None:
|
|
"""Python build system with PCH optimization"""
|
|
# Use the optimized cpp_test_compile system directly
|
|
import subprocess
|
|
|
|
cmd = ["uv", "run", "python", "-m", "ci.compiler.cpp_test_compile"]
|
|
|
|
if specific_test:
|
|
cmd.extend(["--test", specific_test])
|
|
if clean:
|
|
cmd.append("--clean")
|
|
if verbose:
|
|
cmd.append("--verbose")
|
|
if "--check" in unknown_args:
|
|
cmd.append("--check")
|
|
if "--no-pch" in unknown_args:
|
|
cmd.append("--no-pch")
|
|
# Forward debug mode to the compiler when quick_build is disabled
|
|
if not quick_build:
|
|
cmd.append("--debug")
|
|
|
|
print("🚀 Using Python build system with PCH optimization")
|
|
result = subprocess.run(cmd)
|
|
if result.returncode != 0:
|
|
raise RuntimeError(
|
|
f"Unit test compilation failed with return code {result.returncode}"
|
|
)
|
|
|
|
|
|
def run_tests(
|
|
specific_test: str | None = None,
|
|
*,
|
|
verbose: bool = False,
|
|
show_compile: bool = False,
|
|
show_link: bool = False,
|
|
) -> None:
|
|
"""
|
|
Run compiled tests with GDB crash analysis support.
|
|
"""
|
|
_run_tests_python(specific_test)
|
|
|
|
|
|
def _run_tests_python(
|
|
specific_test: str | None = None,
|
|
*,
|
|
verbose: bool = False,
|
|
show_compile: bool = False,
|
|
show_link: bool = False,
|
|
) -> None:
|
|
"""Run tests from Python build system"""
|
|
# Import the test compiler system
|
|
from ci.compiler.test_compiler import FastLEDTestCompiler
|
|
|
|
# Get test executables from Python build system
|
|
test_compiler = FastLEDTestCompiler.get_existing_instance()
|
|
if not test_compiler:
|
|
print("No compiled tests found. Run compilation first.")
|
|
sys.exit(1)
|
|
|
|
test_executables = test_compiler.get_test_executables(specific_test)
|
|
if not test_executables:
|
|
test_name = specific_test or "any tests"
|
|
print(f"No test executables found for: {test_name}")
|
|
sys.exit(1)
|
|
|
|
print(f"Running {len(test_executables)} tests from Python build...")
|
|
|
|
# Print list of tests that will be executed
|
|
print("Tests to execute:")
|
|
for i, test_exec in enumerate(test_executables, 1):
|
|
# Convert absolute path to relative for display
|
|
rel_path = os.path.relpath(test_exec.executable_path)
|
|
print(f" {i}. {test_exec.name} ({rel_path})")
|
|
print("")
|
|
|
|
failed_tests: list[FailedTest] = []
|
|
|
|
# Convert to file list format for compatibility with existing logic
|
|
files: list[str] = []
|
|
test_paths: dict[str, str] = {}
|
|
for test_exec in test_executables:
|
|
file_name = test_exec.name
|
|
if os.name == "nt" and not file_name.endswith(".exe"):
|
|
file_name += ".exe"
|
|
files.append(file_name)
|
|
test_paths[file_name] = str(test_exec.executable_path)
|
|
|
|
print(f"Starting test execution for {len(files)} test files...")
|
|
_execute_test_files(files, "", failed_tests, specific_test, test_paths)
|
|
_handle_test_results(failed_tests)
|
|
|
|
|
|
def _execute_test_files(
|
|
files: list[str],
|
|
test_dir: str,
|
|
failed_tests: list[FailedTest],
|
|
specific_test: str | None,
|
|
test_paths: dict[str, str] | None = None,
|
|
*,
|
|
verbose: bool = False,
|
|
show_compile: bool = False,
|
|
show_link: bool = False,
|
|
) -> None:
|
|
"""
|
|
Execute test files in parallel with full GDB crash analysis.
|
|
|
|
Args:
|
|
files: List of test file names
|
|
test_dir: Directory containing tests (for CMake) or empty string (for Python API)
|
|
failed_tests: List to collect failed tests
|
|
specific_test: Specific test name if filtering
|
|
test_paths: Dict mapping file names to full paths (for Python API)
|
|
"""
|
|
total_tests = len(files)
|
|
successful_tests = 0
|
|
completed_tests = 0
|
|
|
|
# Initialize output buffer for ordered output
|
|
output_buffer = OutputBuffer()
|
|
output_buffer.write(0, f"Executing {total_tests} test files in parallel...")
|
|
|
|
# Determine number of workers based on configuration
|
|
|
|
# Get configuration from args
|
|
args = parse_args()
|
|
|
|
# Force sequential execution if NO_PARALLEL is set
|
|
if os.environ.get("NO_PARALLEL"):
|
|
max_workers = 1
|
|
output_buffer.write(
|
|
0, "NO_PARALLEL environment variable set - forcing sequential execution"
|
|
)
|
|
elif args.sequential:
|
|
max_workers = 1
|
|
elif args.parallel:
|
|
max_workers = args.parallel
|
|
else:
|
|
max_workers = max(1, multiprocessing.cpu_count() - 1) # Leave one core free
|
|
|
|
# Check memory limit
|
|
if args.max_memory:
|
|
memory_limit = args.max_memory * 1024 * 1024 # Convert MB to bytes
|
|
available_memory = psutil.virtual_memory().available
|
|
if memory_limit > available_memory:
|
|
output_buffer.write(
|
|
0,
|
|
f"Warning: Requested memory limit {args.max_memory}MB exceeds available memory {available_memory / (1024 * 1024):.0f}MB",
|
|
)
|
|
output_buffer.write(
|
|
0, "Reducing number of parallel workers to stay within memory limits"
|
|
)
|
|
# Estimate memory per test based on previous runs or default to 100MB
|
|
memory_per_test = 100 * 1024 * 1024 # 100MB per test
|
|
max_parallel_by_memory = max(1, memory_limit // memory_per_test)
|
|
max_workers = min(max_workers, max_parallel_by_memory)
|
|
|
|
output_buffer.write(0, f"Using {max_workers} parallel workers")
|
|
|
|
# Thread-safe counter
|
|
counter_lock = Lock()
|
|
|
|
def run_single_test(
|
|
test_file: str, test_index: int
|
|
) -> tuple[bool, float, str, int]:
|
|
"""Run a single test and return its results"""
|
|
nonlocal completed_tests
|
|
|
|
if test_paths:
|
|
test_path = test_paths[test_file]
|
|
else:
|
|
test_path = os.path.join(test_dir, test_file)
|
|
|
|
# For .cpp files, compile them first
|
|
if test_path.endswith(".cpp"):
|
|
# Create a temporary directory for compilation
|
|
with tempfile.TemporaryDirectory() as temp_dir:
|
|
# Compile the test file
|
|
output_buffer.write(
|
|
test_index,
|
|
f"[{test_index}/{total_tests}] Compiling test: {test_file}",
|
|
)
|
|
compile_cmd = [
|
|
"python",
|
|
"-m",
|
|
"ziglang",
|
|
"c++",
|
|
"-o",
|
|
os.path.join(temp_dir, "test.exe"),
|
|
test_path,
|
|
"-I",
|
|
os.path.join(PROJECT_ROOT, "src"),
|
|
"-I",
|
|
os.path.join(PROJECT_ROOT, "tests"),
|
|
# NOTE: Compiler flags now come from build configuration TOML
|
|
]
|
|
return_code, stdout = run_command(compile_cmd)
|
|
if return_code != 0:
|
|
output_buffer.write(
|
|
test_index,
|
|
f"[{test_index}/{total_tests}] ERROR: Failed to compile test: {test_file}",
|
|
)
|
|
return False, 0.0, f"Failed to compile test: {stdout}", return_code
|
|
|
|
# Update test_path to point to the compiled executable
|
|
test_path = os.path.join(temp_dir, "test.exe")
|
|
|
|
if not (os.path.isfile(test_path) and os.access(test_path, os.X_OK)):
|
|
output_buffer.write(
|
|
test_index,
|
|
f"[{test_index}/{total_tests}] ERROR: Test file not found or not executable: {test_path}",
|
|
)
|
|
return False, 0.0, f"Test file not found or not executable: {test_path}", 1
|
|
|
|
output_buffer.write(
|
|
test_index, f"[{test_index}/{total_tests}] Running test: {test_file}"
|
|
)
|
|
if verbose:
|
|
output_buffer.write(test_index, f" Command: {test_path}")
|
|
|
|
start_time = time.time()
|
|
# Pass --minimal flag to doctest when not in verbose mode to suppress output unless tests fail
|
|
cmd = [test_path]
|
|
if not verbose:
|
|
cmd.append("--minimal")
|
|
return_code, stdout = run_command(cmd)
|
|
elapsed_time = time.time() - start_time
|
|
|
|
output = stdout
|
|
failure_pattern = re.compile(r"Test .+ failed with return code (\d+)")
|
|
failure_match = failure_pattern.search(output)
|
|
is_crash = failure_match is not None
|
|
|
|
# Handle crashes with GDB (must be done synchronously)
|
|
if is_crash:
|
|
output_buffer.write(
|
|
test_index, f"Test crashed. Re-running with GDB to get stack trace..."
|
|
)
|
|
_, gdb_stdout = run_command(test_path, use_gdb=True)
|
|
stdout += "\n--- GDB Output ---\n" + gdb_stdout
|
|
|
|
# Extract crash information
|
|
crash_info = extract_crash_info(gdb_stdout)
|
|
output_buffer.write(
|
|
test_index, f"Crash occurred at: {crash_info.file}:{crash_info.line}"
|
|
)
|
|
output_buffer.write(test_index, f"Cause: {crash_info.cause}")
|
|
output_buffer.write(test_index, f"Stack: {crash_info.stack}")
|
|
|
|
# Print output based on verbosity and status
|
|
if verbose or return_code != 0:
|
|
output_buffer.write(test_index, "Test output:")
|
|
output_buffer.write(test_index, stdout)
|
|
|
|
if return_code == 0:
|
|
output_buffer.write(
|
|
test_index, f" Test {test_file} passed in {elapsed_time:.2f}s"
|
|
)
|
|
else:
|
|
output_buffer.write(
|
|
test_index,
|
|
f" Test {test_file} FAILED with return code {return_code} in {elapsed_time:.2f}s",
|
|
)
|
|
|
|
with counter_lock:
|
|
completed_tests += 1
|
|
|
|
return return_code == 0, elapsed_time, stdout, return_code
|
|
|
|
try:
|
|
# Run tests in parallel using ThreadPoolExecutor
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
with ThreadPoolExecutor(max_workers=max_workers) as executor:
|
|
# Submit all tests
|
|
future_to_test = {
|
|
executor.submit(run_single_test, test_file, i + 1): test_file
|
|
for i, test_file in enumerate(files)
|
|
}
|
|
|
|
# Process results as they complete
|
|
for future in as_completed(future_to_test):
|
|
test_file = future_to_test[future]
|
|
try:
|
|
success, _, stdout, return_code = future.result()
|
|
if success:
|
|
with counter_lock:
|
|
successful_tests += 1
|
|
else:
|
|
failed_tests.append(
|
|
FailedTest(
|
|
name=test_file, return_code=return_code, stdout=stdout
|
|
)
|
|
)
|
|
except Exception as e:
|
|
output_buffer.write(
|
|
0, f"ERROR: Test {test_file} failed with exception: {e}"
|
|
)
|
|
failed_tests.append(
|
|
FailedTest(name=test_file, return_code=1, stdout=str(e))
|
|
)
|
|
|
|
# Print final summary
|
|
output_buffer.write(
|
|
0,
|
|
f"Test execution complete: {successful_tests} passed, {len(failed_tests)} failed",
|
|
)
|
|
if successful_tests == total_tests:
|
|
output_buffer.write(0, "All tests passed successfully!")
|
|
else:
|
|
output_buffer.write(
|
|
0, f"Some tests failed ({len(failed_tests)} of {total_tests})"
|
|
)
|
|
|
|
finally:
|
|
# Ensure output buffer is stopped
|
|
output_buffer.stop()
|
|
|
|
|
|
def _handle_test_results(
|
|
failed_tests: list[FailedTest], *, verbose: bool = False
|
|
) -> None:
|
|
"""Handle test results and exit appropriately (preserving existing logic)"""
|
|
if failed_tests:
|
|
print("Failed tests summary:")
|
|
failures: List[TestFailureInfo] = []
|
|
for failed_test in failed_tests:
|
|
print(
|
|
f"Test {failed_test.name} failed with return code {failed_test.return_code}"
|
|
)
|
|
# Always show output on failure
|
|
print("Output:")
|
|
# Show indented output for better readability
|
|
for line in failed_test.stdout.splitlines():
|
|
print(f" {line}")
|
|
print() # Add spacing between failed tests
|
|
|
|
failures.append(
|
|
TestFailureInfo(
|
|
test_name=failed_test.name,
|
|
command=f"test_{failed_test.name}",
|
|
return_code=failed_test.return_code,
|
|
output=failed_test.stdout,
|
|
error_type="test_execution_failure",
|
|
)
|
|
)
|
|
|
|
tests_failed = len(failed_tests)
|
|
failed_test_names = [test.name for test in failed_tests]
|
|
print(
|
|
f"{tests_failed} test{'s' if tests_failed != 1 else ''} failed: {', '.join(failed_test_names)}"
|
|
)
|
|
raise TestExecutionFailedException(f"{tests_failed} test(s) failed", failures)
|
|
if verbose:
|
|
print("All tests passed.")
|
|
|
|
|
|
@dataclass
|
|
class CrashInfo:
|
|
cause: str = "Unknown"
|
|
stack: str = "Unknown"
|
|
file: str = "Unknown"
|
|
line: str = "Unknown"
|
|
|
|
|
|
def extract_crash_info(gdb_output: str) -> CrashInfo:
|
|
lines = gdb_output.split("\n")
|
|
crash_info = CrashInfo()
|
|
|
|
try:
|
|
for i, line in enumerate(lines):
|
|
if line.startswith("Program received signal"):
|
|
try:
|
|
crash_info.cause = line.split(":", 1)[1].strip()
|
|
except IndexError:
|
|
crash_info.cause = line.strip()
|
|
elif line.startswith("#0"):
|
|
crash_info.stack = line
|
|
for j in range(i, len(lines)):
|
|
if "at" in lines[j]:
|
|
try:
|
|
_, location = lines[j].split("at", 1)
|
|
location = location.strip()
|
|
if ":" in location:
|
|
crash_info.file, crash_info.line = location.rsplit(
|
|
":", 1
|
|
)
|
|
else:
|
|
crash_info.file = location
|
|
except ValueError:
|
|
pass # If split fails, we keep the default values
|
|
break
|
|
break
|
|
except Exception as e:
|
|
print(f"Error parsing GDB output: {e}")
|
|
|
|
return crash_info
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Compile and run C++ tests")
|
|
parser.add_argument(
|
|
"--compile-only",
|
|
action="store_true",
|
|
help="Only compile the tests without running them",
|
|
)
|
|
parser.add_argument(
|
|
"--run-only",
|
|
action="store_true",
|
|
help="Only run the tests without compiling them",
|
|
)
|
|
parser.add_argument(
|
|
"--only-run-failed-test",
|
|
action="store_true",
|
|
help="Only run the tests that failed in the previous run",
|
|
)
|
|
parser.add_argument(
|
|
"--clean", action="store_true", help="Clean build before compiling"
|
|
)
|
|
parser.add_argument(
|
|
"--test",
|
|
help="Specific test to run (without extension)",
|
|
)
|
|
parser.add_argument(
|
|
"--verbose",
|
|
action="store_true",
|
|
help="Enable verbose output",
|
|
)
|
|
parser.add_argument(
|
|
"--show-compile",
|
|
action="store_true",
|
|
help="Show compilation commands and output",
|
|
)
|
|
parser.add_argument(
|
|
"--show-link",
|
|
action="store_true",
|
|
help="Show linking commands and output",
|
|
)
|
|
parser.add_argument(
|
|
"--parallel",
|
|
type=int,
|
|
help="Number of parallel test processes to run (default: CPU count - 1)",
|
|
)
|
|
parser.add_argument(
|
|
"--sequential",
|
|
action="store_true",
|
|
help="Run tests sequentially (disables parallel execution)",
|
|
)
|
|
parser.add_argument(
|
|
"--max-memory",
|
|
type=int,
|
|
help="Maximum memory usage in MB for parallel test execution",
|
|
)
|
|
|
|
# Create mutually exclusive group for compiler selection
|
|
compiler_group = parser.add_mutually_exclusive_group()
|
|
compiler_group.add_argument(
|
|
"--clang",
|
|
help="Use Clang compiler",
|
|
action="store_true",
|
|
)
|
|
compiler_group.add_argument(
|
|
"--gcc",
|
|
help="Use GCC compiler (default on non-Windows)",
|
|
action="store_true",
|
|
)
|
|
parser.add_argument(
|
|
"--check",
|
|
action="store_true",
|
|
help="Enable static analysis (IWYU, clang-tidy)",
|
|
)
|
|
|
|
parser.add_argument(
|
|
"--no-unity",
|
|
action="store_true",
|
|
help="Disable unity builds for cpp tests",
|
|
)
|
|
parser.add_argument(
|
|
"--no-pch",
|
|
action="store_true",
|
|
help="Disable precompiled headers (PCH) for unit tests",
|
|
)
|
|
parser.add_argument(
|
|
"--debug",
|
|
action="store_true",
|
|
help="Use debug build mode with full debug symbols (default is quick mode with -g0)",
|
|
)
|
|
|
|
args, unknown = parser.parse_known_args()
|
|
args.unknown = unknown
|
|
|
|
return args
|
|
|
|
|
|
def main() -> None:
|
|
try:
|
|
args = parse_args()
|
|
|
|
# Get verbosity flags from args
|
|
|
|
run_only = args.run_only
|
|
compile_only = args.compile_only
|
|
specific_test = args.test
|
|
# only_run_failed_test feature to be implemented in future
|
|
_ = args.only_run_failed_test
|
|
use_clang = args.clang
|
|
|
|
no_unity = args.no_unity
|
|
quick_build = (
|
|
not args.debug
|
|
) # Default to quick mode unless --debug is specified
|
|
# use_gcc = args.gcc
|
|
|
|
if not run_only:
|
|
passthrough_args = args.unknown
|
|
if use_clang:
|
|
passthrough_args.append("--use-clang")
|
|
if args.check:
|
|
passthrough_args.append("--check")
|
|
if no_unity:
|
|
passthrough_args.append("--no-unity")
|
|
if args.no_pch:
|
|
passthrough_args.append("--no-pch")
|
|
# Note: --gcc is handled by not passing --use-clang (GCC is the default in compiler/cpp_test_compile.py)
|
|
compile_tests(
|
|
clean=args.clean,
|
|
unknown_args=passthrough_args,
|
|
specific_test=specific_test,
|
|
quick_build=quick_build,
|
|
verbose=args.verbose,
|
|
show_compile=args.show_compile,
|
|
show_link=args.show_link,
|
|
)
|
|
|
|
if not compile_only:
|
|
if specific_test:
|
|
run_tests(
|
|
specific_test,
|
|
verbose=args.verbose,
|
|
show_compile=args.show_compile,
|
|
show_link=args.show_link,
|
|
)
|
|
else:
|
|
# Use our own test runner instead of CTest since CTest integration is broken
|
|
run_tests(
|
|
None,
|
|
verbose=args.verbose,
|
|
show_compile=args.show_compile,
|
|
show_link=args.show_link,
|
|
)
|
|
except (
|
|
CompilationFailedException,
|
|
TestExecutionFailedException,
|
|
TestTimeoutException,
|
|
) as e:
|
|
# Print detailed failure information
|
|
print("\n" + "=" * 60)
|
|
print("FASTLED TEST FAILURE DETAILS")
|
|
print("=" * 60)
|
|
print(e.get_detailed_failure_info())
|
|
print("=" * 60)
|
|
|
|
# Exit with appropriate code
|
|
if e.failures:
|
|
# Use the return code from the first failure, or 1 if none available
|
|
exit_code = (
|
|
e.failures[0].return_code if e.failures[0].return_code != 0 else 1
|
|
)
|
|
else:
|
|
exit_code = 1
|
|
sys.exit(exit_code)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|