如何使用 MPI 的远程内存访问 (RMA) 功能并行化数据聚合?
How can I use Remote Memory Access (RMA) functions of MPI to parallelize data aggregation?
我正在研究生成数据的 Monte Carlo 代码。更具体地说,它生成的数据可以落入 num_data_domains
个单独的数据域之一。最后,每个域应包含 min_sample_size
个数据点。下面是 非并行 代码的样子:
int num_data_domains = 10;
std::vector<unsigned long int> counters(num_data_domains, 0);
std::vector<std::vector<double>> data_sets(num_data_domains);
unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
double data_point = Generate_Data_Point();
int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
data_sets[data_domain].push_back(data_point);
counters[data_domain]++;
smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));
}
根据我的 previous question 的回答,我想使用 RMA 函数将此过程与 MPI 并行化。但是我无法让它工作。
这是我对上述代码的并行化版本。
int num_data_domains = 10;
std::vector<unsigned long int> counters(num_data_domains, 0);
std::vector<std::vector<double>> data_set(num_data_domains);
MPI_Win mpi_window;
MPI_Win_create(&counters, num_data_domains * sizeof(unsigned long int), sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
int mpi_target_rank = 0;
unsigned long int increment = 1;
unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
double data_point = Generate_Data_Point();
int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
data_sets[data_domain].push_back(data_point);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
MPI_Accumulate(&increment, 1, MPI_UNSIGNED_LONG, mpi_target_rank, data_domain, 1, MPI_UNSIGNED_LONG, MPI_SUM, mpi_window);
MPI_Win_unlock(0, mpi_window);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
MPI_Get( &counters , num_data_domains , MPI_UNSIGNED_LONG , mpi_target_rank , 0 , num_data_domains , MPI_UNSIGNED_LONG , mpi_window);
MPI_Win_unlock(0, mpi_window);
smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));
}
MPI_Win_free(&mpi_window);
这里,MPI 进程 0(“master”)的 counters
应该通过 MPI_Accumulate()
更新。这里,第五个参数 data_domain
应该是目标缓冲区中的位移,即这应该确保正确的域计数器增加。之后,每个工作人员将自己的计数器更新为远程计数器。
但是,如果我这样设置代码,我会得到一个分段错误:
[MacBook-Pro:84733] *** Process received signal ***
[MacBook-Pro:84733] Signal: Segmentation fault: 11 (11)
[MacBook-Pro:84733] Signal code: Address not mapped (1)
[MacBook-Pro:84733] Failing at address: 0x9
[MacBook-Pro:84733] [ 0] 0 libsystem_platform.dylib 0x00007fff6fde65fd _sigtramp + 29
[MacBook-Pro:84733] [ 1] 0 ??? 0x0000000000000000 0x0 + 0
[MacBook-Pro:84733] [ 2] 0 executable 0x000000010e53c14b main + 1083
[MacBook-Pro:84733] [ 3] 0 libdyld.dylib 0x00007fff6fbedcc9 start + 1
[MacBook-Pro:84733] [ 4] 0 ??? 0x0000000000000002 0x0 + 2
[MacBook-Pro:84733] *** End of error message ***
--------------------------------------------------------------------------
Primary job terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 0 on node MacBook-Pro exited on signal 11 (Segmentation fault: 11).
--------------------------------------------------------------------------
我相当确定 MPI_Accumulate()
导致了这个错误。我做错了什么?
看来问题是我使用了 std::vector<unsigned long int>
而不是普通数组。但是不知道为什么。
无论如何,如果我将上面的代码更改为
int num_data_domains = 10;
// std::vector<unsigned long int> counters(num_data_domains, 0);
unsigned long int counters[num_data_domains];
for(int i = 0; i < num_data_domains; i++)
counters[i] = 0;
std::vector<std::vector<double>> data_set(num_data_domains);
MPI_Win mpi_window;
MPI_Win_create(&counters, num_data_domains * sizeof(unsigned long int), sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
int mpi_target_rank = 0;
unsigned long int increment = 1;
unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
double data_point = Generate_Data_Point();
int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
data_sets[data_domain].push_back(data_point);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
MPI_Accumulate(&increment, 1, MPI_UNSIGNED_LONG, mpi_target_rank, data_domain, 1, MPI_UNSIGNED_LONG, MPI_SUM, mpi_window);
MPI_Win_unlock(0, mpi_window);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
MPI_Get( &counters , num_data_domains , MPI_UNSIGNED_LONG , mpi_target_rank , 0 , num_data_domains , MPI_UNSIGNED_LONG , mpi_window);
MPI_Win_unlock(0, mpi_window);
// smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));
smallest_sample_size = counters[0];
for(unsigned int i = 1; i < num_data_domains; i++)
if(counters[i] < smallest_sample_size)
smallest_sample_size = counters[i];
}
MPI_Win_free(&mpi_window);
分段错误消失。
如果有人知道这是为什么,我会很好奇。提前致谢!
问题是 std::vector 对象的地址与您要修改的数据的地址不同。您可以通过下面的程序看到这一点。
当你使用counters
创建一个window时,你暴露的是从counters
的地址开始的内存,而你想要暴露的是从counters
的地址开始的内存counters[0]
,相当于counters.data()
.
#include <vector>
#include <iostream>
int main(void)
{
int num_data_domains = 10;
std::vector<unsigned long int> counters(num_data_domains, 0);
std::cout << "address of counters: " << &counters << std::endl;
std::cout << "address of counters[0]: " << &(counters[0]) << std::endl;
std::cout << "counters.data(): " << counters.data() << std::endl;
return 0;
}
$ clang++ stl.cc
$ ./a.out
address of counters: 0x7ffee8604e70
address of counters[0]: 0x7ffcd1c02b30
counters.data(): 0x7ffcd1c02b30
如果您将 MPI 程序中的第一个参数更改为以下参数之一,它应该会按预期工作。
MPI_Win_create(&(counters[0]), num_data_domains * sizeof(unsigned long int),
sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
MPI_Win_create(counters.data(), num_data_domains * sizeof(unsigned long int),
sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
.data()
method 是在 C++11 中引入的,因此您可能更喜欢前者。
您的程序在使用简单数组时运行的原因是 counters
在这种情况下与 &(counters[0])
完全相同。
我正在研究生成数据的 Monte Carlo 代码。更具体地说,它生成的数据可以落入 num_data_domains
个单独的数据域之一。最后,每个域应包含 min_sample_size
个数据点。下面是 非并行 代码的样子:
int num_data_domains = 10;
std::vector<unsigned long int> counters(num_data_domains, 0);
std::vector<std::vector<double>> data_sets(num_data_domains);
unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
double data_point = Generate_Data_Point();
int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
data_sets[data_domain].push_back(data_point);
counters[data_domain]++;
smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));
}
根据我的 previous question 的回答,我想使用 RMA 函数将此过程与 MPI 并行化。但是我无法让它工作。
这是我对上述代码的并行化版本。
int num_data_domains = 10;
std::vector<unsigned long int> counters(num_data_domains, 0);
std::vector<std::vector<double>> data_set(num_data_domains);
MPI_Win mpi_window;
MPI_Win_create(&counters, num_data_domains * sizeof(unsigned long int), sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
int mpi_target_rank = 0;
unsigned long int increment = 1;
unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
double data_point = Generate_Data_Point();
int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
data_sets[data_domain].push_back(data_point);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
MPI_Accumulate(&increment, 1, MPI_UNSIGNED_LONG, mpi_target_rank, data_domain, 1, MPI_UNSIGNED_LONG, MPI_SUM, mpi_window);
MPI_Win_unlock(0, mpi_window);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
MPI_Get( &counters , num_data_domains , MPI_UNSIGNED_LONG , mpi_target_rank , 0 , num_data_domains , MPI_UNSIGNED_LONG , mpi_window);
MPI_Win_unlock(0, mpi_window);
smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));
}
MPI_Win_free(&mpi_window);
这里,MPI 进程 0(“master”)的 counters
应该通过 MPI_Accumulate()
更新。这里,第五个参数 data_domain
应该是目标缓冲区中的位移,即这应该确保正确的域计数器增加。之后,每个工作人员将自己的计数器更新为远程计数器。
但是,如果我这样设置代码,我会得到一个分段错误:
[MacBook-Pro:84733] *** Process received signal ***
[MacBook-Pro:84733] Signal: Segmentation fault: 11 (11)
[MacBook-Pro:84733] Signal code: Address not mapped (1)
[MacBook-Pro:84733] Failing at address: 0x9
[MacBook-Pro:84733] [ 0] 0 libsystem_platform.dylib 0x00007fff6fde65fd _sigtramp + 29
[MacBook-Pro:84733] [ 1] 0 ??? 0x0000000000000000 0x0 + 0
[MacBook-Pro:84733] [ 2] 0 executable 0x000000010e53c14b main + 1083
[MacBook-Pro:84733] [ 3] 0 libdyld.dylib 0x00007fff6fbedcc9 start + 1
[MacBook-Pro:84733] [ 4] 0 ??? 0x0000000000000002 0x0 + 2
[MacBook-Pro:84733] *** End of error message ***
--------------------------------------------------------------------------
Primary job terminated normally, but 1 process returned
a non-zero exit code. Per user-direction, the job has been aborted.
--------------------------------------------------------------------------
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 0 on node MacBook-Pro exited on signal 11 (Segmentation fault: 11).
--------------------------------------------------------------------------
我相当确定 MPI_Accumulate()
导致了这个错误。我做错了什么?
看来问题是我使用了 std::vector<unsigned long int>
而不是普通数组。但是不知道为什么。
无论如何,如果我将上面的代码更改为
int num_data_domains = 10;
// std::vector<unsigned long int> counters(num_data_domains, 0);
unsigned long int counters[num_data_domains];
for(int i = 0; i < num_data_domains; i++)
counters[i] = 0;
std::vector<std::vector<double>> data_set(num_data_domains);
MPI_Win mpi_window;
MPI_Win_create(&counters, num_data_domains * sizeof(unsigned long int), sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
int mpi_target_rank = 0;
unsigned long int increment = 1;
unsigned int min_sample_size = 100;
unsigned int smallest_sample_size = 0;
while(smallest_sample_size < min_sample_size)
{
double data_point = Generate_Data_Point();
int data_domain = Identify_Data_Domain(data_point); // returns a number between 0 and data_domains-1
data_sets[data_domain].push_back(data_point);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
MPI_Accumulate(&increment, 1, MPI_UNSIGNED_LONG, mpi_target_rank, data_domain, 1, MPI_UNSIGNED_LONG, MPI_SUM, mpi_window);
MPI_Win_unlock(0, mpi_window);
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, mpi_window);
MPI_Get( &counters , num_data_domains , MPI_UNSIGNED_LONG , mpi_target_rank , 0 , num_data_domains , MPI_UNSIGNED_LONG , mpi_window);
MPI_Win_unlock(0, mpi_window);
// smallest_sample_size = *std::min_element(std::begin(counters), std::end(counters));
smallest_sample_size = counters[0];
for(unsigned int i = 1; i < num_data_domains; i++)
if(counters[i] < smallest_sample_size)
smallest_sample_size = counters[i];
}
MPI_Win_free(&mpi_window);
分段错误消失。
如果有人知道这是为什么,我会很好奇。提前致谢!
问题是 std::vector 对象的地址与您要修改的数据的地址不同。您可以通过下面的程序看到这一点。
当你使用counters
创建一个window时,你暴露的是从counters
的地址开始的内存,而你想要暴露的是从counters
的地址开始的内存counters[0]
,相当于counters.data()
.
#include <vector>
#include <iostream>
int main(void)
{
int num_data_domains = 10;
std::vector<unsigned long int> counters(num_data_domains, 0);
std::cout << "address of counters: " << &counters << std::endl;
std::cout << "address of counters[0]: " << &(counters[0]) << std::endl;
std::cout << "counters.data(): " << counters.data() << std::endl;
return 0;
}
$ clang++ stl.cc
$ ./a.out
address of counters: 0x7ffee8604e70
address of counters[0]: 0x7ffcd1c02b30
counters.data(): 0x7ffcd1c02b30
如果您将 MPI 程序中的第一个参数更改为以下参数之一,它应该会按预期工作。
MPI_Win_create(&(counters[0]), num_data_domains * sizeof(unsigned long int),
sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
MPI_Win_create(counters.data(), num_data_domains * sizeof(unsigned long int),
sizeof(unsigned long int), MPI_INFO_NULL, MPI_COMM_WORLD, &mpi_window);
.data()
method 是在 C++11 中引入的,因此您可能更喜欢前者。
您的程序在使用简单数组时运行的原因是 counters
在这种情况下与 &(counters[0])
完全相同。