如何使用 MPI 的远程内存访问 (RMA) 功能并行化数据聚合?

How can I use Remote Memory Access (RMA) functions of MPI to parallelize data aggregation?

我正在研究生成数据的 Monte Carlo 代码。更具体地说,它生成的数据可以落入 num_data_domains 个单独的数据域之一。最后,每个域应包含 min_sample_size 个数据点。下面是 非并行 代码的样子:

int num_data_domains = 10;
std::vector<unsigned long int> counters(num_data_domains, 0);
std::vector<std::vector<double>> data_sets(num_data_domains);

unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
    double data_point = Generate_Data_Point();
    int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
    data_sets[data_domain].push_back(data_point);
    
    counters[data_domain]++;
    
    smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));

}

根据我的 previous question 的回答,我想使用 RMA 函数将此过程与 MPI 并行化。但是我无法让它工作。

这是我对上述代码的并行化版本。

int num_data_domains = 10;
std::vector<unsigned long int> counters(num_data_domains, 0);
std::vector<std::vector<double>> data_set(num_data_domains);

MPI_Win mpi_window;
MPI_Win_create(&counters, num_data_domains * sizeof(unsigned long int), sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
int mpi_target_rank         = 0;
unsigned long int increment = 1;

unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
    double data_point = Generate_Data_Point();
    int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
    data_sets[data_domain].push_back(data_point);
    
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
    MPI_Accumulate(&increment, 1, MPI_UNSIGNED_LONG, mpi_target_rank, data_domain, 1, MPI_UNSIGNED_LONG, MPI_SUM, mpi_window);
    MPI_Win_unlock(0, mpi_window);

    
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
    MPI_Get( &counters , num_data_domains , MPI_UNSIGNED_LONG , mpi_target_rank , 0 , num_data_domains , MPI_UNSIGNED_LONG , mpi_window);
    MPI_Win_unlock(0, mpi_window);

    smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));

}
MPI_Win_free(&mpi_window);

这里,MPI 进程 0(“master”)的 counters 应该通过 MPI_Accumulate() 更新。这里,第五个参数 data_domain 应该是目标缓冲区中的位移,即这应该确保正确的域计数器增加。之后,每个工作人员将自己的计数器更新为远程计数器。

但是,如果我这样设置代码,我会得到一个分段错误:

[MacBook-Pro:84733] *** Process received signal ***
[MacBook-Pro:84733] Signal: Segmentation fault: 11 (11)
[MacBook-Pro:84733] Signal code: Address not mapped (1)
[MacBook-Pro:84733] Failing at address: 0x9
[MacBook-Pro:84733] [ 0] 0   libsystem_platform.dylib            0x00007fff6fde65fd _sigtramp + 29
[MacBook-Pro:84733] [ 1] 0   ???                                 0x0000000000000000 0x0 + 0
[MacBook-Pro:84733] [ 2] 0   executable                          0x000000010e53c14b main + 1083
[MacBook-Pro:84733] [ 3] 0   libdyld.dylib                       0x00007fff6fbedcc9 start + 1
[MacBook-Pro:84733] [ 4] 0   ???                                 0x0000000000000002 0x0 + 2
[MacBook-Pro:84733] *** End of error message ***
--------------------------------------------------------------------------
Primary job  terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 0 on node MacBook-Pro exited on signal 11 (Segmentation fault: 11).
--------------------------------------------------------------------------

我相当确定 MPI_Accumulate() 导致了这个错误。我做错了什么?

看来问题是我使用了 std::vector<unsigned long int> 而不是普通数组。但是不知道为什么。

无论如何,如果我将上面的代码更改为

int num_data_domains = 10;
// std::vector<unsigned long int> counters(num_data_domains, 0);
unsigned long int counters[num_data_domains];
for(int i = 0; i < num_data_domains; i++)
   counters[i] = 0;
std::vector<std::vector<double>> data_set(num_data_domains);

MPI_Win mpi_window;
MPI_Win_create(&counters, num_data_domains * sizeof(unsigned long int), sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
int mpi_target_rank         = 0;
unsigned long int increment = 1;

unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
    double data_point = Generate_Data_Point();
    int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
    data_sets[data_domain].push_back(data_point);
    
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
    MPI_Accumulate(&increment, 1, MPI_UNSIGNED_LONG, mpi_target_rank, data_domain, 1, MPI_UNSIGNED_LONG, MPI_SUM, mpi_window);
    MPI_Win_unlock(0, mpi_window);

    
    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
    MPI_Get( &counters , num_data_domains , MPI_UNSIGNED_LONG , mpi_target_rank , 0 , num_data_domains , MPI_UNSIGNED_LONG , mpi_window);
    MPI_Win_unlock(0, mpi_window);

//    smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));
    smallest_sample_size = counters[0];
    for(unsigned int i = 1; i < num_data_domains; i++)
        if(counters[i] < smallest_sample_size)
            smallest_sample_size = counters[i];

}
MPI_Win_free(&mpi_window);

分段错误消失。

如果有人知道这是为什么,我会很好奇。提前致谢!

问题是 std::vector 对象的地址与您要修改的数据的地址不同。您可以通过下面的程序看到这一点。

当你使用counters创建一个window时,你暴露的是从counters的地址开始的内存,而你想要暴露的是从counters的地址开始的内存counters[0],相当于counters.data().

#include <vector>
#include <iostream>

int main(void)
{
    int num_data_domains = 10;
    std::vector<unsigned long int> counters(num_data_domains, 0);

    std::cout << "address of counters: " << &counters << std::endl;
    std::cout << "address of counters[0]: " << &(counters[0]) << std::endl;
    std::cout << "counters.data(): " << counters.data() << std::endl;

    return 0;
}
$ clang++ stl.cc
$ ./a.out 
address of counters: 0x7ffee8604e70
address of counters[0]: 0x7ffcd1c02b30
counters.data(): 0x7ffcd1c02b30

如果您将 MPI 程序中的第一个参数更改为以下参数之一,它应该会按预期工作。

MPI_Win_create(&(counters[0]), num_data_domains * sizeof(unsigned long int),
               sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
MPI_Win_create(counters.data(), num_data_domains * sizeof(unsigned long int),
               sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);

.data() method 是在 C++11 中引入的,因此您可能更喜欢前者。

您的程序在使用简单数组时运行的原因是 counters 在这种情况下与 &(counters[0]) 完全相同。