multiprocessing.Queue 进程结束时挂起

multiprocessing.Queue hanging when Process dies

我有一个通过 multiprocessing.Process 的子进程和一个通过 multiprocessing.Queue 的队列。

主要进程正在使用 multiprocessing.Queue.get() 获取一些新数据。我不想在那里超时,我希望它被阻塞。

然而,当子进程因任何原因终止时(用户通过 kill 或段错误等手动终止),Queue.get() 将永远挂起。

我怎样才能避免这种情况?

Queue 无法知道何时不再有任何可能的作者。您可以将 object 传递给任意数量的子进程,它不知道您是否将它传递给任何给定的子进程。所以它必须等待,即使一个子进程死亡。队列 不是 文件描述符,它会在 child 结束时自动关闭。

您正在寻找的是 parent 过程中的某种主管,它会注意到 children 意外死亡并以您认为合适的任何方式处理这种情况。您可以通过捕获 SIGCHLD 进程、检查 Process.is_alive 或在线程中使用 Process.join 来做到这一点。一个简单的实现将在 Queue.get 调用中使用 timeout 参数,并在 returns.

时执行 Process.is_alive 检查

如果你对 child 进程的结束有更多的控制,它应该发送一个 "EOF" 类型的 object (None, 或者某种已完成的标记)添加到队列中,以便您的 parent 进程可以正确处理它。

我觉得multiprocessing.Queue不是我想要的

我现在在用

parent_conn, child_conn = multiprocessing.Pipe(duplex=True)

得到两个multiprocessing.Connectionobjects。然后我 os.fork() 或使用 multiprocessing.Process。在 child 中,我做:

parent_conn.close()
# read/write on child_conn

在parent(分叉之后),我做:

child_conn.close()
# read/write on parent_conn

这样,当我在连接上调用 recv() 时,它会在 child/parent 同时死亡时引发异常 (EOFError)。

请注意,这仅适用于单个 child。我想 Queue 的意思是当你想要多个 child 时。在那种情况下,您可能无论如何都会有一些管理器来监视所有 child 是否还活着并相应地重新启动它们。