Python ProcessPoolExecutor 无法处理异常
Python ProcessPoolExecutor cannot handle exception
我正在使用 ProcessPoolExecutor
执行 Tornado 服务器以并行处理多个请求。
问题是,在一种特定情况下,当其中一个进程引发异常时,它不会传播,而是进程崩溃并出现此错误:
concurrent.futures.process._RemoteTraceback:
\n'''\nTraceback (most recent call last):
\n File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\process.py\", line 367, in _queue_management_worker\n result_item = result_reader.recv()
\n File \"C:\Users\ActionICT\anaconda3\lib\multiprocessing\connection.py\", line 251, in recv
\n return _ForkingPickler.loads(buf.getbuffer())\nTypeError: __init__() missing 1 required positional argument: 'is_local'\n'''\n\nThe above exception was the direct cause of the following exception:
\n
\nTraceback (most recent call last):\n File \"C:\S1\Product\Baseline\PYTHON\lab\controller.py\", line 558, in get\n output = exec_future.result()
\n File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\_base.py\", line 428, in result\n return self.__get_result()\n File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\_base.py\", line 384, in __get_result
\n raise self._exception\nconcurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.\n
我已经在调试器中试过了,发现问题出在执行这个
def _send_bytes(self, buf):
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
try:
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
[ov.event], False, INFINITE)
assert waitres == WAIT_OBJECT_0
except:
ov.cancel()
raise
finally:
nwritten, err = ov.GetOverlappedResult(True)
assert err == 0
assert nwritten == len(buf)
当进程试图将异常传播到相应的 Future
对象时调用。
在第一行中,当调用 _winapi.WriteFile
时,调试器中的一切都崩溃了,我不明白为什么。有什么想法吗?
我已经解决了一个解决方法:我在 try except 中将函数内部包装在单独的进程中,然后将旧异常复制到新异常中并引发它。我不知道为什么...但它有效。
def _execute_tuning(tune_parameters: TuneParameters):
# function to parallelize todo to be refactored
# execute scenario, then write result or error in output
try:
config.generate_project_config(
project_name=tune_parameters.project_name,
scenario_name=tune_parameters.scenario_name
)
config.generate_session_log_config(project_name=tune_parameters.project_name,
scenario_name=tune_parameters.scenario_name)
tree = DecisionTreeGenerator(tune_parameters.project_name, tune_parameters.scenario_name)
tree.fit(
# todo refactor
auto_tune=True if tune_parameters == 'true' else False,
max_depth=tune_parameters.max_depth,
columns=tune_parameters.columns,
min_samples_leaf=tune_parameters.min_samples_per_leaf,
max_leaf_nodes=tune_parameters.max_leaf_nodes
)
kpi = KPICalculator(tune_parameters.project_name, tune_parameters.scenario_name)
kpi.run(do_optimization_kpi=False)
except Exception as exc:
Loggers.application.exception(exc)
exc_final = Exception(str(exc))
exc_final.__traceback__ = exc.__traceback__
raise exc_final
我正在使用 ProcessPoolExecutor
执行 Tornado 服务器以并行处理多个请求。
问题是,在一种特定情况下,当其中一个进程引发异常时,它不会传播,而是进程崩溃并出现此错误:
concurrent.futures.process._RemoteTraceback:
\n'''\nTraceback (most recent call last):
\n File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\process.py\", line 367, in _queue_management_worker\n result_item = result_reader.recv()
\n File \"C:\Users\ActionICT\anaconda3\lib\multiprocessing\connection.py\", line 251, in recv
\n return _ForkingPickler.loads(buf.getbuffer())\nTypeError: __init__() missing 1 required positional argument: 'is_local'\n'''\n\nThe above exception was the direct cause of the following exception:
\n
\nTraceback (most recent call last):\n File \"C:\S1\Product\Baseline\PYTHON\lab\controller.py\", line 558, in get\n output = exec_future.result()
\n File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\_base.py\", line 428, in result\n return self.__get_result()\n File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\_base.py\", line 384, in __get_result
\n raise self._exception\nconcurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.\n
我已经在调试器中试过了,发现问题出在执行这个
def _send_bytes(self, buf):
ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
try:
if err == _winapi.ERROR_IO_PENDING:
waitres = _winapi.WaitForMultipleObjects(
[ov.event], False, INFINITE)
assert waitres == WAIT_OBJECT_0
except:
ov.cancel()
raise
finally:
nwritten, err = ov.GetOverlappedResult(True)
assert err == 0
assert nwritten == len(buf)
当进程试图将异常传播到相应的 Future
对象时调用。
在第一行中,当调用 _winapi.WriteFile
时,调试器中的一切都崩溃了,我不明白为什么。有什么想法吗?
我已经解决了一个解决方法:我在 try except 中将函数内部包装在单独的进程中,然后将旧异常复制到新异常中并引发它。我不知道为什么...但它有效。
def _execute_tuning(tune_parameters: TuneParameters):
# function to parallelize todo to be refactored
# execute scenario, then write result or error in output
try:
config.generate_project_config(
project_name=tune_parameters.project_name,
scenario_name=tune_parameters.scenario_name
)
config.generate_session_log_config(project_name=tune_parameters.project_name,
scenario_name=tune_parameters.scenario_name)
tree = DecisionTreeGenerator(tune_parameters.project_name, tune_parameters.scenario_name)
tree.fit(
# todo refactor
auto_tune=True if tune_parameters == 'true' else False,
max_depth=tune_parameters.max_depth,
columns=tune_parameters.columns,
min_samples_leaf=tune_parameters.min_samples_per_leaf,
max_leaf_nodes=tune_parameters.max_leaf_nodes
)
kpi = KPICalculator(tune_parameters.project_name, tune_parameters.scenario_name)
kpi.run(do_optimization_kpi=False)
except Exception as exc:
Loggers.application.exception(exc)
exc_final = Exception(str(exc))
exc_final.__traceback__ = exc.__traceback__
raise exc_final