Python:如何使用 MPI 并行化一个简单的循环

Python: how to parallelizing a simple loop with MPI

我需要用 MPI 重写一个简单的 for 循环,因为每一步都很耗时。假设我有一个包含多个 np.array 的列表,我想对每个数组应用一些计算。例如:

def myFun(x):
    return x+2 # simple example, the real one would be complicated

dat = [np.random.rand(3,2), np.random.rand(3,2),np.random.rand(3,2),np.random.rand(3,2)] # real data would be much larger
result = []
for item in dat:
    result.append(myFun(item))

而不是使用上面的简单for循环,我想使用MPI到运行上面代码的'for loop'部分与24并行不同的节点我也希望结果列表中的项目顺序与数据列表相同。

注意数据是从其他文件读取的,每个处理器可以处理'fix'。

我以前没有用过mpi,所以这让我卡住了一段时间。

为简单起见,我们假设 master 进程(带有 rank = 0 的进程)是将​​整个文件从磁盘读入内存的进程。只有了解以下 MPI 例程 Get_size()Get_rank()scattergather.

才能解决此问题

Get_size():

Returns the number of processes in the communicator. It will return the same number to every process.

Get_rank():

Determines the rank of the calling process in the communicator.

在 MPI 中,每个进程都分配了一个等级,从 0 到 N - 1,其中 N 是进程总数 运行。

scatter:

MPI_Scatter involves a designated root process sending data to all processes in a communicator. The primary difference between MPI_Bcast and MPI_Scatter is small but important. MPI_Bcast sends the same piece of data to all processes while MPI_Scatter sends chunks of an array to different processes.

gather:

MPI_Gather is the inverse of MPI_Scatter. Instead of spreading elements from one process to many processes, MPI_Gather takes elements from many processes and gathers them to one single process.

显然,您应该首先按照教程阅读 MPI 文档以了解其并行编程模型及其例程。否则,您会发现很难理解它是如何工作的。也就是说,您的代码可能如下所示:

from mpi4py import MPI

def myFun(x):
    return x+2 # simple example, the real one would be complicated

comm = MPI.COMM_WORLD
rank = comm.Get_rank() # get your process ID
data = # init the data    

if rank == 0: # The master is the only process that reads the file
    data = # something read from file

# Divide the data among processes
data = comm.scatter(data, root=0)

result = []
for item in data:
    result.append(myFun(item))

# Send the results back to the master processes
newData = comm.gather(result,root=0)

 

这样,每个进程将只(并行)处理特定的数据块。完成工作后,每个进程将其数据块(即 comm.gather(result,root=0))发送回 master 进程。这只是一个玩具示例,现在由您根据您的测试环境和代码进行改进。

您可以采用@dreamcrash 的回答中所示的低级 MPI 方式,也可以采用更 Pythonic 的解决方案,该解决方案使用与标准 Python multiprocessing 模块。

首先,您需要将您的代码变成更函数式的代码,注意您实际上是在执行 map 操作,该操作将 myFun 应用于 [=20= 的每个元素]:

def myFun(x):
    return x+2 # simple example, the real one would be complicated

dat = [
    np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
] # real data would be much larger

result = map(myFun, dat)

map 这里 运行s 在一个 Python 解释器进程中顺序。

到与multiprocessing模块并行映射的运行,只需要实例化一个Pool对象,然后调用它的map()方法代替Python map() 函数:

from multiprocessing import Pool

def myFun(x):
    return x+2 # simple example, the real one would be complicated

if __name__ == '__main__':
    dat = [
        np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
    ] # real data would be much larger

    with Pool() as pool:
        result = pool.map(myFun, dat)

此处,Pool() 创建了一个新的执行程序池,其中包含与 OS 所见的逻辑 CPU 一样多的解释器进程。通过将项目发送到池中的不同进程并等待完成,并行调用池的 map() 方法 运行s 映射。由于工作进程将 Python 脚本作为模块导入,因此将之前位于顶层的代码移动到 if __name__ == '__main__': 条件下很重要,这样它就不会 运行工人也是。

使用 multiprocessing.Pool() 非常方便,因为它只需要对原始代码稍作更改,模块会为您处理所有工作调度以及进出工作进程所需的数据移动。 multiprocessing 的问题在于它只能在单个主机上工作。幸运的是,mpi4py 通过 mpi4py.futures.MPIPoolExecutor class:

提供了类似的接口
from mpi4py.futures import MPIPoolExecutor

def myFun(x):
    return x+2 # simple example, the real one would be complicated

if __name__ == '__main__':
    dat = [
        np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2), np.random.rand(3,2)
    ] # real data would be much larger

    with MPIPoolExecutor() as pool:
        result = pool.map(myFun, dat)

与来自 multiprocessing 模块的 Pool 对象一样,MPI 池执行器为您处理所有工作调度和数据移动。

有两种方法可以运行 MPI 程序。第一个将脚本作为 MPI 单例启动,然后使用 MPI 过程控制工具生成一个包含所有池工作者的子 MPI 作业:

mpiexec -n 1 python program.py

您还需要指定 MPI 宇宙大小(主作业和所有子作业中的 MPI 等级总数)。执行此操作的具体方法因实施而异,因此您需要查阅实施手册。

第二个选项是直接启动所需数量的 MPI 等级并让它们以脚本名称作为参数执行 mpi4py.futures 模块本身:

mpiexec -n 24 python -m mpi4py.futures program.py

请记住,无论您以何种方式启动脚本,都会为控制器保留一个 MPI 等级,并且不会 运行ning 映射任务。您的目标是 运行ning 在 24 台主机上,因此您应该有足够的 CPU 核心并且可能有能力预留一个。或者,您可以指示 MPI 超额订阅第一台主机的等级。

multiprocessing.Poolmpi4py.futures.MPIPoolExecutor 需要注意的一点是 map() 方法保证输出数组中项目的顺序,但不保证顺序其中评估了不同的项目。在大多数情况下这应该不是问题。


忠告。如果您的数据实际上是从文件中读取的块,您可能会想做这样的事情:

if __name__ == '__main__':
   data = read_chunks()
   with MPIPoolExecutor() as p:
       result = p.map(myFun, data)

不要那样做。相反,如果可能的话,例如,如果由于存在共享(并且希望是并行的)文件系统而启用,则将读取委托给工作人员:

NUM_CHUNKS = 100

def myFun(chunk_num):
    # You may need to pass the value of NUM_CHUNKS to read_chunk()
    # for it to be able to seek to the right position in the file
    data = read_chunk(NUM_CHUNKS, chunk_num)
    return ...

if __name__ == '__main__':
    chunk_nums = range(NUM_CHUNKS)  # 100 chunks
    with MPIPoolExecutor() as p:
        result = p.map(myFun, chunk_nums)