Python 的多处理 SharedMemory 以内存损坏结束

Python's multiprocessing SharedMemory ending in memory corruption

我正在尝试使用队列将 SharedMemory 引用传递给已经 运行ning 的进程。问题是,一旦我在另一个进程上收到(或获取)SharedMemory 对象,对应的内存块似乎根本不匹配,甚至大小太大。

import numpy as np
import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory


def f(q):
    shared_memory = q.get()
    print(f"In Process: {shared_memory=}")
    x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64)
    print(f"In Process: {x=}")


if __name__ == '__main__':
    temp_array = np.arange(8)
    print(f"Main: {temp_array=}")
    smh = SharedMemory(create=True, size=temp_array.nbytes)
    print(f"Main: {smh=}")
    fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
    fix_array[:] = temp_array[:]
    print(f"Main: {fix_array=}")

    queue = mp.Queue()
    proc = mp.Process(target=f, args=(queue,))
    proc.start()

    queue.put(smh)

如果我 运行 此代码输出以下内容:

Main: temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])
Main: smh=SharedMemory('wnsm_2202c81b', size=32)
Main: fix_array=array([0, 0, 0, 0, 0, 0, 0, 0])
In Process: shared_memory=SharedMemory('wnsm_2202c81b', size=4096)
In Process: x=array([0., (weird very small numbers and many many zeros...), 0.])

我希望能取回原件 temp_array=array([0, 1, 2, 3, 4, 5, 6, 7])

根据文档,内存大小可能不匹配。此外,我用一个包含 1e6 个项目的数组对其进行了测试,仅传递 SharedMemory 的名称并使用 Pipe 而不是 Queue 但仍然相同。

我是不是做错了什么或者这是一个错误?

(我在 Windows 10 Build 19043,Python 3.9.6 64 位)

感谢@Timus

我觉得最好分成两个问题来解决:

问题一,奇怪的数字:

If you adjust the definition of f by x = np.frombuffer(buffer=shared_memory.buf, dtype=np.int32) you'll get your numbers back (that was the initial type).

正如@Timus 所指出的,错误是数据类型不匹配: np.arange() returns 带有 dtype=np.int32np.ndarray 但我试图获得带有 dtype=np.float64 的数组,因此结果错误。

修复:

@Timus 的解决方案或将 dtype=np.float64 添加为 np.arange() 的参数,以便它读取: temp_array = np.arange(8, dtype=np.float)


问题 2, 数组太长:

根据 Python DocsSharedMemory.size 可能比原来大。因此,数组的长度也可能不同。

修复/解决方法:

Trim 数组到它的原始大小,例如通过使用 numpy.resize()。为此,还需要将原始 shape 传递给 f()。虽然这对我来说很好,但以下几点可能对其他人来说是个问题:由于 x 只是缓冲区的视图,因此 np.ndarray.resize() 不可用(它不拥有自己的数据)。使用 numpy.resize(),将创建一个副本,并且 对调整大小的副本所做的更改不会反映在主进程中 !为此,可以将 x_resized 的值复制回 x.


固定代码现在如下所示:

import multiprocessing as mp
from multiprocessing.shared_memory import SharedMemory

import numpy as np

def f(q):
    shared_memory, shape = q.get()  # the shape is passed here
    x = np.frombuffer(buffer=shared_memory.buf, dtype=np.float64)  # dtype matches
    # x = np.trim_zeros(x, "b"), this doesn't work if there are zeros in the dataset
    x_resized = np.resize(x, new_shape=shape)  # changes not reflected on main process

    ###
    # make things to x_resized
    ###

    x[:8] = x_resized[:] # copy changes back to x

if __name__ == '__main__':
    temp_array = np.arange(8, dtype=np.float64) # dtype is correctly specified
    
    smh = SharedMemory(create=True, size=temp_array.nbytes)
    fix_array = np.frombuffer(buffer=smh.buf, dtype=temp_array.dtype)
    fix_array[:] = temp_array[:]

    queue = mp.Queue()
    proc = mp.Process(target=f, args=(queue,))
    proc.start()

    queue.put((smh, temp_array.shape)) # passing the original shape

奇怪的是,虽然第二个过程中的 x 太长了,但在主过程中 fix_array 仍然保持正确的大小...