Source code for pymead.utils.pymead_mp

import multiprocessing
import multiprocessing.pool
import os
import signal
import threading
import typing

import psutil


[docs] def kill_child_processes(parent_pid: int, sig: int = signal.SIGTERM): """ Kills all child processes (using ``SIGTERM``) of a process with a given PID. Parameters ---------- parent_pid: int Process ID of the parent sig: int Signal to send (``SIGTERM`` by default) """ children = collect_child_processes(parent_pid) kill_all_processes_in_list(children, sig)
[docs] def collect_child_processes(parent_pid: int) -> typing.List[psutil.Process]: """ Recursively gathers all the child processes of a parent process (up to the default Python recursion depth) and casts them to a list. Parameters ---------- parent_pid: int Process ID of the parent Returns ------- typing.List[psutil.Process] List of all child processes of the parent process """ try: parent = psutil.Process(parent_pid) except psutil.NoSuchProcess: return [] return parent.children(recursive=True)
[docs] def kill_all_processes_in_list(processes: typing.List[psutil.Process], sig: int = signal.SIGTERM): """ Kills all ``psutil`` processes in the input list using the specified termination signal. Parameters ---------- processes: typing.List[psutil.Process] List of processes to kill sig: int Termination signal to send to the processes. Default: ``signal.SIGTERM``. """ for process in processes: try: process.send_signal(sig) except psutil.NoSuchProcess: continue
[docs] def kill_xfoil_mses_processes(sig: int = signal.SIGTERM): # In the rare case that an instance of XFOIL or MSES does not get shut down, force-close any processes with those # names xfoil_mses_processes = ["xfoil", "mset", "mses", "mplot", "mpolar"] xfoil_mses_processes.extend([name + ".exe" for name in xfoil_mses_processes]) matching_processes = [proc for proc in psutil.process_iter(["name"]) if proc.info["name"] in xfoil_mses_processes] kill_all_processes_in_list(matching_processes, sig)
[docs] def pool_terminate_multi_tiered(pool: multiprocessing.Pool): """ Multi-tiered multiprocessing pool termination function that tries several pool termination methods in sequence from least forceful to most forceful to guarantee that the pool is closed, especially when a significant portion of the CPU and RAM are being used. This function is intended for use with asynchronous parallel function calls like ``multiprocessing.Pool.apply_async()``, ``multiprocessing.Pool.map_async()``, ``multiprocessing.Pool.imap()``, ``multiprocessing.Pool.imap_unordered()``, and ``multiprocessing.Pool.starmap_async()``. First, the ``terminate`` and ``join`` methods of ``Pool`` are attempted within a daemon thread with a 5-second timeout. If the timeout is reached, each process in the pool is sent a ``CTRL_C_EVENT`` to shut it down somewhat forcefully. If the process is still alive, a ``CTRL_BREAK_EVENT`` is sent to shut it down more forcefully. After the signals are sent to the processes, the ``terminate`` method is called again. .. important:: The ``multiprocessing.Pool`` context manager must still be used to ensure that the ``close`` method is still called after the pool is terminated. """ def pool_term_primary(): """Primary pool termination. This function is attempted first.""" pool.terminate() pool.join() def pool_term_last_resort(): """ Last-resort pool termination, adapted from https://stackoverflow.com/a/47580796. """ pool._state = multiprocessing.pool.TERMINATE pool._worker_handler._state = multiprocessing.pool.TERMINATE for p in pool._pool: os.kill(p.pid, signal.CTRL_C_EVENT) if p.is_alive(): print(f"Process {p.pid} still alive after sending CTRL+C, sending CTRL+BREAK event...") os.kill(p.pid, signal.CTRL_BREAK_EVENT) while any(p.is_alive() for p in pool._pool): pass pool.terminate() term_thread = threading.Thread(target=pool_term_primary) term_thread.daemon = True term_thread.start() term_thread.join(timeout=5) if term_thread.is_alive(): pool_term_last_resort()