尝试访问 multiprocessing.Pool 工作进程中的持久数据时出现不稳定的运行时异常

Getting erratic runtime exceptions trying to access persistant data in multiprocessing.Pool worker processes

灵感来自 this solution I am trying to set up a multiprocessing pool of worker processes in Python. The idea is to pass some data to the worker processes before they actually start their work and reuse it eventually. It's intended to minimize the amount of data which needs to be packed/unpacked for every call into a worker process (i.e. reducing inter-process communication overhead). My MCVE 看起来像这样:

import multiprocessing as mp
import numpy as np

def create_worker_context():
    global context # create "global" context in worker process
    context = {}

def init_worker_context(worker_id, some_const_array, DIMS, DTYPE):
    context.update({
        'worker_id': worker_id,
        'some_const_array': some_const_array,
        'tmp': np.zeros((DIMS, DIMS), dtype = DTYPE),
        }) # store context information in global namespace of worker
    return True # return True, verifying that the worker process received its data

class data_analysis:
    def __init__(self):
        self.DTYPE = 'float32'
        self.CPU_LEN = mp.cpu_count()
        self.DIMS = 100
        self.some_const_array = np.zeros((self.DIMS, self.DIMS), dtype = self.DTYPE)
        # Init multiprocessing pool
        self.cpu_pool = mp.Pool(processes = self.CPU_LEN, initializer = create_worker_context) # create pool and context in workers
        pool_results = [
            self.cpu_pool.apply_async(
                init_worker_context,
                args = (core_id, self.some_const_array, self.DIMS, self.DTYPE)
            ) for core_id in range(self.CPU_LEN)
            ] # pass information to workers' context
        result_batches = [result.get() for result in pool_results] # check if they got the information
        if not all(result_batches): # raise an error if things did not work
            raise SyntaxError('Workers could not be initialized ...')

    @staticmethod
    def process_batch(batch_data):
        context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
        return context['tmp'] # return result

    def process_all(self):
        input_data = np.arange(0, self.DIMS ** 2, dtype = self.DTYPE).reshape(self.DIMS, self.DIMS)
        pool_results = [
            self.cpu_pool.apply_async(
                data_analysis.process_batch,
                args = (input_data,)
            ) for _ in range(self.CPU_LEN)
            ] # let workers actually work
        result_batches = [result.get() for result in pool_results]
        for batch in result_batches[1:]:
            np.add(result_batches[0], batch, out = result_batches[0]) # reduce batches
        print(result_batches[0]) # show result

if __name__ == '__main__':
    data_analysis().process_all()

我是运行以上用CPython3.6.6。

奇怪的是...有时有效,有时无效。如果不起作用,process_batch 方法将抛出异常,因为它无法在 context 中找到 some_const_array 作为键。完整的回溯看起来像这样:

(env) me@box:/path> python so.py 
multiprocessing.pool.RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/path/so.py", line 37, in process_batch
    context['tmp'][:,:] = context['some_const_array'] + batch_data # some fancy computation in worker
KeyError: 'some_const_array'
"""

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/path/so.py", line 54, in <module>
    data_analysis().process_all()
  File "/path/so.py", line 48, in process_all
    result_batches = [result.get() for result in pool_results]
  File "/path/so.py", line 48, in <listcomp>
    result_batches = [result.get() for result in pool_results]
  File "/python3.6/multiprocessing/pool.py", line 644, in get
    raise self._value
KeyError: 'some_const_array'

我很纳闷。这里发生了什么?

如果我的 context 词典包含 "higher type" 的对象,例如数据库驱动程序或类似的,我没有遇到这种问题。如果我的 context 词典包含基本的 Python 数据类型、集合或 numpy 数组,我只能重现它。

(是否有可能以更可靠的方式实现相同目标的更好方法?我知道我的方法被认为是 hack ...)

您需要将 init_worker_context 的内容重新定位到您的 initializer 函数中 create_worker_context

您假设 每个 工作进程都将 运行 init_worker_context 造成您的困惑。 您提交给池的任务被送入一个内部任务队列,所有工作进程都从中读取。你的情况是,一些工作进程完成了他们的任务并再次竞争获得新任务。因此,可能会发生一个工作进程将执行多个任务而另一个工作进程不会执行一个任务的情况。请记住 OS 计划 运行 线程(工作进程)的时间。