multiprocessing.pool.imap 是否有允许多个参数的变体(如星图)?
Does multiprocessing.pool.imap has a variant (like starmap) that allows for multiple arguments?
我正在对大量字节进行一些计算。该进程在字节块上运行。我正在尝试使用使用多处理的并行处理来提高性能。最初我尝试使用 pool.map 但它只允许单个参数,然后我发现了 pool.starmap。但是 pool.starmap 只有在所有进程都完成后才会给出结果。我想要结果(某种程度上)。我正在尝试使用 pool.imap ,它在进程完成时提供结果但不允许多个参数(我的函数需要 2 个参数)。此外,结果的顺序很重要。
下面是一些示例代码:
pool = mp.Pool(processes=4)
y = []
for x in pool.starmap(f, zip(da, repeat(db))):
y.append(x)
以上代码有效,但仅在所有过程完成后才给出结果。我看不到任何进展。这就是为什么我尝试使用 pool.imap,效果很好但只有一个参数:
pool = mp.Pool(processes=4)
y = []
for x in pool.imap(f, da)):
y.append(x)
在多个参数上引发以下异常:
TypeError: f() missing 1 required positional argument: 'd'
寻找实现所有 3 个要求的简单方法:
- 使用多个并行处理 parameters/arguments
- 在进程 运行
时设法查看进度
- 订购的结果。
谢谢!
我可以很快回答前两个问题。我想你在理解前两个问题后应该能够处理第三个问题。
1.多参数并行处理
我不确定整个 "starmap" 等效项,但这里有一个替代方案。我过去所做的是将我的论点压缩成一个单一的数据对象,如列表。例如,如果您想将三个参数传递给 map_function
,您可以将这些参数附加到一个列表中,然后将该列表与 .map()
或 .imap()
函数一起使用。
def map_function(combo):
a = combo[0]
b = combo[1]
c = combo[2]
return a + b + c
if '__name__' == '__main__':
combo = []
combo[0] = arg_1
combo[1] = arg_2
combo[2] = arg_3
pool = Pool(processes=4)
pool.map(map_function, combo)
2。跟踪进度
执行此操作的一个好方法是使用 multiprocessing
的共享值。实际上我大约一个月前问过这个。这允许您从 map
函数创建的不同进程中操作相同的变量。为了学习,我将让您自己阅读并找出共享状态解决方案。如果您尝试几次后仍然遇到问题,我会非常乐意为您提供帮助,但我相信自学如何理解某些东西比我给您答案更有价值。
希望对您有所帮助!!
我认为这个解决方案完全符合您的 3 个要求:
总之,p = Pool(); p.imap
会让你看到进步,维护秩序。如果您想要 map
具有多个参数的函数,您可以使用 multiprocessing
的分支,它提供更好的序列化和多个参数。有关示例,请参见 link。
您可以通过 functools.partial()
函数使用 imap
模拟 starmap
:
import functools
import multiprocessing as mp
def my_function(constant, my_list, optional_param=None):
print(locals())
with mp.Pool() as pool:
list(
pool.imap(
functools.partial(
my_function, 2, optional_param=3
),
[1,2,3,4,5]
)
)
输出:
$ python3 foo.py
{'optional_param': 3, 'my_list': 1, 'constant': 2}
{'optional_param': 3, 'my_list': 3, 'constant': 2}
{'optional_param': 3, 'my_list': 2, 'constant': 2}
{'optional_param': 3, 'my_list': 4, 'constant': 2}
{'optional_param': 3, 'my_list': 5, 'constant': 2}
我正在对大量字节进行一些计算。该进程在字节块上运行。我正在尝试使用使用多处理的并行处理来提高性能。最初我尝试使用 pool.map 但它只允许单个参数,然后我发现了 pool.starmap。但是 pool.starmap 只有在所有进程都完成后才会给出结果。我想要结果(某种程度上)。我正在尝试使用 pool.imap ,它在进程完成时提供结果但不允许多个参数(我的函数需要 2 个参数)。此外,结果的顺序很重要。
下面是一些示例代码:
pool = mp.Pool(processes=4)
y = []
for x in pool.starmap(f, zip(da, repeat(db))):
y.append(x)
以上代码有效,但仅在所有过程完成后才给出结果。我看不到任何进展。这就是为什么我尝试使用 pool.imap,效果很好但只有一个参数:
pool = mp.Pool(processes=4)
y = []
for x in pool.imap(f, da)):
y.append(x)
在多个参数上引发以下异常:
TypeError: f() missing 1 required positional argument: 'd'
寻找实现所有 3 个要求的简单方法:
- 使用多个并行处理 parameters/arguments
- 在进程 运行 时设法查看进度
- 订购的结果。
谢谢!
我可以很快回答前两个问题。我想你在理解前两个问题后应该能够处理第三个问题。
1.多参数并行处理
我不确定整个 "starmap" 等效项,但这里有一个替代方案。我过去所做的是将我的论点压缩成一个单一的数据对象,如列表。例如,如果您想将三个参数传递给 map_function
,您可以将这些参数附加到一个列表中,然后将该列表与 .map()
或 .imap()
函数一起使用。
def map_function(combo):
a = combo[0]
b = combo[1]
c = combo[2]
return a + b + c
if '__name__' == '__main__':
combo = []
combo[0] = arg_1
combo[1] = arg_2
combo[2] = arg_3
pool = Pool(processes=4)
pool.map(map_function, combo)
2。跟踪进度
执行此操作的一个好方法是使用 multiprocessing
的共享值。实际上我大约一个月前问过这个map
函数创建的不同进程中操作相同的变量。为了学习,我将让您自己阅读并找出共享状态解决方案。如果您尝试几次后仍然遇到问题,我会非常乐意为您提供帮助,但我相信自学如何理解某些东西比我给您答案更有价值。
希望对您有所帮助!!
我认为这个解决方案完全符合您的 3 个要求:
总之,p = Pool(); p.imap
会让你看到进步,维护秩序。如果您想要 map
具有多个参数的函数,您可以使用 multiprocessing
的分支,它提供更好的序列化和多个参数。有关示例,请参见 link。
您可以通过 functools.partial()
函数使用 imap
模拟 starmap
:
import functools
import multiprocessing as mp
def my_function(constant, my_list, optional_param=None):
print(locals())
with mp.Pool() as pool:
list(
pool.imap(
functools.partial(
my_function, 2, optional_param=3
),
[1,2,3,4,5]
)
)
输出:
$ python3 foo.py
{'optional_param': 3, 'my_list': 1, 'constant': 2}
{'optional_param': 3, 'my_list': 3, 'constant': 2}
{'optional_param': 3, 'my_list': 2, 'constant': 2}
{'optional_param': 3, 'my_list': 4, 'constant': 2}
{'optional_param': 3, 'my_list': 5, 'constant': 2}