MPI_Get 在 Parent/Children 上下文中无法正常工作
MPI_Get not working properly in Parent/Children context
最近在 class,我们一直在学习使用 MPI 的新方法,即 Parent/Children 方法。我们的任务是在 C/C++ 中实现一个非常简单的 matrix/vector 乘法,并在集群上实现基准测试。我们正在使用 OpenMPI 4.0.3。
我尝试实现一个“池化”系统(Children 选择一定数量的工作,执行它,然后将结果放回主线程,并检查是否还有更多工作要做)。为此,我简单地创建了一个无限循环,child 做的第一件事就是获取当前的 offset。当 offset 低于要处理的向量总数时,它会更新 parent 线程上的 offset,获取向量,处理它们,...
为了获取 offset,我创建了一个专用的 MPI_Win,children 可以用来fetch/update 值。问题是,MPI_Get 调用似乎没有更新 children 上的 offset 的值线程。
这里是我写的代码的简化版本(我的包含很多日志,将结果写入文件,...)。
parent.cpp:
int main(int argc, char **argv) {
// Init MPI
int pid = -1, nprocs = -1;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs == 1);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
assert(pid == 0);
// Read CLI arguments
const unsigned int n = atoi(argv[1]);
const unsigned int m = atoi(argv[2]);
const unsigned int root = atoi(argv[4]);
assert(root < nprocs);
const unsigned int nslave = atoi(argv[5]);
const std::string name = argv[6];
const std::string slave_name = argv[7];
// Define size constants
const size_t nn = n * n;
const size_t mn = m * n;
// Spawning slaves & merging Comm
int intrapid = -1;
MPI_Comm intercom = nullptr, intracom = nullptr;
MPI_Comm_spawn(slave_name.c_str(), argv, nslave,
MPI_INFO_NULL, root, MPI_COMM_WORLD,
&intercom, MPI_ERRCODES_IGNORE);
MPI_Intercomm_merge(intercom, 0, &intracom);
MPI_Comm_rank(intracom, &intrapid);
// Initialize & broadcast matrix
int *matrix = new int[nn];
srand(time(nullptr));
for (size_t i = 0; i < nn; i++) matrix[i] = rand() % MATRIX_MAX;
MPI_Bcast(matrix, nn, MPI_INT, root, intracom);
// initialize result and offset
int offset = 0;
int *results = new int[mn];
// Initialize and generate vectors
int *vectors = new int[mn];
for (size_t i = 0; i < m; i++) generate_vector(n, vectors + (i * n), rand() % (n / 2));
// Allocate windows
MPI_Win vectors_win = nullptr, results_win = nullptr, offset_win = nullptr;
MPI_Win_create(vectors, mn, sizeof(int), MPI_INFO_NULL, intracom, &vectors_win);
MPI_Win_create(results, mn, sizeof(int), MPI_INFO_NULL, intracom, &results_win);
MPI_Win_create(&offset, 1, sizeof(int), MPI_INFO_NULL, intracom, &offset_win);
// Fence to wait for windows initialization
MPI_Win_fence(MPI_MODE_NOPRECEDE, vectors_win);
// Start chrono while slaves fetch & compute
Time debut = NOW;
// Fence to wait for all vectors to be computed
MPI_Win_fence(MPI_MODE_NOSUCCEED, results_win);
// Write results to file, free memory, finalize
// ...
return EXIT_SUCCESS;
}
child.cpp:
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int pid = -1, intraprid = -1, nprocs = -1;
MPI_Comm intercom = nullptr, intracom = nullptr;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs >= 1);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
assert(pid >= 0 && pid < nprocs);
// Get communicator for intra-process communication through merge
MPI_Comm_get_parent(&intercom);
MPI_Intercomm_merge(intercom, 1, &intracom);
MPI_Comm_rank(intracom, &intraprid);
assert(intraprid >= 0);
// Read CLI arguments
const unsigned int n = atoi(argv[2]);
const unsigned int m = atoi(argv[3]);
const unsigned int batch_sz = atoi(argv[4]);
const unsigned int root = atoi(argv[5]);
assert(root < nprocs);
// Define size constant
const size_t nn = n * n;
// Allocate matrix memory & fetch from master
int *matrix = new int[nn];
MPI_Bcast(matrix, nn, MPI_INT, root, intracom);
// Allocate batch memory
int *batch = new int[batch_sz * n];
// Initialize dull windows (to match master initialization)
MPI_Win vectors_win = nullptr, results_win = nullptr, offset_win = nullptr;
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &vectors_win);
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &results_win);
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &offset_win);
// Fence to wait for windows initialization
MPI_Win_fence(MPI_MODE_NOPRECEDE, vectors_win);
int offset = -1, new_offset = -1;
// Infinite loop (break on first condition when no more vectors to process)
while (true) {
// Get offset from master
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, root, 0, offset_win);
MPI_Get(&offset, 1, MPI_INT, root, 0, 1, MPI_INT, offset_win);
// If offset is -1, something went wrong with the previous MPI_Get, but MPI_SUCCESS was returned
assert(offset >= 0);
// Break if no more vectors to process
if (new_offset >= m - 1 || offset >= m - 1) {
MPI_Win_unlock(root, offset_win);
break;
}
// Get quantity of vectors to process (if not enough, get all remaining)
const size_t sz = (offset + batch_sz > m) ? m - offset : batch_sz;
// if sz > batch_sz, the received buffer will be overflown
assert(sz <= batch_sz);
// Compute the new vector offset for the other slaves
new_offset = offset + sz;
// Update the offset on master
MPI_Put(&new_offset, 1, MPI_INT, root, 0, 1, MPI_INT, offset_win);
MPI_Win_unlock(root, offset_win);
// Fetch the batch of vectors to process
MPI_Win_lock(MPI_LOCK_SHARED, root, 0, vectors_win);
MPI_Get(batch, sz * n, MPI_INT, root, offset * n, sz * n, MPI_INT, vectors_win);
MPI_Win_unlock(root, vectors_win);
// Process the batch
for (size_t i = 0; i < sz; ++i) {
// ... matrix multiplication
}
// Put the result in the results window of the master
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, root, 0, results_win);
MPI_Put(&batch, sz * n, MPI_INT, root, offset, sz * n, MPI_INT, results_win);
MPI_Win_unlock(root, results_win);
}
// Fence to wait for all vectors to be computed
MPI_Win_fence(MPI_MODE_NOSUCCEED, results_win);
// Free memory, finalize
// ...
return EXIT_SUCCESS;
}
问题是触发了child while循环开头的断言assert(offset >= 0)
(并且日志显示offset仍然是-1 ,或任何它被初始化的东西)。鉴于偏移量在 parent 线程上从 0 开始,这意味着变量没有更新,但是对 MPI_Get 的调用返回了 MPI_SUCCESS .我虽然有并发问题,但似乎锁工作正常,因为 children 在进入锁之前等待前一个崩溃。
我试图解决这个问题,但由于缺乏明确的文档,我没有成功。我要么犯了一个我没有发现的愚蠢的错字,要么我不知道这种方法有什么特别之处。
如果有人知道我做错了什么,我很乐意接受。请原谅我的任何英语错误,我很累。
编辑:根据要求,我将名称改为“Parent/Children”,而不是旧术语
您的大问题是您立即使用通过 MPI_Get
检索的变量。你这样做是不可能的。只有在您释放锁或进行同步调用后,该变量才有其值。因为您有条件地释放了锁,所以我会在 MPI_Get
调用之后插入 MPI_Win_flush_local
,以确保目标和原点结果之间的一致性。
编辑。另一件事是您混合了主动 (fence
) 和被动 (lock
) 目标同步。在您的代码中,围栏不执行任何操作,因此将其删除。如果所有 children 执行相同数量的获取或放置操作,围栏将是合适的:那么纪元的关闭围栏将确保 origin/target.
上数据的一致性
最近在 class,我们一直在学习使用 MPI 的新方法,即 Parent/Children 方法。我们的任务是在 C/C++ 中实现一个非常简单的 matrix/vector 乘法,并在集群上实现基准测试。我们正在使用 OpenMPI 4.0.3。
我尝试实现一个“池化”系统(Children 选择一定数量的工作,执行它,然后将结果放回主线程,并检查是否还有更多工作要做)。为此,我简单地创建了一个无限循环,child 做的第一件事就是获取当前的 offset。当 offset 低于要处理的向量总数时,它会更新 parent 线程上的 offset,获取向量,处理它们,...
为了获取 offset,我创建了一个专用的 MPI_Win,children 可以用来fetch/update 值。问题是,MPI_Get 调用似乎没有更新 children 上的 offset 的值线程。
这里是我写的代码的简化版本(我的包含很多日志,将结果写入文件,...)。
parent.cpp:
int main(int argc, char **argv) {
// Init MPI
int pid = -1, nprocs = -1;
MPI_Init(&argc, &argv);
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs == 1);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
assert(pid == 0);
// Read CLI arguments
const unsigned int n = atoi(argv[1]);
const unsigned int m = atoi(argv[2]);
const unsigned int root = atoi(argv[4]);
assert(root < nprocs);
const unsigned int nslave = atoi(argv[5]);
const std::string name = argv[6];
const std::string slave_name = argv[7];
// Define size constants
const size_t nn = n * n;
const size_t mn = m * n;
// Spawning slaves & merging Comm
int intrapid = -1;
MPI_Comm intercom = nullptr, intracom = nullptr;
MPI_Comm_spawn(slave_name.c_str(), argv, nslave,
MPI_INFO_NULL, root, MPI_COMM_WORLD,
&intercom, MPI_ERRCODES_IGNORE);
MPI_Intercomm_merge(intercom, 0, &intracom);
MPI_Comm_rank(intracom, &intrapid);
// Initialize & broadcast matrix
int *matrix = new int[nn];
srand(time(nullptr));
for (size_t i = 0; i < nn; i++) matrix[i] = rand() % MATRIX_MAX;
MPI_Bcast(matrix, nn, MPI_INT, root, intracom);
// initialize result and offset
int offset = 0;
int *results = new int[mn];
// Initialize and generate vectors
int *vectors = new int[mn];
for (size_t i = 0; i < m; i++) generate_vector(n, vectors + (i * n), rand() % (n / 2));
// Allocate windows
MPI_Win vectors_win = nullptr, results_win = nullptr, offset_win = nullptr;
MPI_Win_create(vectors, mn, sizeof(int), MPI_INFO_NULL, intracom, &vectors_win);
MPI_Win_create(results, mn, sizeof(int), MPI_INFO_NULL, intracom, &results_win);
MPI_Win_create(&offset, 1, sizeof(int), MPI_INFO_NULL, intracom, &offset_win);
// Fence to wait for windows initialization
MPI_Win_fence(MPI_MODE_NOPRECEDE, vectors_win);
// Start chrono while slaves fetch & compute
Time debut = NOW;
// Fence to wait for all vectors to be computed
MPI_Win_fence(MPI_MODE_NOSUCCEED, results_win);
// Write results to file, free memory, finalize
// ...
return EXIT_SUCCESS;
}
child.cpp:
int main(int argc, char **argv) {
MPI_Init(&argc, &argv);
int pid = -1, intraprid = -1, nprocs = -1;
MPI_Comm intercom = nullptr, intracom = nullptr;
MPI_Comm_size(MPI_COMM_WORLD, &nprocs);
assert(nprocs >= 1);
MPI_Comm_rank(MPI_COMM_WORLD, &pid);
assert(pid >= 0 && pid < nprocs);
// Get communicator for intra-process communication through merge
MPI_Comm_get_parent(&intercom);
MPI_Intercomm_merge(intercom, 1, &intracom);
MPI_Comm_rank(intracom, &intraprid);
assert(intraprid >= 0);
// Read CLI arguments
const unsigned int n = atoi(argv[2]);
const unsigned int m = atoi(argv[3]);
const unsigned int batch_sz = atoi(argv[4]);
const unsigned int root = atoi(argv[5]);
assert(root < nprocs);
// Define size constant
const size_t nn = n * n;
// Allocate matrix memory & fetch from master
int *matrix = new int[nn];
MPI_Bcast(matrix, nn, MPI_INT, root, intracom);
// Allocate batch memory
int *batch = new int[batch_sz * n];
// Initialize dull windows (to match master initialization)
MPI_Win vectors_win = nullptr, results_win = nullptr, offset_win = nullptr;
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &vectors_win);
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &results_win);
MPI_Win_create(nullptr, 0, 1, MPI_INFO_NULL, intracom, &offset_win);
// Fence to wait for windows initialization
MPI_Win_fence(MPI_MODE_NOPRECEDE, vectors_win);
int offset = -1, new_offset = -1;
// Infinite loop (break on first condition when no more vectors to process)
while (true) {
// Get offset from master
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, root, 0, offset_win);
MPI_Get(&offset, 1, MPI_INT, root, 0, 1, MPI_INT, offset_win);
// If offset is -1, something went wrong with the previous MPI_Get, but MPI_SUCCESS was returned
assert(offset >= 0);
// Break if no more vectors to process
if (new_offset >= m - 1 || offset >= m - 1) {
MPI_Win_unlock(root, offset_win);
break;
}
// Get quantity of vectors to process (if not enough, get all remaining)
const size_t sz = (offset + batch_sz > m) ? m - offset : batch_sz;
// if sz > batch_sz, the received buffer will be overflown
assert(sz <= batch_sz);
// Compute the new vector offset for the other slaves
new_offset = offset + sz;
// Update the offset on master
MPI_Put(&new_offset, 1, MPI_INT, root, 0, 1, MPI_INT, offset_win);
MPI_Win_unlock(root, offset_win);
// Fetch the batch of vectors to process
MPI_Win_lock(MPI_LOCK_SHARED, root, 0, vectors_win);
MPI_Get(batch, sz * n, MPI_INT, root, offset * n, sz * n, MPI_INT, vectors_win);
MPI_Win_unlock(root, vectors_win);
// Process the batch
for (size_t i = 0; i < sz; ++i) {
// ... matrix multiplication
}
// Put the result in the results window of the master
MPI_Win_lock(MPI_LOCK_EXCLUSIVE, root, 0, results_win);
MPI_Put(&batch, sz * n, MPI_INT, root, offset, sz * n, MPI_INT, results_win);
MPI_Win_unlock(root, results_win);
}
// Fence to wait for all vectors to be computed
MPI_Win_fence(MPI_MODE_NOSUCCEED, results_win);
// Free memory, finalize
// ...
return EXIT_SUCCESS;
}
问题是触发了child while循环开头的断言assert(offset >= 0)
(并且日志显示offset仍然是-1 ,或任何它被初始化的东西)。鉴于偏移量在 parent 线程上从 0 开始,这意味着变量没有更新,但是对 MPI_Get 的调用返回了 MPI_SUCCESS .我虽然有并发问题,但似乎锁工作正常,因为 children 在进入锁之前等待前一个崩溃。
我试图解决这个问题,但由于缺乏明确的文档,我没有成功。我要么犯了一个我没有发现的愚蠢的错字,要么我不知道这种方法有什么特别之处。
如果有人知道我做错了什么,我很乐意接受。请原谅我的任何英语错误,我很累。
编辑:根据要求,我将名称改为“Parent/Children”,而不是旧术语
您的大问题是您立即使用通过 MPI_Get
检索的变量。你这样做是不可能的。只有在您释放锁或进行同步调用后,该变量才有其值。因为您有条件地释放了锁,所以我会在 MPI_Get
调用之后插入 MPI_Win_flush_local
,以确保目标和原点结果之间的一致性。
编辑。另一件事是您混合了主动 (fence
) 和被动 (lock
) 目标同步。在您的代码中,围栏不执行任何操作,因此将其删除。如果所有 children 执行相同数量的获取或放置操作,围栏将是合适的:那么纪元的关闭围栏将确保 origin/target.