ThreadPoolExecutor 超时后如何退出脚本
How to exit a script after ThreadPoolExecutor has timed out
我试图在一些工作超时后完全退出,因为某个线程被某些东西阻塞了。我正在使用这样的 ThreadPoolExecutor
:
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
# submit some work
workers = [executor.submit(...) for x in work]
# wait for completion
try:
for f in concurrent.futures.as_completed(workers, timeout=60):
f.result()
except concurrent.futures.TimeoutError:
raise TimeoutError()
except TimeoutError:
# cleanup
此代码到达 # cleanup
没问题,但脚本永远不会退出,因为它正在等待阻塞的线程最终完成。我还不知道是什么导致工作人员永远阻塞,这是另一个需要解决的问题,但我需要有一种方法至少在我们 运行 进入这种情况时退出。
我查看了 ThreadPoolExecutor 中的线程是如何创建的,它们被设置为 daemon = True
所以我很困惑为什么这些线程会阻止应用程序退出。
奇怪的是,这是有意为之的行为。来自 concurrent/futures/thread.py
(版本 3.6.3):
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
"This problem" 正是您想要的行为 - 在工作线程仍在 运行 时退出。退出处理程序提到在所有工作线程上调用 join()
,如果它们被卡住,它将永远阻塞:
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit)
还有 TaskThreadExecutor
本身的 __exit__
方法:
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
self.shutdown
,与 wait=True
,也加入了所有工作线程。
要强制退出,我们需要覆盖这两个。如果你修改你的代码如下:
except concurrent.futures.TimeoutError:
import atexit
atexit.unregister(concurrent.futures.thread._python_exit))
executor.shutdown = lambda wait:None
raise TimeoutError()
然后您的脚本将根据需要退出。
我试图在一些工作超时后完全退出,因为某个线程被某些东西阻塞了。我正在使用这样的 ThreadPoolExecutor
:
try:
with concurrent.futures.ThreadPoolExecutor(max_workers=32) as executor:
# submit some work
workers = [executor.submit(...) for x in work]
# wait for completion
try:
for f in concurrent.futures.as_completed(workers, timeout=60):
f.result()
except concurrent.futures.TimeoutError:
raise TimeoutError()
except TimeoutError:
# cleanup
此代码到达 # cleanup
没问题,但脚本永远不会退出,因为它正在等待阻塞的线程最终完成。我还不知道是什么导致工作人员永远阻塞,这是另一个需要解决的问题,但我需要有一种方法至少在我们 运行 进入这种情况时退出。
我查看了 ThreadPoolExecutor 中的线程是如何创建的,它们被设置为 daemon = True
所以我很困惑为什么这些线程会阻止应用程序退出。
奇怪的是,这是有意为之的行为。来自 concurrent/futures/thread.py
(版本 3.6.3):
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
"This problem" 正是您想要的行为 - 在工作线程仍在 运行 时退出。退出处理程序提到在所有工作线程上调用 join()
,如果它们被卡住,它将永远阻塞:
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
for t, q in items:
q.put(None)
for t, q in items:
t.join()
atexit.register(_python_exit)
还有 TaskThreadExecutor
本身的 __exit__
方法:
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
self.shutdown
,与 wait=True
,也加入了所有工作线程。
要强制退出,我们需要覆盖这两个。如果你修改你的代码如下:
except concurrent.futures.TimeoutError:
import atexit
atexit.unregister(concurrent.futures.thread._python_exit))
executor.shutdown = lambda wait:None
raise TimeoutError()
然后您的脚本将根据需要退出。