写入 Python 中的共享内存非常慢

Writing to shared memory in Python is very slow

我使用 python.multiprocessing.sharedctypes.RawArray 在多个进程之间共享大型 numpy 数组。而且我注意到,当这个数组很大(> 1 或 2 Gb)时,它的初始化变得非常慢,并且 read/write 到(并且 read/write 时间不可预测,有时非常快,有时非常非常慢)。

我制作了一个小示例脚本,它只使用一个进程,初始化一个共享数组并多次写入它。并测量执行这些操作的时间。

import argparse
import ctypes
import multiprocessing as mp
import multiprocessing.sharedctypes as mpsc
import numpy as np
import time

def main():
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument('-c', '--block-count', type=int, default=1,
                        help='Number of blocks to write')
    parser.add_argument('-w', '--block-width', type=int, default=20000,
                        help='Block width')
    parser.add_argument('-d', '--block-depth', type=int, default=15000,
                        help='Block depth')
    args = parser.parse_args()
    blocks = args.block_count
    blockwidth = args.block_width
    depth = args.block_depth
    start = time.perf_counter()
    shared_array = mpsc.RawArray(ctypes.c_uint16, blocks*blockwidth*depth)
    finish = time.perf_counter()
    print('Init shared array of size {:.2f} Gb: {:.2f} s'.format(blocks*blockwidth*depth*ctypes.sizeof(ctypes.c_uint16)/1024/1024/1024, (finish-start)))
    numpy_array = np.ctypeslib.as_array(shared_array).reshape(blocks*blockwidth, depth)
    start = time.perf_counter()
    for i in range(blocks):
        begin = time.perf_counter()
        numpy_array[i*blockwidth:(i+1)*blockwidth, :] = np.ones((blockwidth, depth), dtype=np.uint16)
        end = time.perf_counter()
        print('Write = %.2f s' % (end-begin))
    finish = time.perf_counter()
    print('Total time = %.2f s' % (finish-start))

if __name__ == '__main__':
    main()

当我 运行 此代码时,我在我的电脑上得到以下信息:

$ python shared-minimal.py -c 1
Init shared array of size 0.56 Gb: 0.36 s
Write = 0.13 s
Total time = 0.13 s
$ python shared-minimal.py -c 2
Init shared array of size 1.12 Gb: 0.72 s
Write = 0.12 s
Write = 0.13 s
Total time = 0.25 s
$ python shared-minimal.py -c 4
Init shared array of size 2.24 Gb: 5.40 s
Write = 1.17 s
Write = 1.17 s
Write = 1.17 s
Write = 1.57 s
Total time = 5.08 s

在最后一种情况下,当数组大小超过 2 Gb 时,初始化时间与数组大小非线性相关,并且将保存大小的切片分配给数组的速度要慢 5 倍以上。

我想知道为什么会这样。我正在 运行 使用 Python 3.5 在 Ubuntu 16.04 上运行脚本。我还通过使用 iotop 注意到,在初始化和写入阵列时,有一个磁盘正在写入 activity ,其大小与共享阵列相同,但我不确定是否是真实的文件被创建或者它只是内存操作(我想它应该是)。通常,在大型共享阵列的情况下,我的系统也会变得响应速度较慢。没有交换,使用 topipcs -muvmstat 进行检查。

经过更多研究,我发现 python 实际上在 /tmp 中创建了以 pymp- 开头的文件夹,虽然使用文件查看器看不到其中的文件,但它看起来很像 /tmp/ 被 python 用于共享内存。刷新文件现金时性能似乎在下降。

最终的解决方案是将 /tmp 挂载为 tmpfs:

sudo mount -t tmpfs tmpfs /tmp

并且,如果使用最新的 docker,通过向 docker run 命令提供 --tmpfs /tmp 参数。

这样做后,read/write操作在RAM中完成,性能快速稳定。

我仍然想知道为什么 /tmp 用于共享内存,而不是 /dev/shm,它已经被标记为 tmpfs 并且应该用于共享内存。

你可以试试np.frombuffer,我的测试速度更快

只需替换以下行

numpy_array = np.ctypeslib.as_array(shared_array).reshape(blocks*blockwidth, depth)

numpy_array = np.frombuffer(shared_array, dtype=np.uint16).reshape(blocks*blockwidth, depth)

从 python 3.8 开始,您可以使用 shared_memory,这似乎更有效并且与 numpy 数组配合得很好。我做了一些粗略的测试,并创建了一个形状为 (5000,5000) 的数组,使用 multiprocessing.Array 需要 cca 3s,使用 shared_memory.SharedMemory

需要 cca 0.015s

下面是 SharedMemory 的一个用例,我在其中使用多个进程为进入输入队列的每个项目创建一个数组,并按照它们进入时的相同顺序从单独的进程中读取它们.

