多处理的 queue.get() return 什么时候完成?
When does multiprocessing's queue.get() return DONE?
我正在学习 Python 多处理模块,我找到了 this 示例:
from multiprocessing import Process, Queue
import time
def reader(queue):
## Read from the queue
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = Queue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch reader() as a separate python process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print "Sending %s numbers to Queue() took %s seconds" % (count,
(time.time() - _start))
我想知道 queue.get()
什么时候会 return DONE
所以我尝试了以下示例:
#!/bin/env python
from multiprocessing import Process, Queue
import time
if __name__=='__main__':
queue = Queue()
print "Before 2x put"
queue.put(10)
queue.put(20)
print "After 2x put"
print "Before 1s get"
print queue.get()
print "After 1st get"
print "Before 2nd get"
print queue.get()
print "After 2nd get"
print "Before 3rd get"
print queue.get()
print "After 3rd get"
此脚本的最后一条消息是 Before 3rd get
,此后脚本卡住了,结束它的唯一方法是终止它。从此示例中,您可以看到 queue.get()
正在阻塞(代码不会继续,直到它结束)。怎么可能在原代码queue.get()
returnsDONE
的时候出现这种情况呢?
编辑
回复@KemyLand,它很好地解释了这里发生的事情,这是没有卡住的版本:
#!/bin/env python
from multiprocessing import Process, Queue
import time
if __name__=='__main__':
queue = Queue()
print "Before 2x put"
queue.put(10)
queue.put(20)
print "After 2x put"
print "Before 1s get"
print queue.get()
print "After 1st get"
print "Before 2nd get"
print queue.get()
print "After 2nd get"
print "Before 3rd get"
try:
print queue.get_nowait()
print "After 3rd get"
except:
pass
这个很简单
在您的第一个代码中,reader
和 writer
之间达成一致的 "protocol" 是 writer
将任意数量的数据发送到 reader
,然后writer
发送'DONE',reader
收到,明白数据传输完成
在您的第二个代码中,reader
和 writer
之间没有达成协议,因为作者的观点是“我发送两个对象,我就完成了!”,而reader的观点是“*我收到三个 个对象,我就完成了!”。
因为无论何时发生协议错误,运行时环境的任何部分都无法检测到,应用程序只是阻塞,等待永远不会到来的数据。唯一可以检测到这种情况的是应用程序本身,因为它是唯一知道它遵守的协议的应用程序。为此,你可以使用Queue.Queue.get_nowait()
(你必须 import Queue
(大写Q),因为multiprocessing.Queue
只是 Queue.Queue
的别名)。如果此类函数无法从 Queue
中直接 提取对象 ,它将抛出 Queue.Empty
异常。 (注意:模块名称的这种混乱已在 Python 3 中修复)。
我正在学习 Python 多处理模块,我找到了 this 示例:
from multiprocessing import Process, Queue
import time
def reader(queue):
## Read from the queue
while True:
msg = queue.get() # Read from the queue and do nothing
if (msg == 'DONE'):
break
def writer(count, queue):
## Write to the queue
for ii in xrange(0, count):
queue.put(ii) # Write 'count' numbers into the queue
queue.put('DONE')
if __name__=='__main__':
for count in [10**4, 10**5, 10**6]:
queue = Queue() # reader() reads from queue
# writer() writes to queue
reader_p = Process(target=reader, args=((queue),))
reader_p.daemon = True
reader_p.start() # Launch reader() as a separate python process
_start = time.time()
writer(count, queue) # Send a lot of stuff to reader()
reader_p.join() # Wait for the reader to finish
print "Sending %s numbers to Queue() took %s seconds" % (count,
(time.time() - _start))
我想知道 queue.get()
什么时候会 return DONE
所以我尝试了以下示例:
#!/bin/env python
from multiprocessing import Process, Queue
import time
if __name__=='__main__':
queue = Queue()
print "Before 2x put"
queue.put(10)
queue.put(20)
print "After 2x put"
print "Before 1s get"
print queue.get()
print "After 1st get"
print "Before 2nd get"
print queue.get()
print "After 2nd get"
print "Before 3rd get"
print queue.get()
print "After 3rd get"
此脚本的最后一条消息是 Before 3rd get
,此后脚本卡住了,结束它的唯一方法是终止它。从此示例中,您可以看到 queue.get()
正在阻塞(代码不会继续,直到它结束)。怎么可能在原代码queue.get()
returnsDONE
的时候出现这种情况呢?
编辑
回复@KemyLand,它很好地解释了这里发生的事情,这是没有卡住的版本:
#!/bin/env python
from multiprocessing import Process, Queue
import time
if __name__=='__main__':
queue = Queue()
print "Before 2x put"
queue.put(10)
queue.put(20)
print "After 2x put"
print "Before 1s get"
print queue.get()
print "After 1st get"
print "Before 2nd get"
print queue.get()
print "After 2nd get"
print "Before 3rd get"
try:
print queue.get_nowait()
print "After 3rd get"
except:
pass
这个很简单
在您的第一个代码中,reader
和 writer
之间达成一致的 "protocol" 是 writer
将任意数量的数据发送到 reader
,然后writer
发送'DONE',reader
收到,明白数据传输完成
在您的第二个代码中,reader
和 writer
之间没有达成协议,因为作者的观点是“我发送两个对象,我就完成了!”,而reader的观点是“*我收到三个 个对象,我就完成了!”。
因为无论何时发生协议错误,运行时环境的任何部分都无法检测到,应用程序只是阻塞,等待永远不会到来的数据。唯一可以检测到这种情况的是应用程序本身,因为它是唯一知道它遵守的协议的应用程序。为此,你可以使用Queue.Queue.get_nowait()
(你必须 import Queue
(大写Q),因为multiprocessing.Queue
只是 Queue.Queue
的别名)。如果此类函数无法从 Queue
中直接 提取对象 ,它将抛出 Queue.Empty
异常。 (注意:模块名称的这种混乱已在 Python 3 中修复)。