将大型 ndarrays 快速放置到 multiprocessing.Queue
Put large ndarrays fast to multiprocessing.Queue
在尝试将大 ndarray
放入 Process
中的 Queue
时,我遇到了以下问题:
首先,这是代码:
import numpy
import multiprocessing
from ctypes import c_bool
import time
def run(acquisition_running, data_queue):
while acquisition_running.value:
length = 65536
data = numpy.ndarray(length, dtype='float')
data_queue.put(data)
time.sleep(0.1)
if __name__ == '__main__':
acquisition_running = multiprocessing.Value(c_bool)
data_queue = multiprocessing.Queue()
process = multiprocessing.Process(
target=run, args=(acquisition_running, data_queue))
acquisition_running.value = True
process.start()
time.sleep(1)
acquisition_running.value = False
process.join()
print('Finished')
number_items = 0
while not data_queue.empty():
data_item = data_queue.get()
number_items += 1
print(number_items)
如果我使用 length=10
左右,一切正常。我通过队列传输了 9 个项目。
如果我增加到 length=1000
,在我的计算机上 process.join()
块,尽管函数 run()
已经完成。我可以用 process.join()
注释该行,然后会看到队列中只有 2 个项目,所以显然将数据放入队列非常慢。
我的计划实际上是传输 4 个 ndarray,每个长度为 65536。对于 Thread
,这工作得非常快(<1ms)。有没有办法提高进程传输数据的速度?
我在 Windows 机器上使用 Python 3.4,但在 Linux 上使用 Python 3.4 我得到了相同的行为。
如果你有非常大的数组,你可能只想传递它们的 pickled 状态——或者更好的选择可能是使用 multiprocessing.Array
或 multiprocessing.sharedctypes.RawArray
来创建一个共享内存数组(对于后者,参见 http://briansimulator.org/sharing-numpy-arrays-between-processes/)。您必须担心冲突,因为您将拥有一个不受 GIL 约束的数组——并且需要锁。但是,您只需发送数组索引即可访问共享数组数据。
"Is there a way to improve speed of transmitting data for processes?"
当然可以,因为要解决正确的问题。目前,您只是在填充缓冲区而不同时清空它。 恭喜,你刚刚给自己建立了一个so-called死锁。对应的引用from the documentation是:
Bear in mind that a process that has put items in a queue will wait
before terminating until all the buffered items are fed by the
“feeder” thread to the underlying pipe.
但是,让我们慢慢来。首先,"speed"不是你的问题!我了解到您只是在试验 Python 的 multiprocessing
。阅读您的代码时最重要的见解是 parent 和 child 之间的通信流,尤其是事件处理并没有真正意义。如果您有一个 real-world 问题想要解决,您肯定无法通过 这种 方式解决它。如果您没有 real-world 问题,那么在开始编写代码之前,您首先需要想出一个好的问题 ;-)。最终,您将需要了解操作系统为 inter-process 通信提供的通信原语。
对您观察到的现象的解释:
您的 child 进程生成大约 10 * length * size(float)
字节的数据(考虑到您的 child 进程可以执行大约 10 次迭代,而您的 parent 休眠大约 1 秒在将 acquisition_running
设置为 False
之前)。当您的 parent 进程休眠时,child 将指定数量的数据放入队列中。您需要了解队列是一个复杂的结构。你不需要了解它的每一点。但有一件事真的很重要:inter-process 通信的队列显然使用了某种位于 parent 和 child 之间的缓冲区*。缓冲区的大小通常是有限的。您正在从 child 中写入此缓冲区,而没有 同时从 parent 中读取它。也就是说,缓冲区内容在 parent 刚刚休眠时稳步增长。通过增加 length
你 运行 进入队列缓冲区已满并且 child 进程无法再写入的情况。但是,child 进程在写入所有数据之前不能终止。同时,parent 进程等待 child 终止。
看到了吗?一个实体等待另一个实体。 parent 等待 child 终止,child 等待 parent 进行一些 space。这种情况称为死锁。它无法自行解决。
在细节上,缓冲区的情况比上面描述的要复杂一些。您的 child 进程产生了一个额外的线程,试图通过管道将缓冲数据推送到 parent。实际上,这个管道的缓冲区是限制实体。它由操作系统定义,至少在 Linux 上通常不大于 65536 字节。
重要的部分是,换句话说:parent不从管道读取之前 child 完成 尝试写入 到管道。在使用管道的每个有意义的场景中,读取和写入以相当同时的方式发生,因此一个进程可以快速响应另一个进程提供的输入。你做的恰恰相反:你让你的 parent 进入睡眠状态,因此将它 dis-responsive 渲染为来自 child 的输入,导致死锁情况。
(*) “当进程首先将一个项目放入队列时,将启动一个供给线程,它将 objects 从缓冲区传输到管道 ”,来自https://docs.python.org/2/library/multiprocessing.html
要解决该问题,结合 JPG 的出色回答,您可以做的一件事是在每个进程之间卸载您的队列。
所以改为这样做:
process.start()
data_item = data_queue.get()
process.join()
虽然这并没有完全复制代码中的行为(数据计数),但您明白了 ;)
将 array/list 转换为 str(your_array)
q.put(str(your_array))
在尝试将大 ndarray
放入 Process
中的 Queue
时,我遇到了以下问题:
首先,这是代码:
import numpy
import multiprocessing
from ctypes import c_bool
import time
def run(acquisition_running, data_queue):
while acquisition_running.value:
length = 65536
data = numpy.ndarray(length, dtype='float')
data_queue.put(data)
time.sleep(0.1)
if __name__ == '__main__':
acquisition_running = multiprocessing.Value(c_bool)
data_queue = multiprocessing.Queue()
process = multiprocessing.Process(
target=run, args=(acquisition_running, data_queue))
acquisition_running.value = True
process.start()
time.sleep(1)
acquisition_running.value = False
process.join()
print('Finished')
number_items = 0
while not data_queue.empty():
data_item = data_queue.get()
number_items += 1
print(number_items)
如果我使用
length=10
左右,一切正常。我通过队列传输了 9 个项目。如果我增加到
length=1000
,在我的计算机上process.join()
块,尽管函数run()
已经完成。我可以用process.join()
注释该行,然后会看到队列中只有 2 个项目,所以显然将数据放入队列非常慢。
我的计划实际上是传输 4 个 ndarray,每个长度为 65536。对于 Thread
,这工作得非常快(<1ms)。有没有办法提高进程传输数据的速度?
我在 Windows 机器上使用 Python 3.4,但在 Linux 上使用 Python 3.4 我得到了相同的行为。
如果你有非常大的数组,你可能只想传递它们的 pickled 状态——或者更好的选择可能是使用 multiprocessing.Array
或 multiprocessing.sharedctypes.RawArray
来创建一个共享内存数组(对于后者,参见 http://briansimulator.org/sharing-numpy-arrays-between-processes/)。您必须担心冲突,因为您将拥有一个不受 GIL 约束的数组——并且需要锁。但是,您只需发送数组索引即可访问共享数组数据。
"Is there a way to improve speed of transmitting data for processes?"
当然可以,因为要解决正确的问题。目前,您只是在填充缓冲区而不同时清空它。 恭喜,你刚刚给自己建立了一个so-called死锁。对应的引用from the documentation是:
Bear in mind that a process that has put items in a queue will wait before terminating until all the buffered items are fed by the “feeder” thread to the underlying pipe.
但是,让我们慢慢来。首先,"speed"不是你的问题!我了解到您只是在试验 Python 的 multiprocessing
。阅读您的代码时最重要的见解是 parent 和 child 之间的通信流,尤其是事件处理并没有真正意义。如果您有一个 real-world 问题想要解决,您肯定无法通过 这种 方式解决它。如果您没有 real-world 问题,那么在开始编写代码之前,您首先需要想出一个好的问题 ;-)。最终,您将需要了解操作系统为 inter-process 通信提供的通信原语。
对您观察到的现象的解释:
您的 child 进程生成大约 10 * length * size(float)
字节的数据(考虑到您的 child 进程可以执行大约 10 次迭代,而您的 parent 休眠大约 1 秒在将 acquisition_running
设置为 False
之前)。当您的 parent 进程休眠时,child 将指定数量的数据放入队列中。您需要了解队列是一个复杂的结构。你不需要了解它的每一点。但有一件事真的很重要:inter-process 通信的队列显然使用了某种位于 parent 和 child 之间的缓冲区*。缓冲区的大小通常是有限的。您正在从 child 中写入此缓冲区,而没有 同时从 parent 中读取它。也就是说,缓冲区内容在 parent 刚刚休眠时稳步增长。通过增加 length
你 运行 进入队列缓冲区已满并且 child 进程无法再写入的情况。但是,child 进程在写入所有数据之前不能终止。同时,parent 进程等待 child 终止。
看到了吗?一个实体等待另一个实体。 parent 等待 child 终止,child 等待 parent 进行一些 space。这种情况称为死锁。它无法自行解决。
在细节上,缓冲区的情况比上面描述的要复杂一些。您的 child 进程产生了一个额外的线程,试图通过管道将缓冲数据推送到 parent。实际上,这个管道的缓冲区是限制实体。它由操作系统定义,至少在 Linux 上通常不大于 65536 字节。
重要的部分是,换句话说:parent不从管道读取之前 child 完成 尝试写入 到管道。在使用管道的每个有意义的场景中,读取和写入以相当同时的方式发生,因此一个进程可以快速响应另一个进程提供的输入。你做的恰恰相反:你让你的 parent 进入睡眠状态,因此将它 dis-responsive 渲染为来自 child 的输入,导致死锁情况。
(*) “当进程首先将一个项目放入队列时,将启动一个供给线程,它将 objects 从缓冲区传输到管道 ”,来自https://docs.python.org/2/library/multiprocessing.html
要解决该问题,结合 JPG 的出色回答,您可以做的一件事是在每个进程之间卸载您的队列。
所以改为这样做:
process.start()
data_item = data_queue.get()
process.join()
虽然这并没有完全复制代码中的行为(数据计数),但您明白了 ;)
将 array/list 转换为 str(your_array)
q.put(str(your_array))