python 共享 numpy 数组时进行多处理
python multiprocessing when share a numpy array
我想通过利用多处理来部分更改大型 numpy 数组中的值。
也就是我要最后得到[[100, 100, 100], [100, 100, 100]]
但是下面的代码是错误的,它说“RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance”
我该怎么办?谢谢
import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def change_array(array, i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = Array('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
pool = multiprocessing.Pool(processes=3)
result = []
for i in range(2):
for j in range(3):
result.append(pool.apply_async(change_array, (X, i, j,)))
result = [r.get() for r in result]
pool.close()
pool.join()
print(np.frombuffer(X.get_obj()).reshape(2, 3))
多进程最重要的规则。如果可能的话,您不希望在子流程中修改共享对象。您希望您的工作程序是:
def change_array(i, j):
value = ..... whatever value goes here
return i, j, value
然后您的主进程将读取返回的值 i,j,value
并将数组的元素设置为正确的值。
您需要进行两项更改:
- 使用带锁定的
multiprocessing.Array
实例(实际上是默认值)而不是“普通”实例 Array
。
- 不要将数组实例作为参数传递给辅助函数。相反,您应该使用数组作为全局值初始化池中的每个处理器。
import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def initpool(arr):
global array
array = arr
def change_array(i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = multiprocessing.Array('d', X_shape[0] * X_shape[1], lock=True)
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
pool = multiprocessing.Pool(processes=3, initializer=initpool, initargs=(X,))
result = []
for i in range(2):
for j in range(3):
result.append(pool.apply_async(change_array, (i, j,)))
result = [r.get() for r in result]
pool.close()
pool.join()
print(np.frombuffer(X.get_obj()).reshape(2, 3))
打印:
[100. 2.2 3.3 4.4 5.5 6.6]
[100. 100. 3.3 4.4 5.5 6.6]
[100. 100. 100. 4.4 5.5 6.6]
[100. 100. 100. 100. 5.5 6.6]
[100. 100. 100. 100. 100. 6.6]
[100. 100. 100. 100. 100. 100.]
[[100. 100. 100.]
[100. 100. 100.]]
更新
因为在这种情况下,data
数组中更改的值不依赖于该数组中的现有值,因此函数 change_array
不需要访问该数组,并且它可以代替,正如 Frank Yellin 所建议的那样,只是 return 一个索引的元组要用新值更改。但我确实想向您展示如何在函数确实需要 access/modify 数组的情况下传递数组。然而,在这种情况下,以下代码就是您所需要的(我做了一些简化):
import numpy as np
import multiprocessing
def change_array(i, j):
return i, j, 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
with multiprocessing.Pool(processes=3) as pool:
result = [pool.apply_async(change_array, (i, j)) for i in range(2) for j in range(3)]
for r in result:
i, j, value = r.get()
data[i, j] = value
print(data)
或:
import numpy as np
import multiprocessing
import itertools
def change_array(t):
i, j = t
return i, j, 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
with multiprocessing.Pool(processes=3) as pool:
for i, j, value in pool.map(change_array, itertools.product(range(2), range(3))):
data[i, j] = value
print(data)
我想通过利用多处理来部分更改大型 numpy 数组中的值。
也就是我要最后得到[[100, 100, 100], [100, 100, 100]]
但是下面的代码是错误的,它说“RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance”
我该怎么办?谢谢
import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def change_array(array, i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = Array('d', X_shape[0] * X_shape[1])
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
pool = multiprocessing.Pool(processes=3)
result = []
for i in range(2):
for j in range(3):
result.append(pool.apply_async(change_array, (X, i, j,)))
result = [r.get() for r in result]
pool.close()
pool.join()
print(np.frombuffer(X.get_obj()).reshape(2, 3))
多进程最重要的规则。如果可能的话,您不希望在子流程中修改共享对象。您希望您的工作程序是:
def change_array(i, j):
value = ..... whatever value goes here
return i, j, value
然后您的主进程将读取返回的值 i,j,value
并将数组的元素设置为正确的值。
您需要进行两项更改:
- 使用带锁定的
multiprocessing.Array
实例(实际上是默认值)而不是“普通”实例Array
。 - 不要将数组实例作为参数传递给辅助函数。相反,您应该使用数组作为全局值初始化池中的每个处理器。
import numpy as np
import multiprocessing
from multiprocessing import RawArray, Array
def initpool(arr):
global array
array = arr
def change_array(i, j):
X_np = np.frombuffer(array.get_obj(), dtype=np.float64).reshape(2, 3)
X_np[i, j] = 100
print(np.frombuffer(array.get_obj()))
if __name__ == '__main__':
X_shape = (2, 3)
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
X = multiprocessing.Array('d', X_shape[0] * X_shape[1], lock=True)
# Wrap X as an numpy array so we can easily manipulates its data.
X_np = np.frombuffer(X.get_obj()).reshape(X_shape)
# Copy data to our shared array.
np.copyto(X_np, data)
pool = multiprocessing.Pool(processes=3, initializer=initpool, initargs=(X,))
result = []
for i in range(2):
for j in range(3):
result.append(pool.apply_async(change_array, (i, j,)))
result = [r.get() for r in result]
pool.close()
pool.join()
print(np.frombuffer(X.get_obj()).reshape(2, 3))
打印:
[100. 2.2 3.3 4.4 5.5 6.6]
[100. 100. 3.3 4.4 5.5 6.6]
[100. 100. 100. 4.4 5.5 6.6]
[100. 100. 100. 100. 5.5 6.6]
[100. 100. 100. 100. 100. 6.6]
[100. 100. 100. 100. 100. 100.]
[[100. 100. 100.]
[100. 100. 100.]]
更新
因为在这种情况下,data
数组中更改的值不依赖于该数组中的现有值,因此函数 change_array
不需要访问该数组,并且它可以代替,正如 Frank Yellin 所建议的那样,只是 return 一个索引的元组要用新值更改。但我确实想向您展示如何在函数确实需要 access/modify 数组的情况下传递数组。然而,在这种情况下,以下代码就是您所需要的(我做了一些简化):
import numpy as np
import multiprocessing
def change_array(i, j):
return i, j, 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
with multiprocessing.Pool(processes=3) as pool:
result = [pool.apply_async(change_array, (i, j)) for i in range(2) for j in range(3)]
for r in result:
i, j, value = r.get()
data[i, j] = value
print(data)
或:
import numpy as np
import multiprocessing
import itertools
def change_array(t):
i, j = t
return i, j, 100
if __name__ == '__main__':
data = np.array([[1.1, 2.2, 3.3], [4.4, 5.5, 6.6]])
with multiprocessing.Pool(processes=3) as pool:
for i, j, value in pool.map(change_array, itertools.product(range(2), range(3))):
data[i, j] = value
print(data)