调用多处理队列工作器的 class 属性无法正常工作

calling class attribute of multiprocessing queue worker is not working as it must

我有什么: 队列在 python Pool with worker Processes

上成为最佳答案

看起来像

from multiprocessing import Worker, Queue


class Worker(Process):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue
        self.task_type = ''
    
    def get_type():
        print(self.task_type)  # prints empty line
        return self.task_type

    def run(self):
        print('Worker started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.queue.get, None):
            self.task_type = data['type']
            print(self.task_type)  # prints test
            # Use data

request_queue = Queue()
workers = []
for i in range(4):
    workers.append(Worker(request_queue))

for i in workers:
    i.start()

for data in the_real_source:
    request_queue.put(data)
# Sentinel objects to allow clean shutdown: 1 per worker.
for i in range(4):
    request_queue.put(some_stuff)

然后我正在做一些事情来找到具有我需要的类型的工人并终止它 喜欢

for i in workers:
    if i.get_type() == 'test':
        i.terminate()

但是如果我们试图获取所有 worker 中的所有类型,即使里面的任务是 运行

,它们都是空的

而且我不知道如何解决

您没有 post 任何接近 minimal, reproducible example 的内容,因为您缺少重要的声明,例如 the_real_source。所以我无法修复和测试您的代码,但可以为您提供稍作改动的代码,您可以进行调整。以下是您出错的几个地方:

  1. 您正在从 multiprocessing 导入名称 Worker。但是 Worker 不存在于该包中;这是您在自己的代码中提供的 class。但是,您需要从该包中导入 Process class.
  2. 语句 for data in iter(self.queue.get, None): 表示通过调用 self.queue.get() 来迭代输入队列,直到值 None 被 returned。因此,Nonesentinel 值,用于表示没有更多数据需要处理。
  3. 由于您有 4 个进程从同一个输入队列读取数据,并且您希望所有 4 个进程在没有更多数据时终止,因此您需要将标记值 None 的 4 个实例放入队列中。
  4. 并且由于所有 4 个进程都从同一个输入队列中读取,并且您无法控制 运行 进程的调度,因此您无法确定哪个进程将读取哪个来自队列的数据。唯一的例外是您可以确定每个进程都会读取一个标记值,因为当一个进程读取一个标记值时,它会立即跳出从队列中获取数据然后终止的循环。

在以下代码中,进程 return 返回到主进程,它们在输出队列中的最终 task_type 属性值作为附加初始化程序传递给 Worker 实例:

from multiprocessing import Process, Queue

class Worker(Process):
    def __init__(self, request_queue, output_queue):
        super(Worker, self).__init__()
        self.request_queue = request_queue
        self.output_queue = output_queue
        self.task_type = ''

    def run(self):
        print(f'Worker {self.pid} started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.request_queue.get, None):
            self.task_type = data['type']
            # Print our ident:
            print('Worker process printing:', 'self id =', self.pid, 'self.task_type =', self.task_type)  # prints test
            # Use data:
        self.output_queue.put((self.pid, self.task_type))


if __name__ == '__main__':
    request_queue = Queue()
    output_queue = Queue()
    workers = []
    for i in range(4):
        workers.append(Worker(request_queue, output_queue))

    for worker in workers:
        worker.start()

    the_real_source = [{'type': i} for i in range(20)]
    for data in the_real_source:
        request_queue.put(data)

    for _ in range(4):
        # put sentinel for clean shutdown, which is None:
        request_queue.put(None)

    # Must read the output queue before joining tasks:
    # We are looking for 4 results:
    results = [output_queue.get() for _ in range(4)]
    for pid, task_type in results:
        print('Main process printing:', 'worker id =', pid, 'worker task_type =', task_type)
    # wait for the tasks to complete:
    for worker in workers:
        worker.join()

打印:

Worker 16672 started
Computing things!
Worker process printing: self id = 16672 self.task_type = 0
Worker process printing: self id = 16672 self.task_type = 1
Worker process printing: self id = 16672 self.task_type = 2
Worker process printing: self id = 16672 self.task_type = 3
Worker process printing: self id = 16672 self.task_type = 4
Worker process printing: self id = 16672 self.task_type = 5
Worker process printing: self id = 16672 self.task_type = 6
Worker process printing: self id = 16672 self.task_type = 7
Worker process printing: self id = 16672 self.task_type = 8
Worker process printing: self id = 16672 self.task_type = 9
Worker process printing: self id = 16672 self.task_type = 10
Worker process printing: self id = 16672 self.task_type = 11
Worker process printing: self id = 16672 self.task_type = 12
Worker 19620 started
Worker process printing: self id = 16672 self.task_type = 13
Computing things!
Worker process printing: self id = 16672 self.task_type = 14
Worker process printing: self id = 19620 self.task_type = 15
Worker 6728 started
Worker process printing: self id = 16672 self.task_type = 16
Worker process printing: self id = 19620 self.task_type = 17
Computing things!
Worker 17724 started
Worker process printing: self id = 16672 self.task_type = 18
Worker process printing: self id = 19620 self.task_type = 19
Computing things!
Main process printing: worker id = 6728 worker task_type =
Main process printing: worker id = 16672 worker task_type = 18
Main process printing: worker id = 19620 worker task_type = 19
Main process printing: worker id = 17724 worker task_type =

在上面的 运行 中,所有输入消息都被 4 个进程中的 2 个抓取。

多线程版本

如果您想让主线程能够访问 Worker实例同时工作线程还在运行ning并获取当前值。否则,如果您使用的是多处理,您能做的最好的事情就是将最终值 return 传回传递的输出队列中的主进程(@Aaron 使用输出队列传回 Worker完整的实例,但这可能比您需要的信息更多,如上面的多处理版本所示)。然而,在下面的代码中,主线程仅在线程完成时访问 task_type 属性,但它可以随时完成:

from threading import Thread
from queue import Queue

class Worker(Thread):
    def __init__(self, queue):
        super(Worker, self).__init__()
        self.queue = queue
        self.task_type = ''

    def run(self):
        print(f'Worker {self.ident} started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.queue.get, None):
            self.task_type = data['type']
            # Print our ident:
            print('Worker thread printing:', 'self id =', self.ident, 'self.task_type =', self.task_type)  # prints test
            # Use data


if __name__ == '__main__':
    request_queue = Queue()
    workers = []
    for i in range(4):
        workers.append(Worker(request_queue))

    for worker in workers:
        worker.start()

    the_real_source = [{'type': i} for i in range(20)]
    for data in the_real_source:
        request_queue.put(data)

    for _ in range(4):
        # put sentinel for clean shutdown, which is None:
        request_queue.put(None)

    # wait for the tasks to complete:
    for worker in workers:
        worker.join()
        print('Main thread printing:', 'worker id =', worker.ident, 'worker task_type =', worker.task_type)

打印:

Worker 3980 started
Computing things!
Worker 12084 started
Computing things!
Worker 3552 started
Computing things!
Worker 5296 started
Computing things!
Worker thread printing: self id = 3980 self.task_type = 0
Worker thread printing: self id = 12084 self.task_type = 2
Worker thread printing: self id = 3552 self.task_type = 1
Worker thread printing: self id = 3552 self.task_type = 6
Worker thread printing: self id = 3552 self.task_type = 7
Worker thread printing: self id = 3980 self.task_type = 4
Worker thread printing: self id = 12084 self.task_type = 5
Worker thread printing: self id = 3980 self.task_type = 8
Worker thread printing: self id = 5296 self.task_type = 3
Worker thread printing: self id = 3552 self.task_type = 9
Worker thread printing: self id = 12084 self.task_type = 10
Worker thread printing: self id = 3980 self.task_type = 11
Worker thread printing: self id = 5296 self.task_type = 12
Worker thread printing: self id = 3552 self.task_type = 13
Worker thread printing: self id = 12084 self.task_type = 14
Worker thread printing: self id = 3980 self.task_type = 15
Worker thread printing: self id = 5296 self.task_type = 16
Worker thread printing: self id = 3552 self.task_type = 17
Worker thread printing: self id = 12084 self.task_type = 18
Worker thread printing: self id = 3980 self.task_type = 19
Main thread printing: worker id = 3980 worker task_type = 19
Main thread printing: worker id = 12084 worker task_type = 18
Main thread printing: worker id = 3552 worker task_type = 17
Main thread printing: worker id = 5296 worker task_type = 16

当您调用Worker.start()时,会生成对象的副本并发送给子进程执行。此时你有两个独立的对象,在子进程中更新 self.task_type 不会在主进程中更新 self.task_type 。进程与线程的主要租户之一是进程不共享内存 space,而线程共享。进程之间的任何通信都必须通过 OS 文件句柄(这是所有多处理共享值类型的核心)。

在主进程中获取工作实例的修改版本的简单示例(未经测试:可能有拼写错误..)

class Worker(Process):
    def __init__(self, in_queue, out_queue):
        super(Worker, self).__init__()
        self.in_queue = in_queue
        self.out_queue = out_queue
        self.task_type = ''
    
    def get_type():
        print(self.task_type)  # prints empty line
        return self.task_type

    def run(self):
        print('Worker started')
        # do some initialization here

        print('Computing things!')
        for data in iter(self.in_queue.get, None):
            self.task_type = data['type']
            print(self.task_type)  # prints test
            # Use data
        #after stop sentinel
        self.out_queue.put(self) #send modified Worker instance back to main process

if __name__ == "__main__": #you should always be using this with multiprocessing...
    request_queue = Queue()
    response_queue = Queue()
    workers = []
    for i in range(4):
        workers.append(Worker(request_queue, response_queue))

    for i in workers:
        i.start()

    for data in the_real_source:
        request_queue.put(data)
    # Sentinel objects to allow clean shutdown: 1 per worker.
    for i in range(4):
        request_queue.put(None) #stop sentinel

    workers = [response_queue.get() for _ in range(4)]

    for i in workers:
        if i.get_type() == 'test':
            i.terminate()