Python 多处理 return 结果在 chunksize 中设置
Python Multiprocessing return results as set in chunksize
我想使用名为 get_scores_dataframe
的函数处理存储在 file_list
中的大量 csv 文件。此函数采用存储在另一个列表中的第二个参数 phenotypes
。然后该函数将结果写回 csv 文件。我设法使用 ProcessPoolExecutor()
并行化了这个任务,因此它起作用了。
with concurrent.futures.ProcessPoolExecutor() as executor:
phenotypes = [phenotype for i in range(len(file_list))]
futures = executor.map(get_scores_dataframe, file_list, phenotypes,
chunksize=25)
filenames = executor.map(os.path.basename, file_list)
for future, filename in zip(futures, filenames):
futures.to_csv(os.path.join(f'{output_path}',f'{filename}.csv'),
index = False)
如您所见,我为此使用了上下文管理器,在上下文管理器中,我可以在方法 map()
中设置选项 chunksize
。但是,我希望程序在完成处理每个数据帧时写入 csv 文件。似乎上下文管理器一直等到所有作业都完成,然后将结果写入 csv 文件。
你知道我怎样才能做到这一点吗?
首先,executor.map
没有 return Future
个实例,因此您的变量 futures
命名不当。它执行 return 一个迭代器,该迭代器依次将 get_scores_dataframe
应用于 file_list
的每个元素,从而产生 return 值。其次,看看接下来如何使用它,这些 return 值似乎是输入文件(与输入参数可能是也可能不是同一个文件——由于缺少显示的代码,无法确定).此外,使用进程池 map
函数而不是内置 map
函数来获取文件名参数的基本名称似乎有些矫枉过正。最后,在您的代码中,它不会是 futures.to_csv
,而是 future.to_csv
。所以我很困惑你的代码是如何工作的。
如果您将函数 get_scores_dataframe
修改为 return 由数据帧和原始传递的文件名参数组成的元组,那么我们可以使用 as_competed
按完成顺序处理结果:
from concurrent.futures import as_completed
import multiprocessing
with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count() - 1) as executor:
futures = [executor.submit(get_scores_dataframe, file, phenotype) for file in file_list]
for future in as_completed(futures):
# it is assumed return value is tuple: (data frame, original filename argument):
df, file = future.result()
csv_filename = os.path.basename(file)
df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)
现在,通过使用 submit
,您将失去“分块”工作提交的能力。我们可以切换到使用 multiprocessing.Pool
和 imap_unordered
。但是 imap_unordered
只能将单个参数传递给辅助函数。因此,如果您能够修改您的 worker 以更改参数的顺序,我们可以将 phenotype
作为第一个并使用 partial
(see manual):
import multiprocessing
from functools import partial
POOL_SIZE = multiprocessing.cpu_count() - 1 # leave 1 for main process
def compute_chunksize(iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, POOL_SIZE * 4)
if extra:
chunksize += 1
return chunksize
with multiprocessing.Pool(POOL_SIZE) as pool:
chunksize = compute_chunksize(len(file_list))
worker = partial(get_scores_dataframe, phenotype)
# it is assumed that start_processing returns a tuple: (data frame, original filename argument)
for df, file in pool.imap_unordered(worker, file_list, chunksize):
csv_filename = os.path.basename(file)
df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)
我想使用名为 get_scores_dataframe
的函数处理存储在 file_list
中的大量 csv 文件。此函数采用存储在另一个列表中的第二个参数 phenotypes
。然后该函数将结果写回 csv 文件。我设法使用 ProcessPoolExecutor()
并行化了这个任务,因此它起作用了。
with concurrent.futures.ProcessPoolExecutor() as executor:
phenotypes = [phenotype for i in range(len(file_list))]
futures = executor.map(get_scores_dataframe, file_list, phenotypes,
chunksize=25)
filenames = executor.map(os.path.basename, file_list)
for future, filename in zip(futures, filenames):
futures.to_csv(os.path.join(f'{output_path}',f'{filename}.csv'),
index = False)
如您所见,我为此使用了上下文管理器,在上下文管理器中,我可以在方法 map()
中设置选项 chunksize
。但是,我希望程序在完成处理每个数据帧时写入 csv 文件。似乎上下文管理器一直等到所有作业都完成,然后将结果写入 csv 文件。
你知道我怎样才能做到这一点吗?
首先,executor.map
没有 return Future
个实例,因此您的变量 futures
命名不当。它执行 return 一个迭代器,该迭代器依次将 get_scores_dataframe
应用于 file_list
的每个元素,从而产生 return 值。其次,看看接下来如何使用它,这些 return 值似乎是输入文件(与输入参数可能是也可能不是同一个文件——由于缺少显示的代码,无法确定).此外,使用进程池 map
函数而不是内置 map
函数来获取文件名参数的基本名称似乎有些矫枉过正。最后,在您的代码中,它不会是 futures.to_csv
,而是 future.to_csv
。所以我很困惑你的代码是如何工作的。
如果您将函数 get_scores_dataframe
修改为 return 由数据帧和原始传递的文件名参数组成的元组,那么我们可以使用 as_competed
按完成顺序处理结果:
from concurrent.futures import as_completed
import multiprocessing
with concurrent.futures.ProcessPoolExecutor(multiprocessing.cpu_count() - 1) as executor:
futures = [executor.submit(get_scores_dataframe, file, phenotype) for file in file_list]
for future in as_completed(futures):
# it is assumed return value is tuple: (data frame, original filename argument):
df, file = future.result()
csv_filename = os.path.basename(file)
df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)
现在,通过使用 submit
,您将失去“分块”工作提交的能力。我们可以切换到使用 multiprocessing.Pool
和 imap_unordered
。但是 imap_unordered
只能将单个参数传递给辅助函数。因此,如果您能够修改您的 worker 以更改参数的顺序,我们可以将 phenotype
作为第一个并使用 partial
(see manual):
import multiprocessing
from functools import partial
POOL_SIZE = multiprocessing.cpu_count() - 1 # leave 1 for main process
def compute_chunksize(iterable_size):
if iterable_size == 0:
return 0
chunksize, extra = divmod(iterable_size, POOL_SIZE * 4)
if extra:
chunksize += 1
return chunksize
with multiprocessing.Pool(POOL_SIZE) as pool:
chunksize = compute_chunksize(len(file_list))
worker = partial(get_scores_dataframe, phenotype)
# it is assumed that start_processing returns a tuple: (data frame, original filename argument)
for df, file in pool.imap_unordered(worker, file_list, chunksize):
csv_filename = os.path.basename(file)
df.to_csv(os.path.join(f'{output_path}', f'{csv_filename}.csv'), index = False)