如何将MPI阻塞发送和接收修改为非阻塞
How to modify MPI blocking send and receive to non-blocking
我试图了解在使用 MPI 的并行处理中阻塞和非阻塞消息传递机制之间的区别。假设我们有以下阻塞代码:
#include <stdio.h>
#include <string.h>
#include "mpi.h"
int main (int argc, char* argv[]) {
const int maximum_message_length = 100;
const int rank_0= 0;
char message[maximum_message_length+1];
MPI_Status status; /* Info about receive status */
int my_rank; /* This process ID */
int num_procs; /* Number of processes in run */
int source; /* Process ID to receive from */
int destination; /* Process ID to send to */
int tag = 0; /* Message ID */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
/* clients processes */
if (my_rank != server_rank) {
sprintf(message, "Hello world from process# %d", my_rank);
MPI_Send(message, strlen(message) + 1, MPI_CHAR, rank_0, tag, MPI_COMM_WORLD);
} else {
/* rank 0 process */
for (source = 0; source < num_procs; source++) {
if (source != rank_0) {
MPI_Recv(message, maximum_message_length + 1, MPI_CHAR, source, tag,
MPI_COMM_WORLD,&status);
fprintf(stderr, "%s\n", message);
}
}
}
MPI_Finalize();
}
每个处理器执行其任务并将其发送回 rank_0(接收方)。 rank_0 将 运行 循环从 1 到 n-1 进程并按顺序打印它们(如果当前客户端尚未发送其任务,则循环中的第 i 步可能不会继续)。如何修改此代码以实现使用 MPI_Isend
和 MPI_Irecv
的非阻塞机制?我是否需要删除接收器部分 (rank_0) 中的循环并为每个客户端明确声明 MPI_Irecv(..),即
MPI_Irecv(message, maximum_message_length + 1, MPI_CHAR, source, tag,
MPI_COMM_WORLD,&status);
谢谢。
您使用非阻塞通信所做的是 post 通信,然后立即继续您的程序做其他事情,这又可能是 post 进行更多通信。尤其是,您可以 post 一次接收所有,并等待它们稍后完成。
这是您在此处的场景中通常会执行的操作。
但是请注意,这个特定的设置是一个不好的例子,因为它基本上只是重新实现了一个 MPI_Gather!
以下是您通常在设置中进行非阻塞通信的方式。首先,您需要一些存储空间来存储所有消息,还需要一个请求句柄列表来跟踪非阻塞通信请求,因此您的代码的第一部分需要相应地更改:
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "mpi.h"
int main (int argc, char* argv[]) {
const int maximum_message_length = 100;
const int server_rank = 0;
char message[maximum_message_length+1];
char *allmessages;
MPI_Status *status; /* Info about receive status */
MPI_Request *req; /* Non-Blocking Requests */
int my_rank; /* This process ID */
int num_procs; /* Number of processes in run */
int source; /* Process ID to receive from */
int tag = 0; /* Message ID */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
/* clients processes */
if (my_rank != server_rank) {
sprintf(message, "Hello world from process# %d", my_rank);
MPI_Send(message, maximum_message_length + 1, MPI_CHAR, server_rank,
tag, MPI_COMM_WORLD);
} else {
这里不需要非阻塞发送。现在我们继续并在 server_rank 上收到所有这些消息。我们需要遍历所有这些并为每个存储一个请求句柄:
/* rank 0 process */
allmessages = malloc((maximum_message_length+1)*num_procs);
status = malloc(sizeof(MPI_Status)*num_procs);
req = malloc(sizeof(MPI_Request)*num_procs);
for (source = 0; source < num_procs; source++) {
req[source] = MPI_REQUEST_NULL;
if (source != server_rank) {
/* Post non-blocking receive for source */
MPI_Irecv(allmessages+(source*(maximum_message_length+1)),
maximum_message_length + 1, MPI_CHAR, source, tag,
MPI_COMM_WORLD, req+source);
/* Proceed without waiting on the receive */
/* (posting further receives */
}
}
/* Wait on all communications to complete */
MPI_Waitall(num_procs, req, status);
/* Print the messages in order to the screen */
for (source = 0; source < num_procs; source++) {
if (source != server_rank) {
fprintf(stderr, "%s\n",
allmessages+(source*(maximum_message_length+1)));
}
}
}
MPI_Finalize();
}
在 posting 非阻塞接收之后,我们需要等待它们全部完成,以正确的顺序打印消息。为此,使用 MPI_Waitall,这允许我们阻塞直到所有请求句柄都得到满足。请注意,为了简单起见,我在此处包含 server_rank,但最初将其请求设置为 MPI_REQUEST_NULL,因此它将被忽略。
如果您不关心顺序,您可以在通信可用时立即处理通信,方法是遍历请求并使用 MPI_Waitany。这将 return 一旦任何通信完成,您就可以对相应的数据采取行动。
使用 MPI_Gather 代码将如下所示:
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "mpi.h"
int main (int argc, char* argv[]) {
const int maximum_message_length = 100;
const int server_rank = 0;
char message[maximum_message_length+1];
char *allmessages;
int my_rank; /* This process ID */
int num_procs; /* Number of processes in run */
int source; /* Process ID to receive from */
int tag = 0; /* Message ID */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
if (my_rank == server_rank) {
allmessages = malloc((maximum_message_length+1)*num_procs);
}
sprintf(message, "Hello world from process# %d", my_rank);
MPI_Gather(message, (maximum_message_length+1), MPI_CHAR,
allmessages, (maximum_message_length+1), MPI_CHAR,
server_rank, MPI_COMM_WORLD);
if (my_rank == server_rank) {
/* Print the messages in order to the screen */
for (source = 0; source < num_procs; source++) {
if (source != server_rank) {
fprintf(stderr, "%s\n",
allmessages+(source*(maximum_message_length+1)));
}
}
}
MPI_Finalize();
}
使用 MPI-3,您甚至可以使用非阻塞 MPI_Igather。
如果您不关心顺序,最后一部分(从 MPI_Waitall 开始)可以用 MPI_Waitany 完成,如下所示:
for (i = 0; i < num_procs-1; i++) {
/* Wait on any next communication to complete */
MPI_Waitany(num_procs, req, &source, status);
fprintf(stderr, "%s\n",
allmessages+(source*(maximum_message_length+1)));
}
我试图了解在使用 MPI 的并行处理中阻塞和非阻塞消息传递机制之间的区别。假设我们有以下阻塞代码:
#include <stdio.h>
#include <string.h>
#include "mpi.h"
int main (int argc, char* argv[]) {
const int maximum_message_length = 100;
const int rank_0= 0;
char message[maximum_message_length+1];
MPI_Status status; /* Info about receive status */
int my_rank; /* This process ID */
int num_procs; /* Number of processes in run */
int source; /* Process ID to receive from */
int destination; /* Process ID to send to */
int tag = 0; /* Message ID */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
/* clients processes */
if (my_rank != server_rank) {
sprintf(message, "Hello world from process# %d", my_rank);
MPI_Send(message, strlen(message) + 1, MPI_CHAR, rank_0, tag, MPI_COMM_WORLD);
} else {
/* rank 0 process */
for (source = 0; source < num_procs; source++) {
if (source != rank_0) {
MPI_Recv(message, maximum_message_length + 1, MPI_CHAR, source, tag,
MPI_COMM_WORLD,&status);
fprintf(stderr, "%s\n", message);
}
}
}
MPI_Finalize();
}
每个处理器执行其任务并将其发送回 rank_0(接收方)。 rank_0 将 运行 循环从 1 到 n-1 进程并按顺序打印它们(如果当前客户端尚未发送其任务,则循环中的第 i 步可能不会继续)。如何修改此代码以实现使用 MPI_Isend
和 MPI_Irecv
的非阻塞机制?我是否需要删除接收器部分 (rank_0) 中的循环并为每个客户端明确声明 MPI_Irecv(..),即
MPI_Irecv(message, maximum_message_length + 1, MPI_CHAR, source, tag,
MPI_COMM_WORLD,&status);
谢谢。
您使用非阻塞通信所做的是 post 通信,然后立即继续您的程序做其他事情,这又可能是 post 进行更多通信。尤其是,您可以 post 一次接收所有,并等待它们稍后完成。 这是您在此处的场景中通常会执行的操作。
但是请注意,这个特定的设置是一个不好的例子,因为它基本上只是重新实现了一个 MPI_Gather!
以下是您通常在设置中进行非阻塞通信的方式。首先,您需要一些存储空间来存储所有消息,还需要一个请求句柄列表来跟踪非阻塞通信请求,因此您的代码的第一部分需要相应地更改:
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "mpi.h"
int main (int argc, char* argv[]) {
const int maximum_message_length = 100;
const int server_rank = 0;
char message[maximum_message_length+1];
char *allmessages;
MPI_Status *status; /* Info about receive status */
MPI_Request *req; /* Non-Blocking Requests */
int my_rank; /* This process ID */
int num_procs; /* Number of processes in run */
int source; /* Process ID to receive from */
int tag = 0; /* Message ID */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
/* clients processes */
if (my_rank != server_rank) {
sprintf(message, "Hello world from process# %d", my_rank);
MPI_Send(message, maximum_message_length + 1, MPI_CHAR, server_rank,
tag, MPI_COMM_WORLD);
} else {
这里不需要非阻塞发送。现在我们继续并在 server_rank 上收到所有这些消息。我们需要遍历所有这些并为每个存储一个请求句柄:
/* rank 0 process */
allmessages = malloc((maximum_message_length+1)*num_procs);
status = malloc(sizeof(MPI_Status)*num_procs);
req = malloc(sizeof(MPI_Request)*num_procs);
for (source = 0; source < num_procs; source++) {
req[source] = MPI_REQUEST_NULL;
if (source != server_rank) {
/* Post non-blocking receive for source */
MPI_Irecv(allmessages+(source*(maximum_message_length+1)),
maximum_message_length + 1, MPI_CHAR, source, tag,
MPI_COMM_WORLD, req+source);
/* Proceed without waiting on the receive */
/* (posting further receives */
}
}
/* Wait on all communications to complete */
MPI_Waitall(num_procs, req, status);
/* Print the messages in order to the screen */
for (source = 0; source < num_procs; source++) {
if (source != server_rank) {
fprintf(stderr, "%s\n",
allmessages+(source*(maximum_message_length+1)));
}
}
}
MPI_Finalize();
}
在 posting 非阻塞接收之后,我们需要等待它们全部完成,以正确的顺序打印消息。为此,使用 MPI_Waitall,这允许我们阻塞直到所有请求句柄都得到满足。请注意,为了简单起见,我在此处包含 server_rank,但最初将其请求设置为 MPI_REQUEST_NULL,因此它将被忽略。 如果您不关心顺序,您可以在通信可用时立即处理通信,方法是遍历请求并使用 MPI_Waitany。这将 return 一旦任何通信完成,您就可以对相应的数据采取行动。
使用 MPI_Gather 代码将如下所示:
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include "mpi.h"
int main (int argc, char* argv[]) {
const int maximum_message_length = 100;
const int server_rank = 0;
char message[maximum_message_length+1];
char *allmessages;
int my_rank; /* This process ID */
int num_procs; /* Number of processes in run */
int source; /* Process ID to receive from */
int tag = 0; /* Message ID */
MPI_Init(&argc, &argv);
MPI_Comm_rank(MPI_COMM_WORLD, &my_rank);
MPI_Comm_size(MPI_COMM_WORLD, &num_procs);
if (my_rank == server_rank) {
allmessages = malloc((maximum_message_length+1)*num_procs);
}
sprintf(message, "Hello world from process# %d", my_rank);
MPI_Gather(message, (maximum_message_length+1), MPI_CHAR,
allmessages, (maximum_message_length+1), MPI_CHAR,
server_rank, MPI_COMM_WORLD);
if (my_rank == server_rank) {
/* Print the messages in order to the screen */
for (source = 0; source < num_procs; source++) {
if (source != server_rank) {
fprintf(stderr, "%s\n",
allmessages+(source*(maximum_message_length+1)));
}
}
}
MPI_Finalize();
}
使用 MPI-3,您甚至可以使用非阻塞 MPI_Igather。
如果您不关心顺序,最后一部分(从 MPI_Waitall 开始)可以用 MPI_Waitany 完成,如下所示:
for (i = 0; i < num_procs-1; i++) {
/* Wait on any next communication to complete */
MPI_Waitany(num_procs, req, &source, status);
fprintf(stderr, "%s\n",
allmessages+(source*(maximum_message_length+1)));
}