使用多处理将数组添加到全局数组

Adding arrays to global array using multiprocessing

我有一个全局 NumPy 数组 ys_final 并定义了一个生成数组 ys 的函数。 ys 数组将根据输入参数生成,我想将这些 ys 数组添加到全局数组中,即 ys_final = ys_final + ys .
添加的顺序无关紧要所以我想使用多处理库中的 Pool.apply_async() 但我不能写到全局数组。参考代码为:

import multiprocessing as mp

ys_final = np.zeros(len)
def ys_genrator(i):
    #code to generate ys array
    return ys
pool = mp.Pool(mp.cpu_count())
for i in range(3954):
    ys_final = ys_final + pool.apply_async(ys_genrator, args=(i)).get()
pool.close()
pool.join()

上面的代码块一直在 运行 并且什么也没有发生。我也试过 *mp.Process 但我仍然面临同样的问题。我在那里定义了一个直接添加到全局数组的目标函数,但它也不起作用,因为该块永远保持 运行 。参考:

def func(i):
    #code to generate ys
    global ys_final
    ys_final = ys_final + ys

for i in range(3954):
    p = mp.Process(target=func, args=(i,))
    p.start()
    p.join()

任何建议都会很有帮助。

编辑: 我的ys_genrator是线性插值的函数。基于参数 i(二维图像中的行索引),该函数创建一个插值幅度数组,该数组将与图像中的所有插值幅度叠加,因此 ys 需要添加到 ys_final 变量 len 是插值数组的长度,所有行都相同。

作为参考,更简单的ys_genrator(i)版本如下:

def ys_genrator(i):
    ys = np.ones(10)*i
    return ys

几点:

  1. pool.apply_async(ys_genrator, args=(i)) 需要 pool.apply_async(ys_genrator, args=(i,))。注意 i.
  2. 后面的逗号
  3. pool.apply_async(ys_genrator, args=(i,)).get() 完全等同于 pool.apply(ys.genrator, args=(i,))。也就是说,您会因为立即调用 get 而阻塞,并且您将完全没有并行性。您需要对 pool.apply_async 进行所有调用并保存返回的 AsyncResult 个实例,然后才对这些实例调用 get
  4. 如果你在Windows下运行,你就会有问题。创建新进程的代码必须在 if __name__ == '__main__':
  5. 控制的块中
  6. 如果您 运行 在 Jupyter Notebook 或 iPython 之类的东西下,您将遇到问题。工作函数 ys_genrator 需要在外部文件中导入。
  7. 使用apply_async提交大量任务效率低下。您最好使用 imapimap_unordered,其中任务以“块”形式提交,您可以在结果可用时一个一个地处理它们。但是你必须选择一个“合适的”chunksize参数。
  8. 如果您在 Windows 下的 运行,那么您在全局级别的任何代码,例如 ys_final = np.zeros(len) 将由每个子进程执行,如果子流程不需要“看到”这个变量。如果他们确实需要查看此变量,请注意池中的每个进程都将使用自己的变量副本,因此最好是只读用法。即使这样,如果变量很大,也会非常浪费存储空间。有多种方法可以跨进程共享这样的变量,但您是否需要这样做并不完全清楚(您甚至还没有定义变量 len)。所以很难给你改进的代码。但是,您的 worker 函数似乎不需要“查看”ys_final,因此我将试一试改进的解决方案。
  9. 但请注意,如果您的函数 ys_genrator 非常微不足道,那么使用多处理将一无所获,因为创建处理池和将参数从一个进程传递到另一个进程都会产生开销。此外,如果 ys_genrator 使用 numpy,这也可能是问题的根源,因为 numpy 对其自身的某些功能使用多处理,最好不要混合使用 numpy使用您自己的多重处理。
import multiprocessing as mp
import numpy as np

SIZE = 3

def ys_genrator(i):
    #code to generate ys array
    # for this dummy example all SIZE entries will end up with the same result:
    ys = [i] * SIZE # for example: [1, 1, 1]
    return ys

def compute_chunksize(poolsize, iterable_size):
    chunksize, remainder = divmod(iterable_size, 4 * poolsize)
    if remainder:
        chunksize += 1
    return chunksize

if __name__ == '__main__':
    ys_final = np.zeros(SIZE)
    n_iterations = 3954
    poolsize = min(mp.cpu_count(), n_iterations)
    chunksize = compute_chunksize(poolsize, n_iterations)
    print('poolsize =', poolsize, 'chunksize =', chunksize)
    pool = mp.Pool(poolsize)
    for result in pool.imap_unordered(ys_genrator, range(n_iterations), chunksize):
        ys_final += result
    print(ys_final)

打印:

poolsize = 8 chunksize = 124
[7815081. 7815081. 7815081.]

更新

您也可以只使用:

    for result in pool.map(ys_genrator, range(n_iterations)):
        ys_final += result

问题是,当您使用方法 map 时,该方法想要根据可迭代参数的大小计算一个有效的 chunksize 参数(参见上面我的 compute_chunksize 函数,这基本上就是 pool.map 将使用的)。但要做到这一点,必须先将可迭代对象转换为列表以获得其大小。如果 n_iterations 非常大,这不是很有效,尽管对于 3954 的大小这可能不是主要问题。不过,在这种情况下你最好使用我的 compute_chunksize 函数,因为你知道iterable 的大小,然后将 chunksize 参数显式传递给 map,就像我在代码中使用 imap_unordered.

所做的那样