Python 使用两个队列作为生产者-消费者模式的多处理可能死锁?
Python multiprocessing possible deadlock with two queue as producer-consumer pattern?
我想知道以下代码中是否存在某种死锁。我必须读取数据库的每个元素(大约 100 万个项目),对其进行处理,然后将结果收集到一个唯一的文件中。
我使用两个队列和三种类型的进程与 multiprocessing 并行执行:
- Reader:读取数据库并将读取的项目添加到task_queue 中的主进程
- Worker:进程池。每个工作人员从 task_queue 获取一个项目,处理该项目,将结果保存在存储在 item_name/item_name.txt 中的中间文件中,并将 item_name 放入 completed_queue
- Writer: Process which gets an item_name from completed_queue, gets the intermediate result from item_name/item_name.txt and write it in results.txt
from multiprocessing import Pool, Process, Queue
class Computation():
def __init__(self,K):
self.task_queue = Queue()
self.completed_queue = Queue()
self.n_cpus = K
def reader(self,):
with open(db, "r") as db:
... # Read an item
self.task_queue.put(item)
def worker(self,):
while True:
item = self.task_queue.get(True)
if item == "STOP":
break
self.process_item(item)
def writer_process(self,):
while True:
f = self.completed_queue.get(True)
if f == "DONE":
break
self.write_f(f)
def run(self,):
pool = Pool(n_cpus, self.worker, args=())
writer = Process(target=self.writer_process, args=())
writer.start()
self.reader()
pool.close()
pool.join()
self.completed_queue.put("DONE")
writer.join()
代码有效,但似乎有时编写器或池停止工作(或者它们非常慢)。在这种情况下是否可能出现死锁?
您的代码有几个问题。首先,通过按原样使用队列,您实际上是在创建自己的进程池,根本不需要使用 multiprocessing.Pool
class。您正在使用池初始化程序作为实际的池工作人员,这是对 class 的一种滥用;你最好只使用常规 Process
实例(无论如何我的意见)。
其次,尽管您将消息 DONE
发送到 writer_process
以发出终止信号是很好的,但是您还没有对 self.n_cpus
worker
个进程,它们正在寻找 'STOP' 条消息,因此 reader
函数需要将 self.n_cpus
STOP
条消息放入任务队列:
from multiprocessing import Process, Queue
class Computation():
def __init__(self, K):
self.task_queue = Queue()
self.completed_queue = Queue()
self.n_cpus = K
def reader(self,):
with open(db, "r") as db:
... # Read an item
self.task_queue.put(item)
# signal to the worker processes to terminate:
for _ in range(self.n_cpus):
self.task_queue.put('STOP')
def worker(self,):
while True:
item = self.task_queue.get(True)
if item == "STOP":
break
self.process_item(item)
def writer_process(self,):
while True:
f = self.completed_queue.get(True)
if f == "DONE":
break
self.write_f(f)
def run(self):
processes = [Process(target=self.worker) for _ in range(self.n_cpus)]
for p in processes:
p.start()
writer = Process(target=self.writer_process, args=())
writer.start()
self.reader()
for p in processes:
p.join()
self.completed_queue.put("DONE")
writer.join()
就个人而言,我不会使用 'STOP' 和 'DONE' 作为 sentinel 消息,而是使用 None
,假设不是有效的实际消息。我已经测试了上面的代码,其中 reader
只是处理了列表中的字符串,而 self.process_item(item)
只是将 'done' 附加到每个字符串并将修改后的字符串放在 completed_queue
上并替换了 self.write_f
在 writer_process
中调用 print
。我没有发现代码有任何问题。
更新以使用托管队列
免责声明: 我没有使用 mpi4py 的经验,也不知道队列代理如何分布在不同的计算机上。上面的代码 可能 不够,正如以下文章 How to share mutliprocessing queue object between multiple computers 所建议的那样。 但是,该代码正在创建 Queue.Queue 的实例(该代码是 Python 2 代码)并且 不是 multiprocessing.SyncManager. 这方面的文档很差。 尝试上面的更改,看看它是否工作得更好(它会更慢)。
因为manager.Queue()
返回的代理,我不得不稍微重新安排一下代码;队列现在作为参数显式传递给流程函数:
from multiprocessing import Process, Manager
class Computation():
def __init__(self, K):
self.n_cpus = K
def reader(self, task_queue):
with open(db, "r") as db:
... # Read an item
# signal to the worker processes to terminate:
for _ in range(self.n_cpus):
task_queue.put('STOP')
def worker(self, task_queue, completed_queue):
while True:
item = task_queue.get(True)
if item == "STOP":
break
self.process_item(item)
def writer_process(self, completed_queue):
while True:
f = completed_queue.get(True)
if f == "DONE":
break
self.write_f(f)
def run(self):
with Manager() as manager:
task_queue = manager.Queue()
completed_queue = manager.Queue()
processes = [Process(target=self.worker, args=(task_queue, completed_queue)) for _ in range(self.n_cpus)]
for p in processes:
p.start()
writer = Process(target=self.writer_process, args=(completed_queue,))
writer.start()
self.reader(task_queue)
for p in processes:
p.join()
completed_queue.put("DONE")
writer.join()