Files
2026-02-12 00:45:31 -08:00

1458 lines
52 KiB
Python

#!/usr/bin/env python3
"""
WARNING: sys.stdout.flush() causes blocking issues on Windows with QEMU/subprocess processes!
Use conditional flushing: `if os.name != 'nt': sys.stdout.flush()` to avoid Windows blocking
while maintaining real-time output visibility on Unix systems.
"""
import _thread
import os
import queue
import re
import subprocess
import sys
import threading
import time
import traceback
from dataclasses import dataclass
from pathlib import Path
from queue import Queue
from typing import Any, Callable, List, Optional, Pattern, Protocol, cast
from typeguard import typechecked
from ci.util.running_process import EndOfStream, RunningProcess
from ci.util.test_exceptions import (
TestExecutionFailedException,
TestFailureInfo,
TestTimeoutException,
)
from ci.util.test_types import (
TestArgs,
TestCategories,
TestResult,
TestResultType,
TestSuiteResult,
determine_test_categories,
)
_IS_GITHUB_ACTIONS = os.getenv("GITHUB_ACTIONS") == "true"
_TIMEOUT = 600 if _IS_GITHUB_ACTIONS else 240
_GLOBAL_TIMEOUT = 600 if _IS_GITHUB_ACTIONS else 300
# Abort threshold for total failures across all processes (unit + examples)
MAX_FAILURES_BEFORE_ABORT = 3
def extract_error_snippet(accumulated_output: list[str], context_lines: int = 5) -> str:
"""
Extract relevant error snippets from process output.
Searches for lines containing "error" (case insensitive) and extracts
a small context window around the first few error occurrences.
Args:
accumulated_output: List of output lines from the process
context_lines: Number of lines to capture before/after each error line (default: 5)
Returns:
Formatted string containing error snippets with minimal context
"""
if not accumulated_output:
return "No output captured"
error_snippets: list[str] = []
error_pattern = re.compile(r"error", re.IGNORECASE)
# Find all lines that contain "error" (case insensitive)
error_line_indices: list[int] = []
for i, line in enumerate(accumulated_output):
if error_pattern.search(line):
error_line_indices.append(i)
if not error_line_indices:
# No specific errors found, return last 10 lines which might contain useful info
max_lines = min(10, len(accumulated_output))
return (
"No 'error' keyword found. Last "
+ str(max_lines)
+ " lines:\n"
+ "\n".join(accumulated_output[-max_lines:])
)
# Extract context around first 5 errors (increased from 2 for better visibility)
max_errors_to_show = 5
for i, error_idx in enumerate(error_line_indices[:max_errors_to_show]):
# Calculate context window (5 lines before to 5 lines after the error)
start_idx = max(0, error_idx - context_lines)
end_idx = min(len(accumulated_output), error_idx + context_lines + 1)
snippet_lines: list[str] = []
for j in range(start_idx, end_idx):
line_marker = "" if j == error_idx else " " # Mark the actual error line
snippet_lines.append(f"{line_marker}{accumulated_output[j]}")
error_snippets.append("\n".join(snippet_lines))
# Add summary if there are more errors - but show the actual error lines
if len(error_line_indices) > max_errors_to_show:
remaining_errors = error_line_indices[max_errors_to_show:]
additional_error_lines: list[str] = []
for error_idx in remaining_errors[:3]: # Show up to 3 more error lines
additional_error_lines.append(f"{accumulated_output[error_idx]}")
if additional_error_lines:
error_snippets.append("Additional errors found:")
error_snippets.append("\n".join(additional_error_lines))
if len(remaining_errors) > 3:
error_snippets.append(
f"... and {len(remaining_errors) - 3} more error(s) found"
)
return "\n\n".join(error_snippets)
@dataclass
class ProcessTiming:
"""Information about a completed process execution for timing summary"""
name: str
duration: float
command: str
@dataclass
class ProcessState:
"""Tracks the state of an individual process during parallel execution"""
process: RunningProcess
last_activity_time: float
command: str
@typechecked
class TestOutputFormatter:
"""Formats test output in a consistent way"""
def __init__(self, verbose: bool = False):
self.verbose = verbose
self.current_suite: Optional[TestSuiteResult] = None
self._start_time = time.time()
def start_suite(self, name: str) -> None:
"""Start a new test suite"""
self.current_suite = TestSuiteResult(
name=name, results=[], start_time=time.time()
)
def end_suite(self, passed: bool = True) -> None:
"""End the current test suite"""
if self.current_suite:
self.current_suite.end_time = time.time()
self.current_suite.passed = passed
def add_result(self, result: TestResult) -> None:
"""Add a test result to the current suite"""
if self.current_suite:
self.current_suite.results.append(result)
self._format_result(result)
def _format_result(self, result: TestResult) -> None:
"""Format and display a test result"""
# Only show if verbose or important
if not (self.verbose or self._should_display(result)):
return
# Format timestamp
delta = result.timestamp - self._start_time
# Format message based on type
if result.type == TestResultType.SUCCESS:
color = "\033[92m" # Green
elif result.type == TestResultType.ERROR:
color = "\033[91m" # Red
elif result.type == TestResultType.WARNING:
color = "\033[93m" # Yellow
else:
color = "\033[0m" # Reset
# Build message - only include timestamp, not command name
msg = f"{delta:.2f} "
msg += f"{color}{result.message}\033[0m"
# Print with indentation
print(f" {msg}")
def _should_display(self, result: TestResult) -> bool:
"""Determine if a result should be displayed"""
# Always show errors
if result.type == TestResultType.ERROR:
return True
# Always show final success/completion messages
if any(
marker in result.message
for marker in ["### SUCCESS", "### ERROR", "Test execution complete:"]
):
return True
# Show warnings in verbose mode
if result.type == TestResultType.WARNING and self.verbose:
return True
# Show info/debug only in verbose mode
return self.verbose and result.type in [
TestResultType.INFO,
TestResultType.DEBUG,
]
@typechecked
@dataclass
class TestProcessConfig:
"""Configuration for a test process"""
command: str | list[str]
echo: bool = True
auto_run: bool = False
timeout: int | None = None
enable_stack_trace: bool = True
description: str = ""
parallel_safe: bool = True # Whether this process can run in parallel with others
output_filter: Callable[[str], bool] | None = (
None # Filter function for output lines
)
class ProcessOutputHandler:
"""Handles capturing and displaying process output with structured results"""
def __init__(self, verbose: bool = False):
self.formatter = TestOutputFormatter(verbose=verbose)
self.current_command: str | None = None
self.header_printed = False
def handle_output_line(self, line: str, process_name: str) -> None:
"""Process a line of output and convert it to structured test results"""
# Start new test suite if command changes
if process_name != self.current_command:
if self.current_command:
self.formatter.end_suite()
self.formatter.start_suite(process_name)
self.current_command = process_name
self.header_printed = True
self.formatter.add_result(
TestResult(
type=TestResultType.INFO,
message=f"=== [{process_name}] ===",
test_name=process_name,
)
)
# Convert line to appropriate test result
result = self._parse_line_to_result(line, process_name)
if result:
self.formatter.add_result(result)
def _parse_line_to_result(
self, line: str, process_name: str
) -> Optional[TestResult]:
"""Parse a line of output into a structured test result"""
# Skip empty lines
if not line.strip():
return None
# Skip test output noise
if any(
noise in line
for noise in [
"doctest version is",
'run with "--help"',
"assertions:",
"test cases:",
"MESSAGE:",
"TEST CASE:",
"Test passed",
"Test execution",
"Test completed",
"Running test:",
"Process completed:",
"Command completed:",
"Command output:",
"Exit code:",
"All parallel tests",
"JSON parsing failed",
"readFrameAt failed",
"ByteStreamMemory",
"C:\\Users\\",
"\\dev\\fastled\\",
"\\tests\\",
"\\src\\",
"test line_simplification.exe",
"test noise_hires.exe",
"test mutex.exe",
"test rbtree.exe",
"test priority_queue.exe",
"test json_roundtrip.exe",
"test malloc_hooks.exe",
"test rectangular_buffer.exe",
"test ostream.exe",
"test active_strip_data_json.exe",
"test noise_range.exe",
"test json.exe",
"test point.exe",
"test screenmap.exe",
"test task.exe",
"test queue.exe",
"test promise.exe",
"test shared_ptr.exe",
"test screenmap_serialization.exe",
"test strstream.exe",
"test slice.exe",
"test hsv_conversion_accuracy.exe",
"test tile2x2.exe",
"test tuple.exe",
"test raster.exe",
"test transform.exe",
"test transition_ramp.exe",
"test thread_local.exe",
"test set_inlined.exe",
"test vector.exe",
"test strip_id_map.exe",
"test weak_ptr.exe",
"test variant.exe",
"test type_traits.exe",
"test splat.exe",
"test hsv16.exe",
"test ui_help.exe",
"test unordered_set.exe",
"test traverse_grid.exe",
"test ui_title_bug.exe",
"test videofx_wrapper.exe",
"test ui.exe",
"test video.exe",
"test xypath.exe",
]
):
return None
# Success messages
if "### SUCCESS" in line:
return TestResult(
type=TestResultType.SUCCESS, message=line, test_name=process_name
)
# Error messages
if any(
marker in line
for marker in [
"### ERROR",
"FAILED",
"ERROR",
"Crash",
"Test FAILED",
"Compilation failed",
"Build failed",
]
):
return TestResult(
type=TestResultType.ERROR, message=line, test_name=process_name
)
# Warning messages
if any(marker in line.lower() for marker in ["warning:", "note:"]):
return TestResult(
type=TestResultType.WARNING, message=line, test_name=process_name
)
# Test completion messages
if "Test execution complete:" in line:
return TestResult(
type=TestResultType.INFO, message=line, test_name=process_name
)
# Default to info level for other lines
return TestResult(
type=TestResultType.INFO, message=line, test_name=process_name
)
class ReconfigurableIO(Protocol):
def reconfigure(self, *, encoding: str, errors: str) -> None: ...
def create_namespace_check_process(enable_stack_trace: bool) -> RunningProcess:
"""Create a namespace check process without starting it"""
return RunningProcess(
"uv run python ci/tests/no_using_namespace_fl_in_headers.py",
shell=True,
auto_run=False, # Don't auto-start - will be started in parallel later
enable_stack_trace=enable_stack_trace,
)
def create_unit_test_process(
args: TestArgs, enable_stack_trace: bool
) -> RunningProcess:
"""Create a unit test process without starting it"""
# First compile the tests
compile_cmd: list[str] = [
"uv",
"run",
"python",
"-m",
"ci.compiler.cpp_test_run",
"--compile-only",
]
if args.test:
compile_cmd.extend(["--test", args.test])
if args.clean:
compile_cmd.append("--clean")
if args.verbose:
compile_cmd.append("--verbose")
if args.show_compile:
compile_cmd.append("--show-compile")
if args.show_link:
compile_cmd.append("--show-link")
if args.check:
compile_cmd.append("--check")
if args.clang:
compile_cmd.append("--clang")
if args.gcc:
compile_cmd.append("--gcc")
if args.no_unity:
compile_cmd.append("--no-unity")
if args.no_pch:
compile_cmd.append("--no-pch")
if args.debug:
compile_cmd.append("--debug")
# subprocess.run(compile_cmd, check=True)
# Then run the tests using our new test runner
test_cmd = ["uv", "run", "python", "-m", "ci.run_tests"]
if args.test:
test_cmd.extend(["--test", str(args.test)])
if args.verbose:
test_cmd.append("--verbose")
both_cmds: list[str] = []
both_cmds.extend(compile_cmd)
both_cmds.extend(["&&"])
both_cmds.extend(test_cmd)
cmd_str = subprocess.list2cmdline(both_cmds)
return RunningProcess(
cmd_str,
enable_stack_trace=enable_stack_trace,
timeout=_TIMEOUT, # 2 minutes timeout
auto_run=True,
)
def create_examples_test_process(
args: TestArgs, enable_stack_trace: bool
) -> RunningProcess:
"""Create an examples test process without starting it"""
cmd = ["uv", "run", "python", "-u", "ci/compiler/test_example_compilation.py"]
if args.examples is not None:
cmd.extend(args.examples)
if args.clean:
cmd.append("--clean")
if args.no_pch:
cmd.append("--no-pch")
if args.unity:
cmd.append("--unity")
if args.full and args.examples is not None:
cmd.append("--full")
elif args.examples is not None:
# Auto-enable full mode for examples to include execution
cmd.append("--full")
if args.no_parallel:
cmd.append("--no-parallel")
if args.verbose:
cmd.append("--verbose")
# Use longer timeout for no-parallel mode since sequential compilation takes much longer
timeout = (
1800 if args.no_parallel else 600
) # 30 minutes for sequential, 10 minutes for parallel
cmd_str = subprocess.list2cmdline(cmd)
return RunningProcess(
cmd_str, auto_run=False, enable_stack_trace=enable_stack_trace, timeout=timeout
)
def create_python_test_process(
enable_stack_trace: bool, full_tests: bool = False
) -> RunningProcess:
"""Create a Python test process without starting it"""
# Use list format for better environment handling
cmd = [
"uv",
"run",
"pytest",
"-s", # Don't capture stdout/stderr
"-v", # Verbose output
"--tb=short", # Shorter traceback format
"--durations=0", # Show all durations
"ci/tests", # Test directory
]
# If not running full tests, exclude tests marked with @pytest.mark.full
if not full_tests:
cmd.extend(["-m", "not full"])
cmd_str = subprocess.list2cmdline(cmd)
return RunningProcess(
cmd_str,
auto_run=False, # Don't auto-start - will be started in parallel later
enable_stack_trace=False, # Always disable stack traces for Python tests
timeout=_TIMEOUT, # 2 minute timeout for Python tests
)
def create_integration_test_process(
args: TestArgs, enable_stack_trace: bool
) -> RunningProcess:
"""Create an integration test process without starting it"""
cmd = ["uv", "run", "pytest", "-s", "ci/test_integration", "-xvs", "--durations=0"]
if args.examples is not None:
# When --examples --full is specified, only run example-related integration tests
cmd.extend(["-k", "TestFullProgramLinking"])
if args.verbose:
cmd.append("-v")
return RunningProcess(cmd, auto_run=False, enable_stack_trace=enable_stack_trace)
def create_compile_uno_test_process(enable_stack_trace: bool = True) -> RunningProcess:
"""Create a process to compile the uno tests without starting it"""
cmd = [
"uv",
"run",
"python",
"-m",
"ci.ci-compile",
"uno",
"--examples",
"Blink",
"--no-interactive",
]
return RunningProcess(cmd, auto_run=False, enable_stack_trace=enable_stack_trace)
def get_cpp_test_processes(
args: TestArgs, test_categories: TestCategories, enable_stack_trace: bool
) -> list[RunningProcess]:
"""Return all processes needed for C++ tests"""
processes: list[RunningProcess] = []
# Always include namespace check
processes.append(create_namespace_check_process(enable_stack_trace))
if test_categories.unit:
processes.append(create_unit_test_process(args, enable_stack_trace))
if test_categories.examples:
processes.append(create_examples_test_process(args, enable_stack_trace))
return processes
def get_python_test_processes(
enable_stack_trace: bool, full_tests: bool = False
) -> list[RunningProcess]:
"""Return all processes needed for Python tests"""
return [
create_python_test_process(False, full_tests)
] # Disable stack trace for Python tests
def get_integration_test_processes(
args: TestArgs, enable_stack_trace: bool
) -> list[RunningProcess]:
"""Return all processes needed for integration tests"""
return [create_integration_test_process(args, enable_stack_trace)]
def get_all_test_processes(
args: TestArgs,
test_categories: TestCategories,
enable_stack_trace: bool,
src_code_change: bool,
) -> list[RunningProcess]:
"""Return all processes needed for all tests"""
processes: list[RunningProcess] = []
# Always include namespace check
processes.append(create_namespace_check_process(enable_stack_trace))
# Add test processes based on categories
if test_categories.unit:
processes.append(create_unit_test_process(args, enable_stack_trace))
if test_categories.examples:
processes.append(create_examples_test_process(args, enable_stack_trace))
if test_categories.py:
processes.append(
create_python_test_process(False)
) # Disable stack trace for Python tests
if test_categories.integration:
processes.append(create_integration_test_process(args, enable_stack_trace))
# Add uno test process if source code changed
if src_code_change:
processes.append(create_compile_uno_test_process(enable_stack_trace))
return processes
def _extract_test_name(command: str | list[str]) -> str:
"""Extract a human-readable test name from a command"""
if isinstance(command, list):
command = " ".join(command)
# Extract test name patterns
if "--test " in command:
# Extract specific test name after --test flag
parts = command.split("--test ")
if len(parts) > 1:
test_name = parts[1].split()[0]
return test_name
elif ".exe" in command:
# Extract from executable name
for part in command.split():
if part.endswith(".exe"):
return part.replace(".exe", "")
elif "python" in command and "-m " in command:
# Extract module name
parts = command.split("-m ")
if len(parts) > 1:
module = parts[1].split()[0]
return module.replace("ci.compiler.", "").replace("ci.", "")
elif "python" in command and command.endswith(".py"):
# Extract script name
for part in command.split():
if part.endswith(".py"):
return part.split("/")[-1].replace(".py", "")
# Fallback to first meaningful part of command
parts = command.split()
for part in parts[1:]: # Skip 'uv' or 'python'
if not part.startswith("-") and "python" not in part:
return part
return "unknown_test"
def _get_friendly_test_name(command: str | list[str]) -> str:
"""Extract a user-friendly test name for display in summary table"""
if isinstance(command, list):
command = " ".join(command)
# Simplify common command patterns to friendly names
if "cpp_test_run" in command and "ci.run_tests" in command:
return "unit_tests"
elif "test_example_compilation.py" in command:
# Show script name plus example targets, e.g. "test_example_compilation.py Luminova"
try:
import os
tokens = command.split()
# Find the script token and collect following non-flag args as examples
for i, tok in enumerate(tokens):
normalized = tok.strip('"')
if normalized.endswith("test_example_compilation.py"):
script_name = os.path.basename(normalized)
example_parts: list[str] = []
for t in tokens[i + 1 :]:
if t.startswith("-"):
break
example_parts.append(t.strip('"'))
if example_parts:
return f"{script_name} {' '.join(example_parts)}"
return script_name
except Exception:
# Fall back to generic extraction on any unexpected parsing issue
pass
return _extract_test_name(command)
elif "pytest" in command and "ci/tests" in command:
return "python_tests"
elif "pytest" in command and "ci/test_integration" in command:
return "integration_tests"
elif "ci-compile" in command and "uno" in command:
return "uno_compilation"
else:
# Fallback to the existing extraction logic
return _extract_test_name(command)
def _format_timing_summary(process_timings: List[ProcessTiming]) -> str:
"""Format a summary table of process execution times"""
if not process_timings:
return ""
# Sort by duration (longest first)
sorted_timings = sorted(process_timings, key=lambda x: x.duration, reverse=True)
# Calculate column widths
max_name_width = max(len(timing.name) for timing in sorted_timings)
max_name_width = max(max_name_width, len("Test Name")) # Ensure header fits
# Create header
header = f"{'Test Name':<{max_name_width}} | {'Duration':>10}"
separator = f"{'-' * max_name_width}-+-{'-' * 10}"
# Create rows
rows: list[str] = []
for timing in sorted_timings:
duration_str = f"{timing.duration:.2f}s"
row = f"{timing.name:<{max_name_width}} | {duration_str:>10}"
rows.append(row)
# Combine all parts
table_lines = (
[
"\nTest Execution Summary:",
separator,
header,
separator,
]
+ rows
+ [separator]
)
return "\n".join(table_lines)
def _handle_process_completion(
proc_state: ProcessState,
active_processes: list[RunningProcess],
completed_timings: List[ProcessTiming],
last_activity_time: dict[RunningProcess, float],
) -> None:
"""
Handle completion of a single test process
Args:
proc_state: State information for the completed process
active_processes: List of currently active processes
completed_timings: List to collect timing data
last_activity_time: Dictionary tracking activity times
Raises:
TestExecutionFailedException: If process failed
"""
proc = proc_state.process
cmd = proc_state.command
if isinstance(cmd, list):
cmd = subprocess.list2cmdline(cmd)
try:
returncode = proc.wait()
if returncode != 0:
test_name = _extract_test_name(cmd)
print(f"\nCommand failed: {cmd} with return code {returncode}")
print(f"\033[91m###### ERROR ######\033[0m")
print(f"Test failed: {test_name}")
# Capture the actual output from the failed process
try:
actual_output = proc.stdout
if actual_output.strip():
print(f"\n=== ACTUAL OUTPUT FROM FAILED PROCESS ===")
print(actual_output)
print(f"=== END OF OUTPUT ===")
else:
actual_output = "No output captured from failed process"
except Exception as e:
actual_output = f"Error capturing output: {e}"
# Flush output for real-time visibility (but avoid on Windows due to blocking issues)
if os.name != "nt": # Only flush on non-Windows systems
sys.stdout.flush()
for p in active_processes:
if p != proc:
p.kill()
failure = TestFailureInfo(
test_name=test_name,
command=str(cmd),
return_code=returncode,
output=actual_output,
error_type="command_failure",
)
raise TestExecutionFailedException("Test command failed", [failure])
active_processes.remove(proc)
if proc in last_activity_time:
del last_activity_time[proc] # Clean up tracking
print(f"Process completed: {cmd}")
# Collect timing data for summary
if proc.duration is not None:
timing = ProcessTiming(
name=_get_friendly_test_name(cmd),
duration=proc.duration,
command=str(cmd),
)
completed_timings.append(timing)
# Flush output for real-time visibility (but avoid on Windows due to blocking issues)
if os.name != "nt": # Only flush on non-Windows systems
sys.stdout.flush()
except Exception as e:
test_name = _extract_test_name(cmd)
print(f"\nError waiting for process: {cmd}")
print(f"Error: {e}")
print(f"\033[91m###### ERROR ######\033[0m")
print(f"Test error: {test_name}")
# Try to capture any available output
try:
actual_output = proc.stdout
if actual_output.strip():
print(f"\n=== PROCESS OUTPUT BEFORE ERROR ===")
print(actual_output)
print(f"=== END OF OUTPUT ===")
except Exception as output_error:
print(f"Could not capture process output: {output_error}")
failures: list[TestFailureInfo] = []
for p in active_processes:
p.kill()
# Try to capture output from this process too
try:
process_output = p.stdout if hasattr(p, "stdout") else str(e)
except Exception:
process_output = str(e)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(p.command),
command=str(p.command),
return_code=1,
output=process_output,
error_type="process_wait_error",
)
)
raise TestExecutionFailedException("Error waiting for process", failures)
@dataclass
class StuckProcessSignal:
"""Signal from monitoring thread that a process is stuck"""
process: RunningProcess
timeout_duration: float
class ProcessStuckMonitor:
"""Manages individual monitoring threads for stuck process detection"""
def __init__(self, stuck_process_timeout: float):
self.stuck_process_timeout = stuck_process_timeout
self.stuck_signals: Queue[StuckProcessSignal] = Queue()
self.monitoring_threads: dict[RunningProcess, threading.Thread] = {}
self.shutdown_event = threading.Event()
def start_monitoring(self, process: RunningProcess) -> None:
"""Start a monitoring thread for the given process"""
if process in self.monitoring_threads:
return # Already monitoring this process
monitor_thread = threading.Thread(
target=self._monitor_process,
args=(process,),
name=f"StuckMonitor-{_extract_test_name(process.command)}",
daemon=True,
)
self.monitoring_threads[process] = monitor_thread
monitor_thread.start()
def stop_monitoring(self, process: RunningProcess) -> None:
"""Stop monitoring a specific process"""
if process in self.monitoring_threads:
del self.monitoring_threads[process]
def shutdown(self) -> None:
"""Shutdown all monitoring threads"""
self.shutdown_event.set()
self.monitoring_threads.clear()
def check_for_stuck_processes(self) -> list[StuckProcessSignal]:
"""Check for any stuck process signals from monitoring threads"""
stuck_processes: list[StuckProcessSignal] = []
try:
while True:
signal = self.stuck_signals.get_nowait()
stuck_processes.append(signal)
except queue.Empty:
pass
return stuck_processes
def _monitor_process(self, process: RunningProcess) -> None:
"""Monitor a single process for being stuck (runs in separate thread)"""
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
try:
last_activity_time = time.time()
while not self.shutdown_event.is_set():
if process.finished:
# Process completed normally, stop monitoring
return
# Check if we have recent stdout activity
stdout_time = process.time_last_stdout_line()
if stdout_time is not None:
last_activity_time = stdout_time
# Check if process is stuck
current_time = time.time()
if current_time - last_activity_time > self.stuck_process_timeout:
# Process is stuck, signal the main thread
signal = StuckProcessSignal(
process=process, timeout_duration=self.stuck_process_timeout
)
self.stuck_signals.put(signal)
return
# Sleep briefly before next check
time.sleep(1.0) # Check every second
except KeyboardInterrupt:
print(f"🛑 Thread {thread_id} ({thread_name}) caught KeyboardInterrupt")
print(f"📍 Stack trace for thread {thread_id}:")
traceback.print_exc()
_thread.interrupt_main()
raise
except Exception as e:
print(f"❌ Thread {thread_id} ({thread_name}) unexpected error: {e}")
traceback.print_exc()
_thread.interrupt_main()
raise
def _handle_stuck_processes(
stuck_signals: list[StuckProcessSignal],
active_processes: list[RunningProcess],
failed_processes: list[str],
monitor: ProcessStuckMonitor,
) -> None:
"""
Handle stuck processes reported by monitoring threads
Args:
stuck_signals: List of stuck process signals from monitoring threads
active_processes: List of currently active processes (modified in place)
failed_processes: List to track failed process commands
monitor: The process stuck monitor instance
"""
for signal in stuck_signals:
proc = signal.process
if proc in active_processes:
print(
f"\nProcess appears stuck (no output for {signal.timeout_duration}s): {proc.command}"
)
print("Killing stuck process and its children...")
proc.kill() # This now kills the entire process tree
# Track this as a failure
failed_processes.append(subprocess.list2cmdline(proc.command))
active_processes.remove(proc)
monitor.stop_monitoring(proc)
print(f"Killed stuck process: {proc.command}")
def _run_processes_parallel(
processes: list[RunningProcess], verbose: bool = False
) -> List[ProcessTiming]:
"""
DEPRECATED: Use RunningProcessGroup instead.
This function has been replaced by RunningProcessGroup.run()
for better maintainability and consistency.
Run multiple test processes in parallel and handle their output
Args:
processes: List of RunningProcess objects to execute
Returns:
List of ProcessTiming objects with execution times
"""
if not processes:
return []
# Create a shared output handler for formatting
output_handler = ProcessOutputHandler(verbose=verbose)
# Configure Windows console for UTF-8 output if needed
if os.name == "nt": # Windows
if hasattr(sys.stdout, "reconfigure"):
cast(ReconfigurableIO, sys.stdout).reconfigure(
encoding="utf-8", errors="replace"
)
if hasattr(sys.stderr, "reconfigure"):
cast(ReconfigurableIO, sys.stderr).reconfigure(
encoding="utf-8", errors="replace"
)
# Start processes that aren't already running
for proc in processes:
cmd_str = proc.get_command_str()
if proc.proc is None: # Only start if not already running
proc.run()
print(f"Started: {cmd_str}")
else:
print(f"Process already running: {cmd_str}")
# Monitor all processes for output and completion
active_processes = processes.copy()
start_time = time.time()
runner_timeouts: list[int] = [p.timeout for p in processes if p.timeout is not None]
global_timeout: int | None = None
if runner_timeouts:
global_timeout = max(runner_timeouts) + 60 # Add 1 minute buffer
# Track last activity time for each process to detect stuck processes
last_activity_time = {proc: time.time() for proc in active_processes}
stuck_process_timeout = _GLOBAL_TIMEOUT
# Track failed processes for proper error reporting
failed_processes: list[str] = [] # Processes killed due to timeout/stuck
exit_failed_processes: list[
tuple[RunningProcess, int]
] = [] # Processes that failed with non-zero exit code
# Track completed processes for timing summary
completed_timings: List[ProcessTiming] = []
# Create thread-based stuck process monitor
stuck_monitor = ProcessStuckMonitor(stuck_process_timeout)
try:
# Start monitoring threads for each process
for proc in active_processes:
stuck_monitor.start_monitoring(proc)
def time_expired() -> bool:
if global_timeout is None:
return False
assert global_timeout is not None
return time.time() - start_time > global_timeout
while active_processes:
# Check global timeout
if time_expired():
assert global_timeout is not None
print(f"\nGlobal timeout reached after {global_timeout} seconds")
print("\033[91m###### ERROR ######\033[0m")
print("Tests failed due to global timeout")
failures: list[TestFailureInfo] = []
for p in active_processes:
failed_processes.append(
subprocess.list2cmdline(p.command)
) # Track all active processes as failed
p.kill()
failures.append(
TestFailureInfo(
test_name=_extract_test_name(p.command),
command=str(p.command),
return_code=1,
output="Process killed due to global timeout",
error_type="global_timeout",
)
)
raise TestTimeoutException("Global timeout reached", failures)
# Check for stuck processes (using threaded monitoring)
stuck_signals = stuck_monitor.check_for_stuck_processes()
if stuck_signals:
_handle_stuck_processes(
stuck_signals, active_processes, failed_processes, stuck_monitor
)
# Early abort if failure threshold reached via stuck processes
if (
len(exit_failed_processes) + len(failed_processes)
) >= MAX_FAILURES_BEFORE_ABORT:
print(
f"\nExceeded failure threshold ({MAX_FAILURES_BEFORE_ABORT}). Aborting remaining tests."
)
# Kill any remaining active processes
for p in active_processes:
p.kill()
# Build detailed failures
failures: list[TestFailureInfo] = []
for proc, exit_code in exit_failed_processes:
error_snippet = extract_error_snippet(proc.accumulated_output)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(proc.command),
command=str(proc.command),
return_code=exit_code,
output=error_snippet,
error_type="exit_failure",
)
)
for cmd in failed_processes:
cmd = subprocess.list2cmdline(cmd)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(cmd),
command=str(cmd),
return_code=1,
output="Process was killed due to timeout/stuck detection",
error_type="killed_process",
)
)
raise TestExecutionFailedException(
"Exceeded failure threshold", failures
)
# Process each active test individually
# Iterate backwards to safely remove processes from the list
any_activity = False
for i in range(len(active_processes) - 1, -1, -1):
proc = active_processes[i]
if verbose:
with proc.line_iter(timeout=60) as line_iter:
for line in line_iter:
print(line)
# Check if process has finished
if proc.finished:
# Get the exit code to check for failure
exit_code = proc.wait()
# Process completed, remove from active list
active_processes.remove(proc)
# Stop monitoring this process
stuck_monitor.stop_monitoring(proc)
# Check for non-zero exit code (failure)
if exit_code != 0:
print(
f"Process failed with exit code {exit_code}: {proc.command}"
)
exit_failed_processes.append((proc, exit_code))
# Early abort if we reached the failure threshold
if (
len(exit_failed_processes) + len(failed_processes)
) >= MAX_FAILURES_BEFORE_ABORT:
print(
f"\nExceeded failure threshold ({MAX_FAILURES_BEFORE_ABORT}). Aborting remaining tests."
)
# Kill remaining active processes
for p in active_processes:
if p is not proc:
p.kill()
# Prepare failures with snippets
failures: list[TestFailureInfo] = []
for p, code in exit_failed_processes:
error_snippet = extract_error_snippet(
p.accumulated_output
)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(p.command),
command=str(p.command),
return_code=code,
output=error_snippet,
error_type="exit_failure",
)
)
for cmd in failed_processes:
cmd_str = subprocess.list2cmdline(cmd)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(cmd_str),
command=cmd_str,
return_code=1,
output="Process was killed due to timeout/stuck detection",
error_type="killed_process",
)
)
raise TestExecutionFailedException(
"Exceeded failure threshold", failures
)
any_activity = True
continue
# Update timing information
# Calculate duration properly - if process duration is None, calculate it manually
if proc.duration is not None:
duration = proc.duration
elif proc.start_time is not None:
# Calculate duration from start time to now
duration = time.time() - proc.start_time
else:
duration = 0.0
timing = ProcessTiming(
name=_get_friendly_test_name(proc.command),
command=subprocess.list2cmdline(proc.command),
duration=duration,
)
completed_timings.append(timing)
print(f"Process completed: {proc.get_command_str()}")
any_activity = True
continue
# Update last activity time if we have stdout activity
stdout_time = proc.time_last_stdout_line()
if stdout_time is not None:
last_activity_time[proc] = stdout_time
any_activity = True
# Only sleep if no activity was detected, and use a shorter sleep
# This prevents excessive CPU usage while maintaining responsiveness
if not any_activity:
time.sleep(0.01) # 10ms sleep only when no activity
# Check for processes that failed with non-zero exit codes
if exit_failed_processes:
print(f"\n\033[91m###### ERROR ######\033[0m")
print(
f"Tests failed due to {len(exit_failed_processes)} process(es) with non-zero exit codes:"
)
for proc, exit_code in exit_failed_processes:
print(f" - {proc.command} (exit code {exit_code})")
failures: list[TestFailureInfo] = []
for proc, exit_code in exit_failed_processes:
# Extract error snippet from process output
error_snippet = extract_error_snippet(proc.accumulated_output)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(proc.command),
command=str(proc.command),
return_code=exit_code,
output=error_snippet,
error_type="exit_failure",
)
)
raise TestExecutionFailedException("Tests failed", failures)
# Check for failed processes - CRITICAL FIX
if failed_processes:
print(f"\n\033[91m###### ERROR ######\033[0m")
print(f"Tests failed due to {len(failed_processes)} killed process(es):")
for cmd in failed_processes:
print(f" - {cmd}")
print("Processes were killed due to timeout/stuck detection")
failures: list[TestFailureInfo] = []
for cmd in failed_processes:
failures.append(
TestFailureInfo(
test_name=_extract_test_name(cmd),
command=str(cmd),
return_code=1,
output="Process was killed due to timeout/stuck detection",
error_type="killed_process",
)
)
raise TestExecutionFailedException("Processes were killed", failures)
print("\nAll parallel tests completed successfully")
return completed_timings
finally:
# Always shutdown stuck monitoring threads, even on exception
stuck_monitor.shutdown()
def run_test_processes(
processes: list[RunningProcess], parallel: bool = True, verbose: bool = False
) -> List[ProcessTiming]:
"""
Run multiple test processes using RunningProcessGroup
Args:
processes: List of RunningProcess objects to execute
parallel: Whether to run processes in parallel or sequentially (ignored if NO_PARALLEL is set)
verbose: Whether to show all output
Returns:
List of ProcessTiming objects with execution times
"""
from ci.util.running_process_group import (
ExecutionMode,
ProcessExecutionConfig,
RunningProcessGroup,
)
start_time = time.time()
# Force sequential execution if NO_PARALLEL is set
if os.environ.get("NO_PARALLEL"):
parallel = False
if not processes:
print("\033[92m###### SUCCESS ######\033[0m")
print("No tests to run")
return []
try:
# Configure execution mode
execution_mode = (
ExecutionMode.PARALLEL if parallel else ExecutionMode.SEQUENTIAL
)
config = ProcessExecutionConfig(
execution_mode=execution_mode,
verbose=verbose,
max_failures_before_abort=MAX_FAILURES_BEFORE_ABORT,
enable_stuck_detection=True,
stuck_timeout_seconds=_GLOBAL_TIMEOUT,
live_updates=True, # Enable real-time display
display_type="auto", # Auto-detect best display format
)
# Create and run process group
group = RunningProcessGroup(
processes=processes, config=config, name="TestProcesses"
)
# Start real-time display if we have processes and live updates enabled
display_thread = None
if len(processes) > 0 and config.live_updates and not verbose:
# Only show live display if not in verbose mode (verbose already shows all output)
try:
from ci.util.process_status_display import display_process_status
display_thread = display_process_status(
group,
display_type=config.display_type,
update_interval=config.update_interval,
)
except ImportError:
pass # Fall back to normal execution if display not available
except Exception:
pass # Fall back to normal execution on any display error
timings = group.run()
# Stop display thread if it was started
if display_thread:
# Give it a moment to show final status
time.sleep(0.5)
# Success message for sequential execution
if not parallel:
elapsed = time.time() - start_time
print(f"\033[92m### SUCCESS ({elapsed:.2f}s) ###\033[0m")
return timings
except (TestExecutionFailedException, TestTimeoutException) as e:
# Tests failed - print detailed info and re-raise for proper handling
print("\n" + "=" * 60)
print("FASTLED TEST RUNNER FAILURE DETAILS")
print("=" * 60)
print(e.get_detailed_failure_info())
print("=" * 60)
raise
except SystemExit as e:
# Tests failed - extract command name from the error
if e.code != 0:
print("\033[91m###### ERROR ######\033[0m")
print(f"Tests failed with exit code {e.code}")
raise
def runner(args: TestArgs, src_code_change: bool = True) -> None:
"""
Main test runner function that determines what to run and executes tests
Args:
args: Parsed command line arguments
src_code_change: Whether source code has changed since last run
"""
print(f"[TEST_RUNNER] Starting runner function")
print(f"[TEST_RUNNER] Args: {args}")
print(f"[TEST_RUNNER] Source code changed: {src_code_change}")
try:
# Determine test categories
test_categories = determine_test_categories(args)
enable_stack_trace = not args.no_stack_trace
# Build up unified list of all processes to run
processes: list[RunningProcess] = []
# Always start with namespace check
processes.append(create_namespace_check_process(enable_stack_trace))
# Add unit tests if needed
if test_categories.unit or test_categories.unit_only:
processes.append(create_unit_test_process(args, enable_stack_trace))
# Add integration tests if needed
if test_categories.integration or test_categories.integration_only:
processes.append(create_integration_test_process(args, enable_stack_trace))
# Add uno compilation test if source changed
if src_code_change and not test_categories.py_only:
processes.append(create_compile_uno_test_process(enable_stack_trace))
# Add Python tests if needed
if test_categories.py or test_categories.py_only:
# Pass full_tests=True if we're running integration tests or any form of full tests
full_tests = (
test_categories.integration
or test_categories.integration_only
or args.full
)
processes.append(
create_python_test_process(False, full_tests)
) # Disable stack trace for Python tests
# Add example tests if needed
if test_categories.examples or test_categories.examples_only:
processes.append(create_examples_test_process(args, enable_stack_trace))
# Determine if we'll run in parallel
will_run_parallel = not bool(os.environ.get("NO_PARALLEL"))
# Print summary of what we're about to run
execution_mode = "in parallel" if will_run_parallel else "sequentially"
# Run processes (parallel unless NO_PARALLEL is set)
timings = run_test_processes(
processes,
parallel=will_run_parallel,
verbose=args.verbose,
)
# Display timing summary
if timings:
summary = _format_timing_summary(timings)
print(summary)
except (TestExecutionFailedException, TestTimeoutException) as e:
# Print summary and exit with proper code
print(f"\n\033[91m###### ERROR ######\033[0m")
print(f"Tests failed with {len(e.failures)} failure(s)")
# Exit with appropriate code
if e.failures:
# Use the return code from the first failure, or 1 if none available
exit_code = (
e.failures[0].return_code if e.failures[0].return_code != 0 else 1
)
else:
exit_code = 1
sys.exit(exit_code)