import os
import multiprocessing as mp
import numpy as np
import time
from multiprocessing import shared_memory    


class FunctionTimer:
    def __init__(self, name):
        self.name = name

    def __enter__(self):
        self.start = time.time()
        return self

    def __exit__(self, type, value, traceback):
        self.end = time.time()
        self.exec_time = self.end - self.start
        print(f"{self.name} time: {self.exec_time}")


class MpArrayProcessor:
    def __init__(self, in_queue, out_queue):
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.stop_event = mp.Event()
        self.processes = []
        self.cur_id = 0
        self.order_dict = {}

        self.writable_dict = {}
        self.array_locks = {}
        self.array_data_dict = {}

    @staticmethod
    def wrap_func(func, arr_shape, in_queue, out_queue, stop_event, writable, shmem_name):
        pid = os.getpid()

        while True:
            if stop_event.is_set():
                print("Stopping")
                break
            x = in_queue.get(block=True)

            if x is None:
                break
            else:
                res = func(arr_shape, x)
                with FunctionTimer("Wait and write"):
                    writable.wait()
                    shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
                    c = np.ndarray(arr_shape, dtype=np.uint8, buffer=shmem.buf)
                    c[:] = res

                writable.clear()
                out_queue.put((pid, shmem_name, x))

    def start(self, func, arr_shape, n_proc):
        # TODO implement proper closing of SharedMemory
        for p in range(n_proc):
            writable = mp.Event()
            writable.set()

            shmem_name = f"ps_{p}"
            data = shared_memory.SharedMemory(create=True, size=arr_shape[0] * arr_shape[1], name=shmem_name)
            p = mp.Process(target=self.wrap_func,
                           args=(
                               func, arr_shape, self.in_queue, self.out_queue, self.stop_event, writable, shmem_name))

            p.start()
            self.writable_dict[p.pid] = writable
            self.array_data_dict[p.pid] = data
            self.processes.append(p)

    def get(self):
        while True:
            if self.cur_id in self.order_dict:
                pid, shmem_name, order = self.order_dict[self.cur_id]
                print(f"PID: {pid}, idx: {order}, dict_len: {len(self.order_dict)}")
                shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
                result = np.copy(np.frombuffer(shmem.buf, dtype=np.uint8))
                self.writable_dict[pid].set()
                del self.order_dict[self.cur_id]
                self.cur_id += 1
                return result
            print(self.order_dict)
            pid, shmem_name, order = self.out_queue.get(block=True)
            if order == self.cur_id:
                print(f"PID: {pid}, idx: {order}, dict_len: {len(self.order_dict)}")
                shmem = shared_memory.SharedMemory(name=shmem_name, create=False)
                print(np.frombuffer(shmem.buf, dtype=np.uint8))
                result = np.copy(np.frombuffer(shmem.buf, dtype=np.uint8))
                self.writable_dict[pid].set()

                self.cur_id += 1
                return result
            else:
                self.order_dict[order] = (pid, shmem_name, order)

    def close(self):
        self.stop_event.set()
        print("Event set")
        for p in self.processes:
            self.array_data_dict[p.pid].close()
            self.array_data_dict[p.pid].unlink()
            p.join()
            print("Joined")
            p.close()
            print("Closed")



def create_data(shape, x):
    time.sleep(0.08)
    # np.random.randint(0, 255, shape, dtype=np.uint8)
    return np.ones(shape, dtype=np.uint8) * x


def fill_queue(queue, n_elements, n_processes):
    l = [x for x in range(n_elements)]
    for i in l:
        queue.put(i)
    for i in range(n_processes):
        queue.put(None)

    print("filling finished")



if __name__ == "__main__":
    print(f"Running: {__file__}")
    print(f"Script dir: {os.path.dirname(os.path.abspath(__file__))}")
    print(f"Working dir: {os.path.abspath(os.getcwd())}")

    n = 100
    n_proc = 4
    input_queue = mp.Queue()
    output_queue = mp.Queue(maxsize=50)
    # shape = (3, 3)
    # shape = (1280, 720)
    shape = (5000, 5000)


    in_proc = mp.Process(target=fill_queue, args=(input_queue, n, n_proc))
    in_proc.start()

    with FunctionTimer("MP processing"):
        arr_processor = MpArrayProcessor(input_queue, output_queue)
        arr_processor.start(create_data, shape, 4)

        results = []

        for i in range(n):
            print(f"Getting: {i}")
            r = arr_processor.get()[:shape[0]*shape[1]].reshape(shape)
            results.append(r)

        arr_processor.close()

    in_proc.join()
    in_proc.close()

    print(results)

    with FunctionTimer("Normal"):
        for i in range(n):
            a = create_data(shape, i)