python 与池共享内存
python shared memory with pool
我正在尝试在 python 的多处理中使用 shared_memory with pool。
在Documentation中,关于shared memory
,参数buf
(内存视图)我不是很清楚(可能是因为我不明白内存视图的概念- 它是一个指针吗?)。
我想跨不同的进程使用这个共享内存。以下是我基于文档的示例:
a = np.array([1, 1, 2, 3, 5, 8])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
# Do I need to create the existing_shm or I can keep using shm?
existing_shm = shared_memory.SharedMemory(name=shm.name)
现在我的第一个问题来了。我定义了将使用共享内存中数组的函数:
def test_function(Input):
c = np.ndarray(a.shape, dtype=np.int64, buffer=existing_shm.buf)
c[1]=100
print(c)
这是不正确的,但我不知道应该如何。
然后是正题。有没有main函数让这个工作的作用?
if __name__=='__main__':
with Pool(os.cpu_count()) as p:
p.map(test_function, range(12))
没用。
我必须在每个进程中定义 c
吗?或者我可以在 main 中定义它并在所有进程中使用它?我假设 c
是一个 python 对象,因此由于 gil-lock?
不能被进程共享
非常感谢!
这行得通。不过,我还没有弄清楚所有的事实。
1- 共享内存对象声明:
shm = shared_memory.SharedMemory(create=True, size=10000000*4)
.
2- 一个(在本例中为 numpy 数组)对象用 buffer 声明如下:
b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
.
3- 通过向其中复制数据来填充 numpy 数组。
b[:] = np.random.randint(100, size=10000000, dtype=np.int32)
.
然后,许多cpus需要执行的所有函数都是共享内存对象的名称,函数中提到的第2步是映射共享内存,该共享内存已在前面填充。
您必须 close
访问共享对象并在结束时 unlink
。
import numpy as np
from multiprocessing import shared_memory, Pool
import os
def test_function(args):
Input, shm_name, size = args
existing_shm = shared_memory.SharedMemory(name=shm_name)
d = np.ndarray(size, dtype=np.int32, buffer=existing_shm.buf)
#print(Input, d[Input-1:Input+2])
d[Input]=-20
#print(Input, d[Input-1:Input+2])
existing_shm.close()
print(Input, 'parent process:', os.getppid())
print(Input, 'process id:', os.getpid())
if __name__=='__main__':
shm = shared_memory.SharedMemory(create=True, size=10000000*4)
b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
b[:] = np.random.randint(100, size=10000000, dtype=np.int32)
inputs =[[ 1,shm.name,b.shape],
[ 2,shm.name,b.shape],
[ 3,shm.name,b.shape],
[ 4,shm.name,b.shape],
[ 5,shm.name,b.shape],
[ 6,shm.name,b.shape],
[ 7,shm.name,b.shape],
[ 8,shm.name,b.shape],
[ 9,shm.name,b.shape],
[ 10,shm.name,b.shape],
[ 11,shm.name,b.shape],
[ 12,shm.name,b.shape],
[13,shm.name,b.shape]]
with Pool(os.cpu_count()) as p:
p.map(test_function, inputs)
print(b[:20])
# Clean up from within the first Python shell
shm.close()
shm.unlink() # Free and release the shared memory block at the very end
我正在尝试在 python 的多处理中使用 shared_memory with pool。
在Documentation中,关于shared memory
,参数buf
(内存视图)我不是很清楚(可能是因为我不明白内存视图的概念- 它是一个指针吗?)。
我想跨不同的进程使用这个共享内存。以下是我基于文档的示例:
a = np.array([1, 1, 2, 3, 5, 8])
shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
# Do I need to create the existing_shm or I can keep using shm?
existing_shm = shared_memory.SharedMemory(name=shm.name)
现在我的第一个问题来了。我定义了将使用共享内存中数组的函数:
def test_function(Input):
c = np.ndarray(a.shape, dtype=np.int64, buffer=existing_shm.buf)
c[1]=100
print(c)
这是不正确的,但我不知道应该如何。
然后是正题。有没有main函数让这个工作的作用?
if __name__=='__main__':
with Pool(os.cpu_count()) as p:
p.map(test_function, range(12))
没用。
我必须在每个进程中定义 c
吗?或者我可以在 main 中定义它并在所有进程中使用它?我假设 c
是一个 python 对象,因此由于 gil-lock?
非常感谢!
这行得通。不过,我还没有弄清楚所有的事实。
1- 共享内存对象声明:
shm = shared_memory.SharedMemory(create=True, size=10000000*4)
.
2- 一个(在本例中为 numpy 数组)对象用 buffer 声明如下:
b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
.
3- 通过向其中复制数据来填充 numpy 数组。
b[:] = np.random.randint(100, size=10000000, dtype=np.int32)
.
然后,许多cpus需要执行的所有函数都是共享内存对象的名称,函数中提到的第2步是映射共享内存,该共享内存已在前面填充。
您必须 close
访问共享对象并在结束时 unlink
。
import numpy as np
from multiprocessing import shared_memory, Pool
import os
def test_function(args):
Input, shm_name, size = args
existing_shm = shared_memory.SharedMemory(name=shm_name)
d = np.ndarray(size, dtype=np.int32, buffer=existing_shm.buf)
#print(Input, d[Input-1:Input+2])
d[Input]=-20
#print(Input, d[Input-1:Input+2])
existing_shm.close()
print(Input, 'parent process:', os.getppid())
print(Input, 'process id:', os.getpid())
if __name__=='__main__':
shm = shared_memory.SharedMemory(create=True, size=10000000*4)
b = np.ndarray((10000000,), dtype=np.int32, buffer=shm.buf)
b[:] = np.random.randint(100, size=10000000, dtype=np.int32)
inputs =[[ 1,shm.name,b.shape],
[ 2,shm.name,b.shape],
[ 3,shm.name,b.shape],
[ 4,shm.name,b.shape],
[ 5,shm.name,b.shape],
[ 6,shm.name,b.shape],
[ 7,shm.name,b.shape],
[ 8,shm.name,b.shape],
[ 9,shm.name,b.shape],
[ 10,shm.name,b.shape],
[ 11,shm.name,b.shape],
[ 12,shm.name,b.shape],
[13,shm.name,b.shape]]
with Pool(os.cpu_count()) as p:
p.map(test_function, inputs)
print(b[:20])
# Clean up from within the first Python shell
shm.close()
shm.unlink() # Free and release the shared memory block at the very end