如何从 multiprocessing.Pool.map 的 worker_funtion 内部给数组赋值?
How to assign values to array from inside the worker_funtion of multiprocessing.Pool.map?
基本上我想要的是将那些 2
插入 ar
,以便 ar
在 worker_function
.
之外被更改
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(i=None, ar=None):
val = 2
ar[i] = val
print(ar)
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar=ar)
mp.Pool(1).map(func_part, range(2))
print(ar)
if __name__ == '__main__':
main()
到目前为止,我唯一能做到的就是在 worker_function
内更改 ar
的副本,但不能在函数外更改:
[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]
[0. 0. 0. 0. 0.]
首先,您对 worker_function
的参数定义顺序错误。
如您所见,每个进程都会获得数组的副本。你能做的最好的就是 return 修改后的数组:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
val = 2
ar[i] = val
#print(ar)
return ar # return modified array
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
for array in arrays:
print(array)
if __name__ == '__main__':
main()
打印:
[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]
但现在您要处理两个单独修改的数组。您必须添加额外的逻辑才能将这两个数组的结果合并为一个:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
val = 2
ar[i] = val
#print(ar)
return ar # return modified array
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
for i in range(2):
ar[i] = arrays[i][i]
print(ar)
if __name__ == '__main__':
main()
打印:
[2. 2. 0. 0. 0.]
但更有意义的是 worker_function
只是 return 一个给出被修改元素的索引和新值的元组:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
return i, i + 3 # index, new value
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
results = mp.Pool(2).map(func_part, range(2))
for index, value in results:
ar[index] = value
print(ar)
if __name__ == '__main__':
main()
打印:
[3. 4. 0. 0. 0.]
当然,如果worker_function
修改了多个值,它会return一个元组的元组。
最后,如果您确实需要将对象传递给 sub-processes,还有另一种使用池初始化程序的方法:
import numpy as np
import multiprocessing as mp
def pool_initializer(ar):
global the_array
the_array = ar
def worker_function(i):
return i, the_array[i] ** 2 # index, value
def main():
ar = np.array([1,2,3,4,5])
with mp.Pool(5, pool_initializer, (ar,)) as pool:
results = pool.map(worker_function, range(5))
for index, value in results:
ar[index] = value
print(ar)
if __name__ == '__main__':
main()
打印:
[ 1 4 9 16 25]
为了性能,您应该在此处使用 shared-memory multiprocessing.Array
以避免一次又一次地跨不同进程重建和发送数组。该数组在所有进程中都是相同的,在您发送副本的示例中情况并非如此。这也是您看不到父级所做更改的原因。
import multiprocessing as mp
import numpy as np
def worker_function(i):
global arr
val = 2
arr[i] = val
print(mp.current_process().name, arr[:])
def init_arr(arr):
globals()['arr'] = arr
def main():
# as long as we don't conditionally modify the same indices
# from multiple workers, we don't need the lock ...
arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
mp.Pool(2, initializer=init_arr, initargs=(arr,)).map(worker_function, range(5))
print(mp.current_process().name, arr[:])
if __name__ == '__main__':
main()
输出:
ForkPoolWorker-1 [2, 0, 0, 0, 0]
ForkPoolWorker-2 [2, 2, 0, 0, 0]
ForkPoolWorker-1 [2, 2, 2, 0, 0]
ForkPoolWorker-2 [2, 2, 2, 2, 0]
ForkPoolWorker-1 [2, 2, 2, 2, 2]
MainProcess [2, 2, 2, 2, 2]
Process finished with exit code 0
基本上我想要的是将那些 2
插入 ar
,以便 ar
在 worker_function
.
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(i=None, ar=None):
val = 2
ar[i] = val
print(ar)
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar=ar)
mp.Pool(1).map(func_part, range(2))
print(ar)
if __name__ == '__main__':
main()
到目前为止,我唯一能做到的就是在 worker_function
内更改 ar
的副本,但不能在函数外更改:
[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]
[0. 0. 0. 0. 0.]
首先,您对 worker_function
的参数定义顺序错误。
如您所见,每个进程都会获得数组的副本。你能做的最好的就是 return 修改后的数组:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
val = 2
ar[i] = val
#print(ar)
return ar # return modified array
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
for array in arrays:
print(array)
if __name__ == '__main__':
main()
打印:
[2. 0. 0. 0. 0.]
[0. 2. 0. 0. 0.]
但现在您要处理两个单独修改的数组。您必须添加额外的逻辑才能将这两个数组的结果合并为一个:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
val = 2
ar[i] = val
#print(ar)
return ar # return modified array
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
arrays = mp.Pool(2).map(func_part, range(2)) # pool size of 2, otherwise what is the point?
for i in range(2):
ar[i] = arrays[i][i]
print(ar)
if __name__ == '__main__':
main()
打印:
[2. 2. 0. 0. 0.]
但更有意义的是 worker_function
只是 return 一个给出被修改元素的索引和新值的元组:
import numpy as np
import multiprocessing as mp
from functools import partial
def worker_function(ar, i): # put the arguments in the correct order!
return i, i + 3 # index, new value
def main():
ar = np.zeros(5)
func_part = partial(worker_function, ar)
results = mp.Pool(2).map(func_part, range(2))
for index, value in results:
ar[index] = value
print(ar)
if __name__ == '__main__':
main()
打印:
[3. 4. 0. 0. 0.]
当然,如果worker_function
修改了多个值,它会return一个元组的元组。
最后,如果您确实需要将对象传递给 sub-processes,还有另一种使用池初始化程序的方法:
import numpy as np
import multiprocessing as mp
def pool_initializer(ar):
global the_array
the_array = ar
def worker_function(i):
return i, the_array[i] ** 2 # index, value
def main():
ar = np.array([1,2,3,4,5])
with mp.Pool(5, pool_initializer, (ar,)) as pool:
results = pool.map(worker_function, range(5))
for index, value in results:
ar[index] = value
print(ar)
if __name__ == '__main__':
main()
打印:
[ 1 4 9 16 25]
为了性能,您应该在此处使用 shared-memory multiprocessing.Array
以避免一次又一次地跨不同进程重建和发送数组。该数组在所有进程中都是相同的,在您发送副本的示例中情况并非如此。这也是您看不到父级所做更改的原因。
import multiprocessing as mp
import numpy as np
def worker_function(i):
global arr
val = 2
arr[i] = val
print(mp.current_process().name, arr[:])
def init_arr(arr):
globals()['arr'] = arr
def main():
# as long as we don't conditionally modify the same indices
# from multiple workers, we don't need the lock ...
arr = mp.Array('i', np.zeros(5, dtype=int), lock=False)
mp.Pool(2, initializer=init_arr, initargs=(arr,)).map(worker_function, range(5))
print(mp.current_process().name, arr[:])
if __name__ == '__main__':
main()
输出:
ForkPoolWorker-1 [2, 0, 0, 0, 0]
ForkPoolWorker-2 [2, 2, 0, 0, 0]
ForkPoolWorker-1 [2, 2, 2, 0, 0]
ForkPoolWorker-2 [2, 2, 2, 2, 0]
ForkPoolWorker-1 [2, 2, 2, 2, 2]
MainProcess [2, 2, 2, 2, 2]
Process finished with exit code 0