Python 多处理队列使代码在大数据时挂起

Python multiprocessing queue makes code hang with large data

我正在使用 python 的多处理来分析一些大文本。几天后试图弄清楚为什么我的代码挂起(即进程没有结束),我能够使用以下简单代码重现问题:

import multiprocessing as mp

for y in range(65500, 65600):

    def func(output):


    if __name__ == "__main__":

        output = mp.Queue()

        process = mp.Process(target = func, args = (output,))



如您所见,如果放入队列的项目太大,进程就会挂起。 它不会冻结,如果我在 output.put() 之后写更多的代码,它会 运行,但是,这个过程仍然不会停止。

当字符串达到 65500 个字符时开始发生这种情况,具体取决于您的解释器,它可能会有所不同。

我知道 mp.Queue 有一个 maxsize 参数,但进行一些搜索后我发现它是关于队列的项目数量大小,而不是项目本身的大小。

有办法解决这个问题吗? 我在原始代码中需要放入队列中的数据非常非常大...



If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available.



This is not the problem, because queue has not been given a maxsize so put should succeed until you run out of memory.


A queue works like this: - when you call queue.put(data), the data is added to a deque, which can grow and shrink forever - then a thread pops elements from the deque, and sends them so that the other process can receive them through a pipe or a Unix socket (created via socketpair). But, and that's the important point, both pipes and unix sockets have a limited capacity (used to be 4k - pagesize - on older Linux kernels for pipes, now it's 64k, and between 64k-120k for unix sockets, depending on tunable systcls). - when you do queue.get(), you just do a read on the pipe/socket

[..] when size [becomes too big] the writing thread blocks on the write syscall. And since a join is performed before dequeing the item [note: that's your process.join], you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full! If you dequeue the item before waiting the submitter process, everything works fine.

更新 2

I understand. But I don't actually have a consumer (if it is what I'm thinking it is), I will only get the results from the queue when process has finished putting it into the queue.

是的,这就是问题所在。 multiprocessing.Queue 不是存储容器。您应该专门使用它在 "producers"(生成进入队列的数据的进程)和 "consumers (the processes that " 使用该数据之间传递数据。如您现在所知,将数据留在那里是个坏主意.

How can I get an item from the queue if I cannot even put it there first?

putget 隐藏了数据填满管道时将数据放在一起的问题,因此您只需要在 "main" 进程中设置一个循环即可get 个项目出队列,例如,将它们附加到列表中。该列表在主进程的内存space中,不会堵塞管道。