Python ProcessPoolExecutor 无法处理异常

Python ProcessPoolExecutor cannot handle exception

我正在使用 ProcessPoolExecutor 执行 Tornado 服务器以并行处理多个请求。 问题是,在一种特定情况下,当其中一个进程引发异常时,它不会传播,而是进程崩溃并出现此错误:

concurrent.futures.process._RemoteTraceback:
\n'''\nTraceback (most recent call last):
\n  File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\process.py\", line 367, in _queue_management_worker\n    result_item = result_reader.recv()
\n  File \"C:\Users\ActionICT\anaconda3\lib\multiprocessing\connection.py\", line 251, in recv
\n    return _ForkingPickler.loads(buf.getbuffer())\nTypeError: __init__() missing 1 required positional argument: 'is_local'\n'''\n\nThe above exception was the direct cause of the following exception:
\n
\nTraceback (most recent call last):\n  File \"C:\S1\Product\Baseline\PYTHON\lab\controller.py\", line 558, in get\n    output = exec_future.result()
\n  File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\_base.py\", line 428, in result\n    return self.__get_result()\n  File \"C:\Users\ActionICT\anaconda3\lib\concurrent\futures\_base.py\", line 384, in __get_result
\n    raise self._exception\nconcurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.\n

我已经在调试器中试过了,发现问题出在执行这个

    def _send_bytes(self, buf):
        ov, err = _winapi.WriteFile(self._handle, buf, overlapped=True)
        try:
            if err == _winapi.ERROR_IO_PENDING:
                waitres = _winapi.WaitForMultipleObjects(
                    [ov.event], False, INFINITE)
                assert waitres == WAIT_OBJECT_0
        except:
            ov.cancel()
            raise
        finally:
            nwritten, err = ov.GetOverlappedResult(True)
        assert err == 0
        assert nwritten == len(buf)

当进程试图将异常传播到相应的 Future 对象时调用。 在第一行中,当调用 _winapi.WriteFile 时,调试器中的一切都崩溃了,我不明白为什么。有什么想法吗?

我已经解决了一个解决方法:我在 try except 中将函数内部包装在单独的进程中,然后将旧异常复制到新异常中并引发它。我不知道为什么...但它有效。

def _execute_tuning(tune_parameters: TuneParameters):
# function to parallelize  todo to be refactored

# execute scenario, then write result or error in output

try:
    config.generate_project_config(
        project_name=tune_parameters.project_name,
        scenario_name=tune_parameters.scenario_name
    )

    config.generate_session_log_config(project_name=tune_parameters.project_name,
                                                    scenario_name=tune_parameters.scenario_name)

    tree = DecisionTreeGenerator(tune_parameters.project_name, tune_parameters.scenario_name)

    tree.fit(
        # todo refactor
        auto_tune=True if tune_parameters == 'true' else False,
        max_depth=tune_parameters.max_depth,
        columns=tune_parameters.columns,
        min_samples_leaf=tune_parameters.min_samples_per_leaf,
        max_leaf_nodes=tune_parameters.max_leaf_nodes
    )

    kpi = KPICalculator(tune_parameters.project_name, tune_parameters.scenario_name)
    kpi.run(do_optimization_kpi=False)
except Exception as exc:
    Loggers.application.exception(exc)
    exc_final = Exception(str(exc))
    exc_final.__traceback__ = exc.__traceback__
    raise exc_final