Files
klubhaus-doorbell/libraries/FastLED/ci/util/running_process.py
2026-02-12 00:45:31 -08:00

993 lines
36 KiB
Python

# pyright: reportUnknownMemberType=false, reportMissingParameterType=false
import _thread
import os
import queue
import shlex
import subprocess
import sys
import tempfile
import threading
import time
import traceback
import warnings
from pathlib import Path
from queue import Queue
from typing import Any, Callable, ContextManager, Iterator
class EndOfStream(Exception):
"""Sentinel used to indicate end-of-stream from the reader."""
from ci.util.output_formatter import NullOutputFormatter, OutputFormatter
# Console UTF-8 configuration is now handled globally in ci/__init__.py
class ProcessOutputReader:
"""Dedicated reader that drains a process's stdout and enqueues lines.
This keeps the stdout pipe drained to prevent blocking and forwards
transformed, non-empty lines to the provided output queue. It also invokes
lifecycle callbacks for timing/unregister behaviors.
"""
def __init__(
self,
proc: subprocess.Popen[Any],
shutdown: threading.Event,
output_formatter: OutputFormatter | None,
on_output: Callable[[str | EndOfStream], None],
on_end: Callable[[], None],
) -> None:
output_formatter = output_formatter or NullOutputFormatter()
self._proc = proc
self._shutdown = shutdown
self._output_formatter = output_formatter
self._on_output = on_output
self._on_end = on_end
self.last_stdout_ts: float | None = None
self._eos_emitted: bool = False
def _emit_eos_once(self) -> None:
"""Ensure EndOfStream is only forwarded a single time."""
if not self._eos_emitted:
self._eos_emitted = True
self._on_output(EndOfStream())
def run(self) -> None:
"""Continuously read stdout lines and forward them until EOF or shutdown."""
try:
# Begin formatter lifecycle within the reader context
try:
self._output_formatter.begin()
except Exception as e:
warnings.warn(f"Output formatter begin() failed: {e}")
assert self._proc.stdout is not None
try:
for line in self._proc.stdout:
self.last_stdout_ts = time.time()
if self._shutdown.is_set():
break
line_stripped = line.rstrip()
if not line_stripped:
continue
transformed_line = self._output_formatter.transform(line_stripped)
self._on_output(transformed_line)
except KeyboardInterrupt:
# Per project rules, handle interrupts in threads explicitly
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
print(f"Thread {thread_id} ({thread_name}) caught KeyboardInterrupt")
print(f"Stack trace for thread {thread_id}:")
traceback.print_exc()
# Try to ensure child process is terminated promptly
try:
self._proc.kill()
except Exception:
pass
# Propagate to main thread and re-raise
_thread.interrupt_main()
# EOF
self._emit_eos_once()
raise
except (ValueError, OSError) as e:
# Normal shutdown scenarios include closed file descriptors.
if "closed file" in str(e) or "Bad file descriptor" in str(e):
warnings.warn(f"Output reader encountered closed file: {e}")
pass
else:
print(f"Warning: Output reader encountered error: {e}")
finally:
# Signal end-of-stream to consumers exactly once
self._emit_eos_once()
finally:
# Cleanup stream and invoke completion callback
if self._proc.stdout and not self._proc.stdout.closed:
try:
self._proc.stdout.close()
except (ValueError, OSError) as err:
warnings.warn(f"Output reader encountered error: {err}")
pass
# Notify parent for timing/unregistration
try:
self._on_end()
finally:
# End formatter lifecycle within the reader context
try:
self._output_formatter.end()
except Exception as e:
warnings.warn(f"Output formatter end() failed: {e}")
class ProcessWatcher:
"""Background watcher that polls a process until it terminates."""
def __init__(self, running_process: "RunningProcess") -> None:
self._rp = running_process
self._thread: threading.Thread | None = None
def start(self) -> None:
name: str = "RPWatcher"
try:
if self._rp.proc is not None and self._rp.proc.pid is not None:
name = f"RPWatcher-{self._rp.proc.pid}"
except Exception:
pass
self._thread = threading.Thread(target=self._run, name=name, daemon=True)
self._thread.start()
def _run(self) -> None:
thread_id = threading.current_thread().ident
thread_name = threading.current_thread().name
try:
while not self._rp.shutdown.is_set():
# Enforce per-process timeout independently of wait()
if (
self._rp.timeout is not None
and self._rp.start_time is not None
and (time.time() - self._rp.start_time) > self._rp.timeout
):
print(
f"Process timeout after {self._rp.timeout} seconds (watcher), killing: {self._rp.command}"
)
if self._rp.enable_stack_trace:
try:
print("\n" + "=" * 80)
print("STACK TRACE DUMP (GDB Output)")
print("=" * 80)
print(self._rp._dump_stack_trace())
print("=" * 80)
except Exception as e:
print(f"Watcher stack trace dump failed: {e}")
self._rp.kill()
break
rc: int | None = self._rp.poll()
if rc is not None:
break
time.sleep(0.1)
except KeyboardInterrupt:
print(f"Thread {thread_id} ({thread_name}) caught KeyboardInterrupt")
print(f"Stack trace for thread {thread_id}:")
traceback.print_exc()
_thread.interrupt_main()
raise
except Exception as e:
# Surface unexpected errors and keep behavior consistent
print(f"Watcher thread error in {thread_name}: {e}")
traceback.print_exc()
@property
def thread(self) -> threading.Thread | None:
return self._thread
class _RunningProcessLineIterator(ContextManager[Iterator[str]], Iterator[str]):
"""Context-managed iterator over a RunningProcess's output lines.
Yields only strings (never None). Stops on EndOfStream or when a per-line
timeout elapses.
"""
def __init__(self, rp: "RunningProcess", timeout: float | None) -> None:
self._rp = rp
self._timeout = timeout
# Context manager protocol
def __enter__(self) -> "_RunningProcessLineIterator":
return self
def __exit__(
self,
exc_type: type[BaseException] | None,
exc: BaseException | None,
tb: Any | None,
) -> bool:
# Do not suppress exceptions
return False
# Iterator protocol
def __iter__(self) -> Iterator[str]:
return self
def __next__(self) -> str:
next_item: str | EndOfStream = self._rp.get_next_line(timeout=self._timeout)
if isinstance(next_item, EndOfStream):
raise StopIteration
# Must be a string by contract
return next_item
class RunningProcess:
"""
A class to manage and stream output from a running subprocess.
This class provides functionality to execute shell commands, stream their output
in real-time via a queue, and control the subprocess execution. It merges stderr
into stdout and provides thread-safe access to process output.
Key features:
- Real-time output streaming via queue
- Thread-safe output consumption
- Timeout protection with optional stack traces
- Echo mode for immediate output printing
- Process tree termination support
"""
def __init__(
self,
command: str | list[str],
cwd: Path | None = None,
check: bool = False,
auto_run: bool = True,
shell: bool | None = None,
timeout: int | None = None, # None means no global timeout
enable_stack_trace: bool = False, # Enable stack trace dumping on timeout
on_complete: Callable[[], None]
| None = None, # Callback to execute when process completes
output_formatter: OutputFormatter | None = None,
env: dict[str, str] | None = None, # Environment variables
):
"""
Initialize the RunningProcess instance.
Note: stderr is automatically merged into stdout for unified output handling.
Args:
command: The command to execute as string or list of arguments.
cwd: Working directory to execute the command in.
check: If True, raise CalledProcessError if command returns non-zero exit code.
auto_run: If True, automatically start the command when instance is created.
shell: Shell execution mode. None auto-detects based on command type.
timeout: Global timeout in seconds for process execution. None disables timeout.
enable_stack_trace: If True, dump GDB stack trace when process times out.
on_complete: Callback function executed when process completes normally.
output_formatter: Optional formatter for transforming output lines.
env: Environment variables to pass to the subprocess. None uses current environment.
"""
if shell is None:
# Default: use shell only when given a string, or when a list includes shell metachars
if isinstance(command, str):
shell = True
elif isinstance(command, list):
shell_meta = {"&&", "||", "|", ";", ">", "<", "2>", "&"}
shell = any(part in shell_meta for part in command)
else:
shell = False
self.command = command
self.shell: bool = shell
self.cwd = str(cwd) if cwd is not None else None
self.env = env
self.output_queue: Queue[str | EndOfStream] = Queue()
self.accumulated_output: list[str] = [] # Store all output for later retrieval
self.proc: subprocess.Popen[Any] | None = None
self.check = check
# Force auto_run to False if NO_PARALLEL is set
self.auto_run = False if os.environ.get("NO_PARALLEL") else auto_run
self.timeout = timeout
self.enable_stack_trace = enable_stack_trace
self.on_complete = on_complete
# Always keep a non-None formatter
self.output_formatter = (
output_formatter if output_formatter is not None else NullOutputFormatter()
)
self.reader_thread: threading.Thread | None = None
self.watcher_thread: threading.Thread | None = None
self.shutdown: threading.Event = threading.Event()
self._start_time: float | None = None
self._end_time: float | None = None
self._time_last_stdout_line: float | None = None
self._termination_notified: bool = False
if auto_run:
self.run()
def get_command_str(self) -> str:
return (
subprocess.list2cmdline(self.command)
if isinstance(self.command, list)
else self.command
)
def _dump_stack_trace(self) -> str:
"""
Dump stack trace of the running process using GDB.
Returns:
str: GDB output containing stack trace information.
"""
if self.proc is None:
return "No process to dump stack trace for."
try:
# Get the process ID
pid = self.proc.pid
# Create GDB script for attaching to running process
with tempfile.NamedTemporaryFile(mode="w+", delete=False) as gdb_script:
gdb_script.write("set pagination off\n")
gdb_script.write(f"attach {pid}\n")
gdb_script.write("bt full\n")
gdb_script.write("info registers\n")
gdb_script.write("x/16i $pc\n")
gdb_script.write("thread apply all bt full\n")
gdb_script.write("detach\n")
gdb_script.write("quit\n")
# Run GDB to get stack trace
gdb_command = f"gdb -batch -x {gdb_script.name}"
gdb_process = subprocess.Popen(
gdb_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
text=True,
)
gdb_output, _ = gdb_process.communicate(
timeout=30
) # 30 second timeout for GDB
# Clean up GDB script
os.unlink(gdb_script.name)
return gdb_output
except Exception as e:
return f"Failed to dump stack trace: {e}"
def time_last_stdout_line(self) -> float | None:
return self._time_last_stdout_line
def _handle_timeout(self, timeout: float, echo: bool = False) -> None:
"""Handle process timeout with optional stack trace and cleanup."""
cmd_str = self.get_command_str()
# Drain any remaining output before killing if echo is enabled
if echo:
remaining_lines = self.drain_stdout()
for line in remaining_lines:
print(
line, flush=(os.name == "nt")
) # Force flush only on Windows per-line
if remaining_lines:
print(
f"[Drained {len(remaining_lines)} final lines before timeout]",
flush=(os.name == "nt"),
)
if self.enable_stack_trace:
print(f"\nProcess timeout after {timeout} seconds, dumping stack trace...")
print(f"Command: {cmd_str}")
print(f"Process ID: {self.proc.pid}")
try:
stack_trace = self._dump_stack_trace()
print("\n" + "=" * 80)
print("STACK TRACE DUMP (GDB Output)")
print("=" * 80)
print(stack_trace)
print("=" * 80)
except Exception as e:
print(f"Failed to dump stack trace: {e}")
print(f"Killing timed out process: {cmd_str}")
self.kill()
raise TimeoutError(f"Process timed out after {timeout} seconds: {cmd_str}")
def drain_stdout(self) -> list[str]:
"""
Drain all currently pending stdout lines without blocking.
Consumes all available lines from the output queue until either the queue
is empty or EndOfStream is encountered. The EndOfStream sentinel is preserved
by get_next_line() for other callers.
Returns:
List of output lines that were available. Empty list if no output pending.
"""
lines: list[str] = []
while True:
try:
line = self.get_next_line(timeout=0)
if isinstance(line, EndOfStream):
break # get_next_line already handled EndOfStream preservation
lines.append(line)
except TimeoutError:
break # Queue is empty
return lines
def has_pending_output(self) -> bool:
"""
Check if there are pending output lines without consuming them.
Returns:
True if output lines are available in the queue, False otherwise.
Returns False if only EndOfStream sentinel is present.
"""
try:
with self.output_queue.mutex:
if len(self.output_queue.queue) == 0:
return False
# If the only item is EndOfStream, no actual output is pending
if len(self.output_queue.queue) == 1 and isinstance(
self.output_queue.queue[0], EndOfStream
):
return False
return True
except Exception:
return False
def run(self) -> None:
"""
Execute the command and stream its output to the queue.
Raises:
subprocess.CalledProcessError: If the command returns a non-zero exit code.
"""
assert self.proc is None
shell = self.shell
popen_command: str | list[str]
if shell and isinstance(self.command, list):
# Convert list to a single shell string with proper quoting
popen_command = subprocess.list2cmdline(self.command)
else:
popen_command = self.command
self.proc = subprocess.Popen(
popen_command,
shell=shell,
cwd=self.cwd,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, # Merge stderr into stdout
text=True, # Use text mode
encoding="utf-8", # Explicitly use UTF-8
errors="replace", # Replace invalid chars instead of failing
env=self.env, # Use provided environment variables
)
# Track start time after process is successfully created
# This excludes process creation overhead from timing measurements
self._start_time = time.time()
# Register with global process manager
try:
from ci.util.running_process_manager import (
RunningProcessManagerSingleton,
)
RunningProcessManagerSingleton.register(self)
except Exception as e:
warnings.warn(f"RunningProcessManager.register failed: {e}")
# Output formatter lifecycle is managed by ProcessOutputReader
# Prepare output reader helper
assert self.proc is not None
def _on_reader_end() -> None:
# Set end time when stdout pumper finishes; captures completion time of useful output
if self._end_time is None:
self._end_time = time.time()
# Unregister when stdout is fully drained
try:
self._notify_terminated()
except Exception as e:
warnings.warn(f"RunningProcess termination notify (drain) failed: {e}")
def _on_output(item: str | EndOfStream) -> None:
# Forward to queue and capture text lines for accumulated output
if isinstance(item, EndOfStream):
self.output_queue.put(item)
else:
# Track time of last stdout line observed
self._time_last_stdout_line = time.time()
self.output_queue.put(item)
self.accumulated_output.append(item)
reader = ProcessOutputReader(
proc=self.proc,
shutdown=self.shutdown,
output_formatter=self.output_formatter,
on_output=_on_output,
on_end=_on_reader_end,
)
# Start output reader thread
self.reader_thread = threading.Thread(target=reader.run, daemon=True)
self.reader_thread.start()
# Start watcher thread via helper class and expose thread for compatibility
self._watcher = ProcessWatcher(self)
self._watcher.start()
self.watcher_thread = self._watcher.thread
def get_next_line(self, timeout: float | None = None) -> str | EndOfStream:
"""
Get the next line of output from the process.
Args:
timeout: How long to wait for the next line in seconds.
None means wait forever, 0 means don't wait.
Returns:
str: The next line of output if available.
EndOfStream: Process has finished, no more output will be available.
Raises:
TimeoutError: If timeout is reached before a line becomes available.
"""
assert self.proc is not None
# Fast non-blocking path: honor timeout==0 by peeking before raising
if timeout == 0:
# Peek EOS without consuming
with self.output_queue.mutex:
if len(self.output_queue.queue) > 0:
head = self.output_queue.queue[0]
if isinstance(head, EndOfStream):
return EndOfStream()
# Try immediate get
try:
item_nb: str | EndOfStream = self.output_queue.get_nowait()
if isinstance(item_nb, EndOfStream):
with self.output_queue.mutex:
self.output_queue.queue.appendleft(item_nb)
return EndOfStream()
return item_nb
except queue.Empty:
if self.finished:
return EndOfStream()
raise TimeoutError("Timeout after 0 seconds")
expired_time = time.time() + timeout if timeout is not None else None
while True:
if expired_time is not None and time.time() > expired_time:
raise TimeoutError(f"Timeout after {timeout} seconds")
# Peek without popping if EndOfStream is at the front
with self.output_queue.mutex:
if len(self.output_queue.queue) > 0:
head = self.output_queue.queue[0]
if isinstance(head, EndOfStream):
return EndOfStream()
# Nothing available yet; wait briefly in blocking mode
if self.output_queue.empty():
time.sleep(0.01)
if self.finished and self.output_queue.empty():
return EndOfStream()
continue
try:
# Safe to pop now; head is not EndOfStream
item: str | EndOfStream = self.output_queue.get(timeout=0.1)
if isinstance(item, EndOfStream):
# In rare race conditions, EndOfStream could appear after peek; put back for other callers
with self.output_queue.mutex:
self.output_queue.queue.appendleft(item)
return EndOfStream()
return item
except queue.Empty:
if self.finished:
return EndOfStream()
continue
def get_next_line_non_blocking(self) -> str | None | EndOfStream:
"""
Get the next line of output from the process without blocking.
Returns:
str: Next line of output if available
None: No output available right now (should continue polling)
EndOfStream: Process has finished, no more output will be available
"""
try:
line: str | EndOfStream = self.get_next_line(timeout=0)
return line # get_next_line already handled EndOfStream preservation
except TimeoutError:
# Check if process finished while we were waiting
if self.finished:
return EndOfStream()
return None
def poll(self) -> int | None:
"""
Check the return code of the process.
"""
if self.proc is None:
return None
rc = self.proc.poll()
if rc is not None:
# Ensure unregistration only happens once
try:
self._notify_terminated()
except Exception as e:
warnings.warn(f"RunningProcess termination notify (poll) failed: {e}")
return rc
@property
def finished(self) -> bool:
return self.poll() is not None
def wait(self, echo: bool = False, timeout: float | None = None) -> int:
"""
Wait for the process to complete with timeout protection.
When echo=True, continuously drains and prints stdout lines while waiting.
Performs final output drain after process completion and thread cleanup.
Args:
echo: If True, continuously print stdout lines as they become available.
timeout: Overall timeout in seconds. If None, uses instance timeout.
If both are None, waits indefinitely.
Returns:
Process exit code.
Raises:
ValueError: If the process hasn't been started.
TimeoutError: If the process exceeds the timeout duration.
"""
if self.proc is None:
raise ValueError("Process is not running.")
# Determine effective timeout: parameter > instance > none
effective_timeout = timeout if timeout is not None else self.timeout
# Use a timeout to prevent hanging
start_time = time.time()
while self.poll() is None:
# Check overall timeout
if (
effective_timeout is not None
and (time.time() - start_time) > effective_timeout
):
self._handle_timeout(effective_timeout, echo=echo)
# Echo: drain all available output, then sleep
if echo:
lines = self.drain_stdout()
if lines:
for line in lines:
# Use print flush=True for Windows compatibility, avoid separate flush calls
print(
line, flush=(os.name == "nt")
) # Force flush only on Windows per-line
# Additional flush for Unix systems for better performance
if os.name != "nt":
sys.stdout.flush()
continue # Check for more output immediately
time.sleep(0.01) # Check every 10ms
# Process completed - drain any remaining output if echo is enabled
if echo:
remaining_lines = self.drain_stdout()
for line in remaining_lines:
print(
line, flush=(os.name == "nt")
) # Force flush only on Windows per-line
if remaining_lines:
print(
f"[Drained {len(remaining_lines)} final lines after completion]",
flush=(os.name == "nt"),
)
# Process has completed, get return code
assert self.proc is not None # For type checker
rtn = self.proc.returncode
assert rtn is not None # Process has completed, so returncode exists
is_keyboard_interrupt = (rtn == -11) or (rtn == 3221225786)
if is_keyboard_interrupt:
import _thread
print("Keyboard interrupt detected, interrupting main thread")
_thread.interrupt_main()
return 1
# Record end time only if not already set by output reader
# The output reader sets end time when stdout pumper finishes, which is more accurate
if self._end_time is None:
self._end_time = time.time()
# Wait for reader thread to finish and cleanup
if self.reader_thread is not None:
self.reader_thread.join(
timeout=0.05
) # 50ms should be plenty for thread cleanup
if self.reader_thread.is_alive():
# Reader thread didn't finish, force shutdown
self.shutdown.set()
self.reader_thread.join(timeout=0.05) # 50ms for forced shutdown
# Final drain after reader threads shut down - catch any remaining queued output
if echo:
final_lines = self.drain_stdout()
for line in final_lines:
print(
line, flush=(os.name == "nt")
) # Force flush only on Windows per-line
# Execute completion callback if provided
if self.on_complete is not None:
try:
self.on_complete()
except Exception as e:
print(f"Warning: on_complete callback failed: {e}")
# Output formatter end is handled by ProcessOutputReader
# Unregister from global process manager on normal completion
try:
self._notify_terminated()
except Exception as e:
warnings.warn(f"RunningProcess termination notify (wait) failed: {e}")
return rtn
def kill(self) -> None:
"""
Immediately terminate the process and all child processes.
Signals reader threads to shutdown, kills the entire process tree to prevent
orphaned processes, and waits for thread cleanup. Safe to call multiple times.
Note: Does not raise if process is already terminated or was never started.
"""
if self.proc is None:
return
# Record end time when killed (only if not already set by output reader)
if self._end_time is None:
self._end_time = time.time()
# Signal reader thread to stop
self.shutdown.set()
# Kill the entire process tree (parent + all children)
# This prevents orphaned clang++ processes from hanging the system
try:
from ci.util.test_env import kill_process_tree
kill_process_tree(self.proc.pid)
except KeyboardInterrupt:
print("Keyboard interrupt detected, interrupting main thread")
_thread.interrupt_main()
try:
self.proc.kill()
except (ProcessLookupError, PermissionError, OSError, ValueError) as e:
print(f"Warning: Failed to kill process tree for {self.proc.pid}: {e}")
pass
raise
except Exception as e:
# Fallback to simple kill if tree kill fails
print(f"Warning: Failed to kill process tree for {self.proc.pid}: {e}")
try:
self.proc.kill()
except (ProcessLookupError, PermissionError, OSError, ValueError):
pass # Process might already be dead
# Wait for reader thread to finish
if self.reader_thread is not None:
self.reader_thread.join(timeout=0.05) # 50ms should be plenty for cleanup
# # Drain any remaining output
# while True:
# try:
# line = self.output_queue.get_nowait()
# if line is None: # End of output marker
# break
# except queue.Empty:
# break
# Ensure unregistration even on forced kill
try:
from ci.util.running_process_manager import (
RunningProcessManagerSingleton,
)
RunningProcessManagerSingleton.unregister(self)
except Exception as e:
warnings.warn(f"RunningProcessManager.unregister (kill) failed: {e}")
def _notify_terminated(self) -> None:
"""Idempotent notification that the process has terminated.
Ensures unregister is called only once across multiple termination paths
(poll, wait, stdout drain, watcher thread) and records end time when
available.
"""
if self._termination_notified:
return
self._termination_notified = True
# Record end time only if not already set
if self._end_time is None:
self._end_time = time.time()
try:
from ci.util.running_process_manager import RunningProcessManagerSingleton
RunningProcessManagerSingleton.unregister(self)
except Exception as e:
warnings.warn(f"RunningProcessManager.unregister notify failed: {e}")
def terminate(self) -> None:
"""
Gracefully terminate the process with SIGTERM.
Raises:
ValueError: If the process hasn't been started.
"""
if self.proc is None:
raise ValueError("Process is not running.")
self.shutdown.set()
self.proc.terminate()
@property
def returncode(self) -> int | None:
if self.proc is None:
return None
return self.proc.returncode
@property
def start_time(self) -> float | None:
"""Get the process start time"""
return self._start_time
@property
def end_time(self) -> float | None:
"""Get the process end time"""
return self._end_time
@property
def duration(self) -> float | None:
"""Get the process duration in seconds, or None if not completed"""
if self._start_time is None or self._end_time is None:
return None
return self._end_time - self._start_time
@property
def stdout(self) -> str:
"""
Get the complete stdout output accumulated so far.
Returns all output lines that have been processed by the reader thread,
joined with newlines. Available even while process is still running.
Returns:
Complete stdout output as a string. Empty string if no output yet.
"""
# Return accumulated output (available even if process is still running)
return "\n".join(self.accumulated_output)
def line_iter(self, timeout: float | None) -> _RunningProcessLineIterator:
"""Return a context-managed iterator over output lines.
Args:
timeout: Per-line timeout in seconds. None waits indefinitely for each line.
Returns:
A context-managed iterator yielding non-empty, transformed stdout lines.
"""
return _RunningProcessLineIterator(self, timeout)
# NOTE: RunningProcessManager and its singleton live in ci/util/running_process_manager.py
def subprocess_run(
command: str | list[str],
cwd: Path | None,
check: bool,
timeout: int,
enable_stack_trace: bool,
) -> subprocess.CompletedProcess[str]:
"""
Execute a command with robust stdout handling, emulating subprocess.run().
Uses RunningProcess as the backend to provide:
- Continuous stdout streaming to prevent pipe blocking
- Merged stderr into stdout for unified output
- Timeout protection with optional stack trace dumping
- Standard subprocess.CompletedProcess return value
Args:
command: Command to execute as string or list of arguments.
cwd: Working directory for command execution. Required parameter.
check: If True, raise CalledProcessError for non-zero exit codes.
timeout: Maximum execution time in seconds.
enable_stack_trace: Enable GDB stack trace dumping on timeout.
Returns:
CompletedProcess with combined stdout and process return code.
stderr field is None since it's merged into stdout.
Raises:
RuntimeError: If process times out (wraps TimeoutError).
CalledProcessError: If check=True and process exits with non-zero code.
"""
# Use RunningProcess for robust stdout pumping with merged stderr
proc = RunningProcess(
command=command,
cwd=cwd,
check=False,
auto_run=True,
timeout=timeout,
enable_stack_trace=enable_stack_trace,
on_complete=None,
output_formatter=None,
)
try:
return_code: int = proc.wait()
except KeyboardInterrupt:
# Propagate interrupt behavior consistent with subprocess.run
raise
except TimeoutError as e:
# Align with subprocess.TimeoutExpired semantics by raising a CalledProcessError-like
# error with available output. Using TimeoutError here is consistent with internal RP.
completed_output: str = proc.stdout
raise RuntimeError(
f"CRITICAL: Process timed out after {timeout} seconds: {command}"
) from e
combined_stdout: str = proc.stdout
# Construct CompletedProcess (stderr is merged into stdout by design)
completed = subprocess.CompletedProcess(
args=command,
returncode=return_code,
stdout=combined_stdout,
stderr=None,
)
if check and return_code != 0:
# Raise the standard exception with captured output
raise subprocess.CalledProcessError(
returncode=return_code,
cmd=command,
output=combined_stdout,
stderr=None,
)
return completed