Python 多处理 return 结果在 chunksize 中设置

Python Multiprocessing return results as set in chunksize

我想使用名为 get_scores_dataframe 的函数处理存储在 file_list 中的大量 csv 文件。此函数采用存储在另一个列表中的第二个参数 phenotypes。然后该函数将结果写回 csv 文件。我设法使用 ProcessPoolExecutor() 并行化了这个任务,因此它起作用了。

   with concurrent.futures.ProcessPoolExecutor() as executor:
        phenotypes = [phenotype for i in range(len(file_list))]
        futures = executor.map(get_scores_dataframe, file_list, phenotypes,
                                    chunksize=25)
        filenames = executor.map(os.path.basename, file_list)
        for future, filename  in zip(futures, filenames):
                futures.to_csv(os.path.join(f'{output_path}',f'{filename}.csv'),
                              index = False)

如您所见,我为此使用了上下文管理器,在上下文管理器中,我可以在方法 map() 中设置选项 chunksize。但是,我希望程序在完成处理每个数据帧时写入 csv 文件。似乎上下文管理器一直等到所有作业都完成,然后将结果写入 csv 文件。

你知道我怎样才能做到这一点吗?

首先,executor.map 没有 return Future 个实例,因此您的变量 futures 命名不当。它执行 return 一个迭代器,该迭代器依次将 get_scores_dataframe 应用于 file_list 的每个元素,从而产生 return 值。其次,看看接下来如何使用它,这些 return 值似乎是输入文件(与输入参数可能是也可能不是同一个文件——由于缺少显示的代码,无法确定).此外,使用进程池 map 函数而不是内置 map 函数来获取文件名参数的基本名称似乎有些矫枉过正。最后,在您的代码中,它不会是 futures.to_csv,而是 future.to_csv。所以我很困惑你的代码是如何工作的。

如果您将函数 get_scores_dataframe 修改为 return 由数据帧和原始传递的文件名参数组成的元组,那么我们可以使用 as_competed 按完成顺序处理结果:

from concurrent.futures import as_completed
import multiprocessing

with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count() - 1) as executor:
    futures = [executor.submit(get_scores_dataframe, file, phenotype) for file in file_list]
    for future in as_completed(futures):
        # it is assumed return value is tuple: (data frame, original filename argument):
        df, file = future.result()
        csv_filename = os.path.basename(file)
        df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)

现在,通过使用 submit,您将失去“分块”工作提交的能力。我们可以切换到使用 multiprocessing.Poolimap_unordered。但是 imap_unordered 只能将单个参数传递给辅助函数。因此,如果您能够修改您的 worker 以更改参数的顺序,我们可以将 phenotype 作为第一个并使用 partial (see manual):

import multiprocessing
from functools import partial


POOL_SIZE = multiprocessing.cpu_count() - 1 # leave 1 for main process


def compute_chunksize(iterable_size):
    if iterable_size == 0:
        return 0
    chunksize, extra = divmod(iterable_size, POOL_SIZE * 4)
    if extra:
        chunksize += 1
    return chunksize


with multiprocessing.Pool(POOL_SIZE) as pool:
    chunksize = compute_chunksize(len(file_list))
    worker = partial(get_scores_dataframe, phenotype)
    # it is assumed that start_processing returns a tuple: (data frame, original filename argument)
    for df, file in pool.imap_unordered(worker, file_list, chunksize):
        csv_filename = os.path.basename(file)
        df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)