MPI_Isend 收到的数据不一致

Inconsistent data received with respect to MPI_Isend

我正在使用 MPI 库在 master 和 worker 之间传递对象数组。由于有很多对象,我在 for 循环中使用 MPI_Isend 然后 MPI_Recv。除了最后一个 MPI_Recv 之外,一切都运行良好,其中 MPI_Recv 仅接收 MPI_Isent 的最后一个发送元素,此外代码终止没有任何错误。

发送的输入是:

1 2 3 0.4 0 4 5 1 
2 4 7 0.678 0 8 1
....
5 8 7 0.56 0 6 1

接收到的输入是:

5 8 7 0.56 0 6 1
5 8 7 0.56 0 6 1
5 8 7 0.56 0 6 1
.... (always the same) 

我的问题很简单:为什么?我感觉 MPI 传输中有些东西被覆盖了,但我不知道在哪里以及为什么。我的代码的第一部分似乎没问题,我的问题是在最后一部分。

编辑

根据评论,问题是没有 MPI_Waitall() 验证完成。我必须承认无法理解在哪里插入 MPI_WaitMPI_Waitall.

main.cpp

MPI_Datatype mpi_gene = MPI_Send_Genes();
MPI_Request reqs_6[size_req], reqs_7[size_req]; 
MPI_Status status_7[size_req];

if(rank==0){

    for(int r=1; r < com; r++){
      
      MPI_Isend(&num_orgs, 1, MPI_INT, r, 0, parallel_0, &reqs_0[r]);

      for(int org = 0; org < NEAT::pop_size; org++){

        //send some genome information
        genome_info g_info;
        g_info.generation = pop->organisms[org]->generation;
        MPI_Isend(&g_info, 1, mpi_genome_info, r, org, parallel_1, &reqs_1[org]);

        //define the genome
        NEAT::Genome* sent_genome = pop->organisms[org]->gnome;

        //third the genes
        std::vector<NEAT::Gene*> gen_genes = sent_genome->genes;
        int num_genes = gen_genes.size();
        struct genes sent_genes[num_genes];

        for(int id = 0; id < num_genes; id++){
          if(gen_genes[id]->lnk->linktrait==0){
            sent_genes[id].trait_id = 0;
          }else{
            sent_genes[id].trait_id = gen_genes[id]->lnk->linktrait->trait_id;
          }
          sent_genes[id].in_node_id = gen_genes[id]->lnk->in_node->node_id;
          sent_genes[id].out_node_id = gen_genes[id]->lnk->out_node->node_id;
          sent_genes[id].weight = gen_genes[id]->lnk->weight;
          sent_genes[id].recurrent = gen_genes[id]->lnk->is_recurrent;
          sent_genes[id].innovation_num = gen_genes[id]->innovation_num;
          sent_genes[id].mutation_num = gen_genes[id]->mutation_num;
          sent_genes[id].enable = gen_genes[id]->enable;

          if(id==3){
            std::cout << 
          sent_genes[id].in_node_id << " " <<
          sent_genes[id].out_node_id <<" " <<
          sent_genes[id].weight <<" " <<
          sent_genes[id].recurrent <<" " <<
          sent_genes[id].innovation_num <<" " <<
          sent_genes[id].mutation_num <<" " <<
          sent_genes[id].enable <<" " <<
          std::endl; 
          }
        }

        MPI_Isend(&num_genes, 1, MPI_INT, r, org, parallel_6, &reqs_6[org]);
        MPI_Isend(&sent_genes, num_genes, mpi_gene, r, org, parallel_7, &reqs_7[org]); 
      }
    }
  }
  
  std::cout << "--------------" << std::endl; 

    if(rank!=0){

      std::vector<NEAT::Organism*> local_pop;
      int generation;
      MPI_Recv(&num_orgs, 1, MPI_INT, 0, 0, parallel_0, &status);
      
      for(int org = 0; org < num_orgs ; org++){

        //receive genome information
        genome_info rcvd_genome_info;
        MPI_Recv(&rcvd_genome_info, 1, mpi_genome_info, 0, org, parallel_1, &status);
        generation = rcvd_genome_info.generation; 

        //receive genes 
        int num_rcvd_genes;
        MPI_Recv(&num_rcvd_genes, 1, MPI_INT, 0, org, parallel_6, &status);
        genes rcvd_genes[num_rcvd_genes];
        MPI_Recv(&rcvd_genes, num_rcvd_genes, mpi_gene, 0, org, parallel_7, &status);
        MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);
        std::cout << num_rcvd_genes << std::endl; //this is ok
        std::vector<NEAT::Gene*> gen_genes;
        for(int id = 0; id < num_rcvd_genes; id++){
          genes p_gene = rcvd_genes[id];

          if(id==3){//PROBLEM HERE
            std::cout << id << " <- "<<
            p_gene.in_node_id << " " <<
            p_gene.out_node_id <<" " <<
            p_gene.weight <<" " <<
            p_gene.recurrent <<" " <<
            p_gene.innovation_num <<" " <<
            p_gene.mutation_num <<" " <<
            p_gene.enable <<" " <<
            std::endl; 
          }
        }
      }
      MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);
    }

