这种情况下的多处理最佳实践?
Multiprocessing best practice in this case?
目前我的代码中有一个有效的多处理设置,但我对此并不十分满意,想知道是否有人可以提出更好的方法。
def fnc_to_parallelize(args):
file, arg1, arg2, arg3, arg4, arg5 = args
return
tasks = [(file,arg1, arg2, arg3, arg4, arg5) for file in files_to_run]
pool = mp.Pool(nparallel)
for _ in tqdm.tqdm(pool.imap_unordered(fnc, tasks), total=len(tasks)):
pass
所以它所做的是 运行 一个并行操作文件的函数,具有一组输入参数,其中大多数参数永远不会改变。
我通常有 100k-500k 个文件需要 运行,但是 运行 文件所花费的时间可能非常不同(一些文件需要不到一秒,而其他文件需要 10 分钟)。
我不喜欢我的剧本的地方:
- 我目前必须创建包含 300k-500k 元素的任务列表,每个元素都包含 n 个对所有元素都相同的参数(这并不是什么大问题,只是让我觉得丑陋)。
- my fnc_to_parallelize 只能接受一个参数,然后我必须在函数中解包。这看起来真的很难看,因为我也希望能够以非并行方式调用此函数,我只指定参数,但目前我必须以一种不直观的方式来调用此函数,其中参数存储在元组中或先听写。
有没有人对我如何以更好的方式进行并行化提出建议?我特别想要一个建议,这样我就可以将 fnc_to_parallelize 设为:
def fnc_to_parallelize(file, arg1, arg2, arg3, arg4, arg5):
return
因为如果没有在并行化中使用,我通常会这样保留这个函数,但据我所知,imap_unordered 不允许这样做,而且我不确定我要做什么可以替换成.
改用 concurrent.futures
模块,它提供了更高级别 API 来使用多处理池:
import concurrent.futures
import time
import random
def fnc_to_parallelize(file, arg1, arg2, arg3, arg4, arg5):
time.sleep(random.randint(1, 30))
return file
files_to_run = ["file{}".format(x) for x in range(100000)]
pool = concurrent.futures.ProcessPoolExecutor(max_workers=100)
args = ["arg1", "arg2", "arg3", "arg4", "arg5"]
tasks = [pool.submit(fnc_to_parallelize, file, *args) for file in files_to_run]
for task in concurrent.futures.as_completed(tasks):
print("result:", task.result())
目前我的代码中有一个有效的多处理设置,但我对此并不十分满意,想知道是否有人可以提出更好的方法。
def fnc_to_parallelize(args):
file, arg1, arg2, arg3, arg4, arg5 = args
return
tasks = [(file,arg1, arg2, arg3, arg4, arg5) for file in files_to_run]
pool = mp.Pool(nparallel)
for _ in tqdm.tqdm(pool.imap_unordered(fnc, tasks), total=len(tasks)):
pass
所以它所做的是 运行 一个并行操作文件的函数,具有一组输入参数,其中大多数参数永远不会改变。 我通常有 100k-500k 个文件需要 运行,但是 运行 文件所花费的时间可能非常不同(一些文件需要不到一秒,而其他文件需要 10 分钟)。
我不喜欢我的剧本的地方:
- 我目前必须创建包含 300k-500k 元素的任务列表,每个元素都包含 n 个对所有元素都相同的参数(这并不是什么大问题,只是让我觉得丑陋)。
- my fnc_to_parallelize 只能接受一个参数,然后我必须在函数中解包。这看起来真的很难看,因为我也希望能够以非并行方式调用此函数,我只指定参数,但目前我必须以一种不直观的方式来调用此函数,其中参数存储在元组中或先听写。
有没有人对我如何以更好的方式进行并行化提出建议?我特别想要一个建议,这样我就可以将 fnc_to_parallelize 设为:
def fnc_to_parallelize(file, arg1, arg2, arg3, arg4, arg5):
return
因为如果没有在并行化中使用,我通常会这样保留这个函数,但据我所知,imap_unordered 不允许这样做,而且我不确定我要做什么可以替换成.
改用 concurrent.futures
模块,它提供了更高级别 API 来使用多处理池:
import concurrent.futures
import time
import random
def fnc_to_parallelize(file, arg1, arg2, arg3, arg4, arg5):
time.sleep(random.randint(1, 30))
return file
files_to_run = ["file{}".format(x) for x in range(100000)]
pool = concurrent.futures.ProcessPoolExecutor(max_workers=100)
args = ["arg1", "arg2", "arg3", "arg4", "arg5"]
tasks = [pool.submit(fnc_to_parallelize, file, *args) for file in files_to_run]
for task in concurrent.futures.as_completed(tasks):
print("result:", task.result())