为什么这个 queue.join 调用无限期阻塞?
Why is this queue.join call blocking indefinitely?
我正在研究 python3.6 中的一个个人项目,我 运行 遇到了以下导致 my_queue.join()
调用无限期阻塞的问题。请注意,这不是我的实际代码,而是演示该问题的最小示例。
import threading
import queue
def foo(stop_event, my_queue):
while not stop_event.is_set():
try:
item = my_queue.get(timeout=0.1)
print(item) #Actual logic goes here
except queue.Empty:
pass
print('DONE')
stop_event = threading.Event()
my_queue = queue.Queue()
thread = threading.Thread(target=foo, args=(stop_event, my_queue))
thread.start()
my_queue.put(1)
my_queue.put(2)
my_queue.put(3)
print('ALL PUT')
my_queue.join()
print('ALL PROCESSED')
stop_event.set()
print('ALL COMPLETE')
我得到以下输出(它实际上是一致的,但我知道输出顺序可能因线程而异):
ALL PUT
1
2
3
无论我等待多久,我都看不到 ALL PROCESSED
输出到控制台,那么为什么 my_queue.join()
在所有项目都已处理后无限期阻塞?
来自docs:
The count of unfinished tasks goes up whenever an item is added to the
queue. The count goes down whenever a consumer thread calls
task_done() to indicate that the item was retrieved and all work on it
is complete. When the count of unfinished tasks drops to zero, join()
unblocks.
您永远不会在 foo
函数中调用 q.task_done()
。 foo
函数应该类似于示例:
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()
我正在研究 python3.6 中的一个个人项目,我 运行 遇到了以下导致 my_queue.join()
调用无限期阻塞的问题。请注意,这不是我的实际代码,而是演示该问题的最小示例。
import threading
import queue
def foo(stop_event, my_queue):
while not stop_event.is_set():
try:
item = my_queue.get(timeout=0.1)
print(item) #Actual logic goes here
except queue.Empty:
pass
print('DONE')
stop_event = threading.Event()
my_queue = queue.Queue()
thread = threading.Thread(target=foo, args=(stop_event, my_queue))
thread.start()
my_queue.put(1)
my_queue.put(2)
my_queue.put(3)
print('ALL PUT')
my_queue.join()
print('ALL PROCESSED')
stop_event.set()
print('ALL COMPLETE')
我得到以下输出(它实际上是一致的,但我知道输出顺序可能因线程而异):
ALL PUT
1
2
3
无论我等待多久,我都看不到 ALL PROCESSED
输出到控制台,那么为什么 my_queue.join()
在所有项目都已处理后无限期阻塞?
来自docs:
The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.
您永远不会在 foo
函数中调用 q.task_done()
。 foo
函数应该类似于示例:
def worker():
while True:
item = q.get()
if item is None:
break
do_work(item)
q.task_done()