MPI_Isend 收到的数据不一致
Inconsistent data received with respect to MPI_Isend
我正在使用 MPI 库在 master 和 worker 之间传递对象数组。由于有很多对象,我在 for 循环中使用 MPI_Isend
然后 MPI_Recv
。除了最后一个 MPI_Recv
之外,一切都运行良好,其中 MPI_Recv
仅接收 MPI_Isent
的最后一个发送元素,此外代码终止没有任何错误。
发送的输入是:
1 2 3 0.4 0 4 5 1
2 4 7 0.678 0 8 1
....
5 8 7 0.56 0 6 1
接收到的输入是:
5 8 7 0.56 0 6 1
5 8 7 0.56 0 6 1
5 8 7 0.56 0 6 1
.... (always the same)
我的问题很简单:为什么?我感觉 MPI 传输中有些东西被覆盖了,但我不知道在哪里以及为什么。我的代码的第一部分似乎没问题,我的问题是在最后一部分。
编辑
根据评论,问题是没有 MPI_Waitall()
验证完成。我必须承认无法理解在哪里插入 MPI_Wait
或 MPI_Waitall
.
main.cpp
MPI_Datatype mpi_gene = MPI_Send_Genes();
MPI_Request reqs_6[size_req], reqs_7[size_req];
MPI_Status status_7[size_req];
if(rank==0){
for(int r=1; r < com; r++){
MPI_Isend(&num_orgs, 1, MPI_INT, r, 0, parallel_0, &reqs_0[r]);
for(int org = 0; org < NEAT::pop_size; org++){
//send some genome information
genome_info g_info;
g_info.generation = pop->organisms[org]->generation;
MPI_Isend(&g_info, 1, mpi_genome_info, r, org, parallel_1, &reqs_1[org]);
//define the genome
NEAT::Genome* sent_genome = pop->organisms[org]->gnome;
//third the genes
std::vector<NEAT::Gene*> gen_genes = sent_genome->genes;
int num_genes = gen_genes.size();
struct genes sent_genes[num_genes];
for(int id = 0; id < num_genes; id++){
if(gen_genes[id]->lnk->linktrait==0){
sent_genes[id].trait_id = 0;
}else{
sent_genes[id].trait_id = gen_genes[id]->lnk->linktrait->trait_id;
}
sent_genes[id].in_node_id = gen_genes[id]->lnk->in_node->node_id;
sent_genes[id].out_node_id = gen_genes[id]->lnk->out_node->node_id;
sent_genes[id].weight = gen_genes[id]->lnk->weight;
sent_genes[id].recurrent = gen_genes[id]->lnk->is_recurrent;
sent_genes[id].innovation_num = gen_genes[id]->innovation_num;
sent_genes[id].mutation_num = gen_genes[id]->mutation_num;
sent_genes[id].enable = gen_genes[id]->enable;
if(id==3){
std::cout <<
sent_genes[id].in_node_id << " " <<
sent_genes[id].out_node_id <<" " <<
sent_genes[id].weight <<" " <<
sent_genes[id].recurrent <<" " <<
sent_genes[id].innovation_num <<" " <<
sent_genes[id].mutation_num <<" " <<
sent_genes[id].enable <<" " <<
std::endl;
}
}
MPI_Isend(&num_genes, 1, MPI_INT, r, org, parallel_6, &reqs_6[org]);
MPI_Isend(&sent_genes, num_genes, mpi_gene, r, org, parallel_7, &reqs_7[org]);
}
}
}
std::cout << "--------------" << std::endl;
if(rank!=0){
std::vector<NEAT::Organism*> local_pop;
int generation;
MPI_Recv(&num_orgs, 1, MPI_INT, 0, 0, parallel_0, &status);
for(int org = 0; org < num_orgs ; org++){
//receive genome information
genome_info rcvd_genome_info;
MPI_Recv(&rcvd_genome_info, 1, mpi_genome_info, 0, org, parallel_1, &status);
generation = rcvd_genome_info.generation;
//receive genes
int num_rcvd_genes;
MPI_Recv(&num_rcvd_genes, 1, MPI_INT, 0, org, parallel_6, &status);
genes rcvd_genes[num_rcvd_genes];
MPI_Recv(&rcvd_genes, num_rcvd_genes, mpi_gene, 0, org, parallel_7, &status);
MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);
std::cout << num_rcvd_genes << std::endl; //this is ok
std::vector<NEAT::Gene*> gen_genes;
for(int id = 0; id < num_rcvd_genes; id++){
genes p_gene = rcvd_genes[id];
if(id==3){//PROBLEM HERE
std::cout << id << " <- "<<
p_gene.in_node_id << " " <<
p_gene.out_node_id <<" " <<
p_gene.weight <<" " <<
p_gene.recurrent <<" " <<
p_gene.innovation_num <<" " <<
p_gene.mutation_num <<" " <<
p_gene.enable <<" " <<
std::endl;
}
}
}
MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);
}
send_genes.cpp
MPI_Datatype MPI_Send_Genes(){
int nitems = 8;
int gene_blocklengths[nitems] = {1,1,1,1,1,1,1,1};
MPI_Datatype gene_types[nitems] = {MPI_INT, MPI_INT, MPI_INT, MPI_DOUBLE, MPI_C_BOOL, MPI_DOUBLE, MPI_DOUBLE, MPI_C_BOOL};
MPI_Datatype mpi_gene;
MPI_Aint gene_offsets[nitems];
gene_offsets[0] = offsetof(genes, trait_id);
gene_offsets[1] = offsetof(genes, in_node_id);
gene_offsets[2] = offsetof(genes, out_node_id);
gene_offsets[3] = offsetof(genes, weight);
gene_offsets[4] = offsetof(genes, recurrent);
gene_offsets[5] = offsetof(genes, innovation_num);
gene_offsets[6] = offsetof(genes, mutation_num);
gene_offsets[7] = offsetof(genes, enable);
MPI_Type_create_struct(nitems, gene_blocklengths, gene_offsets, gene_types, &mpi_gene);
MPI_Type_commit(&mpi_gene);
return mpi_gene;
}
send_genes.h
struct genes{
int trait_id;
int in_node_id;
int out_node_id;
double weight;
bool recurrent;
double innovation_num;
double mutation_num;
bool enable;
};
感谢 Hristo Iliev 和 Gilles Gouaillardet 的提示,我注意到我没有确保每个请求都已正确启动和发送。如果我的理解是正确的,我只需要在每个 MPI_Isend
之后添加一个 MPI_Wait
,它现在可以正常工作了。尽管如此,我必须承认仍然不明白为什么只复制和发送最后一个值。
MPI_Request reqs_0[size_req], reqs_1[size_req], reqs_2[size_req], reqs_3[size_req], reqs_4[size_req], reqs_5[size_req], reqs_6[size_req], reqs_7[size_req];
MPI_Status status_0[size_req], status_1[size_req], status_2[size_req], status_3[size_req], status_4[size_req], status_5[size_req], status_6[size_req], status_7[size_req];
num_cpu = omp_get_max_threads();
MPI_Reduce(&num_cpu, &result, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
if(rank==0){
num_cpu_0 = num_cpu;
if(com==1){
num_cpu_others = 0;
}else{
num_cpu_others = (result-num_cpu_0)/(com-1);
}
}else if(rank!=0){
num_cpu_0 = result - (com-1) * num_cpu;
num_cpu_others = (result-num_cpu_0)/(com-1);
}
if(rank==0){
if(com==1){
num_orgs = NEAT::pop_size;
}else{
num_orgs = (int) num_cpu_others / num_cpu * NEAT::pop_size;
}
}
//SPREAD THE POPULATION ACROSS NODES
if(rank==0){
for(int r=1; r < com; r++){
MPI_Isend(&num_orgs, 1, MPI_INT, r, 0, parallel_0, &reqs_0[r]);
MPI_Wait(&reqs_0[r], &status_0[r]);
for(int org = 0; org < NEAT::pop_size; org++){
//send some genome information
genome_info g_info;
g_info.generation = pop->organisms[org]->generation;
MPI_Isend(&g_info, 1, mpi_genome_info, r, org, parallel_1, &reqs_1[org]);
MPI_Wait(&reqs_1[org], &status_1[org]);
//define the genome
NEAT::Genome* sent_genome = pop->organisms[org]->gnome;
//third the genes
std::vector<NEAT::Gene*> gen_genes = sent_genome->genes;
int num_genes = gen_genes.size();
struct genes sent_genes[num_genes];
for(int id = 0; id < num_genes; id++){
if(gen_genes[id]->lnk->linktrait==0){
sent_genes[id].trait_id = 0;
}else{
sent_genes[id].trait_id = gen_genes[id]->lnk->linktrait->trait_id;
}
sent_genes[id].in_node_id = gen_genes[id]->lnk->in_node->node_id;
sent_genes[id].out_node_id = gen_genes[id]->lnk->out_node->node_id;
sent_genes[id].weight = gen_genes[id]->lnk->weight;
sent_genes[id].recurrent = gen_genes[id]->lnk->is_recurrent;
sent_genes[id].innovation_num = gen_genes[id]->innovation_num;
sent_genes[id].mutation_num = gen_genes[id]->mutation_num;
sent_genes[id].enable = gen_genes[id]->enable;
if((org==3) && (id<3)){
std::cout << "input "<<
sent_genes[id].in_node_id << " " <<
sent_genes[id].out_node_id <<" " <<
sent_genes[id].weight <<" " <<
sent_genes[id].recurrent <<" " <<
sent_genes[id].innovation_num <<" " <<
sent_genes[id].mutation_num <<" " <<
sent_genes[id].enable <<" " <<
std::endl;
}
}
MPI_Isend(&num_genes, 1, MPI_INT, r, org, parallel_6, &reqs_6[org]);
MPI_Wait(&reqs_6[org], &status_6[org]);
MPI_Isend(&sent_genes, num_genes, mpi_gene, r, org, parallel_7, &reqs_7[org]);
MPI_Wait(&reqs_7[org], &status_7[org]);
}
}
}
std::cout << "--------------" << std::endl;
if(rank!=0){
std::vector<NEAT::Organism*> local_pop;
int generation;
MPI_Recv(&num_orgs, 1, MPI_INT, 0, 0, parallel_0, &status);
std::cout << num_orgs << std::endl;
for(int org = 0; org < num_orgs ; org++){
//receive genome information
genome_info rcvd_genome_info;
MPI_Recv(&rcvd_genome_info, 1, mpi_genome_info, 0, org, parallel_1, &status);
generation = rcvd_genome_info.generation;
//receive genes
int num_rcvd_genes;
MPI_Recv(&num_rcvd_genes, 1, MPI_INT, 0, org, parallel_6, &status);
genes rcvd_genes[num_rcvd_genes];
MPI_Recv(&rcvd_genes, num_rcvd_genes, mpi_gene, 0, org, parallel_7, &status);
//MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);
std::vector<NEAT::Gene*> gen_genes;
for(int id = 0; id < num_rcvd_genes; id++){
genes p_gene = rcvd_genes[id];
if((org==3) && (id<3)){
std::cout << "output "<<
p_gene.in_node_id << " " <<
p_gene.out_node_id <<" " <<
p_gene.weight <<" " <<
p_gene.recurrent <<" " <<
p_gene.innovation_num <<" " <<
p_gene.mutation_num <<" " <<
p_gene.enable <<" " <<
std::endl;
}
}
}
//MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);*/
}
MPI_Finalize();
return 0;
}
我正在使用 MPI 库在 master 和 worker 之间传递对象数组。由于有很多对象,我在 for 循环中使用 MPI_Isend
然后 MPI_Recv
。除了最后一个 MPI_Recv
之外,一切都运行良好,其中 MPI_Recv
仅接收 MPI_Isent
的最后一个发送元素,此外代码终止没有任何错误。
发送的输入是:
1 2 3 0.4 0 4 5 1
2 4 7 0.678 0 8 1
....
5 8 7 0.56 0 6 1
接收到的输入是:
5 8 7 0.56 0 6 1
5 8 7 0.56 0 6 1
5 8 7 0.56 0 6 1
.... (always the same)
我的问题很简单:为什么?我感觉 MPI 传输中有些东西被覆盖了,但我不知道在哪里以及为什么。我的代码的第一部分似乎没问题,我的问题是在最后一部分。
编辑
根据评论,问题是没有 MPI_Waitall()
验证完成。我必须承认无法理解在哪里插入 MPI_Wait
或 MPI_Waitall
.
main.cpp
MPI_Datatype mpi_gene = MPI_Send_Genes();
MPI_Request reqs_6[size_req], reqs_7[size_req];
MPI_Status status_7[size_req];
if(rank==0){
for(int r=1; r < com; r++){
MPI_Isend(&num_orgs, 1, MPI_INT, r, 0, parallel_0, &reqs_0[r]);
for(int org = 0; org < NEAT::pop_size; org++){
//send some genome information
genome_info g_info;
g_info.generation = pop->organisms[org]->generation;
MPI_Isend(&g_info, 1, mpi_genome_info, r, org, parallel_1, &reqs_1[org]);
//define the genome
NEAT::Genome* sent_genome = pop->organisms[org]->gnome;
//third the genes
std::vector<NEAT::Gene*> gen_genes = sent_genome->genes;
int num_genes = gen_genes.size();
struct genes sent_genes[num_genes];
for(int id = 0; id < num_genes; id++){
if(gen_genes[id]->lnk->linktrait==0){
sent_genes[id].trait_id = 0;
}else{
sent_genes[id].trait_id = gen_genes[id]->lnk->linktrait->trait_id;
}
sent_genes[id].in_node_id = gen_genes[id]->lnk->in_node->node_id;
sent_genes[id].out_node_id = gen_genes[id]->lnk->out_node->node_id;
sent_genes[id].weight = gen_genes[id]->lnk->weight;
sent_genes[id].recurrent = gen_genes[id]->lnk->is_recurrent;
sent_genes[id].innovation_num = gen_genes[id]->innovation_num;
sent_genes[id].mutation_num = gen_genes[id]->mutation_num;
sent_genes[id].enable = gen_genes[id]->enable;
if(id==3){
std::cout <<
sent_genes[id].in_node_id << " " <<
sent_genes[id].out_node_id <<" " <<
sent_genes[id].weight <<" " <<
sent_genes[id].recurrent <<" " <<
sent_genes[id].innovation_num <<" " <<
sent_genes[id].mutation_num <<" " <<
sent_genes[id].enable <<" " <<
std::endl;
}
}
MPI_Isend(&num_genes, 1, MPI_INT, r, org, parallel_6, &reqs_6[org]);
MPI_Isend(&sent_genes, num_genes, mpi_gene, r, org, parallel_7, &reqs_7[org]);
}
}
}
std::cout << "--------------" << std::endl;
if(rank!=0){
std::vector<NEAT::Organism*> local_pop;
int generation;
MPI_Recv(&num_orgs, 1, MPI_INT, 0, 0, parallel_0, &status);
for(int org = 0; org < num_orgs ; org++){
//receive genome information
genome_info rcvd_genome_info;
MPI_Recv(&rcvd_genome_info, 1, mpi_genome_info, 0, org, parallel_1, &status);
generation = rcvd_genome_info.generation;
//receive genes
int num_rcvd_genes;
MPI_Recv(&num_rcvd_genes, 1, MPI_INT, 0, org, parallel_6, &status);
genes rcvd_genes[num_rcvd_genes];
MPI_Recv(&rcvd_genes, num_rcvd_genes, mpi_gene, 0, org, parallel_7, &status);
MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);
std::cout << num_rcvd_genes << std::endl; //this is ok
std::vector<NEAT::Gene*> gen_genes;
for(int id = 0; id < num_rcvd_genes; id++){
genes p_gene = rcvd_genes[id];
if(id==3){//PROBLEM HERE
std::cout << id << " <- "<<
p_gene.in_node_id << " " <<
p_gene.out_node_id <<" " <<
p_gene.weight <<" " <<
p_gene.recurrent <<" " <<
p_gene.innovation_num <<" " <<
p_gene.mutation_num <<" " <<
p_gene.enable <<" " <<
std::endl;
}
}
}
MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);
}
send_genes.cpp
MPI_Datatype MPI_Send_Genes(){
int nitems = 8;
int gene_blocklengths[nitems] = {1,1,1,1,1,1,1,1};
MPI_Datatype gene_types[nitems] = {MPI_INT, MPI_INT, MPI_INT, MPI_DOUBLE, MPI_C_BOOL, MPI_DOUBLE, MPI_DOUBLE, MPI_C_BOOL};
MPI_Datatype mpi_gene;
MPI_Aint gene_offsets[nitems];
gene_offsets[0] = offsetof(genes, trait_id);
gene_offsets[1] = offsetof(genes, in_node_id);
gene_offsets[2] = offsetof(genes, out_node_id);
gene_offsets[3] = offsetof(genes, weight);
gene_offsets[4] = offsetof(genes, recurrent);
gene_offsets[5] = offsetof(genes, innovation_num);
gene_offsets[6] = offsetof(genes, mutation_num);
gene_offsets[7] = offsetof(genes, enable);
MPI_Type_create_struct(nitems, gene_blocklengths, gene_offsets, gene_types, &mpi_gene);
MPI_Type_commit(&mpi_gene);
return mpi_gene;
}
send_genes.h
struct genes{
int trait_id;
int in_node_id;
int out_node_id;
double weight;
bool recurrent;
double innovation_num;
double mutation_num;
bool enable;
};
感谢 Hristo Iliev 和 Gilles Gouaillardet 的提示,我注意到我没有确保每个请求都已正确启动和发送。如果我的理解是正确的,我只需要在每个 MPI_Isend
之后添加一个 MPI_Wait
,它现在可以正常工作了。尽管如此,我必须承认仍然不明白为什么只复制和发送最后一个值。
MPI_Request reqs_0[size_req], reqs_1[size_req], reqs_2[size_req], reqs_3[size_req], reqs_4[size_req], reqs_5[size_req], reqs_6[size_req], reqs_7[size_req];
MPI_Status status_0[size_req], status_1[size_req], status_2[size_req], status_3[size_req], status_4[size_req], status_5[size_req], status_6[size_req], status_7[size_req];
num_cpu = omp_get_max_threads();
MPI_Reduce(&num_cpu, &result, 1, MPI_INT, MPI_SUM, 0, MPI_COMM_WORLD);
if(rank==0){
num_cpu_0 = num_cpu;
if(com==1){
num_cpu_others = 0;
}else{
num_cpu_others = (result-num_cpu_0)/(com-1);
}
}else if(rank!=0){
num_cpu_0 = result - (com-1) * num_cpu;
num_cpu_others = (result-num_cpu_0)/(com-1);
}
if(rank==0){
if(com==1){
num_orgs = NEAT::pop_size;
}else{
num_orgs = (int) num_cpu_others / num_cpu * NEAT::pop_size;
}
}
//SPREAD THE POPULATION ACROSS NODES
if(rank==0){
for(int r=1; r < com; r++){
MPI_Isend(&num_orgs, 1, MPI_INT, r, 0, parallel_0, &reqs_0[r]);
MPI_Wait(&reqs_0[r], &status_0[r]);
for(int org = 0; org < NEAT::pop_size; org++){
//send some genome information
genome_info g_info;
g_info.generation = pop->organisms[org]->generation;
MPI_Isend(&g_info, 1, mpi_genome_info, r, org, parallel_1, &reqs_1[org]);
MPI_Wait(&reqs_1[org], &status_1[org]);
//define the genome
NEAT::Genome* sent_genome = pop->organisms[org]->gnome;
//third the genes
std::vector<NEAT::Gene*> gen_genes = sent_genome->genes;
int num_genes = gen_genes.size();
struct genes sent_genes[num_genes];
for(int id = 0; id < num_genes; id++){
if(gen_genes[id]->lnk->linktrait==0){
sent_genes[id].trait_id = 0;
}else{
sent_genes[id].trait_id = gen_genes[id]->lnk->linktrait->trait_id;
}
sent_genes[id].in_node_id = gen_genes[id]->lnk->in_node->node_id;
sent_genes[id].out_node_id = gen_genes[id]->lnk->out_node->node_id;
sent_genes[id].weight = gen_genes[id]->lnk->weight;
sent_genes[id].recurrent = gen_genes[id]->lnk->is_recurrent;
sent_genes[id].innovation_num = gen_genes[id]->innovation_num;
sent_genes[id].mutation_num = gen_genes[id]->mutation_num;
sent_genes[id].enable = gen_genes[id]->enable;
if((org==3) && (id<3)){
std::cout << "input "<<
sent_genes[id].in_node_id << " " <<
sent_genes[id].out_node_id <<" " <<
sent_genes[id].weight <<" " <<
sent_genes[id].recurrent <<" " <<
sent_genes[id].innovation_num <<" " <<
sent_genes[id].mutation_num <<" " <<
sent_genes[id].enable <<" " <<
std::endl;
}
}
MPI_Isend(&num_genes, 1, MPI_INT, r, org, parallel_6, &reqs_6[org]);
MPI_Wait(&reqs_6[org], &status_6[org]);
MPI_Isend(&sent_genes, num_genes, mpi_gene, r, org, parallel_7, &reqs_7[org]);
MPI_Wait(&reqs_7[org], &status_7[org]);
}
}
}
std::cout << "--------------" << std::endl;
if(rank!=0){
std::vector<NEAT::Organism*> local_pop;
int generation;
MPI_Recv(&num_orgs, 1, MPI_INT, 0, 0, parallel_0, &status);
std::cout << num_orgs << std::endl;
for(int org = 0; org < num_orgs ; org++){
//receive genome information
genome_info rcvd_genome_info;
MPI_Recv(&rcvd_genome_info, 1, mpi_genome_info, 0, org, parallel_1, &status);
generation = rcvd_genome_info.generation;
//receive genes
int num_rcvd_genes;
MPI_Recv(&num_rcvd_genes, 1, MPI_INT, 0, org, parallel_6, &status);
genes rcvd_genes[num_rcvd_genes];
MPI_Recv(&rcvd_genes, num_rcvd_genes, mpi_gene, 0, org, parallel_7, &status);
//MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);
std::vector<NEAT::Gene*> gen_genes;
for(int id = 0; id < num_rcvd_genes; id++){
genes p_gene = rcvd_genes[id];
if((org==3) && (id<3)){
std::cout << "output "<<
p_gene.in_node_id << " " <<
p_gene.out_node_id <<" " <<
p_gene.weight <<" " <<
p_gene.recurrent <<" " <<
p_gene.innovation_num <<" " <<
p_gene.mutation_num <<" " <<
p_gene.enable <<" " <<
std::endl;
}
}
}
//MPI_Waitall(size_req, &reqs_7[size_req], MPI_STATUSES_IGNORE);*/
}
MPI_Finalize();
return 0;
}