Python concurrent.futures: 处理子进程中的异常

Python concurrent.futures: handling exceptions in child processes

我有一个非常普通的 concurrent.futures.ProcessPoolExecutor 实现——类似于(使用 Python 3.6):

files = get_files()
processor = get_processor_instance()
with concurrent.futures.ProcessPoolExecutor() as executor:
    list(executor.map(processor.process, files))

虽然 processor 是许多可用处理器 classes 中的任何一个的实例,但它们都共享 process 方法,大致如下所示:

def process(self, file):
    log.debug(f"Processing source file {file.name}.")
    with DBConnection(self.db_url) as session:
        file = session.merge(file)
        session.refresh(file)
        self._set_file(file)
        timer = perf_counter()
        try:
            self.records = self._get_records()
            self._save_output()
        except Exception as ex:
            log.warning(f"Failed to process source file {file.ORIGINAL_NAME}: {ex}")
            self.error_time = time.time()
            self.records = None
        else:
            process_duration = perf_counter() - timer
            log.info(f'File {file.name} processed in {process_duration:.6f} seconds.')
            file.process_duration = process_duration
        session.commit()

_get_records_save_output 方法的实现因 class 而异,但我的问题是错误处理。我故意测试它,以便这两种方法中的一种 运行 内存不足,但我希望上面的 except 块能够捕获它并移动下一个文件——这正是当我 运行 单个进程中的代码时会发生什么。

如果我如上所述使用 ProcessPoolExecutor,它会引发 BrokenProcessPool 异常并终止所有执行:

Traceback (most recent call last):
  File "/vagrant/myapp/myapp.py", line 94, in _process
    list(executor.map(processor.process, files))
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/process.py", line 366, in _chain_from_iterable_of_lists
    for element in iterable:
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 586, in result_iterator
    yield fs.pop().result()
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 432, in result
    return self.__get_result()
  File "/home/ubuntu/.pyenv/versions/3.6.3/lib/python3.6/concurrent/futures/_base.py", line 384, in __get_result
    raise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.

我当然可以在调用代码中捕获 BrokenProcessPool,但我更愿意在内部处理错误并继续处理下一个文件。

我也试过使用标准 multiprocessing.Pool 对象,像这样:

with multiprocessing.Pool() as pool:
    pool.map(processor.process, files)

在这种情况下,行为甚至更奇怪:在开始处理引发内存不足错误的前两个文件后,它会继续处理后面的文件,这些文件较小,因此会被完全处理。然而,except 块显然永远不会被触发(没有日志消息,没有 error_time),应用程序只是挂起,既没有完成也没有做任何事情,直到被手动杀死。

我希望 try..except 块能让每个进程独立,处理自己的错误而不影响主应用程序。有什么想法可以实现吗?

因此,经过大量调试(并感谢@RomanPerekhrest 关于检查 executor 对象的建议),我找到了原因。如问题中所述,测试数据由许多文件组成,其中两个文件非常大(每个文件超过 100 万行 CSV)。这两个都导致我的测试机器(一个 2GB VM)阻塞,但方式不同——第一个更大,导致常规内存不足错误,该错误将由 except 处理,第二个简直造成了sigkill。在不探索太多的情况下,我怀疑较大的文件根本无法在读取时放入内存(在 _get_records 方法中完成),而较小的文件可以,但随后对其进行操作(在 [=15 中完成) =]) caused the overflow 并终止了进程。

我的解决方案是简单地捕获 BrokenProcessPool 异常并通知用户这个问题;我还添加了一个在一个进程中运行处理任务的选项,在这种情况下,任何太大的文件都被简单地标记为有错误:

files = get_files()
processor = get_processor_instance()
results = []
if args.nonconcurrent:
    results = list(map(processor.process, files))
else:
    with concurrent.futures.ProcessPoolExecutor() as executor:
        try:
            results = list(executor.map(processor.process, files))
        except concurrent.futures.process.BrokenProcessPool as ex:
            raise MyCustomProcessingError(
                f"{ex} This might be caused by limited system resources. "
                "Try increasing system memory or disable concurrent processing "
                "using the --nonconcurrent option."
            )