为什么这个 queue.join 调用无限期阻塞?

Why is this queue.join call blocking indefinitely?

我正在研究 python3.6 中的一个个人项目,我 运行 遇到了以下导致 my_queue.join() 调用无限期阻塞的问题。请注意,这不是我的实际代码,而是演示该问题的最小示例。

import threading
import queue

def foo(stop_event, my_queue):
  while not stop_event.is_set():
    try:
      item = my_queue.get(timeout=0.1)
      print(item) #Actual logic goes here
    except queue.Empty:
      pass
  print('DONE')

stop_event = threading.Event()
my_queue = queue.Queue()
thread = threading.Thread(target=foo, args=(stop_event, my_queue))
thread.start()

my_queue.put(1)
my_queue.put(2)
my_queue.put(3)

print('ALL PUT')

my_queue.join()

print('ALL PROCESSED')

stop_event.set()

print('ALL COMPLETE')

我得到以下输出(它实际上是一致的,但我知道输出顺序可能因线程而异):

ALL PUT
1
2
3

无论我等待多久,我都看不到 ALL PROCESSED 输出到控制台,那么为什么 my_queue.join() 在所有项目都已处理后无限期阻塞?

来自docs

The count of unfinished tasks goes up whenever an item is added to the queue. The count goes down whenever a consumer thread calls task_done() to indicate that the item was retrieved and all work on it is complete. When the count of unfinished tasks drops to zero, join() unblocks.

您永远不会在 foo 函数中调用 q.task_done()foo 函数应该类似于示例:

def worker():
    while True:
        item = q.get()
        if item is None:
            break
        do_work(item)
        q.task_done()