为什么在 Python3.8+ `multiprocessing` 中使用 "fork" 有效但使用 "spawn" 失败?

Why using "fork" works but using "spawn" fails in Python3.8+ `multiprocessing`?

我在 macOS 上工作,最近被 Python 3.8 多处理中的“fork”更改为“spawn”(请参阅​​ doc)。下面显示了一个简化的工作示例,其中使用“fork”成功但使用“spawn”失败。该代码的目的是创建一个支持在 macOS 下调用 size() 的自定义队列对象,因此继承自 Queue 对象并获取多处理的上下文。

import multiprocessing
from multiprocessing import Process
from multiprocessing.queues import Queue
from time import sleep


class Q(Queue):
    def __init__(self):
        super().__init__(ctx=multiprocessing.get_context())
        self.size = 1

    def call(self):
        return print(self.size)


def foo(q):
    q.call()


if __name__ == '__main__':
    multiprocessing.set_start_method('spawn')  # this would fail
    # multiprocessing.set_start_method('fork')  # this would succeed
    q = Q()
    p = Process(target=foo, args=(q,))
    p.start()
    p.join(timeout=1)

使用“spawn”时输出的错误信息如下所示。

Process Process-1:
Traceback (most recent call last):
  File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/local/Cellar/python@3.8/3.8.5/Frameworks/Python.framework/Versions/3.8/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "/Users/fanchen/Private/python_work/sandbox.py", line 23, in foo
    q.call()
  File "/Users/fanchen/Private/python_work/sandbox.py", line 19, in call
    return print(self.size)
AttributeError: 'Q' object has no attribute 'size'

看来子进程认为self.size不是代码执行所必需的,所以没有复制。我的问题是为什么会这样?

在 macOS Catalina 10.15.6、Python 3.8.5

下测试的代码片段

问题是生成的进程没有共享资源,因此要为每个进程正确地重新创建队列实例,您需要添加序列化和反序列化方法。 这是一个工作代码:

# Portable queue
# The idea of Victor Terron used in Lemon project (https://github.com/vterron/lemon/blob/master/util/queue.py).
# Pickling/unpickling methods are added to share Queue instance between processes correctly.

import multiprocessing
import multiprocessing.queues

class SharedCounter(object):
    """ A synchronized shared counter.

    The locking done by multiprocessing.Value ensures that only a single
    process or thread may read or write the in-memory ctypes object. However,
    in order to do n += 1, Python performs a read followed by a write, so a
    second process may read the old value before the new one is written by the
    first process. The solution is to use a multiprocessing.Lock to guarantee
    the atomicity of the modifications to Value.

    This class comes almost entirely from Eli Bendersky's blog:
    http://eli.thegreenplace.net/2012/01/04/shared-counter-with-pythons-multiprocessing/

    """

    def __init__(self, n = 0):
        self.count = multiprocessing.Value('i', n)

    def __getstate__(self):
        return (self.count,)

    def __setstate__(self, state):
        (self.count,) = state

    def increment(self, n = 1):
        """ Increment the counter by n (default = 1) """
        with self.count.get_lock():
            self.count.value += n

    @property
    def value(self):
        """ Return the value of the counter """
        return self.count.value

class Queue(multiprocessing.queues.Queue):
    """ A portable implementation of multiprocessing.Queue.

    Because of multithreading / multiprocessing semantics, Queue.qsize() may
    raise the NotImplementedError exception on Unix platforms like Mac OS X
    where sem_getvalue() is not implemented. This subclass addresses this
    problem by using a synchronized shared counter (initialized to zero) and
    increasing / decreasing its value every time the put() and get() methods
    are called, respectively. This not only prevents NotImplementedError from
    being raised, but also allows us to implement a reliable version of both
    qsize() and empty().

    """

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs, ctx=multiprocessing.get_context())
        self._counter = SharedCounter(0)

    def __getstate__(self):
        return super().__getstate__() + (self._counter,)

    def __setstate__(self, state):
        super().__setstate__(state[:-1])
        self._counter = state[-1]

    def put(self, *args, **kwargs):
        super().put(*args, **kwargs)
        self._counter.increment(1)

    def get(self, *args, **kwargs):
        item = super().get(*args, **kwargs)
        self._counter.increment(-1)
        return item

    def qsize(self):
        """ Reliable implementation of multiprocessing.Queue.qsize() """
        return self._counter.value

    def empty(self):
        """ Reliable implementation of multiprocessing.Queue.empty() """
        return not self.qsize()

您也可以使用multiprocessing.manager.Queue