MPI - 使用多线程来监听传入的消息
MPI - Use multiple threads to listen for incoming messages
我正在从事一个使用 MPI 例程和多线程发送和接收消息的项目。我希望每个接收线程都专注于不同的传入消息,而不是让两个或更多线程尝试接收同一个消息。有办法实现吗?
我不知道这是否有帮助,但我目前正在使用 Iprobe() 检查传入消息,并使用 Irecv() 和 Test() 检查线程是否已收到整个消息。
如果我对你的理解正确的话,重要的不是你如何接收信息而是你如何发送信息。正如您在下面看到的 MPI_Send
函数有 destination
参数,它定义了这条消息将被发送到哪个线程。
MPI_Send(
void* data,
int count,
MPI_Datatype datatype,
int destination,
int tag,
MPI_Comm communicator)
因此,如果您想让某些线程接收某些消息,您必须只将此消息发送到该线程。
从标准的第 3 版开始,MPI 允许从消息队列中删除匹配的消息,以便后续 probes/receives 不再可见它们。这是使用所谓的 匹配探针 完成的。将MPI_Iprobe
换成MPI_Improbe
即可,即非阻塞匹配探测操作:
int flag;
MPI_Status status;
MPI_Message msg;
MPI_Improbe(source, tag, comm, &flag, &msg, &status);
一旦 MPI_Improbe
returns 1 in flag
,匹配 (source, tag, comm)
的消息已到达。消息的句柄存储到 msg
中,消息从队列中删除。随后的探测或接收具有匹配的(源、标签、通信)三元组 - 通过相同的线程或在另一个线程中 - 将不会再次看到相同的消息,因此不会干扰最初匹配它的线程的接收。
要接收匹配的消息,请使用 MPI_Imrecv
(或阻止 MPI_Mrecv
):
MPI_Request req;
MPI_Imrecv(buffer, count, dtype, &msg, &req);
do
{
...
MPI_Test(&req, &flag, &status);
}
while (!flag);
3.0 之前的 MPI 版本不提供类似的功能。但是,如果我理解正确的话,你只需要保证在 MPI_Irecv
有机会从队列中删除消息之前不会发布任何匹配的探测(这是匹配探测+接收的目的是为了防止) .如果您在主线程中进行探测,然后将消息分派给不同的线程,那么您可以使用信号量来延迟主线程执行下一个探测,直到工作程序发出 MPI_Irecv
。如果您有多个线程执行探测+接收,那么您可以简单地在同一关键部分发出 MPI_Irecv
调用(或您使用的任何同步原语来实现 MPI_THREAD_SERIALIZED
要求的 MPI 调用序列化) ) 作为 MPI_Iprobe
一旦探测成功:
// Worker thread
CRITICAL(mpi)
{
MPI_Iprobe(source, tag, comm, &flag, &status);
if (flag)
MPI_Irecv(buffer, count, dtype, status.MPI_SOURCE, status.MPI_TAG, comm, &req);
}
用您的编程环境提供的任何原语替换 CRITICAL(name) { ... }
符号。
我正在从事一个使用 MPI 例程和多线程发送和接收消息的项目。我希望每个接收线程都专注于不同的传入消息,而不是让两个或更多线程尝试接收同一个消息。有办法实现吗?
我不知道这是否有帮助,但我目前正在使用 Iprobe() 检查传入消息,并使用 Irecv() 和 Test() 检查线程是否已收到整个消息。
如果我对你的理解正确的话,重要的不是你如何接收信息而是你如何发送信息。正如您在下面看到的 MPI_Send
函数有 destination
参数,它定义了这条消息将被发送到哪个线程。
MPI_Send(
void* data,
int count,
MPI_Datatype datatype,
int destination,
int tag,
MPI_Comm communicator)
因此,如果您想让某些线程接收某些消息,您必须只将此消息发送到该线程。
从标准的第 3 版开始,MPI 允许从消息队列中删除匹配的消息,以便后续 probes/receives 不再可见它们。这是使用所谓的 匹配探针 完成的。将MPI_Iprobe
换成MPI_Improbe
即可,即非阻塞匹配探测操作:
int flag;
MPI_Status status;
MPI_Message msg;
MPI_Improbe(source, tag, comm, &flag, &msg, &status);
一旦 MPI_Improbe
returns 1 in flag
,匹配 (source, tag, comm)
的消息已到达。消息的句柄存储到 msg
中,消息从队列中删除。随后的探测或接收具有匹配的(源、标签、通信)三元组 - 通过相同的线程或在另一个线程中 - 将不会再次看到相同的消息,因此不会干扰最初匹配它的线程的接收。
要接收匹配的消息,请使用 MPI_Imrecv
(或阻止 MPI_Mrecv
):
MPI_Request req;
MPI_Imrecv(buffer, count, dtype, &msg, &req);
do
{
...
MPI_Test(&req, &flag, &status);
}
while (!flag);
3.0 之前的 MPI 版本不提供类似的功能。但是,如果我理解正确的话,你只需要保证在 MPI_Irecv
有机会从队列中删除消息之前不会发布任何匹配的探测(这是匹配探测+接收的目的是为了防止) .如果您在主线程中进行探测,然后将消息分派给不同的线程,那么您可以使用信号量来延迟主线程执行下一个探测,直到工作程序发出 MPI_Irecv
。如果您有多个线程执行探测+接收,那么您可以简单地在同一关键部分发出 MPI_Irecv
调用(或您使用的任何同步原语来实现 MPI_THREAD_SERIALIZED
要求的 MPI 调用序列化) ) 作为 MPI_Iprobe
一旦探测成功:
// Worker thread
CRITICAL(mpi)
{
MPI_Iprobe(source, tag, comm, &flag, &status);
if (flag)
MPI_Irecv(buffer, count, dtype, status.MPI_SOURCE, status.MPI_TAG, comm, &req);
}
用您的编程环境提供的任何原语替换 CRITICAL(name) { ... }
符号。