多个进程 - python - 多维数组的简单循环

multiple processes - python - simple loop with multidimensional array

多处理的新手,我无法理解文档。我如何 运行 跨多个进程使用这种形式的功能?这是我的尝试。我想运行一个卷积(of start with other)通过多个进程提高速度。

from multiprocessing import Process, Value, Array 
import numpy as np
import scipy

A = np.ones((10,10,4))
B = np.random.rand(10,10)

def loop(i):
     C[:,:,i] = scipy.ndimage.convolve(A[:,:,i],B)

if __name__ == '__main__':
     C = Array('i',np.zeros((10,10,4)))
     arr    = Array('i',range(4))
     for i in arr:
        p = Process(target = loop, args=i)
        p.start()
        p.join()

当前错误:

Traceback (most recent call last):

File "", line 11, in y_test = Array('i',np.zeros((10,10,4)))

File "/usr/lib/python2.7/multiprocessing/init.py", line 260, in Array return Array(typecode_or_type, size_or_initializer, **kwds)

File "/usr/lib/python2.7/multiprocessing/sharedctypes.py", line 115, in Array obj = RawArray(typecode_or_type, size_or_initializer)

File "/usr/lib/python2.7/multiprocessing/sharedctypes.py", line 89, in RawArray result.init(*size_or_initializer)

TypeError: only length-1 arrays can be converted to Python scalars

y_test 将是输出

异常

这里出现异常,因为 Array(...) 的第一个参数需要是有效的 ctype,例如 c_boolc_int、...,或者代表这种类型的单个字符,如 'i'。 'y_test' 不是有效的 ctype。您似乎输入了变量名,而不是它们的类型。

试试这个,如果你想在这里使用整数:

y_test = Array('i', np.zeros((10,10,4)))

看看the Array documentation for more details on how the Array works. There is also a list of valid type codes可以用

额外的想法,并行执行

此外,请记住,该过程只会 运行 函数 loop 和您提供的参数。您提供了两个数组,但循环函数似乎需要一个数组和一个整数作为参数。当您想 运行 循环函数的多个实例具有不同的 i 值时,您可以创建多个 Process 实例并给它们每个不同的值作为参数,如下所示:

p1 = Process(target = loop, args = (y_test, 1))
p2 = Process(target = loop, args = (y_test, 2))

每个进程都会运行 具有不同参数的循环函数。你也可以把它放在一个循环中,迭代你的 arr 来处理 arr 中的每个值,我认为这就是你想要的。

违规行是

y_test = Array('y_test',np.zeros((10,10,4)))

这是完全错误的。文档说 multiprocessing.Array

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
...
typecode_or_type determines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by the array module...

错误只是说 y_test 不是已知类型。

无论如何,你没有把问题放在正确的一边。

因为一个multiprocessing.Array是存储在共享内存中的原始类型数组。在这里,您可以通过 numpy 数组的 flat 迭代器轻松构建这样的数组:

y_test = Array('d', np.zeros((10,10,4)).flat)

y_test 现在是 400 个双精度的共享内存数组

让我们暂时忘记多处理。你可以这样处理:

for i in range(r):
    npi = start[:,:,i] + i
    y_test[10*10*i:10*10*(i+1)] = npi.flatten()

您现在可以将其转换回 numpy 数组:

resul = np.array(y_test).reshape((10,10,4), order = 'f')

这里有趣的一点是,每次处理 10x10 数组都会访问共享内存的不同部分:我们可以在不同的进程中构建它

def process(a, i):
    npi = start[:,:, i] + i    # computes a numpy array
    a[10*10*i:10*10*(i+1)] = npi.flatten()  # stores it in shared memory

def main():
    y_test = Array('d', np.zeros((10,10,4)).flat) # initializes shared memory
    processes = []
    for i in range(4):  # uses 4 processes, one per each 10x10 array
        p = Process(target = process, args = (y_test, i))
        processes.append(p)
        p.start()
        print("Start", i, p.pid)  # to be sure the processes were started
    for i in range(4):
        p.join()        # join child processes
        print("Join", i, p.exitcode)
    resul = np.array(y_test).reshape((10,10,4), order='f') # rebuild the final numpy array
    print(resul)

if __name__ == '__main__':
    main()