将大型 ndarrays 快速放置到 multiprocessing.Queue

Put large ndarrays fast to multiprocessing.Queue

在尝试将大 ndarray 放入 Process 中的 Queue 时,我遇到了以下问题:

首先,这是代码:

import numpy
import multiprocessing
from ctypes import c_bool
import time


def run(acquisition_running, data_queue):
    while acquisition_running.value:
        length = 65536
        data = numpy.ndarray(length, dtype='float')

        data_queue.put(data)
        time.sleep(0.1)


if __name__ == '__main__':
    acquisition_running = multiprocessing.Value(c_bool)

    data_queue = multiprocessing.Queue()

    process = multiprocessing.Process(
        target=run, args=(acquisition_running, data_queue))

    acquisition_running.value = True
    process.start()
    time.sleep(1)
    acquisition_running.value = False
    process.join()

    print('Finished')

    number_items = 0

    while not data_queue.empty():
        data_item = data_queue.get()
        number_items += 1

    print(number_items)
  1. 如果我使用 length=10 左右,一切正常。我通过队列传输了 9 个项目。

  2. 如果我增加到 length=1000,在我的计算机上 process.join() 块,尽管函数 run() 已经完成。我可以用 process.join() 注释该行,然后会看到队列中只有 2 个项目,所以显然将数据放入队列非常慢。

我的计划实际上是传输 4 个 ndarray,每个长度为 65536。对于 Thread,这工作得非常快(<1ms)。有没有办法提高进程传输数据的速度?

我在 Windows 机器上使用 Python 3.4,但在 Linux 上使用 Python 3.4 我得到了相同的行为。

如果你有非常大的数组,你可能只想传递它们的 pickled 状态——或者更好的选择可能是使用 multiprocessing.Arraymultiprocessing.sharedctypes.RawArray 来创建一个共享内存数组(对于后者,参见 http://briansimulator.org/sharing-numpy-arrays-between-processes/)。您必须担心冲突,因为您将拥有一个不受 GIL 约束的数组——并且需要锁。但是,您只需发送数组索引即可访问共享数组数据。

"Is there a way to improve speed of transmitting data for processes?"

当然可以,因为要解决正确的问题。目前,您只是在填充缓冲区而不同时清空它。 恭喜,你刚刚给自己建立了一个so-called死锁对应的引用from the documentation是:

Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe.

但是,让我们慢慢来。首先,"speed"不是你的问题!我了解到您只是在试验 Python 的 multiprocessing。阅读您的代码时最重要的见解是 parent 和 child 之间的通信流,尤其是事件处理并没有真正意义。如果您有一个 real-world 问题想要解决,您肯定无法通过 这种 方式解决它。如果您没有 real-world 问题,那么在开始编写代码之前,您首先需要想出一个好的问题 ;-)。最终,您将需要了解操作系统为 inter-process 通信提供的通信原语。

对您观察到的现象的解释:

您的 child 进程生成大约 10 * length * size(float) 字节的数据(考虑到您的 child 进程可以执行大约 10 次迭代,而您的 parent 休眠大约 1 秒在将 acquisition_running 设置为 False 之前)。当您的 parent 进程休眠时,child 将指定数量的数据放入队列中。您需要了解队列是一个复杂的结构。你不需要了解它的每一点。但有一件事真的很重要:inter-process 通信的队列显然使用了某种位于 parent 和 child 之间的缓冲区*。缓冲区的大小通常是有限的。您正在从 child 中写入此缓冲区,而没有 同时从 parent 中读取它。也就是说,缓冲区内容在 parent 刚刚休眠时稳步增长。通过增加 length 你 运行 进入队列缓冲区已满并且 child 进程无法再写入的情况。但是,child 进程在写入所有数据之前不能终止。同时,parent 进程等待 child 终止。

看到了吗?一个实体等待另一个实体。 parent 等待 child 终止,child 等待 parent 进行一些 space。这种情况称为死锁。它无法自行解决。

在细节上,缓冲区的情况比上面描述的要复杂一些。您的 child 进程产生了一个额外的线程,试图通过管道将缓冲数据推送到 parent。实际上,这个管道的缓冲区是限制实体。它由操作系统定义,至少在 Linux 上通常不大于 65536 字节。

重要的部分是,换句话说:parent不从管道读取之前 child 完成 尝试写入 到管道。在使用管道的每个有意义的场景中,读取写入以相当同时的方式发生,因此一个进程可以快速响应另一个进程提供的输入。你做的恰恰相反:你让你的 parent 进入睡眠状态,因此将它 dis-responsive 渲染为来自 child 的输入,导致死锁情况。

(*) “当进程首先将一个项目放入队列时,将启动一个供给线程,它将 objects 从缓冲区传输到管道 ”,来自https://docs.python.org/2/library/multiprocessing.html

要解决该问题,结合 JPG 的出色回答,您可以做的一件事是在每个进程之间卸载您的队列。

所以改为这样做:

process.start()
data_item = data_queue.get()
process.join()

虽然这并没有完全复制代码中的行为(数据计数),但您明白了 ;)

将 array/list 转换为 str(your_array)

q.put(str(your_array))