Pythons Multiprocessing Queue buffer dead lock的解决方案?如何在多处理队列满时从多处理队列 "get" 继续多处理?

Solutions to Pythons Multiprocessing Queue buffer dead lock? How to "get" from multiprocessing Queue when its full and continue multiprocessing?

这个问题是关于 Python 和 Python 由我的计算机 OS 管道呈现的多处理队列缓冲区限制的多处理。基本上,我达到了 Pythons 多处理队列缓冲区的限制。

这是我目前的简单实现

import os
from multiprocessing import Queue,Lock,Manager
def threaded_results(q,*args):
    """do something"""
    q.put(*args)

def main():
    manager = Manager()
    return_dict = manager.dict()

    cpu = os.cpu_count()
    q = Queue()
    processes = []
    for i in range(cpu):
        p = Process(target=threaded_results,args=(q,*args))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    results = [q.get() for proc in processes]

我读到我必须先清空队列,然后再添加回由某种称为信号量的东西编排的队列。我正在考虑使用我自己定义的数据结构或重构我的代码设计。问题是,是否有任何常规解决方案可以绕过 OS 级别的队列缓冲区限制,以便使用 Python 将内容存储在高速缓存中?如何在多处理队列已满并继续多处理时“获取”多处理队列?

在使用多处理库一段时间后,我发现实现健壮的多处理队列的最简单方法是使用 multiprocessing.Manager 对象。来自文档:

Create a shared queue.Queue object and return a proxy for it.

Manager 对象创建和管理标准多线程队列,而不是分配一个单独的线程通过管道刷新数据,而不必通过 Pipe(没有查看源代码,所以我不能肯定地说)。这意味着您的代码几乎可以无限期地运行。

None 这是免费的,而且我发现托管队列的运行速度比 (几乎 20 倍)慢于 multiprocessing 队列一个简单的测试,尽管由于其他瓶颈,当队列集成到一个完整的系统中时,差异并不那么明显。

使用托管队列可以使您的 IPC 更加健壮,并且在性能权衡下可能是个好主意,除非您可以找到一种方法来忍受普通 multiprocessing 队列的不可靠性。