实施多个生产者和多个工人会导致死锁
Implementing multiple producer and multiple workers results in deadlock
我一直在尝试在 python 中使用多处理实现多生产者和多消费者模型。
生产者从网络上抓取数据,消费者处理数据。
起初我只是实现了两个具有特定功能的函数生产者和消费者,并使用队列在它们之间进行通信,但无法弄清楚如何处理完成事件。
然后我使用信号量 -
实现了模型
def producer(RESP_q, URL_q, SEM):
with SEM:
while True:
url = URL_q.get()
if url == "END":
break
RESP = produce_txns(url)
RESP_q.put(RESP)
def consumer(RESP_q, SEM, NP):
while SEM.get_value() < NP or not RESP_q.empty():
resp = RESP_q.get()
for txn in resp:
_txn = E_Transaction(txn)
print(_txn)
RESP_q.task_done()
class Manager:
def __init__(self):
self.URL_q = Queue()
self.RESP_q = JoinableQueue()
self.max_processes = cpu_count()
self.SEM = Semaphore(self.max_processes // 2)
def start(self):
self.worker = []
for i in range(0, self.max_processes, 2):
self.worker.append(Process(target=producer, args=(self.RESP_q, self.URL_q, self.SEM)))
self.worker.append(Process(target=consumer, args=(self.RESP_q, self.SEM, self.max_processes // 2)))
url_server(self.URL_q, self.max_processes // 2)
#Consider URL_q holds -> [*data, *["END"]*(self.max_processes // 2)]
for worker in self.worker:
worker.start()
self.stop()
def stop(self):
for worker in self.worker:
worker.join()
self.RESP_q.join()
self.RESP_q.close()
self.URL_q.close()
Manager().start()
当(In Consumer)RESP_q 为空且 SEM 接近 max_process 并且当解释器满足 while 条件时,SEM 将具有与 [=21= 相同的值] 并且不会留下任何生产者并且程序在 get 方法处被阻塞。
我无法解决这个问题。
编辑 1.
@Louis Lac 的实现也是正确的。我更正了我的代码以使用 try-except 块消除死锁。
def consumer(RESP_q, SEM, NP):
while SEM.get_value() < NP or not RESP_q.empty():
try:
resp = RESP_q.get(timeout=0.5)
except Exception:
continue
这是一个多消费者多生产者实现的例子。您可以在实例化进程时使用 daemon
标志,以便在程序退出时自动关闭它们。然后,您可以使用 JoinableQueue
并加入它们(而不是加入进程),以便程序在没有要处理的项目时退出。
您应该使用 if __main__ == "__main__
来启动该程序,而不会导致该程序的递归执行。
from multiprocessing import Process, JoinableQueue
from time import sleep
def consumer(in_queue: JoinableQueue, out_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
s = str(item)
out_queue.put(s)
in_queue.task_done()
def producer(in_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
n = int(item)
print(n)
in_queue.task_done()
if __name__ == "__main__":
number_queue = JoinableQueue()
str_queue = JoinableQueue()
for _ in range(4):
Process(target=consumer, args=(number_queue, str_queue), daemon=True).start()
Process(target=producer, args=(str_queue,), daemon=True).start()
for i in range(100):
number_queue.put(i)
number_queue.join()
str_queue.join()
我一直在尝试在 python 中使用多处理实现多生产者和多消费者模型。 生产者从网络上抓取数据,消费者处理数据。 起初我只是实现了两个具有特定功能的函数生产者和消费者,并使用队列在它们之间进行通信,但无法弄清楚如何处理完成事件。 然后我使用信号量 -
实现了模型def producer(RESP_q, URL_q, SEM):
with SEM:
while True:
url = URL_q.get()
if url == "END":
break
RESP = produce_txns(url)
RESP_q.put(RESP)
def consumer(RESP_q, SEM, NP):
while SEM.get_value() < NP or not RESP_q.empty():
resp = RESP_q.get()
for txn in resp:
_txn = E_Transaction(txn)
print(_txn)
RESP_q.task_done()
class Manager:
def __init__(self):
self.URL_q = Queue()
self.RESP_q = JoinableQueue()
self.max_processes = cpu_count()
self.SEM = Semaphore(self.max_processes // 2)
def start(self):
self.worker = []
for i in range(0, self.max_processes, 2):
self.worker.append(Process(target=producer, args=(self.RESP_q, self.URL_q, self.SEM)))
self.worker.append(Process(target=consumer, args=(self.RESP_q, self.SEM, self.max_processes // 2)))
url_server(self.URL_q, self.max_processes // 2)
#Consider URL_q holds -> [*data, *["END"]*(self.max_processes // 2)]
for worker in self.worker:
worker.start()
self.stop()
def stop(self):
for worker in self.worker:
worker.join()
self.RESP_q.join()
self.RESP_q.close()
self.URL_q.close()
Manager().start()
当(In Consumer)RESP_q 为空且 SEM 接近 max_process 并且当解释器满足 while 条件时,SEM 将具有与 [=21= 相同的值] 并且不会留下任何生产者并且程序在 get 方法处被阻塞。 我无法解决这个问题。
编辑 1.
@Louis Lac 的实现也是正确的。我更正了我的代码以使用 try-except 块消除死锁。
def consumer(RESP_q, SEM, NP):
while SEM.get_value() < NP or not RESP_q.empty():
try:
resp = RESP_q.get(timeout=0.5)
except Exception:
continue
这是一个多消费者多生产者实现的例子。您可以在实例化进程时使用 daemon
标志,以便在程序退出时自动关闭它们。然后,您可以使用 JoinableQueue
并加入它们(而不是加入进程),以便程序在没有要处理的项目时退出。
您应该使用 if __main__ == "__main__
来启动该程序,而不会导致该程序的递归执行。
from multiprocessing import Process, JoinableQueue
from time import sleep
def consumer(in_queue: JoinableQueue, out_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
s = str(item)
out_queue.put(s)
in_queue.task_done()
def producer(in_queue: JoinableQueue):
while True:
item = in_queue.get()
sleep(0.5)
n = int(item)
print(n)
in_queue.task_done()
if __name__ == "__main__":
number_queue = JoinableQueue()
str_queue = JoinableQueue()
for _ in range(4):
Process(target=consumer, args=(number_queue, str_queue), daemon=True).start()
Process(target=producer, args=(str_queue,), daemon=True).start()
for i in range(100):
number_queue.put(i)
number_queue.join()
str_queue.join()