Python 的多处理 SharedMemory 以内存损坏结束
Python's multiprocessing SharedMemory ending in memory corruption
我正在尝试使用队列将 SharedMemory 引用传递给已经 运行ning 的进程。问题是,一旦我在另一个进程上收到(或获取)SharedMemory 对象,对应的内存块似乎根本不匹配,甚至大小太大。
import numpy as np
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
def f(q):
shared_memory = q.get()
print(f"In Process: {shared_memory=}")
x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64)
print(f"In Process: {x=}")
if __name__ == '__main__':
temp_array = np.arange(8)
print(f"Main: {temp_array=}")
smh = SharedMemory(create=True, size=temp_array.nbytes)
print(f"Main: {smh=}")
fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
fix_array[:] = temp_array[:]
print(f"Main: {fix_array=}")
queue = mp.Queue()
proc = mp.Process(target=f, args=(queue,))
proc.start()
queue.put(smh)
如果我 运行 此代码输出以下内容:
Main: temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])
Main: smh=SharedMemory('wnsm_2202c81b', size=32)
Main: fix_array=array([0, 0, 0, 0, 0, 0, 0, 0])
In Process: shared_memory=SharedMemory('wnsm_2202c81b', size=4096)
In Process: x=array([0., (weird very small numbers and many many zeros...), 0.])
我希望能取回原件 temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])
根据文档,内存大小可能不匹配。此外,我用一个包含 1e6 个项目的数组对其进行了测试,仅传递 SharedMemory 的名称并使用 Pipe 而不是 Queue 但仍然相同。
我是不是做错了什么或者这是一个错误?
(我在 Windows 10 Build 19043,Python 3.9.6 64 位)
感谢@Timus
我觉得最好分成两个问题来解决:
问题一,奇怪的数字:
If you adjust the definition of f by x = np.frombuffer(buffer=shared_memory.buf, dtype=np.int32) you'll get your numbers back (that was the initial type).
正如@Timus 所指出的,错误是数据类型不匹配:
np.arange()
returns 带有 dtype=np.int32
的 np.ndarray
但我试图获得带有 dtype=np.float64
的数组,因此结果错误。
修复:
@Timus 的解决方案或将 dtype=np.float64
添加为 np.arange()
的参数,以便它读取:
temp_array = np.arange(8, dtype=np.float)
问题 2, 数组太长:
根据 Python Docs,SharedMemory.size
可能比原来大。因此,数组的长度也可能不同。
修复/解决方法:
Trim 数组到它的原始大小,例如通过使用 numpy.resize()
。为此,还需要将原始 shape
传递给 f()
。虽然这对我来说很好,但以下几点可能对其他人来说是个问题:由于 x
只是缓冲区的视图,因此 np.ndarray.resize()
不可用(它不拥有自己的数据)。使用 numpy.resize()
,将创建一个副本,并且 对调整大小的副本所做的更改不会反映在主进程中 !为此,可以将 x_resized
的值复制回 x
.
固定代码现在如下所示:
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def f(q):
shared_memory, shape = q.get() # the shape is passed here
x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64) # dtype matches
# x = np.trim_zeros(x, "b"), this doesn't work if there are zeros in the dataset
x_resized = np.resize(x, new_shape=shape) # changes not reflected on main process
###
# make things to x_resized
###
x[:8] = x_resized[:] # copy changes back to x
if __name__ == '__main__':
temp_array = np.arange(8, dtype=np.float64) # dtype is correctly specified
smh = SharedMemory(create=True, size=temp_array.nbytes)
fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
fix_array[:] = temp_array[:]
queue = mp.Queue()
proc = mp.Process(target=f, args=(queue,))
proc.start()
queue.put((smh, temp_array.shape)) # passing the original shape
奇怪的是,虽然第二个过程中的 x
太长了,但在主过程中 fix_array
仍然保持正确的大小...
我正在尝试使用队列将 SharedMemory 引用传递给已经 运行ning 的进程。问题是,一旦我在另一个进程上收到(或获取)SharedMemory 对象,对应的内存块似乎根本不匹配,甚至大小太大。
import numpy as np
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
def f(q):
shared_memory = q.get()
print(f"In Process: {shared_memory=}")
x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64)
print(f"In Process: {x=}")
if __name__ == '__main__':
temp_array = np.arange(8)
print(f"Main: {temp_array=}")
smh = SharedMemory(create=True, size=temp_array.nbytes)
print(f"Main: {smh=}")
fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
fix_array[:] = temp_array[:]
print(f"Main: {fix_array=}")
queue = mp.Queue()
proc = mp.Process(target=f, args=(queue,))
proc.start()
queue.put(smh)
如果我 运行 此代码输出以下内容:
Main: temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])
Main: smh=SharedMemory('wnsm_2202c81b', size=32)
Main: fix_array=array([0, 0, 0, 0, 0, 0, 0, 0])
In Process: shared_memory=SharedMemory('wnsm_2202c81b', size=4096)
In Process: x=array([0., (weird very small numbers and many many zeros...), 0.])
我希望能取回原件 temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])
根据文档,内存大小可能不匹配。此外,我用一个包含 1e6 个项目的数组对其进行了测试,仅传递 SharedMemory 的名称并使用 Pipe 而不是 Queue 但仍然相同。
我是不是做错了什么或者这是一个错误?
(我在 Windows 10 Build 19043,Python 3.9.6 64 位)
感谢@Timus
我觉得最好分成两个问题来解决:
问题一,奇怪的数字:
If you adjust the definition of f by x = np.frombuffer(buffer=shared_memory.buf, dtype=np.int32) you'll get your numbers back (that was the initial type).
正如@Timus 所指出的,错误是数据类型不匹配:
np.arange()
returns 带有 dtype=np.int32
的 np.ndarray
但我试图获得带有 dtype=np.float64
的数组,因此结果错误。
修复:
@Timus 的解决方案或将 dtype=np.float64
添加为 np.arange()
的参数,以便它读取:
temp_array = np.arange(8, dtype=np.float)
问题 2, 数组太长:
根据 Python Docs,SharedMemory.size
可能比原来大。因此,数组的长度也可能不同。
修复/解决方法:
Trim 数组到它的原始大小,例如通过使用 numpy.resize()
。为此,还需要将原始 shape
传递给 f()
。虽然这对我来说很好,但以下几点可能对其他人来说是个问题:由于 x
只是缓冲区的视图,因此 np.ndarray.resize()
不可用(它不拥有自己的数据)。使用 numpy.resize()
,将创建一个副本,并且 对调整大小的副本所做的更改不会反映在主进程中 !为此,可以将 x_resized
的值复制回 x
.
固定代码现在如下所示:
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory
import numpy as np
def f(q):
shared_memory, shape = q.get() # the shape is passed here
x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64) # dtype matches
# x = np.trim_zeros(x, "b"), this doesn't work if there are zeros in the dataset
x_resized = np.resize(x, new_shape=shape) # changes not reflected on main process
###
# make things to x_resized
###
x[:8] = x_resized[:] # copy changes back to x
if __name__ == '__main__':
temp_array = np.arange(8, dtype=np.float64) # dtype is correctly specified
smh = SharedMemory(create=True, size=temp_array.nbytes)
fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
fix_array[:] = temp_array[:]
queue = mp.Queue()
proc = mp.Process(target=f, args=(queue,))
proc.start()
queue.put((smh, temp_array.shape)) # passing the original shape
奇怪的是,虽然第二个过程中的 x
太长了,但在主过程中 fix_array
仍然保持正确的大小...