调用多处理队列工作器的 class 属性无法正常工作
calling class attribute of multiprocessing queue worker is not working as it must
我有什么:
队列在 python Pool with worker Processes
上成为最佳答案
看起来像
from multiprocessing import Worker, Queue
class Worker(Process):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
self.task_type = ''
def get_type():
print(self.task_type) # prints empty line
return self.task_type
def run(self):
print('Worker started')
# do some initialization here
print('Computing things!')
for data in iter(self.queue.get, None):
self.task_type = data['type']
print(self.task_type) # prints test
# Use data
request_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue))
for i in workers:
i.start()
for data in the_real_source:
request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
request_queue.put(some_stuff)
然后我正在做一些事情来找到具有我需要的类型的工人并终止它
喜欢
for i in workers:
if i.get_type() == 'test':
i.terminate()
但是如果我们试图获取所有 worker 中的所有类型,即使里面的任务是 运行
,它们都是空的
而且我不知道如何解决
您没有 post 任何接近 minimal, reproducible example 的内容,因为您缺少重要的声明,例如 the_real_source
。所以我无法修复和测试您的代码,但可以为您提供稍作改动的代码,您可以进行调整。以下是您出错的几个地方:
- 您正在从
multiprocessing
导入名称 Worker
。但是 Worker
不存在于该包中;这是您在自己的代码中提供的 class。但是,您需要从该包中导入 Process
class.
- 语句
for data in iter(self.queue.get, None):
表示通过调用 self.queue.get()
来迭代输入队列,直到值 None
被 returned。因此,None
是 sentinel 值,用于表示没有更多数据需要处理。
- 由于您有 4 个进程从同一个输入队列读取数据,并且您希望所有 4 个进程在没有更多数据时终止,因此您需要将标记值
None
的 4 个实例放入队列中。
- 并且由于所有 4 个进程都从同一个输入队列中读取,并且您无法控制 运行 进程的调度,因此您无法确定哪个进程将读取哪个来自队列的数据。唯一的例外是您可以确定每个进程都会读取一个标记值,因为当一个进程读取一个标记值时,它会立即跳出从队列中获取数据然后终止的循环。
在以下代码中,进程 return 返回到主进程,它们在输出队列中的最终 task_type
属性值作为附加初始化程序传递给 Worker
实例:
from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self, request_queue, output_queue):
super(Worker, self).__init__()
self.request_queue = request_queue
self.output_queue = output_queue
self.task_type = ''
def run(self):
print(f'Worker {self.pid} started')
# do some initialization here
print('Computing things!')
for data in iter(self.request_queue.get, None):
self.task_type = data['type']
# Print our ident:
print('Worker process printing:', 'self id =', self.pid, 'self.task_type =', self.task_type) # prints test
# Use data:
self.output_queue.put((self.pid, self.task_type))
if __name__ == '__main__':
request_queue = Queue()
output_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue, output_queue))
for worker in workers:
worker.start()
the_real_source = [{'type': i} for i in range(20)]
for data in the_real_source:
request_queue.put(data)
for _ in range(4):
# put sentinel for clean shutdown, which is None:
request_queue.put(None)
# Must read the output queue before joining tasks:
# We are looking for 4 results:
results = [output_queue.get() for _ in range(4)]
for pid, task_type in results:
print('Main process printing:', 'worker id =', pid, 'worker task_type =', task_type)
# wait for the tasks to complete:
for worker in workers:
worker.join()
打印:
Worker 16672 started
Computing things!
Worker process printing: self id = 16672 self.task_type = 0
Worker process printing: self id = 16672 self.task_type = 1
Worker process printing: self id = 16672 self.task_type = 2
Worker process printing: self id = 16672 self.task_type = 3
Worker process printing: self id = 16672 self.task_type = 4
Worker process printing: self id = 16672 self.task_type = 5
Worker process printing: self id = 16672 self.task_type = 6
Worker process printing: self id = 16672 self.task_type = 7
Worker process printing: self id = 16672 self.task_type = 8
Worker process printing: self id = 16672 self.task_type = 9
Worker process printing: self id = 16672 self.task_type = 10
Worker process printing: self id = 16672 self.task_type = 11
Worker process printing: self id = 16672 self.task_type = 12
Worker 19620 started
Worker process printing: self id = 16672 self.task_type = 13
Computing things!
Worker process printing: self id = 16672 self.task_type = 14
Worker process printing: self id = 19620 self.task_type = 15
Worker 6728 started
Worker process printing: self id = 16672 self.task_type = 16
Worker process printing: self id = 19620 self.task_type = 17
Computing things!
Worker 17724 started
Worker process printing: self id = 16672 self.task_type = 18
Worker process printing: self id = 19620 self.task_type = 19
Computing things!
Main process printing: worker id = 6728 worker task_type =
Main process printing: worker id = 16672 worker task_type = 18
Main process printing: worker id = 19620 worker task_type = 19
Main process printing: worker id = 17724 worker task_type =
在上面的 运行 中,所有输入消息都被 4 个进程中的 2 个抓取。
多线程版本
如果您想让主线程能够访问 Worker
实例同时工作线程还在运行ning并获取当前值。否则,如果您使用的是多处理,您能做的最好的事情就是将最终值 return 传回传递的输出队列中的主进程(@Aaron 使用输出队列传回 Worker
完整的实例,但这可能比您需要的信息更多,如上面的多处理版本所示)。然而,在下面的代码中,主线程仅在线程完成时访问 task_type
属性,但它可以随时完成:
from threading import Thread
from queue import Queue
class Worker(Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
self.task_type = ''
def run(self):
print(f'Worker {self.ident} started')
# do some initialization here
print('Computing things!')
for data in iter(self.queue.get, None):
self.task_type = data['type']
# Print our ident:
print('Worker thread printing:', 'self id =', self.ident, 'self.task_type =', self.task_type) # prints test
# Use data
if __name__ == '__main__':
request_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue))
for worker in workers:
worker.start()
the_real_source = [{'type': i} for i in range(20)]
for data in the_real_source:
request_queue.put(data)
for _ in range(4):
# put sentinel for clean shutdown, which is None:
request_queue.put(None)
# wait for the tasks to complete:
for worker in workers:
worker.join()
print('Main thread printing:', 'worker id =', worker.ident, 'worker task_type =', worker.task_type)
打印:
Worker 3980 started
Computing things!
Worker 12084 started
Computing things!
Worker 3552 started
Computing things!
Worker 5296 started
Computing things!
Worker thread printing: self id = 3980 self.task_type = 0
Worker thread printing: self id = 12084 self.task_type = 2
Worker thread printing: self id = 3552 self.task_type = 1
Worker thread printing: self id = 3552 self.task_type = 6
Worker thread printing: self id = 3552 self.task_type = 7
Worker thread printing: self id = 3980 self.task_type = 4
Worker thread printing: self id = 12084 self.task_type = 5
Worker thread printing: self id = 3980 self.task_type = 8
Worker thread printing: self id = 5296 self.task_type = 3
Worker thread printing: self id = 3552 self.task_type = 9
Worker thread printing: self id = 12084 self.task_type = 10
Worker thread printing: self id = 3980 self.task_type = 11
Worker thread printing: self id = 5296 self.task_type = 12
Worker thread printing: self id = 3552 self.task_type = 13
Worker thread printing: self id = 12084 self.task_type = 14
Worker thread printing: self id = 3980 self.task_type = 15
Worker thread printing: self id = 5296 self.task_type = 16
Worker thread printing: self id = 3552 self.task_type = 17
Worker thread printing: self id = 12084 self.task_type = 18
Worker thread printing: self id = 3980 self.task_type = 19
Main thread printing: worker id = 3980 worker task_type = 19
Main thread printing: worker id = 12084 worker task_type = 18
Main thread printing: worker id = 3552 worker task_type = 17
Main thread printing: worker id = 5296 worker task_type = 16
当您调用Worker.start()
时,会生成对象的副本并发送给子进程执行。此时你有两个独立的对象,在子进程中更新 self.task_type
不会在主进程中更新 self.task_type
。进程与线程的主要租户之一是进程不共享内存 space,而线程共享。进程之间的任何通信都必须通过 OS 文件句柄(这是所有多处理共享值类型的核心)。
在主进程中获取工作实例的修改版本的简单示例(未经测试:可能有拼写错误..)
class Worker(Process):
def __init__(self, in_queue, out_queue):
super(Worker, self).__init__()
self.in_queue = in_queue
self.out_queue = out_queue
self.task_type = ''
def get_type():
print(self.task_type) # prints empty line
return self.task_type
def run(self):
print('Worker started')
# do some initialization here
print('Computing things!')
for data in iter(self.in_queue.get, None):
self.task_type = data['type']
print(self.task_type) # prints test
# Use data
#after stop sentinel
self.out_queue.put(self) #send modified Worker instance back to main process
if __name__ == "__main__": #you should always be using this with multiprocessing...
request_queue = Queue()
response_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue, response_queue))
for i in workers:
i.start()
for data in the_real_source:
request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
request_queue.put(None) #stop sentinel
workers = [response_queue.get() for _ in range(4)]
for i in workers:
if i.get_type() == 'test':
i.terminate()
我有什么: 队列在 python Pool with worker Processes
上成为最佳答案看起来像
from multiprocessing import Worker, Queue
class Worker(Process):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
self.task_type = ''
def get_type():
print(self.task_type) # prints empty line
return self.task_type
def run(self):
print('Worker started')
# do some initialization here
print('Computing things!')
for data in iter(self.queue.get, None):
self.task_type = data['type']
print(self.task_type) # prints test
# Use data
request_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue))
for i in workers:
i.start()
for data in the_real_source:
request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
request_queue.put(some_stuff)
然后我正在做一些事情来找到具有我需要的类型的工人并终止它 喜欢
for i in workers:
if i.get_type() == 'test':
i.terminate()
但是如果我们试图获取所有 worker 中的所有类型,即使里面的任务是 运行
,它们都是空的而且我不知道如何解决
您没有 post 任何接近 minimal, reproducible example 的内容,因为您缺少重要的声明,例如 the_real_source
。所以我无法修复和测试您的代码,但可以为您提供稍作改动的代码,您可以进行调整。以下是您出错的几个地方:
- 您正在从
multiprocessing
导入名称Worker
。但是Worker
不存在于该包中;这是您在自己的代码中提供的 class。但是,您需要从该包中导入Process
class. - 语句
for data in iter(self.queue.get, None):
表示通过调用self.queue.get()
来迭代输入队列,直到值None
被 returned。因此,None
是 sentinel 值,用于表示没有更多数据需要处理。 - 由于您有 4 个进程从同一个输入队列读取数据,并且您希望所有 4 个进程在没有更多数据时终止,因此您需要将标记值
None
的 4 个实例放入队列中。 - 并且由于所有 4 个进程都从同一个输入队列中读取,并且您无法控制 运行 进程的调度,因此您无法确定哪个进程将读取哪个来自队列的数据。唯一的例外是您可以确定每个进程都会读取一个标记值,因为当一个进程读取一个标记值时,它会立即跳出从队列中获取数据然后终止的循环。
在以下代码中,进程 return 返回到主进程,它们在输出队列中的最终 task_type
属性值作为附加初始化程序传递给 Worker
实例:
from multiprocessing import Process, Queue
class Worker(Process):
def __init__(self, request_queue, output_queue):
super(Worker, self).__init__()
self.request_queue = request_queue
self.output_queue = output_queue
self.task_type = ''
def run(self):
print(f'Worker {self.pid} started')
# do some initialization here
print('Computing things!')
for data in iter(self.request_queue.get, None):
self.task_type = data['type']
# Print our ident:
print('Worker process printing:', 'self id =', self.pid, 'self.task_type =', self.task_type) # prints test
# Use data:
self.output_queue.put((self.pid, self.task_type))
if __name__ == '__main__':
request_queue = Queue()
output_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue, output_queue))
for worker in workers:
worker.start()
the_real_source = [{'type': i} for i in range(20)]
for data in the_real_source:
request_queue.put(data)
for _ in range(4):
# put sentinel for clean shutdown, which is None:
request_queue.put(None)
# Must read the output queue before joining tasks:
# We are looking for 4 results:
results = [output_queue.get() for _ in range(4)]
for pid, task_type in results:
print('Main process printing:', 'worker id =', pid, 'worker task_type =', task_type)
# wait for the tasks to complete:
for worker in workers:
worker.join()
打印:
Worker 16672 started
Computing things!
Worker process printing: self id = 16672 self.task_type = 0
Worker process printing: self id = 16672 self.task_type = 1
Worker process printing: self id = 16672 self.task_type = 2
Worker process printing: self id = 16672 self.task_type = 3
Worker process printing: self id = 16672 self.task_type = 4
Worker process printing: self id = 16672 self.task_type = 5
Worker process printing: self id = 16672 self.task_type = 6
Worker process printing: self id = 16672 self.task_type = 7
Worker process printing: self id = 16672 self.task_type = 8
Worker process printing: self id = 16672 self.task_type = 9
Worker process printing: self id = 16672 self.task_type = 10
Worker process printing: self id = 16672 self.task_type = 11
Worker process printing: self id = 16672 self.task_type = 12
Worker 19620 started
Worker process printing: self id = 16672 self.task_type = 13
Computing things!
Worker process printing: self id = 16672 self.task_type = 14
Worker process printing: self id = 19620 self.task_type = 15
Worker 6728 started
Worker process printing: self id = 16672 self.task_type = 16
Worker process printing: self id = 19620 self.task_type = 17
Computing things!
Worker 17724 started
Worker process printing: self id = 16672 self.task_type = 18
Worker process printing: self id = 19620 self.task_type = 19
Computing things!
Main process printing: worker id = 6728 worker task_type =
Main process printing: worker id = 16672 worker task_type = 18
Main process printing: worker id = 19620 worker task_type = 19
Main process printing: worker id = 17724 worker task_type =
在上面的 运行 中,所有输入消息都被 4 个进程中的 2 个抓取。
多线程版本
如果您想让主线程能够访问 Worker
实例同时工作线程还在运行ning并获取当前值。否则,如果您使用的是多处理,您能做的最好的事情就是将最终值 return 传回传递的输出队列中的主进程(@Aaron 使用输出队列传回 Worker
完整的实例,但这可能比您需要的信息更多,如上面的多处理版本所示)。然而,在下面的代码中,主线程仅在线程完成时访问 task_type
属性,但它可以随时完成:
from threading import Thread
from queue import Queue
class Worker(Thread):
def __init__(self, queue):
super(Worker, self).__init__()
self.queue = queue
self.task_type = ''
def run(self):
print(f'Worker {self.ident} started')
# do some initialization here
print('Computing things!')
for data in iter(self.queue.get, None):
self.task_type = data['type']
# Print our ident:
print('Worker thread printing:', 'self id =', self.ident, 'self.task_type =', self.task_type) # prints test
# Use data
if __name__ == '__main__':
request_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue))
for worker in workers:
worker.start()
the_real_source = [{'type': i} for i in range(20)]
for data in the_real_source:
request_queue.put(data)
for _ in range(4):
# put sentinel for clean shutdown, which is None:
request_queue.put(None)
# wait for the tasks to complete:
for worker in workers:
worker.join()
print('Main thread printing:', 'worker id =', worker.ident, 'worker task_type =', worker.task_type)
打印:
Worker 3980 started
Computing things!
Worker 12084 started
Computing things!
Worker 3552 started
Computing things!
Worker 5296 started
Computing things!
Worker thread printing: self id = 3980 self.task_type = 0
Worker thread printing: self id = 12084 self.task_type = 2
Worker thread printing: self id = 3552 self.task_type = 1
Worker thread printing: self id = 3552 self.task_type = 6
Worker thread printing: self id = 3552 self.task_type = 7
Worker thread printing: self id = 3980 self.task_type = 4
Worker thread printing: self id = 12084 self.task_type = 5
Worker thread printing: self id = 3980 self.task_type = 8
Worker thread printing: self id = 5296 self.task_type = 3
Worker thread printing: self id = 3552 self.task_type = 9
Worker thread printing: self id = 12084 self.task_type = 10
Worker thread printing: self id = 3980 self.task_type = 11
Worker thread printing: self id = 5296 self.task_type = 12
Worker thread printing: self id = 3552 self.task_type = 13
Worker thread printing: self id = 12084 self.task_type = 14
Worker thread printing: self id = 3980 self.task_type = 15
Worker thread printing: self id = 5296 self.task_type = 16
Worker thread printing: self id = 3552 self.task_type = 17
Worker thread printing: self id = 12084 self.task_type = 18
Worker thread printing: self id = 3980 self.task_type = 19
Main thread printing: worker id = 3980 worker task_type = 19
Main thread printing: worker id = 12084 worker task_type = 18
Main thread printing: worker id = 3552 worker task_type = 17
Main thread printing: worker id = 5296 worker task_type = 16
当您调用Worker.start()
时,会生成对象的副本并发送给子进程执行。此时你有两个独立的对象,在子进程中更新 self.task_type
不会在主进程中更新 self.task_type
。进程与线程的主要租户之一是进程不共享内存 space,而线程共享。进程之间的任何通信都必须通过 OS 文件句柄(这是所有多处理共享值类型的核心)。
在主进程中获取工作实例的修改版本的简单示例(未经测试:可能有拼写错误..)
class Worker(Process):
def __init__(self, in_queue, out_queue):
super(Worker, self).__init__()
self.in_queue = in_queue
self.out_queue = out_queue
self.task_type = ''
def get_type():
print(self.task_type) # prints empty line
return self.task_type
def run(self):
print('Worker started')
# do some initialization here
print('Computing things!')
for data in iter(self.in_queue.get, None):
self.task_type = data['type']
print(self.task_type) # prints test
# Use data
#after stop sentinel
self.out_queue.put(self) #send modified Worker instance back to main process
if __name__ == "__main__": #you should always be using this with multiprocessing...
request_queue = Queue()
response_queue = Queue()
workers = []
for i in range(4):
workers.append(Worker(request_queue, response_queue))
for i in workers:
i.start()
for data in the_real_source:
request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
request_queue.put(None) #stop sentinel
workers = [response_queue.get() for _ in range(4)]
for i in workers:
if i.get_type() == 'test':
i.terminate()