"notify" 处理器不阻塞的正确方法是什么?
What is the right way to "notify" processors without blocking?
假设我有很多东西,我必须对所有这些东西做一些操作。
万一一个元素的操作失败,我想停止整个阵列的工作[这项工作分布在多个处理器上]。
我想实现这一点,同时将 sent/received 消息的数量保持在最低限度。
另外,如果没有必要,我不想阻止处理器。
如何使用 MPI 来实现?
这个问题我问了自己好几次都没有找到任何完全令人满意的答案...我唯一想到的(除了 MPI_Abort()
这样做但有点极端)是创建一个 MPI_Win
存储一个标志,该标志将由面临问题的任何进程引发,并由所有进程定期检查以查看它们是否可以继续处理。这是使用非阻塞调用完成的,与 this answer.
中描述的方式相同
它的主要弱点是:
- 这取决于自愿检查标志状态的进程。通知他们不会立即中断他们的工作。
- 必须手动调整此检查的频率。您必须在浪费处理数据的时间和检查状态所需的时间之间找到权衡...
最后,我们需要的是一种定义由 MPI 调用触发的回调操作的方法,例如 MPI_Abort()
(基本上用其他东西替换中止操作)。我认为这不存在,但也许我忽略了它。
一种以非阻塞方式推导全局停止条件的可能策略是依赖 MPI_Test
。
场景
考虑每个进程使用给定标签将类型为 MPI_INT 的异步接收发布到其左侧等级以构建环。然后开始你的计算。如果等级遇到停止条件,它会将自己的等级发送到正确的等级。同时每个等级使用 MPI_Test
在计算过程中检查 MPI_Irecv
是否完成,如果完成则进入一个分支,首先等待消息,然后向右传递接收到的等级,除非如果正确的等级等于消息的有效负载(不循环)。
完成后,您应该在分支中拥有所有进程,准备好触发任意恢复操作。
复杂性
保留的拓扑结构是一个环,因为它最大限度地减少了消息的数量 (n-1),但是它增加了传播时间。其他拓扑可以保留更多消息但空间复杂度较低,例如具有 n.ln(n) 复杂度的树。
实施
像这样。
int rank, size;
MPI_Init(&argc,&argv);
MPI_Comm_rank( MPI_COMM_WORLD, &rank);
MPI_Comm_size( MPI_COMM_WORLD, &size);
int left_rank = (rank==0)?(size-1):(rank-1);
int right_rank = (rank==(size-1))?0:(rank+1)%size;
int stop_cond_rank;
MPI_Request stop_cond_request;
int stop_cond= 0;
while( 1 )
{
MPI_Irecv( &stop_cond_rank, 1, MPI_INT, left_rank, 123, MPI_COMM_WORLD, &stop_cond_request);
/* Compute Here and set stop condition accordingly */
if( stop_cond )
{
/* Cancel the left recv */
MPI_Cancel( &stop_cond_request );
if( rank != right_rank )
MPI_Send( &rank, 1, MPI_INT, right_rank, 123, MPI_COMM_WORLD );
break;
}
int did_recv = 0;
MPI_Test( &stop_cond_request, &did_recv, MPI_STATUS_IGNORE );
if( did_recv )
{
stop_cond = 1;
MPI_Wait( &stop_cond_request, MPI_STATUS_IGNORE );
if( right_rank != stop_cond_rank )
MPI_Send( &stop_cond_rank, 1, MPI_INT, right_rank, 123, MPI_COMM_WORLD );
break;
}
}
if( stop_cond )
{
/* Handle the stop condition */
}
else
{
/* Cleanup */
MPI_Cancel( &stop_cond_request );
}
这似乎是一个没有简单答案的常见问题。其他两个答案都有可扩展性问题。环形通信方法具有线性通信成本,而在单方面 MPI_Win
解决方案中,单个进程将受到所有工作人员的内存请求的冲击。这对于较低的排名可能没问题,但在增加排名时会出现问题。
非阻塞集合可以提供更具可扩展性的更好解决方案。主要思想是 post a MPI_Ibarrier
在所有等级上,除了一个指定的根。此根将通过 MPI_Irecv
侦听点对点停止消息,并在收到消息后完成 MPI_Ibarrier
。
棘手的部分是需要处理四种不同的情况“{root, non-root} x {found, not-found}”。也可能发生多个等级发送停止消息,需要在根上接收未知数量的匹配。这可以通过计算发送停止请求的等级数量的额外减少来解决。
这是一个例子,它在 C:
中的样子
#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
const int iter_max = 10000;
const int difficulty = 20000;
int find_stuff()
{
int num_iters = rand() % iter_max;
for (int i = 0; i < num_iters; i++) {
if (rand() % difficulty == 0) {
return 1;
}
}
return 0;
}
const int stop_tag = 42;
const int root = 0;
int forward_stop(MPI_Request* root_recv_stop, MPI_Request* all_recv_stop, int found_count)
{
int flag;
MPI_Status status;
if (found_count == 0) {
MPI_Test(root_recv_stop, &flag, &status);
} else {
// If we find something on the root, we actually wait until we receive our own message.
MPI_Wait(root_recv_stop, &status);
flag = 1;
}
if (flag) {
printf("Forwarding stop signal from %d\n", status.MPI_SOURCE);
MPI_Ibarrier(MPI_COMM_WORLD, all_recv_stop);
MPI_Wait(all_recv_stop, MPI_STATUS_IGNORE);
// We must post some additional receives if multiple ranks found something at the same time
MPI_Reduce(MPI_IN_PLACE, &found_count, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
for (found_count--; found_count > 0; found_count--) {
MPI_Recv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &status);
printf("Additional stop from: %d\n", status.MPI_SOURCE);
}
return 1;
}
return 0;
}
int main()
{
MPI_Init(NULL, NULL);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srand(rank);
MPI_Request root_recv_stop;
MPI_Request all_recv_stop;
if (rank == root) {
MPI_Irecv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &root_recv_stop);
} else {
// You may want to use an extra communicator here, to avoid messing with other barriers
MPI_Ibarrier(MPI_COMM_WORLD, &all_recv_stop);
}
while (1) {
int found = find_stuff();
if (found) {
printf("Rank %d found something.\n", rank);
// Note: We cannot post this as blocking, otherwise there is a deadlock with the reduce
MPI_Request req;
MPI_Isend(NULL, 0, MPI_CHAR, root, stop_tag, MPI_COMM_WORLD, &req);
if (rank != root) {
// We know that we are going to receive our own stop signal.
// This avoids running another useless iteration
MPI_Wait(&all_recv_stop, MPI_STATUS_IGNORE);
MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
MPI_Wait(&req, MPI_STATUS_IGNORE);
break;
}
MPI_Wait(&req, MPI_STATUS_IGNORE);
}
if (rank == root) {
if (forward_stop(&root_recv_stop, &all_recv_stop, found)) {
break;
}
} else {
int stop_signal;
MPI_Test(&all_recv_stop, &stop_signal, MPI_STATUS_IGNORE);
if (stop_signal)
{
MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
printf("Rank %d stopping after receiving signal.\n", rank);
break;
}
}
};
MPI_Finalize();
}
虽然这不是最简单的代码,但它应该:
- 不引入额外的阻塞
- 通过实施障碍进行扩展(通常
O(log N)
)
- 从发现一个到全部停止的最坏情况延迟为 2 * 循环时间 ( + 1 p2p + 1障碍 + 1 减少)。
- 如果 many/all 排名同时找到解决方案,它仍然有效,但效率可能较低。
假设我有很多东西,我必须对所有这些东西做一些操作。 万一一个元素的操作失败,我想停止整个阵列的工作[这项工作分布在多个处理器上]。
我想实现这一点,同时将 sent/received 消息的数量保持在最低限度。 另外,如果没有必要,我不想阻止处理器。
如何使用 MPI 来实现?
这个问题我问了自己好几次都没有找到任何完全令人满意的答案...我唯一想到的(除了 MPI_Abort()
这样做但有点极端)是创建一个 MPI_Win
存储一个标志,该标志将由面临问题的任何进程引发,并由所有进程定期检查以查看它们是否可以继续处理。这是使用非阻塞调用完成的,与 this answer.
它的主要弱点是:
- 这取决于自愿检查标志状态的进程。通知他们不会立即中断他们的工作。
- 必须手动调整此检查的频率。您必须在浪费处理数据的时间和检查状态所需的时间之间找到权衡...
最后,我们需要的是一种定义由 MPI 调用触发的回调操作的方法,例如 MPI_Abort()
(基本上用其他东西替换中止操作)。我认为这不存在,但也许我忽略了它。
一种以非阻塞方式推导全局停止条件的可能策略是依赖 MPI_Test
。
场景
考虑每个进程使用给定标签将类型为 MPI_INT 的异步接收发布到其左侧等级以构建环。然后开始你的计算。如果等级遇到停止条件,它会将自己的等级发送到正确的等级。同时每个等级使用 MPI_Test
在计算过程中检查 MPI_Irecv
是否完成,如果完成则进入一个分支,首先等待消息,然后向右传递接收到的等级,除非如果正确的等级等于消息的有效负载(不循环)。
完成后,您应该在分支中拥有所有进程,准备好触发任意恢复操作。
复杂性
保留的拓扑结构是一个环,因为它最大限度地减少了消息的数量 (n-1),但是它增加了传播时间。其他拓扑可以保留更多消息但空间复杂度较低,例如具有 n.ln(n) 复杂度的树。
实施
像这样。
int rank, size;
MPI_Init(&argc,&argv);
MPI_Comm_rank( MPI_COMM_WORLD, &rank);
MPI_Comm_size( MPI_COMM_WORLD, &size);
int left_rank = (rank==0)?(size-1):(rank-1);
int right_rank = (rank==(size-1))?0:(rank+1)%size;
int stop_cond_rank;
MPI_Request stop_cond_request;
int stop_cond= 0;
while( 1 )
{
MPI_Irecv( &stop_cond_rank, 1, MPI_INT, left_rank, 123, MPI_COMM_WORLD, &stop_cond_request);
/* Compute Here and set stop condition accordingly */
if( stop_cond )
{
/* Cancel the left recv */
MPI_Cancel( &stop_cond_request );
if( rank != right_rank )
MPI_Send( &rank, 1, MPI_INT, right_rank, 123, MPI_COMM_WORLD );
break;
}
int did_recv = 0;
MPI_Test( &stop_cond_request, &did_recv, MPI_STATUS_IGNORE );
if( did_recv )
{
stop_cond = 1;
MPI_Wait( &stop_cond_request, MPI_STATUS_IGNORE );
if( right_rank != stop_cond_rank )
MPI_Send( &stop_cond_rank, 1, MPI_INT, right_rank, 123, MPI_COMM_WORLD );
break;
}
}
if( stop_cond )
{
/* Handle the stop condition */
}
else
{
/* Cleanup */
MPI_Cancel( &stop_cond_request );
}
这似乎是一个没有简单答案的常见问题。其他两个答案都有可扩展性问题。环形通信方法具有线性通信成本,而在单方面 MPI_Win
解决方案中,单个进程将受到所有工作人员的内存请求的冲击。这对于较低的排名可能没问题,但在增加排名时会出现问题。
非阻塞集合可以提供更具可扩展性的更好解决方案。主要思想是 post a MPI_Ibarrier
在所有等级上,除了一个指定的根。此根将通过 MPI_Irecv
侦听点对点停止消息,并在收到消息后完成 MPI_Ibarrier
。
棘手的部分是需要处理四种不同的情况“{root, non-root} x {found, not-found}”。也可能发生多个等级发送停止消息,需要在根上接收未知数量的匹配。这可以通过计算发送停止请求的等级数量的额外减少来解决。
这是一个例子,它在 C:
中的样子#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>
const int iter_max = 10000;
const int difficulty = 20000;
int find_stuff()
{
int num_iters = rand() % iter_max;
for (int i = 0; i < num_iters; i++) {
if (rand() % difficulty == 0) {
return 1;
}
}
return 0;
}
const int stop_tag = 42;
const int root = 0;
int forward_stop(MPI_Request* root_recv_stop, MPI_Request* all_recv_stop, int found_count)
{
int flag;
MPI_Status status;
if (found_count == 0) {
MPI_Test(root_recv_stop, &flag, &status);
} else {
// If we find something on the root, we actually wait until we receive our own message.
MPI_Wait(root_recv_stop, &status);
flag = 1;
}
if (flag) {
printf("Forwarding stop signal from %d\n", status.MPI_SOURCE);
MPI_Ibarrier(MPI_COMM_WORLD, all_recv_stop);
MPI_Wait(all_recv_stop, MPI_STATUS_IGNORE);
// We must post some additional receives if multiple ranks found something at the same time
MPI_Reduce(MPI_IN_PLACE, &found_count, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
for (found_count--; found_count > 0; found_count--) {
MPI_Recv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &status);
printf("Additional stop from: %d\n", status.MPI_SOURCE);
}
return 1;
}
return 0;
}
int main()
{
MPI_Init(NULL, NULL);
int rank;
MPI_Comm_rank(MPI_COMM_WORLD, &rank);
srand(rank);
MPI_Request root_recv_stop;
MPI_Request all_recv_stop;
if (rank == root) {
MPI_Irecv(NULL, 0, MPI_CHAR, MPI_ANY_SOURCE, stop_tag, MPI_COMM_WORLD, &root_recv_stop);
} else {
// You may want to use an extra communicator here, to avoid messing with other barriers
MPI_Ibarrier(MPI_COMM_WORLD, &all_recv_stop);
}
while (1) {
int found = find_stuff();
if (found) {
printf("Rank %d found something.\n", rank);
// Note: We cannot post this as blocking, otherwise there is a deadlock with the reduce
MPI_Request req;
MPI_Isend(NULL, 0, MPI_CHAR, root, stop_tag, MPI_COMM_WORLD, &req);
if (rank != root) {
// We know that we are going to receive our own stop signal.
// This avoids running another useless iteration
MPI_Wait(&all_recv_stop, MPI_STATUS_IGNORE);
MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
MPI_Wait(&req, MPI_STATUS_IGNORE);
break;
}
MPI_Wait(&req, MPI_STATUS_IGNORE);
}
if (rank == root) {
if (forward_stop(&root_recv_stop, &all_recv_stop, found)) {
break;
}
} else {
int stop_signal;
MPI_Test(&all_recv_stop, &stop_signal, MPI_STATUS_IGNORE);
if (stop_signal)
{
MPI_Reduce(&found, NULL, 1, MPI_INT, MPI_SUM, root, MPI_COMM_WORLD);
printf("Rank %d stopping after receiving signal.\n", rank);
break;
}
}
};
MPI_Finalize();
}
虽然这不是最简单的代码,但它应该:
- 不引入额外的阻塞
- 通过实施障碍进行扩展(通常
O(log N)
) - 从发现一个到全部停止的最坏情况延迟为 2 * 循环时间 ( + 1 p2p + 1障碍 + 1 减少)。
- 如果 many/all 排名同时找到解决方案,它仍然有效,但效率可能较低。