写入 Python 中的共享内存非常慢
Writing to shared memory in Python is very slow
我使用 python.multiprocessing.sharedctypes.RawArray
在多个进程之间共享大型 numpy 数组。而且我注意到,当这个数组很大(> 1 或 2 Gb)时,它的初始化变得非常慢,并且 read/write 到(并且 read/write 时间不可预测,有时非常快,有时非常非常慢)。
我制作了一个小示例脚本,它只使用一个进程,初始化一个共享数组并多次写入它。并测量执行这些操作的时间。
import argparse
import ctypes
import multiprocessing as mp
import multiprocessing.sharedctypes as mpsc
import numpy as np
import time
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-c', '--block-count', type=int, default=1,
help='Number of blocks to write')
parser.add_argument('-w', '--block-width', type=int, default=20000,
help='Block width')
parser.add_argument('-d', '--block-depth', type=int, default=15000,
help='Block depth')
args = parser.parse_args()
blocks = args.block_count
blockwidth = args.block_width
depth = args.block_depth
start = time.perf_counter()
shared_array = mpsc.RawArray(ctypes.c_uint16, blocks*blockwidth*depth)
finish = time.perf_counter()
print('Init shared array of size {:.2f} Gb: {:.2f} s'.format(blocks*blockwidth*depth*ctypes.sizeof(ctypes.c_uint16)/1024/1024/1024, (finish-start)))
numpy_array = np.ctypeslib.as_array(shared_array).reshape(blocks*blockwidth, depth)
start = time.perf_counter()
for i in range(blocks):
begin = time.perf_counter()
numpy_array[i*blockwidth:(i+1)*blockwidth, :] = np.ones((blockwidth, depth), dtype=np.uint16)
end = time.perf_counter()
print('Write = %.2f s' % (end-begin))
finish = time.perf_counter()
print('Total time = %.2f s' % (finish-start))
if __name__ == '__main__':
main()
当我 运行 此代码时,我在我的电脑上得到以下信息:
$ python shared-minimal.py -c 1
Init shared array of size 0.56 Gb: 0.36 s
Write = 0.13 s
Total time = 0.13 s
$ python shared-minimal.py -c 2
Init shared array of size 1.12 Gb: 0.72 s
Write = 0.12 s
Write = 0.13 s
Total time = 0.25 s
$ python shared-minimal.py -c 4
Init shared array of size 2.24 Gb: 5.40 s
Write = 1.17 s
Write = 1.17 s
Write = 1.17 s
Write = 1.57 s
Total time = 5.08 s
在最后一种情况下,当数组大小超过 2 Gb 时,初始化时间与数组大小非线性相关,并且将保存大小的切片分配给数组的速度要慢 5 倍以上。
我想知道为什么会这样。我正在 运行 使用 Python 3.5 在 Ubuntu 16.04 上运行脚本。我还通过使用 iotop 注意到,在初始化和写入阵列时,有一个磁盘正在写入 activity ,其大小与共享阵列相同,但我不确定是否是真实的文件被创建或者它只是内存操作(我想它应该是)。通常,在大型共享阵列的情况下,我的系统也会变得响应速度较慢。没有交换,使用 top
、ipcs -mu
和 vmstat
进行检查。
经过更多研究,我发现 python 实际上在 /tmp
中创建了以 pymp-
开头的文件夹,虽然使用文件查看器看不到其中的文件,但它看起来很像 /tmp/
被 python 用于共享内存。刷新文件现金时性能似乎在下降。
最终的解决方案是将 /tmp
挂载为 tmpfs
:
sudo mount -t tmpfs tmpfs /tmp
并且,如果使用最新的 docker,通过向 docker run
命令提供 --tmpfs /tmp
参数。
这样做后,read/write操作在RAM中完成,性能快速稳定。
我仍然想知道为什么 /tmp
用于共享内存,而不是 /dev/shm
,它已经被标记为 tmpfs
并且应该用于共享内存。
你可以试试np.frombuffer
,我的测试速度更快
只需替换以下行
numpy_array = np.ctypeslib.as_array(shared_array).reshape(blocks*blockwidth, depth)
与
numpy_array = np.frombuffer(shared_array, dtype=np.uint16).reshape(blocks*blockwidth, depth)
从 python 3.8 开始,您可以使用 shared_memory,这似乎更有效并且与 numpy
数组配合得很好。我做了一些粗略的测试,并创建了一个形状为 (5000,5000)
的数组,使用 multiprocessing.Array
需要 cca 3s,使用 shared_memory.SharedMemory
需要 cca 0.015s
下面是 SharedMemory
的一个用例,我在其中使用多个进程为进入输入队列的每个项目创建一个数组,并按照它们进入时的相同顺序从单独的进程中读取它们.
import os
import multiprocessing as mp
import numpy as np
import time
from multiprocessing import shared_memory
class FunctionTimer:
def __init__(self, name):
self.name = name
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, type, value, traceback):
self.end = time.time()
self.exec_time = self.end - self.start
print(f"{self.name} time: {self.exec_time}")
class MpArrayProcessor:
def __init__(self, in_queue, out_queue):
self.in_queue = in_queue
self.out_queue = out_queue
self.stop_event = mp.Event()
self.processes = []
self.cur_id = 0
self.order_dict = {}
self.writable_dict = {}
self.array_locks = {}
self.array_data_dict = {}
@staticmethod
def wrap_func(func, arr_shape, in_queue, out_queue, stop_event, writable, shmem_name):
pid = os.getpid()
while True:
if stop_event.is_set():
print("Stopping")
break
x = in_queue.get(block=True)
if x is None:
break
else:
res = func(arr_shape, x)
with FunctionTimer("Wait and write"):
writable.wait()
shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
c = np.ndarray(arr_shape, dtype=np.uint8, buffer=shmem.buf)
c[:] = res
writable.clear()
out_queue.put((pid, shmem_name, x))
def start(self, func, arr_shape, n_proc):
# TODO implement proper closing of SharedMemory
for p in range(n_proc):
writable = mp.Event()
writable.set()
shmem_name = f"ps_{p}"
data = shared_memory.SharedMemory(create=True, size=arr_shape[0] * arr_shape[1], name=shmem_name)
p = mp.Process(target=self.wrap_func,
args=(
func, arr_shape, self.in_queue, self.out_queue, self.stop_event, writable, shmem_name))
p.start()
self.writable_dict[p.pid] = writable
self.array_data_dict[p.pid] = data
self.processes.append(p)
def get(self):
while True:
if self.cur_id in self.order_dict:
pid, shmem_name, order = self.order_dict[self.cur_id]
print(f"PID: {pid}, idx: {order}, dict_len: {len(self.order_dict)}")
shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
result = np.copy(np.frombuffer(shmem.buf, dtype=np.uint8))
self.writable_dict[pid].set()
del self.order_dict[self.cur_id]
self.cur_id += 1
return result
print(self.order_dict)
pid, shmem_name, order = self.out_queue.get(block=True)
if order == self.cur_id:
print(f"PID: {pid}, idx: {order}, dict_len: {len(self.order_dict)}")
shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
print(np.frombuffer(shmem.buf, dtype=np.uint8))
result = np.copy(np.frombuffer(shmem.buf, dtype=np.uint8))
self.writable_dict[pid].set()
self.cur_id += 1
return result
else:
self.order_dict[order] = (pid, shmem_name, order)
def close(self):
self.stop_event.set()
print("Event set")
for p in self.processes:
self.array_data_dict[p.pid].close()
self.array_data_dict[p.pid].unlink()
p.join()
print("Joined")
p.close()
print("Closed")
def create_data(shape, x):
time.sleep(0.08)
# np.random.randint(0, 255, shape, dtype=np.uint8)
return np.ones(shape, dtype=np.uint8) * x
def fill_queue(queue, n_elements, n_processes):
l = [x for x in range(n_elements)]
for i in l:
queue.put(i)
for i in range(n_processes):
queue.put(None)
print("filling finished")
if __name__ == "__main__":
print(f"Running: {__file__}")
print(f"Script dir: {os.path.dirname(os.path.abspath(__file__))}")
print(f"Working dir: {os.path.abspath(os.getcwd())}")
n = 100
n_proc = 4
input_queue = mp.Queue()
output_queue = mp.Queue(maxsize=50)
# shape = (3, 3)
# shape = (1280, 720)
shape = (5000, 5000)
in_proc = mp.Process(target=fill_queue, args=(input_queue, n, n_proc))
in_proc.start()
with FunctionTimer("MP processing"):
arr_processor = MpArrayProcessor(input_queue, output_queue)
arr_processor.start(create_data, shape, 4)
results = []
for i in range(n):
print(f"Getting: {i}")
r = arr_processor.get()[:shape[0]*shape[1]].reshape(shape)
results.append(r)
arr_processor.close()
in_proc.join()
in_proc.close()
print(results)
with FunctionTimer("Normal"):
for i in range(n):
a = create_data(shape, i)
我使用 python.multiprocessing.sharedctypes.RawArray
在多个进程之间共享大型 numpy 数组。而且我注意到,当这个数组很大(> 1 或 2 Gb)时,它的初始化变得非常慢,并且 read/write 到(并且 read/write 时间不可预测,有时非常快,有时非常非常慢)。
我制作了一个小示例脚本,它只使用一个进程,初始化一个共享数组并多次写入它。并测量执行这些操作的时间。
import argparse
import ctypes
import multiprocessing as mp
import multiprocessing.sharedctypes as mpsc
import numpy as np
import time
def main():
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-c', '--block-count', type=int, default=1,
help='Number of blocks to write')
parser.add_argument('-w', '--block-width', type=int, default=20000,
help='Block width')
parser.add_argument('-d', '--block-depth', type=int, default=15000,
help='Block depth')
args = parser.parse_args()
blocks = args.block_count
blockwidth = args.block_width
depth = args.block_depth
start = time.perf_counter()
shared_array = mpsc.RawArray(ctypes.c_uint16, blocks*blockwidth*depth)
finish = time.perf_counter()
print('Init shared array of size {:.2f} Gb: {:.2f} s'.format(blocks*blockwidth*depth*ctypes.sizeof(ctypes.c_uint16)/1024/1024/1024, (finish-start)))
numpy_array = np.ctypeslib.as_array(shared_array).reshape(blocks*blockwidth, depth)
start = time.perf_counter()
for i in range(blocks):
begin = time.perf_counter()
numpy_array[i*blockwidth:(i+1)*blockwidth, :] = np.ones((blockwidth, depth), dtype=np.uint16)
end = time.perf_counter()
print('Write = %.2f s' % (end-begin))
finish = time.perf_counter()
print('Total time = %.2f s' % (finish-start))
if __name__ == '__main__':
main()
当我 运行 此代码时,我在我的电脑上得到以下信息:
$ python shared-minimal.py -c 1
Init shared array of size 0.56 Gb: 0.36 s
Write = 0.13 s
Total time = 0.13 s
$ python shared-minimal.py -c 2
Init shared array of size 1.12 Gb: 0.72 s
Write = 0.12 s
Write = 0.13 s
Total time = 0.25 s
$ python shared-minimal.py -c 4
Init shared array of size 2.24 Gb: 5.40 s
Write = 1.17 s
Write = 1.17 s
Write = 1.17 s
Write = 1.57 s
Total time = 5.08 s
在最后一种情况下,当数组大小超过 2 Gb 时,初始化时间与数组大小非线性相关,并且将保存大小的切片分配给数组的速度要慢 5 倍以上。
我想知道为什么会这样。我正在 运行 使用 Python 3.5 在 Ubuntu 16.04 上运行脚本。我还通过使用 iotop 注意到,在初始化和写入阵列时,有一个磁盘正在写入 activity ,其大小与共享阵列相同,但我不确定是否是真实的文件被创建或者它只是内存操作(我想它应该是)。通常,在大型共享阵列的情况下,我的系统也会变得响应速度较慢。没有交换,使用 top
、ipcs -mu
和 vmstat
进行检查。
经过更多研究,我发现 python 实际上在 /tmp
中创建了以 pymp-
开头的文件夹,虽然使用文件查看器看不到其中的文件,但它看起来很像 /tmp/
被 python 用于共享内存。刷新文件现金时性能似乎在下降。
最终的解决方案是将 /tmp
挂载为 tmpfs
:
sudo mount -t tmpfs tmpfs /tmp
并且,如果使用最新的 docker,通过向 docker run
命令提供 --tmpfs /tmp
参数。
这样做后,read/write操作在RAM中完成,性能快速稳定。
我仍然想知道为什么 /tmp
用于共享内存,而不是 /dev/shm
,它已经被标记为 tmpfs
并且应该用于共享内存。
你可以试试np.frombuffer
,我的测试速度更快
只需替换以下行
numpy_array = np.ctypeslib.as_array(shared_array).reshape(blocks*blockwidth, depth)
与
numpy_array = np.frombuffer(shared_array, dtype=np.uint16).reshape(blocks*blockwidth, depth)
从 python 3.8 开始,您可以使用 shared_memory,这似乎更有效并且与 numpy
数组配合得很好。我做了一些粗略的测试,并创建了一个形状为 (5000,5000)
的数组,使用 multiprocessing.Array
需要 cca 3s,使用 shared_memory.SharedMemory
下面是 SharedMemory
的一个用例,我在其中使用多个进程为进入输入队列的每个项目创建一个数组,并按照它们进入时的相同顺序从单独的进程中读取它们.
import os
import multiprocessing as mp
import numpy as np
import time
from multiprocessing import shared_memory
class FunctionTimer:
def __init__(self, name):
self.name = name
def __enter__(self):
self.start = time.time()
return self
def __exit__(self, type, value, traceback):
self.end = time.time()
self.exec_time = self.end - self.start
print(f"{self.name} time: {self.exec_time}")
class MpArrayProcessor:
def __init__(self, in_queue, out_queue):
self.in_queue = in_queue
self.out_queue = out_queue
self.stop_event = mp.Event()
self.processes = []
self.cur_id = 0
self.order_dict = {}
self.writable_dict = {}
self.array_locks = {}
self.array_data_dict = {}
@staticmethod
def wrap_func(func, arr_shape, in_queue, out_queue, stop_event, writable, shmem_name):
pid = os.getpid()
while True:
if stop_event.is_set():
print("Stopping")
break
x = in_queue.get(block=True)
if x is None:
break
else:
res = func(arr_shape, x)
with FunctionTimer("Wait and write"):
writable.wait()
shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
c = np.ndarray(arr_shape, dtype=np.uint8, buffer=shmem.buf)
c[:] = res
writable.clear()
out_queue.put((pid, shmem_name, x))
def start(self, func, arr_shape, n_proc):
# TODO implement proper closing of SharedMemory
for p in range(n_proc):
writable = mp.Event()
writable.set()
shmem_name = f"ps_{p}"
data = shared_memory.SharedMemory(create=True, size=arr_shape[0] * arr_shape[1], name=shmem_name)
p = mp.Process(target=self.wrap_func,
args=(
func, arr_shape, self.in_queue, self.out_queue, self.stop_event, writable, shmem_name))
p.start()
self.writable_dict[p.pid] = writable
self.array_data_dict[p.pid] = data
self.processes.append(p)
def get(self):
while True:
if self.cur_id in self.order_dict:
pid, shmem_name, order = self.order_dict[self.cur_id]
print(f"PID: {pid}, idx: {order}, dict_len: {len(self.order_dict)}")
shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
result = np.copy(np.frombuffer(shmem.buf, dtype=np.uint8))
self.writable_dict[pid].set()
del self.order_dict[self.cur_id]
self.cur_id += 1
return result
print(self.order_dict)
pid, shmem_name, order = self.out_queue.get(block=True)
if order == self.cur_id:
print(f"PID: {pid}, idx: {order}, dict_len: {len(self.order_dict)}")
shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
print(np.frombuffer(shmem.buf, dtype=np.uint8))
result = np.copy(np.frombuffer(shmem.buf, dtype=np.uint8))
self.writable_dict[pid].set()
self.cur_id += 1
return result
else:
self.order_dict[order] = (pid, shmem_name, order)
def close(self):
self.stop_event.set()
print("Event set")
for p in self.processes:
self.array_data_dict[p.pid].close()
self.array_data_dict[p.pid].unlink()
p.join()
print("Joined")
p.close()
print("Closed")
def create_data(shape, x):
time.sleep(0.08)
# np.random.randint(0, 255, shape, dtype=np.uint8)
return np.ones(shape, dtype=np.uint8) * x
def fill_queue(queue, n_elements, n_processes):
l = [x for x in range(n_elements)]
for i in l:
queue.put(i)
for i in range(n_processes):
queue.put(None)
print("filling finished")
if __name__ == "__main__":
print(f"Running: {__file__}")
print(f"Script dir: {os.path.dirname(os.path.abspath(__file__))}")
print(f"Working dir: {os.path.abspath(os.getcwd())}")
n = 100
n_proc = 4
input_queue = mp.Queue()
output_queue = mp.Queue(maxsize=50)
# shape = (3, 3)
# shape = (1280, 720)
shape = (5000, 5000)
in_proc = mp.Process(target=fill_queue, args=(input_queue, n, n_proc))
in_proc.start()
with FunctionTimer("MP processing"):
arr_processor = MpArrayProcessor(input_queue, output_queue)
arr_processor.start(create_data, shape, 4)
results = []
for i in range(n):
print(f"Getting: {i}")
r = arr_processor.get()[:shape[0]*shape[1]].reshape(shape)
results.append(r)
arr_processor.close()
in_proc.join()
in_proc.close()
print(results)
with FunctionTimer("Normal"):
for i in range(n):
a = create_data(shape, i)