在一个进程中生成多线程时多处理队列死锁
multiprocessing Queue deadlock when spawn multi threads in one process
我创建了两个进程,一个生成多线程的进程是响应将数据写入队列,另一个是从队列读取数据。它总是死锁在高频率,更少不是。特别是当你在 write 模块的 运行 方法中添加 sleep 时(在代码中注释)。让我把我的代码放在下面:
环境: python2.7
main.py
from multiprocessing import Process,Queue
from write import write
from read import read
if __name__ == "__main__":
record_queue = Queue()
table_queue = Queue()
pw = Process(target=write,args=[record_queue, table_queue])
pr = Process(target=read,args=[record_queue, table_queue])
pw.start()
pr.start()
pw.join()
pr.join()
write.py
from concurrent.futures import ThreadPoolExecutor, as_completed
def write(record_queue, table_queue):
thread_num = 3
pool = ThreadPoolExecutor(thread_num)
futures = [pool.submit(run, record_queue, table_queue) for _ in range (thread_num)]
results = [r.result() for r in as_completed(futures)]
def run(record_queue, table_queue):
while True:
if table_queue.empty():
break
table = table_queue.get()
# adding this code below reduce deadlock opportunity.
#import time
#import random
#time.sleep(random.randint(1, 3))
process_with_table(record_queue, table_queue, table)
def process_with_table(record_queue, table_queue, table):
#for short
for item in [x for x in range(1000)]:
record_queue.put(item)
read.py
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import Queue
def read(record_queue, table_queue):
count = 0
while True:
item = record_queue.get()
count += 1
print ("item: ", item)
if count == 4:
break
我用谷歌搜索了一下,SO 上也有同样的问题,但我看不出与我的代码有何相似之处,所以谁能帮助我的代码,谢谢...
我似乎找到了解决方案,将写入模块中的 运行 方法更改为:
def run(record_queue, table_queue):
while True:
try:
if table_queue.empty():
break
table = table_queue.get(timeout=3)
process_with_table(record_queue, table_queue, table)
except multiprocessing.queues.Empty:
import time
time.sleep(0.1)
并且永远不会在 get 方法上出现死锁或阻塞。
我创建了两个进程,一个生成多线程的进程是响应将数据写入队列,另一个是从队列读取数据。它总是死锁在高频率,更少不是。特别是当你在 write 模块的 运行 方法中添加 sleep 时(在代码中注释)。让我把我的代码放在下面:
环境: python2.7
main.py
from multiprocessing import Process,Queue
from write import write
from read import read
if __name__ == "__main__":
record_queue = Queue()
table_queue = Queue()
pw = Process(target=write,args=[record_queue, table_queue])
pr = Process(target=read,args=[record_queue, table_queue])
pw.start()
pr.start()
pw.join()
pr.join()
write.py
from concurrent.futures import ThreadPoolExecutor, as_completed
def write(record_queue, table_queue):
thread_num = 3
pool = ThreadPoolExecutor(thread_num)
futures = [pool.submit(run, record_queue, table_queue) for _ in range (thread_num)]
results = [r.result() for r in as_completed(futures)]
def run(record_queue, table_queue):
while True:
if table_queue.empty():
break
table = table_queue.get()
# adding this code below reduce deadlock opportunity.
#import time
#import random
#time.sleep(random.randint(1, 3))
process_with_table(record_queue, table_queue, table)
def process_with_table(record_queue, table_queue, table):
#for short
for item in [x for x in range(1000)]:
record_queue.put(item)
read.py
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
import Queue
def read(record_queue, table_queue):
count = 0
while True:
item = record_queue.get()
count += 1
print ("item: ", item)
if count == 4:
break
我用谷歌搜索了一下,SO 上也有同样的问题,但我看不出与我的代码有何相似之处,所以谁能帮助我的代码,谢谢...
我似乎找到了解决方案,将写入模块中的 运行 方法更改为:
def run(record_queue, table_queue):
while True:
try:
if table_queue.empty():
break
table = table_queue.get(timeout=3)
process_with_table(record_queue, table_queue, table)
except multiprocessing.queues.Empty:
import time
time.sleep(0.1)
并且永远不会在 get 方法上出现死锁或阻塞。