更新 mpi4py 中的字典值

updating dictionary values in mpi4py

我们如何在不同处理器之间更新 MPI(特别是 mpi4py)中的一个全局字典。 我现在广播后遇到的问题是不同的处理器看不到其他处理器对字典的更改(更新)。

例如输入数据如下:

   col1  col2
   -----------
    a      1
    a      1
    b      2
    c      3
    c      1

输出字典应该如下:

  {'a': 2, 'b': 2, 'c': 4}

这意味着输入中的 col2 相加并为键 (col1) 创建值。字典最初是空的,并在所有处理器的并行处理过程中得到更新(至少这是我们正在尝试做的)。

How can we update one global dictionary in MPI (specifically mpi4py) across different processors. The issue that i am encountering now after broadcasting is that different processors cannot see the changes (update) on the dictionary by the other processors.

首先你要明白在MPI中,每个MPI进程运行的都是程序的完整副本。因此,分配给该程序的所有数据对每个进程都是私有的。

让我们看下面的例子:

from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

if rank == 0:
    dictionary = {'a': 1, 'c': 3}
    for i in range(1, size, 1):
        data = comm.recv(source=i, tag=11)
        for key in data:
            if key in dictionary:
               dictionary[key] = dictionary[key] + data[key]
            else:
               dictionary[key] = data[key] 
    print(dictionary)
else:
    data = {'a': 1, 'b': 2, 'c': 1}
    comm.send(data, dest=0, tag=11)

在这段代码中,具有 rank=0 的进程分配了一个 dictionary,它对该进程是私有的,同样地,data = {'a': 1, 'b': 2, 'c': 1} 对其他进程也是私有的过程。如果(例如)一个进程更改了变量 size,则其他进程将看不到该更改。

在此代码中,所有进程都发送其字典副本:

    data = {'a': 1, 'b': 2, 'c': 1}
    comm.send(data, dest=0, tag=11)

到进程 0,它为每个其他进程调用 comm.recv

for i in range(1, size, 1):
    data = comm.recv(source=i, tag=11)

并将(从其他进程)接收到的数据合并到自己的字典中:

    for key in data:
        if key in dictionary:
           dictionary[key] = dictionary[key] + data[key]
        else:
           dictionary[key] = data[key] 

最后,只有进程0有完整的dictionary。当你做广播时,同样的事情发生在你身上。尽管如此,MPI 确实有例程( comm.Allgather),可以让您在所有进程中拥有整个 dictionary

这样的代码示例(你只需要适应字典):

from mpi4py import MPI
import numpy


comm = MPI.COMM_WORLD
size = comm.Get_size()
rank = comm.Get_rank()

sendBuffer = numpy.ones(1, dtype=bool)
recvBuffer = numpy.zeros(size, dtype=bool)

print("Before Allgather => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
comm.Allgather([sendBuffer,  MPI.BOOL],[recvBuffer, MPI.BOOL])
print("After Allgather  => Process %s | sendBuffer %s | recvBuffer %s" % (rank, sendBuffer, recvBuffer))
MacBook-Pro-de-Bruno:Python dreamcrash$ 

The dictionary initially is empty and is getting updated during the parallel processing by all the processors (at least this is what we’re trying to do).

使用上述模型(分布式内存范例),每次其中一个进程更改字典时,您都需要与所有进程显式通信。这意味着您必须事先知道代码中应该进行这些通信的点。

但是,根据您的文字,您似乎想要一种共享内存方法,其中一个进程将更新字典,例如如下:

    if key in dictionary:
       dictionary[key] = dictionary[key] + data[key]
    else:
       dictionary[key] = data[key] 

并且这些更改将立即对所有进程可见。就像在多线程代码中发生的一样。

MPI 3.0 引入了共享内存的概念,人们可以在其中实际实现这一点。

下面是一个使用数组的例子:

from mpi4py import MPI 
import numpy as np 

comm = MPI.COMM_WORLD 

size = 1000 
itemsize = MPI.DOUBLE.Get_size() 
if comm.Get_rank() == 0: 
   nbytes = size * itemsize 
else: 
   nbytes = 0 

win = MPI.Win.Allocate_shared(nbytes, itemsize, comm=comm) 

buf, itemsize = win.Shared_query(0) 
assert itemsize == MPI.DOUBLE.Get_size() 
buf = np.array(buf, dtype='B', copy=False) 
ary = np.ndarray(buffer=buf, dtype='d', shape=(size,)) 

if comm.rank == 1: 
  ary[:5] = np.arange(5) 
 
comm.Barrier() 
if comm.rank == 0: 
  print(ary[:10])

代码不是我的,它来自here