如何从 multiprocessing.Pool.map 的 worker_funtion 内部给数组赋值?

How to assign values to array from inside the worker_funtion of multiprocessing.Pool.map?

基本上我想要的是将那些 2 插入 ar,以便 arworker_function.

之外被更改
import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(i=None, ar=None):
    val = 2
    ar[i] = val
    print(ar)


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar=ar)
    mp.Pool(1).map(func_part, range(2))
    print(ar)


if __name__ == '__main__':
    main()

到目前为止,我唯一能做到的就是在 worker_function 内更改 ar 的副本,但不能在函数外更改:

[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]
[0. 0. 0. 0. 0.]

首先,您对 worker_function 的参数定义顺序错误。

如您所见,每个进程都会获得数组的副本。你能做的最好的就是 return 修改后的数组:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    val = 2
    ar[i] = val
    #print(ar)
    return ar # return modified array


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
    for array in arrays:
        print(array)


if __name__ == '__main__':
    main()

打印:

[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]

但现在您要处理两个单独修改的数组。您必须添加额外的逻辑才能将这两个数组的结果合并为一个:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    val = 2
    ar[i] = val
    #print(ar)
    return ar # return modified array


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
    for i in range(2):
        ar[i] = arrays[i][i]
    print(ar)


if __name__ == '__main__':
    main()

打印:

[2. 2. 0. 0. 0.]

但更有意义的是 worker_function 只是 return 一个给出被修改元素的索引和新值的元组:

import numpy as np
import multiprocessing as mp
from functools import partial


def worker_function(ar, i): # put the arguments in the correct order!
    return i, i + 3 # index, new value


def main():
    ar = np.zeros(5)
    func_part = partial(worker_function, ar)
    results = mp.Pool(2).map(func_part, range(2))
    for index, value in results:
        ar[index] = value
    print(ar)


if __name__ == '__main__':
    main()

打印:

[3. 4. 0. 0. 0.]

当然,如果worker_function修改了多个值,它会return一个元组的元组。

最后,如果您确实需要将对象传递给 sub-processes,还有另一种使用池初始化程序的方法:

import numpy as np
import multiprocessing as mp


def pool_initializer(ar):
    global the_array

    the_array = ar


def worker_function(i):
    return i, the_array[i] ** 2 # index, value


def main():
    ar = np.array([1,2,3,4,5])
    with mp.Pool(5, pool_initializer, (ar,)) as pool:
        results = pool.map(worker_function, range(5))
    for index, value in results:
        ar[index] = value
    print(ar)


if __name__ == '__main__':
    main()

打印:

[ 1  4  9 16 25]

为了性能,您应该在此处使用 shared-memory multiprocessing.Array 以避免一次又一次地跨不同进程重建和发送数组。该数组在所有进程中都是相同的,在您发送副本的示例中情况并非如此。这也是您看不到父级所做更改的原因。

import multiprocessing as mp
import numpy as np


def worker_function(i):
    global arr
    val = 2
    arr[i] = val
    print(mp.current_process().name, arr[:])


def init_arr(arr):
    globals()['arr'] = arr


def main():
    # as long as we don't conditionally modify the same indices 
    # from multiple workers, we don't need the lock ...
    arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
    mp.Pool(2, initializer=init_arr, initargs=(arr,)).map(worker_function, range(5))
    print(mp.current_process().name, arr[:])


if __name__ == '__main__':
    main()

输出:

ForkPoolWorker-1 [2, 0, 0, 0, 0]
ForkPoolWorker-2 [2, 2, 0, 0, 0]
ForkPoolWorker-1 [2, 2, 2, 0, 0]
ForkPoolWorker-2 [2, 2, 2, 2, 0]
ForkPoolWorker-1 [2, 2, 2, 2, 2]
MainProcess [2, 2, 2, 2, 2]

Process finished with exit code 0