send_genes.cpp

MPI_Datatype MPI_Send_Genes(){

  int nitems = 8;
  int gene_blocklengths[nitems] = {1,1,1,1,1,1,1,1};
  MPI_Datatype gene_types[nitems] = {MPI_INT, MPI_INT, MPI_INT, MPI_DOUBLE, MPI_C_BOOL, MPI_DOUBLE, MPI_DOUBLE, MPI_C_BOOL};
  MPI_Datatype mpi_gene;
  MPI_Aint gene_offsets[nitems];

  gene_offsets[0] = offsetof(genes, trait_id);
  gene_offsets[1] = offsetof(genes, in_node_id);
  gene_offsets[2] = offsetof(genes, out_node_id);
  gene_offsets[3] = offsetof(genes, weight);
  gene_offsets[4] = offsetof(genes, recurrent);
  gene_offsets[5] = offsetof(genes, innovation_num);
  gene_offsets[6] = offsetof(genes, mutation_num);
  gene_offsets[7] = offsetof(genes, enable);

  MPI_Type_create_struct(nitems, gene_blocklengths, gene_offsets, gene_types, &mpi_gene);
  MPI_Type_commit(&mpi_gene);

  return mpi_gene;   
}

send_genes.h

struct genes{
  int trait_id;
  int in_node_id;
  int out_node_id;
  double weight;
  bool recurrent;
  double innovation_num;
  double mutation_num;
  bool enable;
};

