Python 多处理队列使代码在大数据时挂起
Python multiprocessing queue makes code hang with large data
我正在使用 python 的多处理来分析一些大文本。几天后试图弄清楚为什么我的代码挂起(即进程没有结束),我能够使用以下简单代码重现问题:
import multiprocessing as mp
for y in range(65500, 65600):
print(y)
def func(output):
output.put("a"*y)
if __name__ == "__main__":
output = mp.Queue()
process = mp.Process(target = func, args = (output,))
process.start()
process.join()
如您所见,如果放入队列的项目太大,进程就会挂起。
它不会冻结,如果我在 output.put()
之后写更多的代码,它会 运行,但是,这个过程仍然不会停止。
当字符串达到 65500 个字符时开始发生这种情况,具体取决于您的解释器,它可能会有所不同。
我知道 mp.Queue
有一个 maxsize
参数,但进行一些搜索后我发现它是关于队列的项目数量大小,而不是项目本身的大小。
有办法解决这个问题吗?
我在原始代码中需要放入队列中的数据非常非常大...
您的队列已满,没有消费者可以清空它。
根据Queue.put
的定义:
If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available.
假设生产者和消费者之间没有死锁(并且假设您的原始代码确实有消费者,因为您的示例没有),最终生产者应该被解锁并终止。检查您的消费者的代码(或将其添加到问题中,以便我们查看)
更新
This is not the problem, because queue has not been given a maxsize so put should succeed until you run out of memory.
这不是队列的行为。正如这个ticket中所阐述的,这里阻塞的部分不是队列本身,而是底层管道。来自链接资源(“[]”之间的插入是我的):
A queue works like this:
- when you call queue.put(data), the data is added to a deque, which can grow and shrink forever
- then a thread pops elements from the deque, and sends them so that the other process can receive them through a pipe or a Unix socket (created via socketpair). But, and that's the important point, both pipes and unix sockets have a limited capacity (used to be 4k - pagesize - on older Linux kernels for pipes, now it's 64k, and between 64k-120k for unix sockets, depending on tunable systcls).
- when you do queue.get(), you just do a read on the pipe/socket
[..] when size [becomes too big] the writing thread blocks on the write syscall.
And since a join is performed before dequeing the item [note: that's your process.join
], you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full!
If you dequeue the item before waiting the submitter process, everything works fine.
更新 2
I understand. But I don't actually have a consumer (if it is what I'm thinking it is), I will only get the results from the queue when process has finished putting it into the queue.
是的,这就是问题所在。 multiprocessing.Queue
不是存储容器。您应该专门使用它在 "producers"(生成进入队列的数据的进程)和 "consumers (the processes that " 使用该数据之间传递数据。如您现在所知,将数据留在那里是个坏主意.
How can I get an item from the queue if I cannot even put it there first?
put
和 get
隐藏了数据填满管道时将数据放在一起的问题,因此您只需要在 "main" 进程中设置一个循环即可get
个项目出队列,例如,将它们附加到列表中。该列表在主进程的内存space中,不会堵塞管道。
我正在使用 python 的多处理来分析一些大文本。几天后试图弄清楚为什么我的代码挂起(即进程没有结束),我能够使用以下简单代码重现问题:
import multiprocessing as mp
for y in range(65500, 65600):
print(y)
def func(output):
output.put("a"*y)
if __name__ == "__main__":
output = mp.Queue()
process = mp.Process(target = func, args = (output,))
process.start()
process.join()
如您所见,如果放入队列的项目太大,进程就会挂起。
它不会冻结,如果我在 output.put()
之后写更多的代码,它会 运行,但是,这个过程仍然不会停止。
当字符串达到 65500 个字符时开始发生这种情况,具体取决于您的解释器,它可能会有所不同。
我知道 mp.Queue
有一个 maxsize
参数,但进行一些搜索后我发现它是关于队列的项目数量大小,而不是项目本身的大小。
有办法解决这个问题吗? 我在原始代码中需要放入队列中的数据非常非常大...
您的队列已满,没有消费者可以清空它。
根据Queue.put
的定义:
If the optional argument block is True (the default) and timeout is None (the default), block if necessary until a free slot is available.
假设生产者和消费者之间没有死锁(并且假设您的原始代码确实有消费者,因为您的示例没有),最终生产者应该被解锁并终止。检查您的消费者的代码(或将其添加到问题中,以便我们查看)
更新
This is not the problem, because queue has not been given a maxsize so put should succeed until you run out of memory.
这不是队列的行为。正如这个ticket中所阐述的,这里阻塞的部分不是队列本身,而是底层管道。来自链接资源(“[]”之间的插入是我的):
A queue works like this: - when you call queue.put(data), the data is added to a deque, which can grow and shrink forever - then a thread pops elements from the deque, and sends them so that the other process can receive them through a pipe or a Unix socket (created via socketpair). But, and that's the important point, both pipes and unix sockets have a limited capacity (used to be 4k - pagesize - on older Linux kernels for pipes, now it's 64k, and between 64k-120k for unix sockets, depending on tunable systcls). - when you do queue.get(), you just do a read on the pipe/socket
[..] when size [becomes too big] the writing thread blocks on the write syscall. And since a join is performed before dequeing the item [note: that's your
process.join
], you just deadlock, since the join waits for the sending thread to complete, and the write can't complete since the pipe/socket is full! If you dequeue the item before waiting the submitter process, everything works fine.
更新 2
I understand. But I don't actually have a consumer (if it is what I'm thinking it is), I will only get the results from the queue when process has finished putting it into the queue.
是的,这就是问题所在。 multiprocessing.Queue
不是存储容器。您应该专门使用它在 "producers"(生成进入队列的数据的进程)和 "consumers (the processes that " 使用该数据之间传递数据。如您现在所知,将数据留在那里是个坏主意.
How can I get an item from the queue if I cannot even put it there first?
put
和 get
隐藏了数据填满管道时将数据放在一起的问题,因此您只需要在 "main" 进程中设置一个循环即可get
个项目出队列,例如,将它们附加到列表中。该列表在主进程的内存space中,不会堵塞管道。