在 python 多处理中传递共享内存变量

Passing shared memory variables in python multiprocessing

我想使用 Python 的多处理并行读取一堆文件,并将所有数据收集在单个 NumPy 数组中。为此,我想定义一个共享内存 NumPy 数组,并将其切片传递给不同的进程以并行读取。下面的代码给出了我正在尝试做的玩具插图,我正在尝试使用多处理修改 numpy 数组。

示例 1:


import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    # Need to fill this array in parallel
    arr = np.zeros(4)
    p = multiprocessing.Pool(4)
    # Passing slices to arr to modify using multiprocessing
    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

在此代码中,我希望用 0、1、2、3 填充 arr。然而,这会将 arr 打印为全零。看完答案here,我用multiprocessing.Array定义共享内存变量,修改我的代码如下

示例 2:

import numpy as np
import multiprocessing

def do_stuff(i, arr):
    arr[:]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    # Shared memory Array
    shared = multiprocessing.Array('d', 4)
    arr = np.ctypeslib.as_array(shared.get_obj())

    for i in idx:
        p.apply(do_stuff, args=(i,arr[i:i+1]))
    p.close()
    p.join()
    print(arr)

这也会打印 arr 的所有零。但是,当我在 main 外部定义数组并使用 pool.map 时,代码有效。例如,以下代码有效

示例 3:

import numpy as np
import multiprocessing

shared = multiprocessing.Array('d', 4)
arr = np.ctypeslib.as_array(shared.get_obj())

def do_stuff(i):
    arr[i]=i
    return

def print_error(err):
    print(err)

if __name__ == '__main__':
    idx = [0,1,2,3]
    p = multiprocessing.Pool(4)
    shared = multiprocessing.Array('d', 4)
    p.map(do_stuff, idx)
    p.close()
    p.join()
    print(arr)
             

这会打印 [0,1,2,3]。

这一切让我很困惑。我的问题是:

  1. 当我定义arr = np.zeros(4)时,哪个处理器拥有这个变量?当我将这个数组的切片发送到不同的处理器时,如果这些处理器上没有定义这个变量,那么发送的是什么。

  2. 为什么示例 2 不起作用而示例 3 起作用?

我正在研究 Linux 和 Python/3.7/4

When I define arr = np.zeros(4), which processor owns this variable?

只有主进程才能访问它。如果您使用“fork”作为启动方法,child 进程可以访问所有内容,但是一旦有人试图修改它,它将被复制到它自己的私有内存 space 之前已修改(写入时复制)。如果您有大型 read-only 数组,这会减少开销,但对将数据写回这些数组没有多大帮助。

what is being sent if this variable is not defined on those processors.

当参数是 re-constructed 从主进程通过管道和 pickle 发送后,在 child 进程中创建一个新数组。数据被序列化为文本和 re-constructed,因此除了切片中数据的值之外,没有其他信息保留。这是一个全新的 object.

Why doesn't example 2 work while example 3 does?

示例 3 之所以有效,是因为在“分叉”时(您调用 Pool 的那一刻),arr 已经创建,并且将被共享。使用 Array 创建它也很重要,因此当您尝试修改数据时,数据将被共享(具体机制很复杂)。

示例 2 的工作方式与示例 1 的工作方式不同:您将数组的一部分作为参数传递,它会转换为全新的 object,因此 arr 在你的 do_stuff 函数中只是来自主进程的 arr[i:i+1] 的副本。在调用 Pool 之前创建将在进程之间共享的任何内容仍然很重要(如果您依赖“fork”来共享数据),但这不是此示例不起作用的原因。

您应该知道:示例 3 之所以有效,是因为您使用的是 linux,并且默认启动方法是 fork。这不是首选的启动方法,因为在锁定状态下复制锁 object 可能会导致死锁。这在 Windows 上根本不起作用,在 MacOS 3.8 及更高版本上默认情况下也不起作用。

所有这一切的最佳解决方案(最便携)是将 Array 本身作为参数传递,并将 re-construct numpy 数组传递给 child 进程。这有一个复杂的问题,即“共享 objects”只能在创建 child 进程时作为参数传递。如果您使用 Process,这没什么大不了的,但是对于 Pool,您基本上必须将任何共享的 object 作为参数传递给初始化函数,并获得 re-constructed 数组作为 child 作用域的全局变量。例如,在此示例中,您尝试将 buf 作为参数传递给 p.mapp.apply 时会出错,但在将 buf 作为 initargs=(buf,) 传递给Pool()

import numpy as np
from multiprocessing import Pool, Array

def init_child(buf):
    global arr #use global context (for each process) to pass arr to do_stuff
    arr = np.frombuffer(buf.get_obj(), dtype='d')

def do_stuff(i):
    global arr
    arr[i]=i

if __name__ == '__main__':
    idx = [0,1,2,3]
    
    buf = Array('d', 4)
    arr = np.frombuffer(buf.get_obj(), dtype='d')
    arr[:] = 0
    
    #"with" context is easier than writing "close" and "join" all the time
    with Pool(4, initializer=init_child, initargs=(buf,)) as p:
        for i in idx:
            p.apply(do_stuff, args=(i,)) #you could pass more args to get slice indices too
    print(arr)

对于 3.8 及更高版本,有一个新模块比 Array 或任何其他 sharedctypes 类 更好,称为:shared_memory。这使用起来有点复杂,并且有一些额外的 OS 依赖性,但理论上它的开销更低,速度更快。如果你想深入了解我已经写了一篇 answers on the topic of shared_memory,并且最近一直在回答很多关于并发的问题,如果你想看看我上一两个月的回答。