由共享内存支持的 numpy 数组:BufferError

numpy array backed by shared memory: BufferError

我 运行 在使用 multiprocessing.shared_memory 支持 numpy 数组时出错。 这是我的使用模式:

# demo.py
from contextlib import contextmanager
from multiprocessing.managers import SharedMemoryManager
from multiprocessing.shared_memory import SharedMemory
from typing import Iterator
import numpy as np

@contextmanager
def allocate_shared_mem() -> Iterator[SharedMemory]:
    with SharedMemoryManager() as smm:
        shared_mem = smm.SharedMemory(size=80)
        yield shared_mem

with allocate_shared_mem() as shared_mem:
    shared_arr = np.frombuffer(shared_mem.buf)
    assert len(shared_arr) == 10

这是我遇到的问题:

$ python demo.py
Exception ignored in: <function SharedMemory.__del__ at 0x7ff8bf604ee0>
Traceback (most recent call last):
  File "/home/rig1/miniconda3/envs/pysc/lib/python3.10/multiprocessing/shared_memory.py", line 184, in __del__
  File "/home/rig1/miniconda3/envs/pysc/lib/python3.10/multiprocessing/shared_memory.py", line 227, in close
BufferError: cannot close exported pointers exist

我认为问题是当 numpy 数组存在时 SharedMemoryManager 无法解除分配 shared_mem。如果我将 del shared_array 附加到 demo.py 底部的 with 块的末尾,问题就会消失。

如何使用共享内存支持的 numpy 数组,而不必手动考虑数组删除?是否有任何干净的模式或数组无效技巧来处理这种情况?

我担心我的代码的某些其他部分会获取数组的句柄,并且弄清楚应该删除哪个对象以避免“无法关闭导出的指针存在”错误会很麻烦。如果 numpy 数组在 with 块退出后变得无效,这对我来说很好。

official doc一样,您可以直接使用numpy.ndarray初始化器来创建一个没有缓冲区管理的数组。

@contextmanager
def allocate_shared_mem() -> Iterator[SharedMemory]:
    with SharedMemoryManager() as smm:
        shared_mem = smm.SharedMemory(size=80)
        yield shared_mem


def run():
    with allocate_shared_mem() as shared_mem:
        shared_arr = np.ndarray((10,), dtype=np.float64, buffer=shared_mem.buf)
        shared_arr[:] = 1

    print(shared_arr)  # This should not work, but it does.


run()

请注意,您还可以导出数组而不是缓冲区。

@contextmanager
def allocate_shared_mem() -> Iterator[SharedMemory]:
    with SharedMemoryManager() as smm:
        shared_mem = smm.SharedMemory(size=80)
        shared_arr = np.ndarray((10,), dtype=np.float64, buffer=shared_mem.buf)
        yield shared_arr


def run():
    with allocate_shared_mem() as shared_arr:
        shared_arr[:] = 1

    print(shared_arr)  # This will crash without raising any exception.


run()