Python 多处理队列使代码在大数据时挂起

Python multiprocessing queue makes code hang with large data

我正在使用 python 的多处理来分析一些大文本。几天后试图弄清楚为什么我的代码挂起(即进程没有结束),我能够使用以下简单代码重现问题:

import multiprocessing as mp

for y in range(65500, 65600):
    print(y)

    def func(output):

         output.put("a"*y)

    if __name__ == "__main__":

        output = mp.Queue()

        process = mp.Process(target = func, args = (output,))

        process.start()

        process.join()

如您所见,如果放入队列的项目太大,进程就会挂起。 它不会冻结,如果我在 output.put() 之后写更多的代码,它会 运行,但是,这个过程仍然不会停止。

当字符串达到 65500 个字符时开始发生这种情况,具体取决于您的解释器,它可能会有所不同。

我知道 mp.Queue 有一个 maxsize 参数,但进行一些搜索后我发现它是关于队列的项目数量大小,而不是项目本身的大小。

有办法解决这个问题吗? 我在原始代码中需要放入队列中的数据非常非常大...

您的队列已满,没有消费者可以清空它。

根据Queue.put的定义:

If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available.

假设生产者和消费者之间没有死锁(并且假设您的原始代码确实有消费者,因为您的示例没有),最终生产者应该被解锁并终止。检查您的消费者的代码(或将其添加到问题中,以便我们查看)


更新

This is not the problem, because queue has not been given a maxsize so put should succeed until you run out of memory.

这不是队列的行为。正如这个ticket中所阐述的,这里阻塞的部分不是队列本身,而是底层管道。来自链接资源(“[]”之间的插入是我的):

A queue works like this: - when you call queue.put(data), the data is added to a deque, which can grow and shrink forever - then a thread pops elements from the deque, and sends them so that the other process can receive them through a pipe or a Unix socket (created via socketpair). But, and that's the important point, both pipes and unix sockets have a limited capacity (used to be 4k - pagesize - on older Linux kernels for pipes, now it's 64k, and between 64k-120k for unix sockets, depending on tunable systcls). - when you do queue.get(), you just do a read on the pipe/socket

[..] when size [becomes too big] the writing thread blocks on the write syscall. And since a join is performed before dequeing the item [note: that's your process.join], you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full! If you dequeue the item before waiting the submitter process, everything works fine.


更新 2

I understand. But I don't actually have a consumer (if it is what I'm thinking it is), I will only get the results from the queue when process has finished putting it into the queue.

是的,这就是问题所在。 multiprocessing.Queue 不是存储容器。您应该专门使用它在 "producers"(生成进入队列的数据的进程)和 "consumers (the processes that " 使用该数据之间传递数据。如您现在所知,将数据留在那里是个坏主意.

How can I get an item from the queue if I cannot even put it there first?

putget 隐藏了数据填满管道时将数据放在一起的问题,因此您只需要在 "main" 进程中设置一个循环即可get 个项目出队列,例如,将它们附加到列表中。该列表在主进程的内存space中,不会堵塞管道。