用于可变复杂性任务或可变速度节点的负载平衡 MPI 多线程?

Load balancing MPI multithreading for variable-complexity tasks or variable-speed nodes?

我编写了一个 MPI 代码,该代码目前是多线程的,它通过将每个数组中相同数量的元素发送到不同的进程来完成工作(因此,对于 6 个工作人员,该数组被分成 6 个相等的部分)。我想做的是仅在工作人员准备好接收时才发送小块,并在不阻塞未来发送的情况下接收完成的块;这样,如果一个块需要 10 秒,而其他块需要 1 秒,则可以在等待长块完成时处理其他数据。

这是我整理的一些框架代码:

#include <mpi.h>
#include <iostream>
#include <vector>
#include <cmath>

struct crazytaxi
{
    double a = 10.0;
    double b = 25.2;
    double c = 222.222;
};

int main(int argc, char** argv)
{
    //Initial and temp kanno vectors
    std::vector<crazytaxi> kanno;
    std::vector<crazytaxi> kanno_tmp;

    //init MPI
    MPI_Init(NULL,NULL);

    //allocate vector
    int SZ = 4200;
    kanno.resize(SZ);

    int world_size;
    MPI_Comm_size(MPI_COMM_WORLD,&world_size);

    int world_rank;
    MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);

    if (world_rank == 0)
    {
        for (int i = 0; i < SZ; i++)
            kanno[i].a = 1.0*i;
            kanno[i].b = 10.0/(i+1);
    }

    for (int j = 0; j < 10; j++) {

        //Make sure all processes have same kanno vector;
        if (world_rank == 0) {
            for (int i = 1; i < world_size; i++)
                MPI_Send(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,i,3,MPI_COMM_WORLD);
        } else {
            MPI_Recv(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,0,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
        }

        //copy to tmp vector
        kanno_tmp = kanno;
        MPI_Barrier();

        //the sender
        if (world_rank == 0) {
            unsigned p1 = 0;
            unsigned segment = 10;
            unsigned p2 = segment;
            while (p1 < SZ) {
                for (int i = 0; i < world_size; i++) {
                    //if (process #i is ready to receive)
                        //Send data in chunks of 10 to i
                    //else
                        //continue
                }
            }
        }
        if (world_rank != 0) {
            //Receive data to be processed
            //do some math
            for (unsigned i = p1; i < p2; i++)
                kanno_tmp[i].a = std::sqrt(kanno[i].a)/((double)i+1.0);

            //Send processed data to 0 and wait to receive new data.
        }

        //copy temp vector to kanno
        kanno = kanno_tmp;
    }

    //print some of the results;
    if (world_rank == 0)
    {
        for (int i = 0; i < SZ; i += 40)
            printf("Line %d: %lg,%lg\n",i,kanno[i].a,kanno[i].b);
    }

    MPI_Finalize();
}

除了我的 MPI_Send 和 MPI_Recv 调用会阻塞,或者 'master' 进程不知道 'slave' 进程已准备好接收数据。

MPI 中有没有办法做类似

的事情
unsigned Datapointer = [some_array_index];
while (Datapointer < array_size) {
    if (world_rank == 0) {
        for (int i = 1; i < world_size; i++)
        {
            if (<process i is ready to receive>) {
                MPI_Send([...]);
                Datapointer += 10;
            }
            if (<process i has sent data>)
                MPI_Recv([...]);
            if (Datapointer > array_size) {
                MPI_Bcast([killswitch]);
                break;
            }
        }
    }
}
MPI_Barrier();

或者是否有更有效的方法来为可变复杂性块或可变速度节点构造此结构?

正如@Gilles Gouaillardet,指出这种情况下的关键字是MPI_ANY_SOURCE。使用它,进程可以从任何来源接收消息。要知道哪个进程发送了该消息,您可以在 recv 调用的状态上使用 status.MPI_SOURCE

MPI_Status status;
if(rank == 0) {
  //send initial work to all processes
  while(true) {
    MPI_recv(buf, 32, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    // do the distribution logic
    MPI_send(buf, 32, MPI_INT, status.MPI_SOURCE, tag, MPI_COMM_WORLD);
    // break out of the loop once the work is over and send all the processes 
    message to stop waiting for work
  }
}
else {
  while(true){
    // receive work from rank 0
    MPI_recv(buf, 32, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
    // Perform computation and send back the result
    MPI_send(buf, 32, MPI_INT, 0, tag, MPI_COMM_WORLD);
    //break this until asked by master 0 using some kind of special message
  }
}