如何立即重新引发任何工作线程中抛出的异常?
How to immediately re-raise an exception thrown in any of the worker threads?
我正在使用 ThreadPoolExecutor
并且需要中止整个计算以防任何工作线程失败。
示例 1. 这将打印 Success 而不管错误,因为 ThreadPoolExecutor
不会自动重新引发异常。
from concurrent.futures import ThreadPoolExecutor
def task():
raise ValueError
with ThreadPoolExecutor() as executor:
executor.submit(task)
print('Success')
示例 2. 这正确地使主线程崩溃,因为 .result()
重新引发了异常。但是它等待第一个任务完成,所以主线程经历了延迟的异常。
import time
from concurrent.futures import ThreadPoolExecutor
def task(should_raise):
time.sleep(1)
if should_raise:
raise ValueError
with ThreadPoolExecutor() as executor:
executor.submit(task, False).result()
executor.submit(task, True).result()
print('Success')
如何在主线程(或多或少)发生后立即注意到主线程中的 worker 异常,以处理故障并中止剩余的 worker?
我想,我会这样实现的:
我是主进程,我创建了 2 个队列:
- 一个来报告异常,
- 一个通知取消。
::
import multiprocessing as mp
error_queue = mp.Queue()
cancel_queue = mp.Queue()
我创建每个 ThreadPoolExecutor
,并将这些队列作为参数传递。
class MyExecutor(concurrent.futures.ThreadPoolExecutor):
def __init__(self, error_queue, cancel_queue):
self.error_queue : error_queue
self.cancel_queue = cancel_queue
每个ThreadPoolExecutor
都有一个主循环。在此循环中,我首先扫描 cancel_queue
以查看 "cancel" 消息是否可用。
在主循环中,我还实现了一个异常管理器。如果发生错误,我会抛出异常:
self.status = "running"
with True: # <- or something else
if not self.cancel_queue.empty():
self.status = "cancelled"
break
try:
# normal processing
...
except Exception as exc:
# you can log the exception here for debug
self.error_queue.put(exc)
self.status = "error"
break
time.sleep(.1)
在主进程中:
运行 所有 MyExecutor
个实例。
扫描error_queue:
while True:
if not error_queue.empty():
cancel_queue.put("cancel")
time.sleep(.1)
首先,我们必须在请求结果之前提交任务。否则,线程甚至运行并行:
futures = []
with ThreadPoolExecutor() as executor:
futures.append(executor.submit(good_task))
futures.append(executor.submit(bad_task))
for future in futures:
future.result()
现在我们可以将异常信息存储在一个变量中,该变量对主线程和工作线程都可用:
exc_info = None
主线程无法真正杀死它的子进程,所以我们让worker检查异常信息设置并停止:
def good_task():
global exc_info
while not exc_info:
time.sleep(0.1)
def bad_task():
global exc_info
time.sleep(0.2)
try:
raise ValueError()
except Exception:
exc_info = sys.exc_info()
所有线程终止后,主线程可以查看保存异常信息的变量。如果已填充,我们将重新引发异常:
if exc_info:
raise exc_info[0].with_traceback(exc_info[1], exc_info[2])
print('Success')
我正在使用 ThreadPoolExecutor
并且需要中止整个计算以防任何工作线程失败。
示例 1. 这将打印 Success 而不管错误,因为 ThreadPoolExecutor
不会自动重新引发异常。
from concurrent.futures import ThreadPoolExecutor
def task():
raise ValueError
with ThreadPoolExecutor() as executor:
executor.submit(task)
print('Success')
示例 2. 这正确地使主线程崩溃,因为 .result()
重新引发了异常。但是它等待第一个任务完成,所以主线程经历了延迟的异常。
import time
from concurrent.futures import ThreadPoolExecutor
def task(should_raise):
time.sleep(1)
if should_raise:
raise ValueError
with ThreadPoolExecutor() as executor:
executor.submit(task, False).result()
executor.submit(task, True).result()
print('Success')
如何在主线程(或多或少)发生后立即注意到主线程中的 worker 异常,以处理故障并中止剩余的 worker?
我想,我会这样实现的:
我是主进程,我创建了 2 个队列:
- 一个来报告异常,
- 一个通知取消。
::
import multiprocessing as mp
error_queue = mp.Queue()
cancel_queue = mp.Queue()
我创建每个 ThreadPoolExecutor
,并将这些队列作为参数传递。
class MyExecutor(concurrent.futures.ThreadPoolExecutor):
def __init__(self, error_queue, cancel_queue):
self.error_queue : error_queue
self.cancel_queue = cancel_queue
每个ThreadPoolExecutor
都有一个主循环。在此循环中,我首先扫描 cancel_queue
以查看 "cancel" 消息是否可用。
在主循环中,我还实现了一个异常管理器。如果发生错误,我会抛出异常:
self.status = "running"
with True: # <- or something else
if not self.cancel_queue.empty():
self.status = "cancelled"
break
try:
# normal processing
...
except Exception as exc:
# you can log the exception here for debug
self.error_queue.put(exc)
self.status = "error"
break
time.sleep(.1)
在主进程中:
运行 所有 MyExecutor
个实例。
扫描error_queue:
while True:
if not error_queue.empty():
cancel_queue.put("cancel")
time.sleep(.1)
首先,我们必须在请求结果之前提交任务。否则,线程甚至运行并行:
futures = []
with ThreadPoolExecutor() as executor:
futures.append(executor.submit(good_task))
futures.append(executor.submit(bad_task))
for future in futures:
future.result()
现在我们可以将异常信息存储在一个变量中,该变量对主线程和工作线程都可用:
exc_info = None
主线程无法真正杀死它的子进程,所以我们让worker检查异常信息设置并停止:
def good_task():
global exc_info
while not exc_info:
time.sleep(0.1)
def bad_task():
global exc_info
time.sleep(0.2)
try:
raise ValueError()
except Exception:
exc_info = sys.exc_info()
所有线程终止后,主线程可以查看保存异常信息的变量。如果已填充,我们将重新引发异常:
if exc_info:
raise exc_info[0].with_traceback(exc_info[1], exc_info[2])
print('Success')