MPI主进程收敛循环

MPI master process convergence loop

我正在尝试编写一个 MPI 程序来模拟整个网格的温度流动以达到平衡。我已经使用 openMP pthreads 和 cuda 编写了串行版本和并行版本。

我的目标是并行化计算一维数组的更新温度值的 for 循环。我必须执行并行部分的代码在这里(所有其他变量都在上面初始化):

int nproc, rank,chunksize,leftover,offset,source, tag1=3,tag2=2,tag3=1;

    MPI_Status status;
    MPI_Init(&argc,&argv);
    MPI_Comm_size(MPI_COMM_WORLD,&nproc);
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);
    chunksize = (boxes / (nproc-1));
    leftover = (boxes % (nproc-1));
    if(rank == 0){

        //init dsv
        for(int idx = 0; idx < boxes; idx++){
            temps[idx] = newtemps[idx];
        }

        int stop = 0;
        int iter = 0;
        float max_tmp;
        float min_tmp;

        while(stop != 1){
            offset = 0;
            for (int dest=1; dest<nproc; dest++) {
                int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
                MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
                MPI_Send(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD);
                MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD);
                printf("sent %d temps to process: %d\n",chunk, dest);
                offset = offset + chunk;
            }
            for (int dest=1; dest<nproc; dest++) {
                int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
                MPI_Recv(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD, &status);
                MPI_Recv(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD,&status);
                MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD,&status);
                printf("received %d temps from process: %d\n",chunk, dest);
                printf("status: %d\n",status.MPI_TAG);

            }

            max_tmp = -10000;
            min_tmp =  10000;
            for(idx = 0; idx < boxes; idx++){
                temps[idx] = newtemps[idx];
                if(newtemps[idx] > max_tmp){
                    max_tmp = newtemps[idx];
                }
                if(newtemps[idx] < min_tmp){
                    min_tmp = newtemps[idx];
                }
            }
            stop = (max_tmp - min_tmp) <= (max_tmp * epsilon);
            iter += 1;
        }
    }
    if (rank > 0){
        int chunk = (rank <= leftover ? chunksize + 1 : chunksize);
        MPI_Recv(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD, &status);
        MPI_Recv(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD,&status);
        MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD,&status);
        printf("received %d temps from process: 0\n",chunk);
        printf("status: %d\n",status.MPI_TAG);

        for(int j = offset; j < offset+chunk; j++){
            float weightedtmp = 0;
            int perimeter = 0;
            int num_iters = neighbors[j][0];
            for(int i = 1; i <= num_iters; i++){
                weightedtmp += temps[neighbors[j][i]] * mults[j][i];
                perimeter += mults[j][i];
            }
            weightedtmp /= perimeter;
            newtemps[j] = temps[j] + (weightedtmp - temps[j] ) * affect_rate;
        }
        printf("sent %d temps to process: 0\n",chunk);
        MPI_Send(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD);
        MPI_Send(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD);
        MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD);
    }

    MPI_Finalize();

然而,我的程序成功地完成了 while 循环的第一次迭代并找到了 while 循环的最大值(匹配我的串行版本),然后将 temps、newtemps 和 offset 变量发送到每个进程。在这里,虽然我的程序停止了,并且进程从不打印他们收到了消息。控制台看起来像这样:

[radeymichael@owens-login04 ~]$ mpicc -o ci changeInput.c
[radeymichael@owens-login04 ~]$ mpirun -np 3 ./ci .1 .1
sent 101 temps to process: 1
sent 100 temps to process: 2
received 101 temps from process: 1
status: 1
received 101 temps from process: 0
status: 1
sent 101 temps to process: 0
received 100 temps from process: 0
status: 1
sent 100 temps to process: 0
received 100 temps from process: 2
status: 1
max: 900.000000
sent 101 temps to process: 1
sent 100 temps to process: 2

我花了很多时间试图找出错误,但我认为我缺乏使用 MPI 的基础知识。如果有人能帮我找出我的误解在哪里,我将不胜感激。

问题是,第 0 级在 while 循环内并将发送数据直到 stop=1,而所有其他进程将在最后一个 MPI_Send 之后到达 MPI_Finalize else 部分。一种解决方案(如@Gilles 的评论中所见)是基于 stop 添加一个 while 循环,也适用于所有其他等级,并将 stop 广播给所有该过程由root

    MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);

查看下面的代码。

int nproc, rank,chunksize,leftover,offset,source, tag1=3,tag2=2,tag3=1;

MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nproc);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
chunksize = (boxes / (nproc-1));
leftover = (boxes % (nproc-1));

int stop = 0;

if(rank == 0){

    //init dsv
    for(int idx = 0; idx < boxes; idx++){
        temps[idx] = newtemps[idx];
    }

    int iter = 0;
    float max_tmp;
    float min_tmp;

    while(stop != 1){
        offset = 0;
        for (int dest=1; dest<nproc; dest++) {
            int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
            MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
            MPI_Send(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD);
            MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD);
            printf("sent %d temps to process: %d\n",chunk, dest);
            offset = offset + chunk;
        }
        for (int dest=1; dest<nproc; dest++) {
            int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
            MPI_Recv(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD, &status);
            MPI_Recv(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD,&status);
            MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD,&status);
            printf("received %d temps from process: %d\n",chunk, dest);
            printf("status: %d\n",status.MPI_TAG);

        }

        max_tmp = -10000;
        min_tmp =  10000;
        for(idx = 0; idx < boxes; idx++){
            temps[idx] = newtemps[idx];
            if(newtemps[idx] > max_tmp){
                max_tmp = newtemps[idx];
            }
            if(newtemps[idx] < min_tmp){
                min_tmp = newtemps[idx];
            }
        }
        stop = (max_tmp - min_tmp) <= (max_tmp * epsilon);
        iter += 1;
        MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);

    }
}
if (rank > 0){
    while(stop != 1){
    int chunk = (rank <= leftover ? chunksize + 1 : chunksize);
    MPI_Recv(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD, &status);
    MPI_Recv(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD,&status);
    MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD,&status);
    printf("received %d temps from process: 0\n",chunk);
    printf("status: %d\n",status.MPI_TAG);

    for(int j = offset; j < offset+chunk; j++){
        float weightedtmp = 0;
        int perimeter = 0;
        int num_iters = neighbors[j][0];
        for(int i = 1; i <= num_iters; i++){
            weightedtmp += temps[neighbors[j][i]] * mults[j][i];
            perimeter += mults[j][i];
        }
        weightedtmp /= perimeter;
        newtemps[j] = temps[j] + (weightedtmp - temps[j] ) * affect_rate;
    }
    printf("sent %d temps to process: 0\n",chunk);
    MPI_Send(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD);
    MPI_Send(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD);
    MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD);
    MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
 }

}

MPI_Finalize();