Manger.Namespace() 内的多处理队列

multiprocessing's Queue inside Manger.Namespace()

我目前正在创建一个 class,它应该使用 multiprocessing 模块以多线程方式执行一些方法。我使用 Pooln 工作人员执行实际计算。现在我想为每个当前 n 活跃的工作人员分配一个介于 0n 之间的索引以用于其他计算。为此,我想使用共享 Queue 以某种方式分配索引,每次都没有两个工作人员具有相同的 ID。为了在不同线程之间的 class 中共享相同的 Queue,我想将它存储在 Manager.Namespace() 中。但是这样做,Queue 出现了一些问题。因此,我创建了我的问题的一个最小版本,结果是这样的:

from multiprocess import Process, Queue, Manager, Pool, cpu_count

class A(object):
    def __init__(self):
        manager = Manager()
        self.ns = manager.Namespace()
        self.ns.q = manager.Queue()

    def foo(self):
        for i in range(10):
            print(i)
            self.ns.q.put(i)
            print(self.ns.q.get())
            print(self.ns.q.qsize())

a = A()
a.foo()

在此代码中,执行在第二个 print 语句之前停止 - 因此,我认为 Queue 中实际上没有写入任何数据。当我删除 namespace 相关内容时,代码可以完美运行。这是 multiprocessings 对象的预期行为吗?我做错了什么吗?或者这是某种错误?

是的,你不应该在这里使用 Namespace。当你把一个Queue对象放入manager.Namespace()时,每个进程都会得到一个新的Queue实例,那些新创建的队列对象的所有writer/reader都与父进程没有联系,因此,工作进程将不会收到任何消息。单独分享一个Queue

顺便说一下,您多次提到 "thread",但是在 multiprocess 模块的上下文中,worker 是进程,而不是线程。