Files
klubhaus-doorbell/libraries/FastLED/ci/util/running_process_group.py
2026-02-12 00:45:31 -08:00

690 lines
26 KiB
Python

#!/usr/bin/env python3
# pyright: reportUnknownMemberType=false, reportUnknownVariableType=false, reportUnknownArgumentType=false
"""RunningProcessGroup - Unified process execution management for FastLED CI."""
import os
import subprocess
import sys
import threading
import time
import traceback
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Dict, List, Optional, cast
from ci.util.running_process import RunningProcess
from ci.util.test_exceptions import (
TestExecutionFailedException,
TestFailureInfo,
TestTimeoutException,
)
from ci.util.test_runner import (
_GLOBAL_TIMEOUT,
MAX_FAILURES_BEFORE_ABORT,
ProcessOutputHandler,
ProcessStuckMonitor,
ProcessTiming,
StuckProcessSignal,
_extract_test_name,
_get_friendly_test_name,
_handle_stuck_processes,
extract_error_snippet,
)
class ExecutionMode(Enum):
"""Execution mode for process groups."""
PARALLEL = "parallel"
SEQUENTIAL = "sequential"
SEQUENTIAL_WITH_DEPENDENCIES = "sequential_with_dependencies"
@dataclass
class ProcessStatus:
"""Real-time status information for a running process."""
name: str
is_alive: bool
is_completed: bool
start_time: datetime
running_duration: timedelta
last_output_line: Optional[str] = None
return_value: Optional[int] = None
@property
def running_time_seconds(self) -> float:
"""Get running duration in seconds for display."""
return self.running_duration.total_seconds()
@dataclass
class GroupStatus:
"""Status information for all processes in a group."""
group_name: str
processes: List[ProcessStatus]
total_processes: int
completed_processes: int
failed_processes: int
@property
def completion_percentage(self) -> float:
"""Percentage of processes completed."""
if self.total_processes == 0:
return 100.0
return (self.completed_processes / self.total_processes) * 100.0
@dataclass
class ProcessExecutionConfig:
"""Configuration for how a process group should execute."""
execution_mode: ExecutionMode = ExecutionMode.PARALLEL
timeout_seconds: Optional[int] = None
max_failures_before_abort: int = MAX_FAILURES_BEFORE_ABORT
verbose: bool = False
enable_stuck_detection: bool = True
stuck_timeout_seconds: int = _GLOBAL_TIMEOUT
# Real-time display options
display_type: str = "auto" # "auto", "rich", "textual", "ascii"
live_updates: bool = True
update_interval: float = 0.1
class RunningProcessGroup:
"""Manages execution of a group of RunningProcess instances."""
def __init__(
self,
processes: Optional[List[RunningProcess]] = None,
config: Optional[ProcessExecutionConfig] = None,
name: str = "ProcessGroup",
):
"""Initialize a new process group.
Args:
processes: List of RunningProcess instances to manage
config: Configuration for execution behavior
name: Name for this process group (used for logging)
"""
self.processes = processes or []
self.config = config or ProcessExecutionConfig()
self.name = name
self._dependencies: Dict[RunningProcess, List[RunningProcess]] = {}
# Status tracking
self._process_start_times: Dict[RunningProcess, datetime] = {}
self._process_last_output: Dict[RunningProcess, str] = {}
self._status_monitoring_active: bool = False
def add_process(self, process: RunningProcess) -> None:
"""Add a process to the group.
Args:
process: RunningProcess instance to add
"""
if process not in self.processes:
self.processes.append(process)
def add_dependency(
self, process: RunningProcess, depends_on: RunningProcess
) -> None:
"""Add a dependency relationship between processes.
Args:
process: The dependent process
depends_on: The process that must complete first
"""
if process not in self.processes:
self.add_process(process)
if depends_on not in self.processes:
self.add_process(depends_on)
if process not in self._dependencies:
self._dependencies[process] = []
self._dependencies[process].append(depends_on)
def add_sequential_chain(self, processes: List[RunningProcess]) -> None:
"""Add processes that must run in sequence.
Args:
processes: List of processes to run sequentially
"""
for process in processes:
self.add_process(process)
# Create dependency chain
for i in range(1, len(processes)):
self.add_dependency(processes[i], processes[i - 1])
def run(self) -> List[ProcessTiming]:
"""Execute all processes according to the configuration.
Returns:
List of ProcessTiming objects with execution results
"""
if not self.processes:
return []
print(f"Running process group: {self.name}")
if self.config.execution_mode == ExecutionMode.PARALLEL:
return self._run_parallel()
elif self.config.execution_mode == ExecutionMode.SEQUENTIAL:
return self._run_sequential()
elif self.config.execution_mode == ExecutionMode.SEQUENTIAL_WITH_DEPENDENCIES:
return self._run_with_dependencies()
else:
raise ValueError(f"Unknown execution mode: {self.config.execution_mode}")
def _run_parallel(self) -> List[ProcessTiming]:
"""Execute processes in parallel (based on test_runner._run_processes_parallel)."""
if not self.processes:
return []
# Create a shared output handler for formatting
output_handler = ProcessOutputHandler(verbose=self.config.verbose)
# Configure Windows console for UTF-8 output if needed
if os.name == "nt": # Windows
if hasattr(sys.stdout, "reconfigure"):
sys.stdout.reconfigure(encoding="utf-8", errors="replace") # type: ignore
if hasattr(sys.stderr, "reconfigure"):
sys.stderr.reconfigure(encoding="utf-8", errors="replace") # type: ignore
# Track start times and enable status monitoring
self._status_monitoring_active = True
for proc in self.processes:
self._track_process_start(proc)
# Start processes that aren't already running
for proc in self.processes:
cmd_str = proc.get_command_str()
if proc.proc is None: # Only start if not already running
proc.run()
print(f"Started: {cmd_str}")
else:
print(f"Process already running: {cmd_str}")
# Monitor all processes for output and completion
active_processes = self.processes.copy()
start_time = time.time()
runner_timeouts: list[int] = [
p.timeout for p in self.processes if p.timeout is not None
]
global_timeout: int | None = self.config.timeout_seconds
if global_timeout is None and runner_timeouts:
global_timeout = max(runner_timeouts) + 60 # Add 1 minute buffer
# Track last activity time for each process to detect stuck processes
last_activity_time = {proc: time.time() for proc in active_processes}
stuck_process_timeout = self.config.stuck_timeout_seconds
# Track failed processes for proper error reporting
failed_processes: list[str] = [] # Processes killed due to timeout/stuck
exit_failed_processes: list[
tuple[RunningProcess, int]
] = [] # Processes that failed with non-zero exit code
# Track completed processes for timing summary
completed_timings: List[ProcessTiming] = []
# Create thread-based stuck process monitor if enabled
stuck_monitor = None
if self.config.enable_stuck_detection:
stuck_monitor = ProcessStuckMonitor(stuck_process_timeout)
try:
# Start monitoring threads for each process
if stuck_monitor:
for proc in active_processes:
stuck_monitor.start_monitoring(proc)
def time_expired() -> bool:
if global_timeout is None:
return False
return time.time() - start_time > global_timeout
while active_processes:
# Check global timeout
if time_expired():
print(f"\nGlobal timeout reached after {global_timeout} seconds")
print("\033[91m###### ERROR ######\033[0m")
print("Tests failed due to global timeout")
failures: list[TestFailureInfo] = []
for p in active_processes:
failed_processes.append(
subprocess.list2cmdline(p.command)
) # Track all active processes as failed
p.kill()
failures.append(
TestFailureInfo(
test_name=_extract_test_name(p.command),
command=str(p.command),
return_code=1,
output="Process killed due to global timeout",
error_type="global_timeout",
)
)
raise TestTimeoutException("Global timeout reached", failures)
# Check for stuck processes (using threaded monitoring)
if stuck_monitor:
stuck_signals = stuck_monitor.check_for_stuck_processes()
if stuck_signals:
self._handle_stuck_processes(
stuck_signals,
active_processes,
failed_processes,
stuck_monitor,
)
# Early abort if failure threshold reached via stuck processes
if (
len(exit_failed_processes) + len(failed_processes)
) >= self.config.max_failures_before_abort:
print(
f"\nExceeded failure threshold ({self.config.max_failures_before_abort}). Aborting remaining tests."
)
# Kill any remaining active processes
for p in active_processes:
p.kill()
# Build detailed failures
failures = self._build_failure_list(
exit_failed_processes, failed_processes
)
raise TestExecutionFailedException(
"Exceeded failure threshold", failures
)
# Process each active test individually
any_activity = self._process_active_tests(
active_processes,
exit_failed_processes,
failed_processes,
completed_timings,
stuck_monitor,
)
# Brief sleep to avoid spinning if no activity
if not any_activity:
time.sleep(0.01)
finally:
# Clean up monitoring
if stuck_monitor:
for proc in self.processes:
stuck_monitor.stop_monitoring(proc)
self._status_monitoring_active = False
# Check for processes that failed with non-zero exit codes
if exit_failed_processes:
print(f"\n\033[91m###### ERROR ######\033[0m")
print(
f"Tests failed due to {len(exit_failed_processes)} process(es) with non-zero exit codes:"
)
for proc, exit_code in exit_failed_processes:
print(f" - {proc.command} (exit code {exit_code})")
failures: list[TestFailureInfo] = []
for proc, exit_code in exit_failed_processes:
# Extract error snippet from process output
error_snippet = extract_error_snippet(proc.accumulated_output)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(proc.command),
command=str(proc.command),
return_code=exit_code,
output=error_snippet,
error_type="exit_failure",
)
)
raise TestExecutionFailedException("Tests failed", failures)
# Check for failed processes (killed due to timeout/stuck)
if failed_processes:
print(f"\n\033[91m###### ERROR ######\033[0m")
print(f"Tests failed due to {len(failed_processes)} killed process(es):")
for cmd in failed_processes:
print(f" - {cmd}")
print("Processes were killed due to timeout/stuck detection")
failures: list[TestFailureInfo] = []
for cmd in failed_processes:
failures.append(
TestFailureInfo(
test_name=_extract_test_name(cmd),
command=str(cmd),
return_code=1,
output="Process was killed due to timeout/stuck detection",
error_type="killed_process",
)
)
raise TestExecutionFailedException("Processes were killed", failures)
return completed_timings
def get_status(self) -> GroupStatus:
"""Get current status of all processes in the group."""
process_statuses = []
completed = 0
failed = 0
for process in self.processes:
start_time = self._process_start_times.get(process)
if start_time is None:
# Process hasn't started yet, use current time
start_time = datetime.now()
running_duration = timedelta(0)
else:
running_duration = datetime.now() - start_time
# Get last output line from process
last_output = self._get_last_output_line(process)
# Get process name - use command friendly name if available
process_name = getattr(process, "name", None)
if not process_name:
if hasattr(process, "command"):
process_name = _get_friendly_test_name(process.command)
else:
process_name = f"Process-{id(process)}"
status = ProcessStatus(
name=process_name,
is_alive=not process.finished,
is_completed=process.finished,
start_time=start_time,
running_duration=running_duration,
last_output_line=last_output,
return_value=process.returncode,
)
process_statuses.append(status)
if status.is_completed:
completed += 1
if status.return_value != 0:
failed += 1
return GroupStatus(
group_name=self.name,
processes=process_statuses,
total_processes=len(self.processes),
completed_processes=completed,
failed_processes=failed,
)
def _get_last_output_line(self, process: RunningProcess) -> Optional[str]:
"""Extract the last line of output from a process."""
# Try to get from cached last output first
cached = self._process_last_output.get(process)
if cached:
return cached
# Fall back to accumulated output
if process.accumulated_output:
last_line = process.accumulated_output[-1].strip()
self._process_last_output[process] = last_line
return last_line
return None
def _track_process_start(self, process: RunningProcess) -> None:
"""Record when a process starts for timing calculations."""
self._process_start_times[process] = datetime.now()
def _update_process_output(self, process: RunningProcess) -> None:
"""Update cached last output line for a process."""
if process.accumulated_output:
last_line = process.accumulated_output[-1].strip()
self._process_last_output[process] = last_line
def _handle_stuck_processes(
self,
stuck_signals: list[StuckProcessSignal],
active_processes: list[RunningProcess],
failed_processes: list[str],
stuck_monitor: ProcessStuckMonitor,
) -> None:
"""Handle processes that are detected as stuck."""
_handle_stuck_processes(
stuck_signals, active_processes, failed_processes, stuck_monitor
)
def _build_failure_list(
self,
exit_failed_processes: list[tuple[RunningProcess, int]],
failed_processes: list[str],
) -> list[TestFailureInfo]:
"""Build a list of TestFailureInfo objects from failed processes."""
failures: list[TestFailureInfo] = []
for proc, exit_code in exit_failed_processes:
error_snippet = extract_error_snippet(proc.accumulated_output)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(proc.command),
command=str(proc.command),
return_code=exit_code,
output=error_snippet,
error_type="exit_failure",
)
)
for cmd in failed_processes:
if isinstance(cmd, list):
cmd_str = subprocess.list2cmdline(cmd)
else:
cmd_str = str(cmd)
failures.append(
TestFailureInfo(
test_name=_extract_test_name(cmd_str),
command=cmd_str,
return_code=1,
output="Process was killed due to timeout/stuck detection",
error_type="killed_process",
)
)
return failures
def _process_active_tests(
self,
active_processes: list[RunningProcess],
exit_failed_processes: list[tuple[RunningProcess, int]],
failed_processes: list[str],
completed_timings: List[ProcessTiming],
stuck_monitor: Optional[ProcessStuckMonitor],
) -> bool:
"""Process active tests, return True if any activity occurred."""
any_activity = False
# Iterate backwards to safely remove processes from the list
for i in range(len(active_processes) - 1, -1, -1):
proc = active_processes[i]
if self.config.verbose:
with proc.line_iter(timeout=60) as line_iter:
for line in line_iter:
print(line)
any_activity = True
# Check if process has finished
if proc.finished:
any_activity = True
# Get the exit code to check for failure
exit_code = proc.wait()
# Process completed, remove from active list
active_processes.remove(proc)
# Stop monitoring this process
if stuck_monitor:
stuck_monitor.stop_monitoring(proc)
# Collect timing data
if proc.duration is not None:
timing = ProcessTiming(
name=_get_friendly_test_name(proc.command),
duration=proc.duration,
command=str(proc.command),
)
completed_timings.append(timing)
# Check for non-zero exit code (failure)
if exit_code != 0:
print(f"Process failed with exit code {exit_code}: {proc.command}")
exit_failed_processes.append((proc, exit_code))
# Early abort if we reached the failure threshold
if (
len(exit_failed_processes) + len(failed_processes)
) >= self.config.max_failures_before_abort:
print(
f"\nExceeded failure threshold ({self.config.max_failures_before_abort}). Aborting remaining tests."
)
# Kill remaining active processes
for p in active_processes:
if p is not proc:
p.kill()
# Prepare failures with snippets
failures = self._build_failure_list(
exit_failed_processes, failed_processes
)
raise TestExecutionFailedException(
"Exceeded failure threshold", failures
)
return any_activity
def _run_sequential(self) -> List[ProcessTiming]:
"""Execute processes in sequence."""
completed_timings: List[ProcessTiming] = []
# Enable status monitoring
self._status_monitoring_active = True
try:
for process in self.processes:
print(f"Running: {process.get_command_str()}")
# Track process start time
self._track_process_start(process)
# Start the process if not already running
if process.proc is None:
process.run()
try:
exit_code = process.wait()
# Collect timing data
if process.duration is not None:
timing = ProcessTiming(
name=_get_friendly_test_name(process.command),
duration=process.duration,
command=str(process.command),
)
completed_timings.append(timing)
# Check for failure
if exit_code != 0:
error_snippet = extract_error_snippet(
process.accumulated_output
)
failure = TestFailureInfo(
test_name=_extract_test_name(process.command),
command=str(process.command),
return_code=exit_code,
output=error_snippet,
error_type="exit_failure",
)
raise TestExecutionFailedException(
f"Process failed with exit code {exit_code}", [failure]
)
except Exception as e:
print(f"Process failed: {process.get_command_str()} - {e}")
raise
finally:
self._status_monitoring_active = False
return completed_timings
def _run_with_dependencies(self) -> List[ProcessTiming]:
"""Execute processes respecting dependency order."""
completed_timings: List[ProcessTiming] = []
completed_processes: set[RunningProcess] = set()
remaining_processes = self.processes.copy()
# Enable status monitoring
self._status_monitoring_active = True
try:
while remaining_processes:
# Find processes that can run (all dependencies completed)
ready_processes = []
for process in remaining_processes:
dependencies = self._dependencies.get(process, [])
if all(dep in completed_processes for dep in dependencies):
ready_processes.append(process)
if not ready_processes:
# No processes can run - circular dependency or missing process
remaining_names = [
_get_friendly_test_name(p.command) for p in remaining_processes
]
raise RuntimeError(
f"Circular dependency or missing dependency detected for: {remaining_names}"
)
# Run the first ready process
process = ready_processes[0]
print(f"Running: {process.get_command_str()}")
# Track process start time
self._track_process_start(process)
# Start the process if not already running
if process.proc is None:
process.run()
try:
exit_code = process.wait()
# Collect timing data
if process.duration is not None:
timing = ProcessTiming(
name=_get_friendly_test_name(process.command),
duration=process.duration,
command=str(process.command),
)
completed_timings.append(timing)
# Mark as completed
completed_processes.add(process)
remaining_processes.remove(process)
# Check for failure
if exit_code != 0:
error_snippet = extract_error_snippet(
process.accumulated_output
)
failure = TestFailureInfo(
test_name=_extract_test_name(process.command),
command=str(process.command),
return_code=exit_code,
output=error_snippet,
error_type="exit_failure",
)
raise TestExecutionFailedException(
f"Process failed with exit code {exit_code}", [failure]
)
except Exception as e:
print(f"Process failed: {process.get_command_str()} - {e}")
raise
finally:
self._status_monitoring_active = False
return completed_timings