感谢 Hristo Iliev 和 Gilles Gouaillardet 的提示,我注意到我没有确保每个请求都已正确启动和发送。如果我的理解是正确的,我只需要在每个 MPI_Isend 之后添加一个 MPI_Wait,它现在可以正常工作了。尽管如此,我必须承认仍然不明白为什么只复制和发送最后一个值。

  MPI_Request reqs_0[size_req], reqs_1[size_req], reqs_2[size_req], reqs_3[size_req], reqs_4[size_req], reqs_5[size_req], reqs_6[size_req], reqs_7[size_req];
  MPI_Status status_0[size_req], status_1[size_req], status_2[size_req], status_3[size_req], status_4[size_req], status_5[size_req], status_6[size_req], status_7[size_req];

  num_cpu = omp_get_max_threads();
  MPI_Reduce(&num_cpu, &result, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
  if(rank==0){
    num_cpu_0 = num_cpu;
    if(com==1){
      num_cpu_others = 0;
    }else{
      num_cpu_others = (result-num_cpu_0)/(com-1);
    }
  }else if(rank!=0){
    num_cpu_0 = result - (com-1) * num_cpu;
    num_cpu_others = (result-num_cpu_0)/(com-1);  
  }

  if(rank==0){
    if(com==1){
      num_orgs = NEAT::pop_size;
    }else{
      num_orgs = (int) num_cpu_others / num_cpu * NEAT::pop_size;
    }
  }

  //SPREAD THE POPULATION ACROSS NODES
  if(rank==0){

    for(int r=1; r < com; r++){

      MPI_Isend(&num_orgs, 1, MPI_INT, r, 0, parallel_0, &reqs_0[r]);
      MPI_Wait(&reqs_0[r], &status_0[r]);

      for(int org = 0; org < NEAT::pop_size; org++){

        //send some genome information
        genome_info g_info;
        g_info.generation = pop->organisms[org]->generation;
        MPI_Isend(&g_info, 1, mpi_genome_info, r, org, parallel_1, &reqs_1[org]);
        MPI_Wait(&reqs_1[org], &status_1[org]);

        //define the genome
        NEAT::Genome* sent_genome = pop->organisms[org]->gnome;

        //third the genes
        std::vector<NEAT::Gene*> gen_genes = sent_genome->genes;
        int num_genes = gen_genes.size();
        struct genes sent_genes[num_genes];

        for(int id = 0; id < num_genes; id++){
          if(gen_genes[id]->lnk->linktrait==0){
            sent_genes[id].trait_id = 0;
          }else{
            sent_genes[id].trait_id = gen_genes[id]->lnk->linktrait->trait_id;
          }
          sent_genes[id].in_node_id = gen_genes[id]->lnk->in_node->node_id;
          sent_genes[id].out_node_id = gen_genes[id]->lnk->out_node->node_id;
          sent_genes[id].weight = gen_genes[id]->lnk->weight;
          sent_genes[id].recurrent = gen_genes[id]->lnk->is_recurrent;
          sent_genes[id].innovation_num = gen_genes[id]->innovation_num;
          sent_genes[id].mutation_num = gen_genes[id]->mutation_num;
          sent_genes[id].enable = gen_genes[id]->enable;

          if((org==3) && (id<3)){
            std::cout << "input "<<
          sent_genes[id].in_node_id << " " <<
          sent_genes[id].out_node_id <<" " <<
          sent_genes[id].weight <<" " <<
          sent_genes[id].recurrent <<" " <<
          sent_genes[id].innovation_num <<" " <<
          sent_genes[id].mutation_num <<" " <<
          sent_genes[id].enable <<" " <<
          std::endl; 
          }
        }

        MPI_Isend(&num_genes, 1, MPI_INT, r, org, parallel_6, &reqs_6[org]);
        MPI_Wait(&reqs_6[org], &status_6[org]);
        MPI_Isend(&sent_genes, num_genes, mpi_gene, r, org, parallel_7, &reqs_7[org]); 
        MPI_Wait(&reqs_7[org], &status_7[org]);
      }
    }
  }
  
  std::cout << "--------------" << std::endl; 

    if(rank!=0){

      std::vector<NEAT::Organism*> local_pop;
      int generation;
      MPI_Recv(&num_orgs, 1, MPI_INT, 0, 0, parallel_0, &status);
      std::cout << num_orgs << std::endl;
      
      for(int org = 0; org < num_orgs ; org++){

        //receive genome information
        genome_info rcvd_genome_info;
        MPI_Recv(&rcvd_genome_info, 1, mpi_genome_info, 0, org, parallel_1, &status);
        generation = rcvd_genome_info.generation; 

        //receive genes
        int num_rcvd_genes;
        MPI_Recv(&num_rcvd_genes, 1, MPI_INT, 0, org, parallel_6, &status);
        genes rcvd_genes[num_rcvd_genes];
        MPI_Recv(&rcvd_genes, num_rcvd_genes, mpi_gene, 0, org, parallel_7, &status);

        //MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);

        std::vector<NEAT::Gene*> gen_genes;
        for(int id = 0; id < num_rcvd_genes; id++){
          genes p_gene = rcvd_genes[id];

          if((org==3) && (id<3)){
            std::cout << "output "<<
            p_gene.in_node_id << " " <<
            p_gene.out_node_id <<" " <<
            p_gene.weight <<" " <<
            p_gene.recurrent <<" " <<
            p_gene.innovation_num <<" " <<
            p_gene.mutation_num <<" " <<
            p_gene.enable <<" " <<
            std::endl; 
          }
        }
      }
      //MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);*/
    }

  MPI_Finalize();
  return 0;
}