如何从 class 启动和停止多个子进程?

How to start and stop multiple child processes from a class?

Python程序:

import multiprocessing
import time


class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker)
            for _ in range(multiprocessing.cpu_count())]

    def _worker(self):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()


if __name__ == '__main__':
    application = Application()
    application.start()
    time.sleep(3)
    application.stop()

其输出:

starting
starting
Traceback (most recent call last):
  File "/Users/maggyero/Desktop/application.py", line 31, in <module>
    application.start()
  File "/Users/maggyero/Desktop/application.py", line 21, in start
    process.start()
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/process.py", line 121, in start
    self._popen = self._Popen(self)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 224, in _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/context.py", line 284, in _Popen
    return Popen(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 32, in __init__
    super().__init__(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_fork.py", line 19, in __init__
    self._launch(process_obj)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/popen_spawn_posix.py", line 47, in _launch
    reduction.dump(process_obj, fp)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/reduction.py", line 60, in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle 'weakref' object
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 116, in spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/spawn.py", line 126, in _main
    self = reduction.pickle.load(from_parent)
  File "/usr/local/Cellar/python@3.9/3.9.10/Frameworks/Python.framework/Versions/3.9/lib/python3.9/multiprocessing/synchronize.py", line 110, in __setstate__
    self._semlock = _multiprocessing.SemLock._rebuild(*state)
FileNotFoundError: [Errno 2] No such file or directory

在函数 Application.__init__ 中,每次调用 multiprocessing.Process(target=self._worker) 都会使用实例方法 self._worker 作为其 target 参数初始化一个 multiprocessing.Process 实例。 self._worker 绑定到具有实例属性 self._processes.

self

在函数 Application.start 中,每次调用 process.start() 都会序列化 target 参数,因此 self._processesself._processesmultiprocessing.Process 个实例的列表,最初尚未启动。第一次调用 process.start() 启动该列表中的第一个 multiprocessing.Process 实例没有问题,但第二次调用 process.start() 失败。

因此无法序列化已启动的 multiprocessing.Process 实例。如何解决这个问题?

问题的根源在于 multiprocessing.Process 实例的 start 方法将其 _popen 实例属性设置为 multiprocessing.popen_*.Popen 实例。该实例的初始化执行以下两个步骤(以及其他步骤):

  1. 对于 multiprocessing.popen_spawn_posix.Popen 实例、multiprocessing.popen_spawn_win32.Popen 实例或 multiprocessing.popen_forkserver.Popen 实例但不是 multiprocessing.popen_fork.Popen 实例(即对于启动方法'spawn' 或启动方法 'forkserver' 但不是启动方法 'fork'),它 serialises multiprocessing.Process 实例用于将其写入管道的末尾父进程与子进程通信,以便子进程可以执行 multiprocessing.Process 实例的 run 方法。

  2. 它将 setsfinalizer 实例属性设置为 multiprocessing.util.Finalize 实例,该实例本身将其 _weakref 实例属性设置为 weakref.ref用于在解释器出口处关闭父进程用于与子进程通信的管道末端的实例。换句话说,它使 multiprocessing.Process 实例持有弱引用。

因此,如果 multiprocessing.Process 实例持有对已启动 multiprocessing.Process 实例的引用,那么它持有弱引用(第 2 点),因此启动它将失败,因为它将序列化(第 1 点)弱引用和弱引用不可序列化:

import multiprocessing

if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')  # or 'forkserver' but not 'fork'
    process_a = multiprocessing.Process()
    process_b = multiprocessing.Process()
    process_b.foo = process_a
    process_a.start()  # creates process_a._popen.finalizer._weakref
    process_b.start()  # TypeError: cannot pickle 'weakref' object

显示序列化问题的最小 Python 程序:

import pickle
import weakref

pickle.dumps(weakref.ref(int))  # TypeError: cannot pickle 'weakref' object

避免序列化问题的两种解决方法:

  • 要么将 target 参数设为 classmethod,这样它就不会绑定到 self(尤其是实例属性 self._processes):
class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker, args=(self._event,))
            for _ in range(multiprocessing.cpu_count())]

    @classmethod
    def _worker(self, event):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()
  • 或者从 target 参数的序列化中特别排除实例属性 self._processes __getstate__:
class Application:

    def __init__(self):
        self._event = multiprocessing.Event()
        self._processes = [
            multiprocessing.Process(target=self._worker)
            for _ in range(multiprocessing.cpu_count())]

    def _worker(self):
        while not self._event.is_set():
            print(multiprocessing.current_process().name)
            time.sleep(1)

    def start(self):
        for process in self._processes:
            print('starting')
            process.start()

    def stop(self):
        self._event.set()
        for process in self._processes:
            process.join()

    def __getstate__(self):
        state = {}
        for key, value in vars(self).items():
            if key != '_processes':
                state[key] = value
        return state