多处理 Pool.apply 执行 n-1 次
Multiprocessing Pool.apply executing n-1 times
我遇到了 multiprocessing.Pool.apply
的问题。
我的 objective 是有 5 个进程,每个进程用 100 个元素填充一个数组(此测试为 100 个),然后将数组合并为一个长度为 500 的数组。问题是,它最终只有 400 个元素任何我无法理解的原因。
我曾尝试更改池创建的进程数量,但除了执行时间外,根本没有任何改变。
import torch.multiprocessing as mp
import itertools
pool = mp.Pool(processes=5)
split = int(500/5)
lst = pool.apply(RampedGraph, (split,[])) #each foo returns a list of 100 elements
lst = list(itertools.chain.from_iterable(lst)) #merging the lists into one
len(lst)
>>>400
len(lst)
的期望输出应该是 500
。
谁能告诉我我做错了什么?
EDIT Foo 方法解释:
def RampedGraph(popsize, graph_lst):
cyclic_size = int(math.ceil(popsize/2))
acyclic_size = popsize - full_size
append = graph_lst.append
for _ in range(cyclic_size):
t = c.Node().cyclic()
nn = c.number_of_nodes()
c = c.calculate(0, False)
append((t,nn,c))
for _ in range(acyclic_size):
t = c.Node().acyclic()
nn = c.number_of_nodes()
c = c.calculate(0, False)
append((t,nn,c))
return graph_lst
import torch.multiprocessing as mp
# import multiprocessing as mp
import itertools
def RampedGraph(popsize, graph_lst):
print(mp.current_process().name)
return list(range(100))
num_workers = 5
pool = mp.Pool(processes=num_workers)
split = int(500/num_workers)
lst = pool.starmap(RampedGraph, [(split,[])]*num_workers)
lst = list(itertools.chain.from_iterable(lst))
print(len(lst))
# 500
pool.starmap(RampedGraph, [(split,[])]*5)
发送5个任务到任务池。
它导致 RampedGraph(split, [])
被并发调用 5 次。
RampedGraph
返回的 5 个结果被收集到一个列表中,lst
.
请注意,同时调用 RampedGraph
5 次并不能保证运行使用所有 5 个处理器。例如,如果 RampedGraph
很快完成,则一个处理器可能处理多个任务,而另一个处理器可能根本没有被使用。
但是,如果 RampedGraph
花费的时间很长,通常您可以预期使用所有 5 个工作进程。
注意:我运行上面的代码用的是import multiprocessing as mp
而不是import torch.multiprocessing as mp
。但由于 torch.multiprocessing
应该是 multiprocessing
的直接替代品,所以应该没有什么区别。
使用 multiprocessing
既有成本也有收益。
当然,好处是能够同时使用多个处理器。
成本包括启动额外进程所需的时间,以及进程间通信的成本。 multiprocessing
使用 Queues
将工作进程 运行 传递给函数 运行 的参数,并将返回值传递回主进程 运行 .为了通过队列传输返回值,对象通过 pickling 序列化为字节。如果通过队列发送的 pickled 对象很大,这会在使用多处理时增加显着的开销成本。请注意,所有这些成本都不是由相同代码的等效顺序版本引起的。
特别是当工作进程的函数 运行 快速完成时,开销成本可能会占据程序的总 运行 时间,使得使用多处理的代码比相同的顺序版本慢代码。
因此,使用多处理时提高速度的关键是尽量减少进程间通信,并确保工作进程做大量工作,以便开销成本成为总成本中相对较小的一部分 运行时间。
我遇到了 multiprocessing.Pool.apply
的问题。
我的 objective 是有 5 个进程,每个进程用 100 个元素填充一个数组(此测试为 100 个),然后将数组合并为一个长度为 500 的数组。问题是,它最终只有 400 个元素任何我无法理解的原因。
我曾尝试更改池创建的进程数量,但除了执行时间外,根本没有任何改变。
import torch.multiprocessing as mp
import itertools
pool = mp.Pool(processes=5)
split = int(500/5)
lst = pool.apply(RampedGraph, (split,[])) #each foo returns a list of 100 elements
lst = list(itertools.chain.from_iterable(lst)) #merging the lists into one
len(lst)
>>>400
len(lst)
的期望输出应该是 500
。
谁能告诉我我做错了什么?
EDIT Foo 方法解释:
def RampedGraph(popsize, graph_lst):
cyclic_size = int(math.ceil(popsize/2))
acyclic_size = popsize - full_size
append = graph_lst.append
for _ in range(cyclic_size):
t = c.Node().cyclic()
nn = c.number_of_nodes()
c = c.calculate(0, False)
append((t,nn,c))
for _ in range(acyclic_size):
t = c.Node().acyclic()
nn = c.number_of_nodes()
c = c.calculate(0, False)
append((t,nn,c))
return graph_lst
import torch.multiprocessing as mp
# import multiprocessing as mp
import itertools
def RampedGraph(popsize, graph_lst):
print(mp.current_process().name)
return list(range(100))
num_workers = 5
pool = mp.Pool(processes=num_workers)
split = int(500/num_workers)
lst = pool.starmap(RampedGraph, [(split,[])]*num_workers)
lst = list(itertools.chain.from_iterable(lst))
print(len(lst))
# 500
pool.starmap(RampedGraph, [(split,[])]*5)
发送5个任务到任务池。
它导致 RampedGraph(split, [])
被并发调用 5 次。
RampedGraph
返回的 5 个结果被收集到一个列表中,lst
.
请注意,同时调用 RampedGraph
5 次并不能保证运行使用所有 5 个处理器。例如,如果 RampedGraph
很快完成,则一个处理器可能处理多个任务,而另一个处理器可能根本没有被使用。
但是,如果 RampedGraph
花费的时间很长,通常您可以预期使用所有 5 个工作进程。
注意:我运行上面的代码用的是import multiprocessing as mp
而不是import torch.multiprocessing as mp
。但由于 torch.multiprocessing
应该是 multiprocessing
的直接替代品,所以应该没有什么区别。
使用 multiprocessing
既有成本也有收益。
当然,好处是能够同时使用多个处理器。
成本包括启动额外进程所需的时间,以及进程间通信的成本。 multiprocessing
使用 Queues
将工作进程 运行 传递给函数 运行 的参数,并将返回值传递回主进程 运行 .为了通过队列传输返回值,对象通过 pickling 序列化为字节。如果通过队列发送的 pickled 对象很大,这会在使用多处理时增加显着的开销成本。请注意,所有这些成本都不是由相同代码的等效顺序版本引起的。
特别是当工作进程的函数 运行 快速完成时,开销成本可能会占据程序的总 运行 时间,使得使用多处理的代码比相同的顺序版本慢代码。
因此,使用多处理时提高速度的关键是尽量减少进程间通信,并确保工作进程做大量工作,以便开销成本成为总成本中相对较小的一部分 运行时间。