Python 使用两个队列作为生产者-消费者模式的多处理可能死锁?

Python multiprocessing possible deadlock with two queue as producer-consumer pattern?

我想知道以下代码中是否存在某种死锁。我必须读取数据库的每个元素(大约 100 万个项目),对其进行处理,然后将结果收集到一个唯一的文件中。

我使用两个队列和三种类型的进程与 multiprocessing 并行执行:

from multiprocessing import Pool, Process, Queue
class Computation():

    def __init__(self,K):
        self.task_queue = Queue()
        self.completed_queue = Queue()
        self.n_cpus = K

    def reader(self,):
        with open(db, "r") as db:
            ... # Read an item
            self.task_queue.put(item)
            
    def worker(self,):
        while True:
            item = self.task_queue.get(True)
            if item == "STOP":
                break
            self.process_item(item)

    def writer_process(self,):
        while True:
            f = self.completed_queue.get(True)
            if f == "DONE":
               break
            self.write_f(f)

    def run(self,):
        pool = Pool(n_cpus, self.worker, args=())
        
        writer = Process(target=self.writer_process, args=())
        writer.start()

        self.reader()

        pool.close()
        pool.join()

        self.completed_queue.put("DONE")
        writer.join()

代码有效,但似乎有时编写器或池停止工作(或者它们非常慢)。在这种情况下是否可能出现死锁?

您的代码有几个问题。首先,通过按原样使用队列,您实际上是在创建自己的进程池,根本不需要使用 multiprocessing.Pool class。您正在使用池初始化程序作为实际的池工作人员,这是对 class 的一种滥用;你最好只使用常规 Process 实例(无论如何我的意见)。

其次,尽管您将消息 DONE 发送到 writer_process 以发出终止信号是很好的,但是您还没有对 self.n_cpus worker 个进程,它们正在寻找 'STOP' 条消息,因此 reader 函数需要将 self.n_cpus STOP 条消息放入任务队列:

from multiprocessing import Process, Queue


class Computation():

    def __init__(self, K):
        self.task_queue = Queue()
        self.completed_queue = Queue()
        self.n_cpus = K

    def reader(self,):
        with open(db, "r") as db:
            ... # Read an item
            self.task_queue.put(item)
        # signal to the worker processes to terminate:
        for _ in range(self.n_cpus):
            self.task_queue.put('STOP')
            
    def worker(self,):
        while True:
            item = self.task_queue.get(True)
            if item == "STOP":
                break
            self.process_item(item)

    def writer_process(self,):
        while True:
            f = self.completed_queue.get(True)
            if f == "DONE":
               break
            self.write_f(f)

    def run(self):
        processes = [Process(target=self.worker) for _ in range(self.n_cpus)]
        for p in processes:
            p.start()
        
        writer = Process(target=self.writer_process, args=())
        writer.start()

        self.reader()

        for p in processes:
            p.join()

        self.completed_queue.put("DONE")
        writer.join()

就个人而言,我不会使用 'STOP' 和 'DONE' 作为 sentinel 消息,而是使用 None,假设不是有效的实际消息。我已经测试了上面的代码,其中 reader 只是处理了列表中的字符串,而 self.process_item(item) 只是将 'done' 附加到每个字符串并将修改后的字符串放在 completed_queue 上并替换了 self.write_fwriter_process 中调用 print。我没有发现代码有任何问题。

更新以使用托管队列

免责声明: 我没有使用 mpi4py 的经验,也不知道队列代理如何分布在不同的计算机上。上面的代码 可能 不够,正如以下文章 How to share mutliprocessing queue object between multiple computers 所建议的那样。 但是,该代码正在创建 Queue.Queue 的实例(该代码是 Python 2 代码)并且 不是 multiprocessing.SyncManager. 这方面的文档很差。 尝试上面的更改,看看它是否工作得更好(它会更慢)。

因为manager.Queue()返回的代理,我不得不稍微重新安排一下代码;队列现在作为参数显式传递给流程函数:

from multiprocessing import Process, Manager


class Computation():

    def __init__(self, K):
        self.n_cpus = K

    def reader(self, task_queue):
        with open(db, "r") as db:
            ... # Read an item
        # signal to the worker processes to terminate:
        for _ in range(self.n_cpus):
            task_queue.put('STOP')

    def worker(self, task_queue, completed_queue):
        while True:
            item = task_queue.get(True)
            if item == "STOP":
                break
            self.process_item(item)

    def writer_process(self, completed_queue):
        while True:
            f = completed_queue.get(True)
            if f == "DONE":
               break
            self.write_f(f)

    def run(self):
        with Manager() as manager:
            task_queue = manager.Queue()
            completed_queue = manager.Queue()
            processes = [Process(target=self.worker, args=(task_queue, completed_queue)) for _ in range(self.n_cpus)]
            for p in processes:
                p.start()

            writer = Process(target=self.writer_process, args=(completed_queue,))
            writer.start()

            self.reader(task_queue)

            for p in processes:
                p.join()

            completed_queue.put("DONE")
            writer.join()