如何使用多处理包并行化 python 中的 for 循环?
How can I parallelize a for loop in python using multiprocessing package?
注意:我不需要 processes/threads 之间的任何通信,我只对完成信号感兴趣(这就是我将此问题发布为新的,因为我发现所有其他示例都相互通信)。
如何使用 Python 3 中的 multiprocessing
包来并行化以下代码(最终目标是使其 运行 更快):
a = 123
b = 456
for id in ids: # len(ids) = 10'000
# executes a binary with CLI flags
run_binary_with_id(id, a, b)
# i.e. runs "./hello_world_exec --id id --a a --b b" which takes about 30 seconds on average
我尝试了以下方法:
import multiprocessing as mp
def run_binary_with_id(id, a, b):
run_command('./hello_world_exec --id {} --a {} --b {}'.format(id, a, b))
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
a = 123
b = 456
ids = range(10000)
for id in ids:
p = ctx.Process(target=run_binary_with_id, args=(id,a,b))
p.start()
p.join()
# The binary was executed len(ids) number of times, do other stuff assuming everything's completed at this point
或
for id in ids:
map.apply_async(run_binary_with_id, (id,a,b))
在 similar question 中,答案如下:
def consume(iterator):
deque(iterator, max_len=0)
x=pool.imap_unordered(f,((i,j) for i in range(10000) for j in range(10000)))
consume(x)
我完全不明白(为什么我需要这个 consume()
)。
尝试并行生成 10000 个进程到 运行 几乎肯定会使您的系统超载,并且由于涉及的开销,使其 运行 比按顺序 运行 进程慢在 OS 当进程数量远远超过系统拥有的 CPUs/cores 数量时,必须不断地在进程之间执行上下文切换。
您可以改为使用 multiprocessing.Pool
来限制为任务生成的工作进程数。 Pool
构造函数默认将进程数限制为系统的内核数,但如果您愿意,可以使用 processes
参数对其进行微调。然后,您可以使用它的 map
方法轻松地将要应用于给定函数的参数序列并行映射到 运行。但是,它只能将一个参数映射到函数,因此您必须使用 functools.partial
为其他参数提供默认值,在您的情况下不会在调用之间更改:
from functools import partial
if __name__ == '__main__':
_run_binary_with_id = partial(run_binary_with_id, a=123, b=456)
with mp.Pool() as pool:
pool.map(_run_binary_with_id, range(10000))
注意:我不需要 processes/threads 之间的任何通信,我只对完成信号感兴趣(这就是我将此问题发布为新的,因为我发现所有其他示例都相互通信)。
如何使用 Python 3 中的 multiprocessing
包来并行化以下代码(最终目标是使其 运行 更快):
a = 123
b = 456
for id in ids: # len(ids) = 10'000
# executes a binary with CLI flags
run_binary_with_id(id, a, b)
# i.e. runs "./hello_world_exec --id id --a a --b b" which takes about 30 seconds on average
我尝试了以下方法:
import multiprocessing as mp
def run_binary_with_id(id, a, b):
run_command('./hello_world_exec --id {} --a {} --b {}'.format(id, a, b))
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
a = 123
b = 456
ids = range(10000)
for id in ids:
p = ctx.Process(target=run_binary_with_id, args=(id,a,b))
p.start()
p.join()
# The binary was executed len(ids) number of times, do other stuff assuming everything's completed at this point
或
for id in ids:
map.apply_async(run_binary_with_id, (id,a,b))
在 similar question 中,答案如下:
def consume(iterator):
deque(iterator, max_len=0)
x=pool.imap_unordered(f,((i,j) for i in range(10000) for j in range(10000)))
consume(x)
我完全不明白(为什么我需要这个 consume()
)。
尝试并行生成 10000 个进程到 运行 几乎肯定会使您的系统超载,并且由于涉及的开销,使其 运行 比按顺序 运行 进程慢在 OS 当进程数量远远超过系统拥有的 CPUs/cores 数量时,必须不断地在进程之间执行上下文切换。
您可以改为使用 multiprocessing.Pool
来限制为任务生成的工作进程数。 Pool
构造函数默认将进程数限制为系统的内核数,但如果您愿意,可以使用 processes
参数对其进行微调。然后,您可以使用它的 map
方法轻松地将要应用于给定函数的参数序列并行映射到 运行。但是,它只能将一个参数映射到函数,因此您必须使用 functools.partial
为其他参数提供默认值,在您的情况下不会在调用之间更改:
from functools import partial
if __name__ == '__main__':
_run_binary_with_id = partial(run_binary_with_id, a=123, b=456)
with mp.Pool() as pool:
pool.map(_run_binary_with_id, range(10000))