Multiprocessing pool.join() 在某些情况下挂起
Multiprocesing pool.join() hangs under some circumstances
我正在尝试使用 multiprocessing
在 Python 中创建一个简单的生产者/消费者模式。它有效,但挂在 poll.join()
.
from multiprocessing import Pool, Queue
que = Queue()
def consume():
while True:
element = que.get()
if element is None:
print('break')
break
print('Consumer closing')
def produce(nr):
que.put([nr] * 1000000)
print('Producer {} closing'.format(nr))
def main():
p = Pool(5)
p.apply_async(consume)
p.map(produce, range(5))
que.put(None)
print('None')
p.close()
p.join()
if __name__ == '__main__':
main()
示例输出:
~/Python/Examples $ ./multip_prod_cons.py
Producer 1 closing
Producer 3 closing
Producer 0 closing
Producer 2 closing
Producer 4 closing
None
break
Consumer closing
然而,当我改变一行时它完美地工作:
que.put([nr] * 100)
它在 Linux 系统 运行 Python 3.4.3 或 Python 2.7.10 上 100% 可重现。我错过了什么吗?
这里有很多混乱。您正在写的不是 producer/consumer 场景,而是滥用另一种通常称为 "pool of workers".
的模式的混乱
工人池模式是 producer/consumer 模式的一种应用,其中有一个生产者安排工作,许多消费者使用它。在这种模式下,Pool
的所有者最终成为生产者,而工人将成为消费者。
在您的示例中,您有一个混合解决方案,其中一名工人最终成为消费者,而其他工人则充当某种中间件。整个设计非常低效,重复了 Pool
已经提供的大部分逻辑,更重要的是,非常容易出错。你最终遭受的是Deadlock。
将对象放入 multiprocessing.Queue
是一个异步操作。它仅在 Queue
已满并且您的 Queue
具有无限大小时才会阻塞。
这意味着您的 produce
函数会立即 returns 因此对 p.map
的调用不会像您期望的那样阻塞。相反,相关的工作进程会等到实际消息通过 Queue
用作通信通道的 Pipe
。
接下来发生的事情是,当您将 Queue
和 None
"message" 放入 produce
函数之前交付的所有列表之前,您会过早地终止您的消费者create 被正确地推送到 Pipe
.
您在调用 p.join
时注意到了这个问题,但实际情况如下。
p.join
调用正在等待所有工作进程终止。
- 工作进程正在等待大列表通过
Queue
的 Pipe
。
- 由于消费工人早已不复存在,因此没有人排干显然已满的
Pipe
。
如果您的列表足够小,可以在您实际将终止消息发送到 consume
函数之前通过,则不会显示该问题。
我正在尝试使用 multiprocessing
在 Python 中创建一个简单的生产者/消费者模式。它有效,但挂在 poll.join()
.
from multiprocessing import Pool, Queue
que = Queue()
def consume():
while True:
element = que.get()
if element is None:
print('break')
break
print('Consumer closing')
def produce(nr):
que.put([nr] * 1000000)
print('Producer {} closing'.format(nr))
def main():
p = Pool(5)
p.apply_async(consume)
p.map(produce, range(5))
que.put(None)
print('None')
p.close()
p.join()
if __name__ == '__main__':
main()
示例输出:
~/Python/Examples $ ./multip_prod_cons.py
Producer 1 closing
Producer 3 closing
Producer 0 closing
Producer 2 closing
Producer 4 closing
None
break
Consumer closing
然而,当我改变一行时它完美地工作:
que.put([nr] * 100)
它在 Linux 系统 运行 Python 3.4.3 或 Python 2.7.10 上 100% 可重现。我错过了什么吗?
这里有很多混乱。您正在写的不是 producer/consumer 场景,而是滥用另一种通常称为 "pool of workers".
的模式的混乱工人池模式是 producer/consumer 模式的一种应用,其中有一个生产者安排工作,许多消费者使用它。在这种模式下,Pool
的所有者最终成为生产者,而工人将成为消费者。
在您的示例中,您有一个混合解决方案,其中一名工人最终成为消费者,而其他工人则充当某种中间件。整个设计非常低效,重复了 Pool
已经提供的大部分逻辑,更重要的是,非常容易出错。你最终遭受的是Deadlock。
将对象放入 multiprocessing.Queue
是一个异步操作。它仅在 Queue
已满并且您的 Queue
具有无限大小时才会阻塞。
这意味着您的 produce
函数会立即 returns 因此对 p.map
的调用不会像您期望的那样阻塞。相反,相关的工作进程会等到实际消息通过 Queue
用作通信通道的 Pipe
。
接下来发生的事情是,当您将 Queue
和 None
"message" 放入 produce
函数之前交付的所有列表之前,您会过早地终止您的消费者create 被正确地推送到 Pipe
.
您在调用 p.join
时注意到了这个问题,但实际情况如下。
p.join
调用正在等待所有工作进程终止。- 工作进程正在等待大列表通过
Queue
的Pipe
。 - 由于消费工人早已不复存在,因此没有人排干显然已满的
Pipe
。
如果您的列表足够小,可以在您实际将终止消息发送到 consume
函数之前通过,则不会显示该问题。