如何将二维数组作为 multiprocessing.Array 传递给 multiprocessing.Pool?
How to pass 2d array as multiprocessing.Array to multiprocessing.Pool?
我的目标是将父数组传递给 mp.Pool
并用 2
填充它,同时将其分配给不同的进程。这适用于一维数组:
import numpy as np
import multiprocessing as mp
import itertools
def worker_function(i=None):
global arr
val = 2
arr[i] = val
print(arr[:])
def init_arr(arr=None):
globals()['arr'] = arr
def main():
arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
mp.Pool(1, initializer=init_arr, initargs=(arr,)).starmap(worker_function, zip(range(5)))
print(arr[:])
if __name__ == '__main__':
main()
输出:
[2, 0, 0, 0, 0]
[2, 2, 0, 0, 0]
[2, 2, 2, 0, 0]
[2, 2, 2, 2, 0]
[2, 2, 2, 2, 2]
[2, 2, 2, 2, 2]
但是我怎样才能对 x 维数组做同样的事情呢?向 arr
添加维度:
arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)
产生错误:
Traceback (most recent call last):
File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 23, in <module>
main()
File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 17, in main
arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)
File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\context.py", line 141, in Array
ctx=self.get_context())
File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 88, in Array
obj = RawArray(typecode_or_type, size_or_initializer)
File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 67, in RawArray
result.__init__(*size_or_initializer)
TypeError: only size-1 arrays can be converted to Python scalars
更改 arr
的 dtype
也无济于事。
你不能直接把multiprocessing.Array
当成一个二维数组,但是在one-dimensional内存中,反正二维只是个幻觉:).
幸运的是,numpy 允许从 buffer 读取数组并重塑它,而无需复制它。在下面的演示中,我只使用了一个单独的锁,这样我们就可以逐步观察所做的更改,目前它正在做的事情没有竞争条件。
import multiprocessing as mp
import numpy as np
def worker_function(i):
global arr, arr_lock
val = 2
with arr_lock:
arr[i, :i+1] = val
print(f"{mp.current_process().name}\n{arr[:]}")
def init_arr(arr, arr_lock=None):
globals()['arr'] = np.frombuffer(arr, dtype='int32').reshape(5, 5)
globals()['arr_lock'] = arr_lock
def main():
arr = mp.Array('i', np.zeros(5 * 5, dtype='int32'), lock=False)
arr_lock = mp.Lock()
mp.Pool(2, initializer=init_arr, initargs=(arr, arr_lock)).map(
worker_function, range(5)
)
arr = np.frombuffer(arr, dtype='int32').reshape(5, 5)
print(f"{mp.current_process().name}\n{arr}")
if __name__ == '__main__':
main()
输出:
ForkPoolWorker-1
[[2 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]]
ForkPoolWorker-2
[[2 0 0 0 0]
[2 2 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]]
ForkPoolWorker-1
[[2 0 0 0 0]
[2 2 0 0 0]
[2 2 2 0 0]
[0 0 0 0 0]
[0 0 0 0 0]]
ForkPoolWorker-2
[[2 0 0 0 0]
[2 2 0 0 0]
[2 2 2 0 0]
[2 2 2 2 0]
[0 0 0 0 0]]
ForkPoolWorker-1
[[2 0 0 0 0]
[2 2 0 0 0]
[2 2 2 0 0]
[2 2 2 2 0]
[2 2 2 2 2]]
MainProcess
[[2 0 0 0 0]
[2 2 0 0 0]
[2 2 2 0 0]
[2 2 2 2 0]
[2 2 2 2 2]]
Process finished with exit code 0
我的目标是将父数组传递给 mp.Pool
并用 2
填充它,同时将其分配给不同的进程。这适用于一维数组:
import numpy as np
import multiprocessing as mp
import itertools
def worker_function(i=None):
global arr
val = 2
arr[i] = val
print(arr[:])
def init_arr(arr=None):
globals()['arr'] = arr
def main():
arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
mp.Pool(1, initializer=init_arr, initargs=(arr,)).starmap(worker_function, zip(range(5)))
print(arr[:])
if __name__ == '__main__':
main()
输出:
[2, 0, 0, 0, 0]
[2, 2, 0, 0, 0]
[2, 2, 2, 0, 0]
[2, 2, 2, 2, 0]
[2, 2, 2, 2, 2]
[2, 2, 2, 2, 2]
但是我怎样才能对 x 维数组做同样的事情呢?向 arr
添加维度:
arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)
产生错误:
Traceback (most recent call last):
File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 23, in <module>
main()
File "C:/Users/Artur/Desktop/RL_framework/test2.py", line 17, in main
arr = mp.Array('i', np.zeros((5, 5), dtype=int), lock=False)
File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\context.py", line 141, in Array
ctx=self.get_context())
File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 88, in Array
obj = RawArray(typecode_or_type, size_or_initializer)
File "C:\Users\Artur\anaconda3\envs\RL_framework\lib\multiprocessing\sharedctypes.py", line 67, in RawArray
result.__init__(*size_or_initializer)
TypeError: only size-1 arrays can be converted to Python scalars
更改 arr
的 dtype
也无济于事。
你不能直接把multiprocessing.Array
当成一个二维数组,但是在one-dimensional内存中,反正二维只是个幻觉:).
幸运的是,numpy 允许从 buffer 读取数组并重塑它,而无需复制它。在下面的演示中,我只使用了一个单独的锁,这样我们就可以逐步观察所做的更改,目前它正在做的事情没有竞争条件。
import multiprocessing as mp
import numpy as np
def worker_function(i):
global arr, arr_lock
val = 2
with arr_lock:
arr[i, :i+1] = val
print(f"{mp.current_process().name}\n{arr[:]}")
def init_arr(arr, arr_lock=None):
globals()['arr'] = np.frombuffer(arr, dtype='int32').reshape(5, 5)
globals()['arr_lock'] = arr_lock
def main():
arr = mp.Array('i', np.zeros(5 * 5, dtype='int32'), lock=False)
arr_lock = mp.Lock()
mp.Pool(2, initializer=init_arr, initargs=(arr, arr_lock)).map(
worker_function, range(5)
)
arr = np.frombuffer(arr, dtype='int32').reshape(5, 5)
print(f"{mp.current_process().name}\n{arr}")
if __name__ == '__main__':
main()
输出:
ForkPoolWorker-1
[[2 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]]
ForkPoolWorker-2
[[2 0 0 0 0]
[2 2 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]]
ForkPoolWorker-1
[[2 0 0 0 0]
[2 2 0 0 0]
[2 2 2 0 0]
[0 0 0 0 0]
[0 0 0 0 0]]
ForkPoolWorker-2
[[2 0 0 0 0]
[2 2 0 0 0]
[2 2 2 0 0]
[2 2 2 2 0]
[0 0 0 0 0]]
ForkPoolWorker-1
[[2 0 0 0 0]
[2 2 0 0 0]
[2 2 2 0 0]
[2 2 2 2 0]
[2 2 2 2 2]]
MainProcess
[[2 0 0 0 0]
[2 2 0 0 0]
[2 2 2 0 0]
[2 2 2 2 0]
[2 2 2 2 2]]
Process finished with exit code 0