为什么使用 ThreadingPoolExecutor 会使 "main" 线程永不恢复?
Why employing ThreadingPoolExecutor makes "main" thread to never resume?
使用 python,我试图演示 producer/consumer 多线程场景如何在消费者线程最终等待一个空队列时导致死锁,该队列将在其余部分保持为空执行的过程,直到结束,以及如何解决这个问题,避免程序饿死或突然的“脏中断”。
所以我从producer/consumer threading using a queue on this nice RealPython article中获取了代码,这里是原始代码摘录:
def consumer(queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)
logging.info("Consumer received event. Exiting")
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
我注意到,尽管不太可能发生,但代码会导致我在 'main' 线程设置 'event' 和 'producer' 案例中描述的情况在 'consumer' 仍在等待从队列中获取消息时结束。
要解决单个 'consumer' 的情况,在 'get' 指令调用之前简单检查队列是否为空就足够了(例如:if (not q.empty()): message = q.get()
)。但是,这个问题仍然存在于多消费者场景中,因为线程可以在队列空检查后立即交换,而另一个消费者(第二个消费者)可以获得消息,使队列为空,以便交换回前一个消费者(第一个)它会在空队列上调用 get 并且......就是这样。
我想寻求一种即使在假设的多消费者场景中也可能有效的解决方案。所以我以这种方式修改了 'consumer' 代码,实质上在队列获取指令上添加了超时并管理异常:
def consumer(q, event, n):
while not event.is_set() or not q.empty():
print("Consumer"+n+": Q-get")
try:
time.sleep(0.1) #(I don't really need this, I just want to force a consumer-thread swap at this precise point :=> showing that, just as expected, things will work well in a multi-consumer scenario also)
message = q.get(True,1)
except queue.Empty:
print("Consumer"+n+": looping on empty queue")
time.sleep(0.1) #(I don't really need this at all... just hoping -unfortunately without success- _main_ to swap on ThreadPoolExecutor)
continue
logging.info("Consumer%s storing message: %s (size=%d)", n,message,q.qsize())
print("Consumer"+n+": ended")
并且还修改了“主要”部分,使其将消息放入队列并生成第二个消费者而不是生产者...
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
pipeline.put("XxXxX")
print("Let's start (ThreadPoolExecutor)")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
print("(_main_ won't even get this far... Why?)") #!#
time.sleep(2)
logging.info("Main: about to set event")
event.set()
(请注意我在这里的目的是阻止消费者死锁风险并表明它实际上是无效的,在这个阶段我真的不需要生产者,这就是为什么我让代码不产生它)
现在,问题是,我不明白为什么,如果线程是用
threading.Thread(...).start()
,例如:
print("Let's start (simple Thread)")
for i in range(1,3): threading.Thread(target=consumer, args=(pipeline, event, str(i))).start()
而 使用 concurrent.futures.ThreadPoolExecutor
因为它似乎使 'main' 线程永远不会恢复 (似乎它甚至没有进入睡眠调用),所以执行永远不会结束导致无限的消费者循环......
你能帮我理解 为什么会有这种“差异”吗? 知道这对我很重要,我想几乎肯定会帮助我理解它是否可以以某种方式解决,或者我是否必须被迫不使用 ThreadPoolExecutor,所以...预先感谢您在此方面的宝贵帮助!
问题是您将 event.set()
放在管理 ThreadPoolExecutor
的 with
块之外。当与 with
一起使用时,在退出 the with
, ThreadPoolExecutor
performs the equivalent of .shutdown(wait=True)
时。所以你在等待工人完成,他们不会,因为你还没有设置 event
.
如果您希望能够告诉它在可以关闭时关闭,而不是立即等待,您可以这样做:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
try:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
executor.shutdown(wait=False) # No new work can be submitted after this point, and
# workers will opportunistically exit if no work left
time.sleep(2)
logging.info("Main: about to set event")
finally:
event.set() # Set the event *before* we block awaiting shutdown
# Done in a finally block to ensure its set even on exception
# so the with doesn't try to block in a case where
# it will never finish
# Blocks for full shutdown here; after this, the pool is definitely finished and cleaned up
使用 python,我试图演示 producer/consumer 多线程场景如何在消费者线程最终等待一个空队列时导致死锁,该队列将在其余部分保持为空执行的过程,直到结束,以及如何解决这个问题,避免程序饿死或突然的“脏中断”。
所以我从producer/consumer threading using a queue on this nice RealPython article中获取了代码,这里是原始代码摘录:
def consumer(queue, event):
"""Pretend we're saving a number in the database."""
while not event.is_set() or not queue.empty():
message = queue.get()
logging.info(
"Consumer storing message: %s (size=%d)", message, queue.qsize()
)
logging.info("Consumer received event. Exiting")
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,
datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event)
time.sleep(0.1)
logging.info("Main: about to set event")
event.set()
我注意到,尽管不太可能发生,但代码会导致我在 'main' 线程设置 'event' 和 'producer' 案例中描述的情况在 'consumer' 仍在等待从队列中获取消息时结束。
要解决单个 'consumer' 的情况,在 'get' 指令调用之前简单检查队列是否为空就足够了(例如:
if (not q.empty()): message = q.get()
)。但是,这个问题仍然存在于多消费者场景中,因为线程可以在队列空检查后立即交换,而另一个消费者(第二个消费者)可以获得消息,使队列为空,以便交换回前一个消费者(第一个)它会在空队列上调用 get 并且......就是这样。
我想寻求一种即使在假设的多消费者场景中也可能有效的解决方案。所以我以这种方式修改了 'consumer' 代码,实质上在队列获取指令上添加了超时并管理异常:
def consumer(q, event, n):
while not event.is_set() or not q.empty():
print("Consumer"+n+": Q-get")
try:
time.sleep(0.1) #(I don't really need this, I just want to force a consumer-thread swap at this precise point :=> showing that, just as expected, things will work well in a multi-consumer scenario also)
message = q.get(True,1)
except queue.Empty:
print("Consumer"+n+": looping on empty queue")
time.sleep(0.1) #(I don't really need this at all... just hoping -unfortunately without success- _main_ to swap on ThreadPoolExecutor)
continue
logging.info("Consumer%s storing message: %s (size=%d)", n,message,q.qsize())
print("Consumer"+n+": ended")
并且还修改了“主要”部分,使其将消息放入队列并生成第二个消费者而不是生产者...
if __name__ == "__main__":
format = "%(asctime)s: %(message)s"
logging.basicConfig(format=format, level=logging.INFO,datefmt="%H:%M:%S")
pipeline = queue.Queue(maxsize=10)
event = threading.Event()
pipeline.put("XxXxX")
print("Let's start (ThreadPoolExecutor)")
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
print("(_main_ won't even get this far... Why?)") #!#
time.sleep(2)
logging.info("Main: about to set event")
event.set()
(请注意我在这里的目的是阻止消费者死锁风险并表明它实际上是无效的,在这个阶段我真的不需要生产者,这就是为什么我让代码不产生它)
现在,问题是,我不明白为什么,如果线程是用
threading.Thread(...).start()
,例如:
print("Let's start (simple Thread)")
for i in range(1,3): threading.Thread(target=consumer, args=(pipeline, event, str(i))).start()
而 使用 concurrent.futures.ThreadPoolExecutor
因为它似乎使 'main' 线程永远不会恢复 (似乎它甚至没有进入睡眠调用),所以执行永远不会结束导致无限的消费者循环......
你能帮我理解 为什么会有这种“差异”吗? 知道这对我很重要,我想几乎肯定会帮助我理解它是否可以以某种方式解决,或者我是否必须被迫不使用 ThreadPoolExecutor,所以...预先感谢您在此方面的宝贵帮助!
问题是您将 event.set()
放在管理 ThreadPoolExecutor
的 with
块之外。当与 with
一起使用时,在退出 the with
, ThreadPoolExecutor
performs the equivalent of .shutdown(wait=True)
时。所以你在等待工人完成,他们不会,因为你还没有设置 event
.
如果您希望能够告诉它在可以关闭时关闭,而不是立即等待,您可以这样做:
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
try:
#executor.submit(producer, pipeline, event)
executor.submit(consumer, pipeline, event, '1')
executor.submit(consumer, pipeline, event, '2')
executor.shutdown(wait=False) # No new work can be submitted after this point, and
# workers will opportunistically exit if no work left
time.sleep(2)
logging.info("Main: about to set event")
finally:
event.set() # Set the event *before* we block awaiting shutdown
# Done in a finally block to ensure its set even on exception
# so the with doesn't try to block in a case where
# it will never finish
# Blocks for full shutdown here; after this, the pool is definitely finished and cleaned up