Python:如何使用 MPI 并行化一个简单的循环
Python: how to parallelizing a simple loop with MPI
我需要用 MPI 重写一个简单的 for 循环,因为每一步都很耗时。假设我有一个包含多个 np.array 的列表,我想对每个数组应用一些计算。例如:
def myFun(x):
return x+2 # simple example, the real one would be complicated
dat = [np.random.rand(3,2), np.random.rand(3,2),np.random.rand(3,2),np.random.rand(3,2)] # real data would be much larger
result = []
for item in dat:
result.append(myFun(item))
而不是使用上面的简单for循环,我想使用MPI到运行上面代码的'for loop'部分与24并行不同的节点我也希望结果列表中的项目顺序与数据列表相同。
注意数据是从其他文件读取的,每个处理器可以处理'fix'。
我以前没有用过mpi,所以这让我卡住了一段时间。
为简单起见,我们假设 master 进程(带有 rank = 0
的进程)是将整个文件从磁盘读入内存的进程。只有了解以下 MPI 例程 Get_size()
、Get_rank()
、scatter
和 gather
.
才能解决此问题
Returns the number of processes in the communicator. It will return
the same number to every process.
Determines the rank of the calling process in the communicator.
在 MPI 中,每个进程都分配了一个等级,从 0 到 N - 1,其中 N 是进程总数 运行。
MPI_Scatter involves a designated root process sending data to all
processes in a communicator. The primary difference between MPI_Bcast
and MPI_Scatter is small but important. MPI_Bcast sends the same piece
of data to all processes while MPI_Scatter sends chunks of an array to
different processes.
和 gather:
MPI_Gather is the inverse of MPI_Scatter. Instead of spreading
elements from one process to many processes, MPI_Gather takes elements
from many processes and gathers them to one single process.
显然,您应该首先按照教程阅读 MPI 文档以了解其并行编程模型及其例程。否则,您会发现很难理解它是如何工作的。也就是说,您的代码可能如下所示:
from mpi4py import MPI
def myFun(x):
return x+2 # simple example, the real one would be complicated
comm = MPI.COMM_WORLD
rank = comm.Get_rank() # get your process ID
data = # init the data
if rank == 0: # The master is the only process that reads the file
data = # something read from file
# Divide the data among processes
data = comm.scatter(data, root=0)
result = []
for item in data:
result.append(myFun(item))
# Send the results back to the master processes
newData = comm.gather(result,root=0)
这样,每个进程将只(并行)处理特定的数据块。完成工作后,每个进程将其数据块(即 comm.gather(result,root=0)
)发送回 master 进程。这只是一个玩具示例,现在由您根据您的测试环境和代码进行改进。
您可以采用@dreamcrash 的回答中所示的低级 MPI 方式,也可以采用更 Pythonic 的解决方案,该解决方案使用与标准 Python multiprocessing
模块。
首先,您需要将您的代码变成更函数式的代码,注意您实际上是在执行 map
操作,该操作将 myFun
应用于 [=20= 的每个元素]:
def myFun(x):
return x+2 # simple example, the real one would be complicated
dat = [
np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger
result = map(myFun, dat)
map
这里 运行s 在一个 Python 解释器进程中顺序。
到与multiprocessing
模块并行映射的运行,只需要实例化一个Pool
对象,然后调用它的map()
方法代替Python map()
函数:
from multiprocessing import Pool
def myFun(x):
return x+2 # simple example, the real one would be complicated
if __name__ == '__main__':
dat = [
np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger
with Pool() as pool:
result = pool.map(myFun, dat)
此处,Pool()
创建了一个新的执行程序池,其中包含与 OS 所见的逻辑 CPU 一样多的解释器进程。通过将项目发送到池中的不同进程并等待完成,并行调用池的 map()
方法 运行s 映射。由于工作进程将 Python 脚本作为模块导入,因此将之前位于顶层的代码移动到 if __name__ == '__main__':
条件下很重要,这样它就不会 运行工人也是。
使用 multiprocessing.Pool()
非常方便,因为它只需要对原始代码稍作更改,模块会为您处理所有工作调度以及进出工作进程所需的数据移动。 multiprocessing
的问题在于它只能在单个主机上工作。幸运的是,mpi4py
通过 mpi4py.futures.MPIPoolExecutor
class:
提供了类似的接口
from mpi4py.futures import MPIPoolExecutor
def myFun(x):
return x+2 # simple example, the real one would be complicated
if __name__ == '__main__':
dat = [
np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger
with MPIPoolExecutor() as pool:
result = pool.map(myFun, dat)
与来自 multiprocessing
模块的 Pool
对象一样,MPI 池执行器为您处理所有工作调度和数据移动。
有两种方法可以运行 MPI 程序。第一个将脚本作为 MPI 单例启动,然后使用 MPI 过程控制工具生成一个包含所有池工作者的子 MPI 作业:
mpiexec -n 1 python program.py
您还需要指定 MPI 宇宙大小(主作业和所有子作业中的 MPI 等级总数)。执行此操作的具体方法因实施而异,因此您需要查阅实施手册。
第二个选项是直接启动所需数量的 MPI 等级并让它们以脚本名称作为参数执行 mpi4py.futures
模块本身:
mpiexec -n 24 python -m mpi4py.futures program.py
请记住,无论您以何种方式启动脚本,都会为控制器保留一个 MPI 等级,并且不会 运行ning 映射任务。您的目标是 运行ning 在 24 台主机上,因此您应该有足够的 CPU 核心并且可能有能力预留一个。或者,您可以指示 MPI 超额订阅第一台主机的等级。
multiprocessing.Pool
和 mpi4py.futures.MPIPoolExecutor
需要注意的一点是 map()
方法保证输出数组中项目的顺序,但不保证顺序其中评估了不同的项目。在大多数情况下这应该不是问题。
忠告。如果您的数据实际上是从文件中读取的块,您可能会想做这样的事情:
if __name__ == '__main__':
data = read_chunks()
with MPIPoolExecutor() as p:
result = p.map(myFun, data)
不要那样做。相反,如果可能的话,例如,如果由于存在共享(并且希望是并行的)文件系统而启用,则将读取委托给工作人员:
NUM_CHUNKS = 100
def myFun(chunk_num):
# You may need to pass the value of NUM_CHUNKS to read_chunk()
# for it to be able to seek to the right position in the file
data = read_chunk(NUM_CHUNKS, chunk_num)
return ...
if __name__ == '__main__':
chunk_nums = range(NUM_CHUNKS) # 100 chunks
with MPIPoolExecutor() as p:
result = p.map(myFun, chunk_nums)
我需要用 MPI 重写一个简单的 for 循环,因为每一步都很耗时。假设我有一个包含多个 np.array 的列表,我想对每个数组应用一些计算。例如:
def myFun(x):
return x+2 # simple example, the real one would be complicated
dat = [np.random.rand(3,2), np.random.rand(3,2),np.random.rand(3,2),np.random.rand(3,2)] # real data would be much larger
result = []
for item in dat:
result.append(myFun(item))
而不是使用上面的简单for循环,我想使用MPI到运行上面代码的'for loop'部分与24并行不同的节点我也希望结果列表中的项目顺序与数据列表相同。
注意数据是从其他文件读取的,每个处理器可以处理'fix'。
我以前没有用过mpi,所以这让我卡住了一段时间。
为简单起见,我们假设 master 进程(带有 rank = 0
的进程)是将整个文件从磁盘读入内存的进程。只有了解以下 MPI 例程 Get_size()
、Get_rank()
、scatter
和 gather
.
Returns the number of processes in the communicator. It will return the same number to every process.
Determines the rank of the calling process in the communicator.
在 MPI 中,每个进程都分配了一个等级,从 0 到 N - 1,其中 N 是进程总数 运行。
MPI_Scatter involves a designated root process sending data to all processes in a communicator. The primary difference between MPI_Bcast and MPI_Scatter is small but important. MPI_Bcast sends the same piece of data to all processes while MPI_Scatter sends chunks of an array to different processes.
和 gather:
MPI_Gather is the inverse of MPI_Scatter. Instead of spreading elements from one process to many processes, MPI_Gather takes elements from many processes and gathers them to one single process.
显然,您应该首先按照教程阅读 MPI 文档以了解其并行编程模型及其例程。否则,您会发现很难理解它是如何工作的。也就是说,您的代码可能如下所示:
from mpi4py import MPI
def myFun(x):
return x+2 # simple example, the real one would be complicated
comm = MPI.COMM_WORLD
rank = comm.Get_rank() # get your process ID
data = # init the data
if rank == 0: # The master is the only process that reads the file
data = # something read from file
# Divide the data among processes
data = comm.scatter(data, root=0)
result = []
for item in data:
result.append(myFun(item))
# Send the results back to the master processes
newData = comm.gather(result,root=0)
这样,每个进程将只(并行)处理特定的数据块。完成工作后,每个进程将其数据块(即 comm.gather(result,root=0)
)发送回 master 进程。这只是一个玩具示例,现在由您根据您的测试环境和代码进行改进。
您可以采用@dreamcrash 的回答中所示的低级 MPI 方式,也可以采用更 Pythonic 的解决方案,该解决方案使用与标准 Python multiprocessing
模块。
首先,您需要将您的代码变成更函数式的代码,注意您实际上是在执行 map
操作,该操作将 myFun
应用于 [=20= 的每个元素]:
def myFun(x):
return x+2 # simple example, the real one would be complicated
dat = [
np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger
result = map(myFun, dat)
map
这里 运行s 在一个 Python 解释器进程中顺序。
到与multiprocessing
模块并行映射的运行,只需要实例化一个Pool
对象,然后调用它的map()
方法代替Python map()
函数:
from multiprocessing import Pool
def myFun(x):
return x+2 # simple example, the real one would be complicated
if __name__ == '__main__':
dat = [
np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger
with Pool() as pool:
result = pool.map(myFun, dat)
此处,Pool()
创建了一个新的执行程序池,其中包含与 OS 所见的逻辑 CPU 一样多的解释器进程。通过将项目发送到池中的不同进程并等待完成,并行调用池的 map()
方法 运行s 映射。由于工作进程将 Python 脚本作为模块导入,因此将之前位于顶层的代码移动到 if __name__ == '__main__':
条件下很重要,这样它就不会 运行工人也是。
使用 multiprocessing.Pool()
非常方便,因为它只需要对原始代码稍作更改,模块会为您处理所有工作调度以及进出工作进程所需的数据移动。 multiprocessing
的问题在于它只能在单个主机上工作。幸运的是,mpi4py
通过 mpi4py.futures.MPIPoolExecutor
class:
from mpi4py.futures import MPIPoolExecutor
def myFun(x):
return x+2 # simple example, the real one would be complicated
if __name__ == '__main__':
dat = [
np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger
with MPIPoolExecutor() as pool:
result = pool.map(myFun, dat)
与来自 multiprocessing
模块的 Pool
对象一样,MPI 池执行器为您处理所有工作调度和数据移动。
有两种方法可以运行 MPI 程序。第一个将脚本作为 MPI 单例启动,然后使用 MPI 过程控制工具生成一个包含所有池工作者的子 MPI 作业:
mpiexec -n 1 python program.py
您还需要指定 MPI 宇宙大小(主作业和所有子作业中的 MPI 等级总数)。执行此操作的具体方法因实施而异,因此您需要查阅实施手册。
第二个选项是直接启动所需数量的 MPI 等级并让它们以脚本名称作为参数执行 mpi4py.futures
模块本身:
mpiexec -n 24 python -m mpi4py.futures program.py
请记住,无论您以何种方式启动脚本,都会为控制器保留一个 MPI 等级,并且不会 运行ning 映射任务。您的目标是 运行ning 在 24 台主机上,因此您应该有足够的 CPU 核心并且可能有能力预留一个。或者,您可以指示 MPI 超额订阅第一台主机的等级。
multiprocessing.Pool
和 mpi4py.futures.MPIPoolExecutor
需要注意的一点是 map()
方法保证输出数组中项目的顺序,但不保证顺序其中评估了不同的项目。在大多数情况下这应该不是问题。
忠告。如果您的数据实际上是从文件中读取的块,您可能会想做这样的事情:
if __name__ == '__main__':
data = read_chunks()
with MPIPoolExecutor() as p:
result = p.map(myFun, data)
不要那样做。相反,如果可能的话,例如,如果由于存在共享(并且希望是并行的)文件系统而启用,则将读取委托给工作人员:
NUM_CHUNKS = 100
def myFun(chunk_num):
# You may need to pass the value of NUM_CHUNKS to read_chunk()
# for it to be able to seek to the right position in the file
data = read_chunk(NUM_CHUNKS, chunk_num)
return ...
if __name__ == '__main__':
chunk_nums = range(NUM_CHUNKS) # 100 chunks
with MPIPoolExecutor() as p:
result = p.map(myFun, chunk_nums)