与 Python 多处理共享对象数组
Sharing array of objects with Python multiprocessing
对于这个问题,我参考了 example in Python docs 讨论“使用 SharedMemory
class 和 NumPy
数组,访问相同的 numpy.ndarray
来自两个不同的 Python shells".
我想实现的一个主要变化是操纵 class 对象数组而不是整数值,如下所示。
import numpy as np
from multiprocessing import shared_memory
# a simplistic class example
class A():
def __init__(self, x):
self.x = x
# numpy array of class objects
a = np.array([A(1), A(2), A(3)])
# create a shared memory instance
shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name='psm_test0')
# numpy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
# copy the original data into shared memory
b[:] = a[:]
print(b)
# array([<__main__.Foo object at 0x7fac56cd1190>,
# <__main__.Foo object at 0x7fac56cd1970>,
# <__main__.Foo object at 0x7fac56cd19a0>], dtype=object)
现在,在另一个 shell 中,我们附加到共享内存 space 并尝试操作数组的内容。
import numpy as np
from multiprocessing import shared_memory
# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')
c = np.ndarray((3,), dtype=object, buffer=existing_shm.buf)
甚至在我们能够操作 c
之前,打印它就会导致分段错误。事实上,我无法期望观察到尚未写入模块的行为,所以我的问题是我可以做什么来使用共享对象数组?
我目前正在挑选列表,但受保护 read/writes 增加了相当多的开销。我也尝试过使用 Namespace
, which was quite slow because indexed writes are not allowed. Another idea could be to use share Ctypes Structure in a ShareableList
,但我不知道从哪里开始。
此外还有一个设计方面:shared_memory
中似乎有一个 open bug 可能会影响我的实现,其中我有多个进程处理数组的不同元素。
是否有更具可扩展性的方式在多个进程之间共享大量对象列表,以便在任何给定时间所有 运行 进程都与列表中的唯一 object/element 交互?
更新: 在这一点上,我也将接受部分答案,讨论是否可以通过 Python 实现这一点。
所以,我做了一些研究 (Shared Memory Objects in Multiprocessing) 并提出了一些想法:
传递 numpy 字节数组
序列化对象,然后将它们作为字节字符串保存到 numpy 数组中。这里的问题是
一个人需要将数据类型从 'psm_test0'
的创建者传递给 'psm_test0'
的任何消费者。不过,这可以通过另一个共享内存来完成。
pickle
和 unpickle
本质上类似于 deepcopy
,即它实际上复制了基础数据。
'main' 进程的代码如下:
import pickle
from multiprocessing import shared_memory
import numpy as np
# a simplistic class example
class A():
def __init__(self, x):
self.x = x
def pickle(self):
return pickle.dumps(self)
@classmethod
def unpickle(self, bts):
return pickle.loads(bts)
if __name__ == '__main__':
# Test pickling procedure
a = A(1)
print(A.unpickle(a.pickle()).x)
# >>> 1
# numpy array of byte strings
a_arr = np.array([A(1).pickle(), A(2).pickle(), A('This is a really long test string which should exceed 42 bytes').pickle()])
# create a shared memory instance
shm = shared_memory.SharedMemory(
create=True,
size=a_arr.nbytes,
name='psm_test0'
)
# numpy array backed by shared memory
b_arr = np.ndarray(a_arr.shape, dtype=a_arr.dtype, buffer=shm.buf)
# copy the original data into shared memory
b_arr[:] = a_arr[:]
print(b_arr.dtype)
# 'S105'
并为消费者
import numpy as np
from multiprocessing import shared_memory
from test import A
# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')
c = np.ndarray((3,), dtype='S105', buffer=existing_shm.buf)
# Test data transfer
arr = [a.x for a in list(map(A.unpickle, c))]
print(arr)
# [1, 2, ...]
我想说你有几种前进的方式:
对于这个问题,我参考了 example in Python docs 讨论“使用 SharedMemory
class 和 NumPy
数组,访问相同的 numpy.ndarray
来自两个不同的 Python shells".
我想实现的一个主要变化是操纵 class 对象数组而不是整数值,如下所示。
import numpy as np
from multiprocessing import shared_memory
# a simplistic class example
class A():
def __init__(self, x):
self.x = x
# numpy array of class objects
a = np.array([A(1), A(2), A(3)])
# create a shared memory instance
shm = shared_memory.SharedMemory(create=True, size=a.nbytes, name='psm_test0')
# numpy array backed by shared memory
b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
# copy the original data into shared memory
b[:] = a[:]
print(b)
# array([<__main__.Foo object at 0x7fac56cd1190>,
# <__main__.Foo object at 0x7fac56cd1970>,
# <__main__.Foo object at 0x7fac56cd19a0>], dtype=object)
现在,在另一个 shell 中,我们附加到共享内存 space 并尝试操作数组的内容。
import numpy as np
from multiprocessing import shared_memory
# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')
c = np.ndarray((3,), dtype=object, buffer=existing_shm.buf)
甚至在我们能够操作 c
之前,打印它就会导致分段错误。事实上,我无法期望观察到尚未写入模块的行为,所以我的问题是我可以做什么来使用共享对象数组?
我目前正在挑选列表,但受保护 read/writes 增加了相当多的开销。我也尝试过使用 Namespace
, which was quite slow because indexed writes are not allowed. Another idea could be to use share Ctypes Structure in a ShareableList
,但我不知道从哪里开始。
此外还有一个设计方面:shared_memory
中似乎有一个 open bug 可能会影响我的实现,其中我有多个进程处理数组的不同元素。
是否有更具可扩展性的方式在多个进程之间共享大量对象列表,以便在任何给定时间所有 运行 进程都与列表中的唯一 object/element 交互?
更新: 在这一点上,我也将接受部分答案,讨论是否可以通过 Python 实现这一点。
所以,我做了一些研究 (Shared Memory Objects in Multiprocessing) 并提出了一些想法:
传递 numpy 字节数组
序列化对象,然后将它们作为字节字符串保存到 numpy 数组中。这里的问题是
一个人需要将数据类型从
'psm_test0'
的创建者传递给'psm_test0'
的任何消费者。不过,这可以通过另一个共享内存来完成。pickle
和unpickle
本质上类似于deepcopy
,即它实际上复制了基础数据。
'main' 进程的代码如下:
import pickle
from multiprocessing import shared_memory
import numpy as np
# a simplistic class example
class A():
def __init__(self, x):
self.x = x
def pickle(self):
return pickle.dumps(self)
@classmethod
def unpickle(self, bts):
return pickle.loads(bts)
if __name__ == '__main__':
# Test pickling procedure
a = A(1)
print(A.unpickle(a.pickle()).x)
# >>> 1
# numpy array of byte strings
a_arr = np.array([A(1).pickle(), A(2).pickle(), A('This is a really long test string which should exceed 42 bytes').pickle()])
# create a shared memory instance
shm = shared_memory.SharedMemory(
create=True,
size=a_arr.nbytes,
name='psm_test0'
)
# numpy array backed by shared memory
b_arr = np.ndarray(a_arr.shape, dtype=a_arr.dtype, buffer=shm.buf)
# copy the original data into shared memory
b_arr[:] = a_arr[:]
print(b_arr.dtype)
# 'S105'
并为消费者
import numpy as np
from multiprocessing import shared_memory
from test import A
# attach to the existing shared space
existing_shm = shared_memory.SharedMemory(name='psm_test0')
c = np.ndarray((3,), dtype='S105', buffer=existing_shm.buf)
# Test data transfer
arr = [a.x for a in list(map(A.unpickle, c))]
print(arr)
# [1, 2, ...]
我想说你有几种前进的方式: