Manger.Namespace() 内的多处理队列
multiprocessing's Queue inside Manger.Namespace()
我目前正在创建一个 class,它应该使用 multiprocessing
模块以多线程方式执行一些方法。我使用 Pool
的 n
工作人员执行实际计算。现在我想为每个当前 n
活跃的工作人员分配一个介于 0
和 n
之间的索引以用于其他计算。为此,我想使用共享 Queue
以某种方式分配索引,每次都没有两个工作人员具有相同的 ID。为了在不同线程之间的 class 中共享相同的 Queue
,我想将它存储在 Manager.Namespace()
中。但是这样做,Queue
出现了一些问题。因此,我创建了我的问题的一个最小版本,结果是这样的:
from multiprocess import Process, Queue, Manager, Pool, cpu_count
class A(object):
def __init__(self):
manager = Manager()
self.ns = manager.Namespace()
self.ns.q = manager.Queue()
def foo(self):
for i in range(10):
print(i)
self.ns.q.put(i)
print(self.ns.q.get())
print(self.ns.q.qsize())
a = A()
a.foo()
在此代码中,执行在第二个 print 语句之前停止 - 因此,我认为 Queue
中实际上没有写入任何数据。当我删除 namespace
相关内容时,代码可以完美运行。这是 multiprocessing
s 对象的预期行为吗?我做错了什么吗?或者这是某种错误?
是的,你不应该在这里使用 Namespace
。当你把一个Queue
对象放入manager.Namespace()
时,每个进程都会得到一个新的Queue
实例,那些新创建的队列对象的所有writer/reader都与父进程没有联系,因此,工作进程将不会收到任何消息。单独分享一个Queue
。
顺便说一下,您多次提到 "thread",但是在 multiprocess
模块的上下文中,worker 是进程,而不是线程。
我目前正在创建一个 class,它应该使用 multiprocessing
模块以多线程方式执行一些方法。我使用 Pool
的 n
工作人员执行实际计算。现在我想为每个当前 n
活跃的工作人员分配一个介于 0
和 n
之间的索引以用于其他计算。为此,我想使用共享 Queue
以某种方式分配索引,每次都没有两个工作人员具有相同的 ID。为了在不同线程之间的 class 中共享相同的 Queue
,我想将它存储在 Manager.Namespace()
中。但是这样做,Queue
出现了一些问题。因此,我创建了我的问题的一个最小版本,结果是这样的:
from multiprocess import Process, Queue, Manager, Pool, cpu_count
class A(object):
def __init__(self):
manager = Manager()
self.ns = manager.Namespace()
self.ns.q = manager.Queue()
def foo(self):
for i in range(10):
print(i)
self.ns.q.put(i)
print(self.ns.q.get())
print(self.ns.q.qsize())
a = A()
a.foo()
在此代码中,执行在第二个 print 语句之前停止 - 因此,我认为 Queue
中实际上没有写入任何数据。当我删除 namespace
相关内容时,代码可以完美运行。这是 multiprocessing
s 对象的预期行为吗?我做错了什么吗?或者这是某种错误?
是的,你不应该在这里使用 Namespace
。当你把一个Queue
对象放入manager.Namespace()
时,每个进程都会得到一个新的Queue
实例,那些新创建的队列对象的所有writer/reader都与父进程没有联系,因此,工作进程将不会收到任何消息。单独分享一个Queue
。
顺便说一下,您多次提到 "thread",但是在 multiprocess
模块的上下文中,worker 是进程,而不是线程。