Multiprocessing pool.join() 在某些情况下挂起

Multiprocesing pool.join() hangs under some circumstances

我正在尝试使用 multiprocessing 在 Python 中创建一个简单的生产者/消费者模式。它有效,但挂在 poll.join().

from multiprocessing import Pool, Queue

que = Queue()


def consume():
    while True:
        element = que.get()
        if element is None:
            print('break')
            break
    print('Consumer closing')


def produce(nr):
    que.put([nr] * 1000000)
    print('Producer {} closing'.format(nr))


def main():
    p = Pool(5)
    p.apply_async(consume)
    p.map(produce, range(5))
    que.put(None)
    print('None')
    p.close()
    p.join()


if __name__ == '__main__':
    main()

示例输出:

~/Python/Examples $ ./multip_prod_cons.py 
Producer 1 closing
Producer 3 closing
Producer 0 closing
Producer 2 closing
Producer 4 closing
None
break
Consumer closing

然而,当我改变一行时它完美地工作:

que.put([nr] * 100)

它在 Linux 系统 运行 Python 3.4.3 或 Python 2.7.10 上 100% 可重现。我错过了什么吗?

这里有很多混乱。您正在写的不是 producer/consumer 场景,而是滥用另一种通常称为 "pool of workers".

的模式的混乱

工人池模式是 producer/consumer 模式的一种应用,其中有一个生产者安排工作,许多消费者使用它。在这种模式下,Pool 的所有者最终成为生产者,而工人将成为消费者。

在您的示例中,您有一个混合解决方案,其中一名工人最终成为消费者,而其他工人则充当某种中间件。整个设计非常低效,重复了 Pool 已经提供的大部分逻辑,更重要的是,非常容易出错。你最终遭受的是Deadlock

将对象放入 multiprocessing.Queue 是一个异步操作。它仅在 Queue 已满并且您的 Queue 具有无限大小时才会阻塞。

这意味着您的 produce 函数会立即 returns 因此对 p.map 的调用不会像您期望的那样阻塞。相反,相关的工作进程会等到实际消息通过 Queue 用作通信通道的 Pipe

接下来发生的事情是,当您将 QueueNone "message" 放入 produce 函数之前交付的所有列表之前,您会过早地终止您的消费者create 被正确地推送到 Pipe.

您在调用 p.join 时注意到了这个问题,但实际情况如下。

  • p.join 调用正在等待所有工作进程终止。
  • 工作进程正在等待大列表通过 QueuePipe
  • 由于消费工人早已不复存在,因此没有人排干显然已满的 Pipe

如果您的列表足够小,可以在您实际将终止消息发送到 consume 函数之前通过,则不会显示该问题。