多个进程 - python - 多维数组的简单循环
multiple processes - python - simple loop with multidimensional array
多处理的新手,我无法理解文档。我如何 运行 跨多个进程使用这种形式的功能?这是我的尝试。我想运行一个卷积(of start with other)通过多个进程提高速度。
from multiprocessing import Process, Value, Array
import numpy as np
import scipy
A = np.ones((10,10,4))
B = np.random.rand(10,10)
def loop(i):
C[:,:,i] = scipy.ndimage.convolve(A[:,:,i],B)
if __name__ == '__main__':
C = Array('i',np.zeros((10,10,4)))
arr = Array('i',range(4))
for i in arr:
p = Process(target = loop, args=i)
p.start()
p.join()
当前错误:
Traceback (most recent call last):
File "", line 11, in
y_test = Array('i',np.zeros((10,10,4)))
File "/usr/lib/python2.7/multiprocessing/init.py", line 260, in
Array
return Array(typecode_or_type, size_or_initializer, **kwds)
File "/usr/lib/python2.7/multiprocessing/sharedctypes.py", line 115,
in Array
obj = RawArray(typecode_or_type, size_or_initializer)
File "/usr/lib/python2.7/multiprocessing/sharedctypes.py", line 89,
in RawArray
result.init(*size_or_initializer)
TypeError: only length-1 arrays can be converted to Python scalars
y_test 将是输出
异常
这里出现异常,因为 Array(...) 的第一个参数需要是有效的 ctype
,例如 c_bool
、c_int
、...,或者代表这种类型的单个字符,如 'i'
。 'y_test' 不是有效的 ctype。您似乎输入了变量名,而不是它们的类型。
试试这个,如果你想在这里使用整数:
y_test = Array('i', np.zeros((10,10,4)))
看看the Array documentation for more details on how the Array works. There is also a list of valid type codes可以用
额外的想法,并行执行
此外,请记住,该过程只会 运行 函数 loop
和您提供的参数。您提供了两个数组,但循环函数似乎需要一个数组和一个整数作为参数。当您想 运行 循环函数的多个实例具有不同的 i 值时,您可以创建多个 Process 实例并给它们每个不同的值作为参数,如下所示:
p1 = Process(target = loop, args = (y_test, 1))
p2 = Process(target = loop, args = (y_test, 2))
每个进程都会运行 具有不同参数的循环函数。你也可以把它放在一个循环中,迭代你的 arr
来处理 arr 中的每个值,我认为这就是你想要的。
违规行是
y_test = Array('y_test',np.zeros((10,10,4)))
这是完全错误的。文档说 multiprocessing.Array
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
...
typecode_or_type determines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by the array module...
错误只是说 y_test
不是已知类型。
无论如何,你没有把问题放在正确的一边。
因为一个multiprocessing.Array
是存储在共享内存中的原始类型数组。在这里,您可以通过 numpy 数组的 flat
迭代器轻松构建这样的数组:
y_test = Array('d', np.zeros((10,10,4)).flat)
y_test 现在是 400 个双精度的共享内存数组
让我们暂时忘记多处理。你可以这样处理:
for i in range(r):
npi = start[:,:,i] + i
y_test[10*10*i:10*10*(i+1)] = npi.flatten()
您现在可以将其转换回 numpy 数组:
resul = np.array(y_test).reshape((10,10,4), order = 'f')
这里有趣的一点是,每次处理 10x10 数组都会访问共享内存的不同部分:我们可以在不同的进程中构建它
def process(a, i):
npi = start[:,:, i] + i # computes a numpy array
a[10*10*i:10*10*(i+1)] = npi.flatten() # stores it in shared memory
def main():
y_test = Array('d', np.zeros((10,10,4)).flat) # initializes shared memory
processes = []
for i in range(4): # uses 4 processes, one per each 10x10 array
p = Process(target = process, args = (y_test, i))
processes.append(p)
p.start()
print("Start", i, p.pid) # to be sure the processes were started
for i in range(4):
p.join() # join child processes
print("Join", i, p.exitcode)
resul = np.array(y_test).reshape((10,10,4), order='f') # rebuild the final numpy array
print(resul)
if __name__ == '__main__':
main()
多处理的新手,我无法理解文档。我如何 运行 跨多个进程使用这种形式的功能?这是我的尝试。我想运行一个卷积(of start with other)通过多个进程提高速度。
from multiprocessing import Process, Value, Array
import numpy as np
import scipy
A = np.ones((10,10,4))
B = np.random.rand(10,10)
def loop(i):
C[:,:,i] = scipy.ndimage.convolve(A[:,:,i],B)
if __name__ == '__main__':
C = Array('i',np.zeros((10,10,4)))
arr = Array('i',range(4))
for i in arr:
p = Process(target = loop, args=i)
p.start()
p.join()
当前错误:
Traceback (most recent call last):
File "", line 11, in y_test = Array('i',np.zeros((10,10,4)))
File "/usr/lib/python2.7/multiprocessing/init.py", line 260, in Array return Array(typecode_or_type, size_or_initializer, **kwds)
File "/usr/lib/python2.7/multiprocessing/sharedctypes.py", line 115, in Array obj = RawArray(typecode_or_type, size_or_initializer)
File "/usr/lib/python2.7/multiprocessing/sharedctypes.py", line 89, in RawArray result.init(*size_or_initializer)
TypeError: only length-1 arrays can be converted to Python scalars
y_test 将是输出
异常
这里出现异常,因为 Array(...) 的第一个参数需要是有效的 ctype
,例如 c_bool
、c_int
、...,或者代表这种类型的单个字符,如 'i'
。 'y_test' 不是有效的 ctype。您似乎输入了变量名,而不是它们的类型。
试试这个,如果你想在这里使用整数:
y_test = Array('i', np.zeros((10,10,4)))
看看the Array documentation for more details on how the Array works. There is also a list of valid type codes可以用
额外的想法,并行执行
此外,请记住,该过程只会 运行 函数 loop
和您提供的参数。您提供了两个数组,但循环函数似乎需要一个数组和一个整数作为参数。当您想 运行 循环函数的多个实例具有不同的 i 值时,您可以创建多个 Process 实例并给它们每个不同的值作为参数,如下所示:
p1 = Process(target = loop, args = (y_test, 1))
p2 = Process(target = loop, args = (y_test, 2))
每个进程都会运行 具有不同参数的循环函数。你也可以把它放在一个循环中,迭代你的 arr
来处理 arr 中的每个值,我认为这就是你想要的。
违规行是
y_test = Array('y_test',np.zeros((10,10,4)))
这是完全错误的。文档说 multiprocessing.Array
multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)
...
typecode_or_type determines the type of the elements of the returned array: it is either a ctypes type or a one character typecode of the kind used by the array module...
错误只是说 y_test
不是已知类型。
无论如何,你没有把问题放在正确的一边。
因为一个multiprocessing.Array
是存储在共享内存中的原始类型数组。在这里,您可以通过 numpy 数组的 flat
迭代器轻松构建这样的数组:
y_test = Array('d', np.zeros((10,10,4)).flat)
y_test 现在是 400 个双精度的共享内存数组
让我们暂时忘记多处理。你可以这样处理:
for i in range(r):
npi = start[:,:,i] + i
y_test[10*10*i:10*10*(i+1)] = npi.flatten()
您现在可以将其转换回 numpy 数组:
resul = np.array(y_test).reshape((10,10,4), order = 'f')
这里有趣的一点是,每次处理 10x10 数组都会访问共享内存的不同部分:我们可以在不同的进程中构建它
def process(a, i):
npi = start[:,:, i] + i # computes a numpy array
a[10*10*i:10*10*(i+1)] = npi.flatten() # stores it in shared memory
def main():
y_test = Array('d', np.zeros((10,10,4)).flat) # initializes shared memory
processes = []
for i in range(4): # uses 4 processes, one per each 10x10 array
p = Process(target = process, args = (y_test, i))
processes.append(p)
p.start()
print("Start", i, p.pid) # to be sure the processes were started
for i in range(4):
p.join() # join child processes
print("Join", i, p.exitcode)
resul = np.array(y_test).reshape((10,10,4), order='f') # rebuild the final numpy array
print(resul)
if __name__ == '__main__':
main()