MPI:从从属进程中渗透子数组以更新 root/master 进程中的主数组
MPI : Percolate a subarray from slave processes to update main array in root/master process
对于相当长的代码,我深表歉意。我是 MPI 的新手,在尝试并行化排序算法时遇到问题。我基本上有一个巨大的 int 数组,我必须通过将它们分成相等的部分给从属进程来更快地排序。从属进程完成后,它们必须 return 他们的子数组到根进程(合并)可以进行进一步处理。
因此:解决方案必须是用半排序子数组修补的全局数组。
我已经尝试查看论坛中以前的问题,但我仍然坚持整个 day.Please 不要对我太苛刻,因为我在 MPI 中仍然是新手。
void Batcher( char *inputFile, string outputFile, int N ){
int initialised;
MPI_Initialized(&initialised);
//int rank;
if (!initialised)
{
MPI_Init(NULL, NULL);
atexit(finalized);
}
//
//BEGIN : FILE READING FOR ARRAY
//
//Get number of processes
int world_size;
int root = 0;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
N=world_size;
//Get the rank of the process
int rank;
MPI_Comm_rank(MPI_COMM_WORLD , &rank);
int *global_array;
int *sub_array_per_process;
int element_count;
if ( rank == root ){
// ifstream input;
ofstream output;
std::ifstream input( inputFile, std::ifstream::in);
//get number of integers in the file so they may be populates to array
int counter=0;
string line;
while( std::getline(input,line)){
if ( line !="") ++counter;
}
int numbers[counter];
global_array = new int [counter];
global_array = &numbers[0];
int current;
int index = 0;
//reset read pointer to beginning of input file
input.clear();
input.seekg(0, ios::beg);
//get number from inputfile and add to array numbers
while( input >> current){
numbers[index]=current;
index++;
// cout<<((int) a);
}
global_array = numbers;
for(int i=0; i<counter; i++)
global_array[i]=numbers[i];//<<endl;
for(int i=0; i<counter; i++)
cout<<"global "<< global_array[i]<< " numbers " <<numbers[i] <<endl;
element_count = counter;
input.close();
/*
Send tasks to slaves */
int NON = element_count / (N - 1 );
for(int i=root+1; i< world_size ; i++){
int start = get_preceeding_ranks(i )*NON;
// cout<<"start is "<< start <<endl;
MPI_Send(&global_array[start], NON, MPI_INT,i, 1 ,MPI_COMM_WORLD);
}
MPI_Status status;
int temp[counter];
} // End root process operation
MPI_Bcast(&element_count, 1, MPI_INT,root,MPI_COMM_WORLD );
int NON = element_count / (N - 1 );
//Recieve local su-job from root
if ( rank != root ){
MPI_Status status;
MPI_Recv(sub_array_per_process,NON, MPI_INT , root, 1 , MPI_COMM_WORLD ,&status );
}
int n_per_small_chunk = sizeof(sub_array_per_process) / sizeof(sub_array_per_process[0]) ;
oddEvenMergeSort(sub_array_per_process,0, n_per_small_chunk);
cout<<"After sorting processwise sorting.... "<<endl;
if ( rank != root ){
for(int i=0;i<NON;i++)
cout<<"rank : "<< rank <<" data = "<< sub_array_per_process[ i] << endl;
}
// MPI_Bcast(global_array, element_count , MPI_INT,root,MPI_COMM_WORLD );
//sub_array_per_process = new int[2];
MPI_Barrier(MPI_COMM_WORLD ) ;
if (rank == root ){
int start ;
int sender = -1;
for ( int i= 0; i< N; i++){
start = get_preceeding_ranks(i+1 )*NON;
MPI_Status status;
cout<<" start = "<<start<<endl;
sender = i+1;
MPI_Recv(&global_array[start], NON , MPI_INT , sender , 1, MPI_COMM_WORLD ,&status );
cout<<" Received " << global_array[start] <<" from " << sender << endl
;
// MPI_Bcast(global_array, elem , MPI_INT,root,MPI_COMM_WORLD );
}
for ( int j=0; j< element_count; j++ )
cout<<" Received " << global_array[j] <<" from " <<sender << endl;
}
else { //Send to root your sub-array..
// for (int j=1 ; j < N; i++ )
for ( int i=0;i<NON; i++)
MPI_Send(&sub_array_per_process[i], NON , MPI_INT,0, 1 ,MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD ) ;
for ( int j=0; j< element_count; j++ )
cout<<" iOutside " << global_array[j] <<" from "<< endl;
MPI_Finalize();
}
int main() {
string output;//Dummy
char* inputFile ="inputsmall.txt";
int N=0; //Dummy
Batcher( inputFile, output, N );
return 0 ;
}
由于代码比较长,特地卡在这里了:-
if (rank == root ){
int start ;
int sender = -1;
for ( int i= 0; i< N; i++){ //get all submitted sub-arrays from slaves.
start = get_preceeding_ranks(i+1 )*NON;
MPI_Status status;
cout<<" start = "<<start<<endl;
sender = i+1;
MPI_Recv(&global_array[start], NON , MPI_INT , sender , 1, MPI_COMM_WORLD ,&status );
cout<<" Received " << global_array[start] <<" from " << sender << endl
;
}
for ( int j=0; j< element_count; j++ )
cout<<" Received " << global_array[j] <<" from " <<sender << endl;
}
else { //Send to root your sub-array..
for ( int i=0;i<NON; i++)
MPI_Send(&sub_array_per_process[i], NON , MPI_INT,0, 1 ,MPI_COMM_WORLD);
}
我得到的输出是:-
start = 2
Received 2 from 1
start = 4
Received 3 from 2
start = 6
Received 4 from 3
start = 8
[tux2:25061] *** Process received signal ***
[tux2:25061] Signal: Segmentation fault (11)
[tux2:25061] Signal code: (128)
[tux2:25061] Failing at address: (nil)
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 25061 on node tux2 exited on signal 11 (Segmentation fault).
--------------------------------------------------------------------------
从您的代码中我可以看出,当 rank
等于 root
时,您正在从从属进程接收排序后的数组。问题在于您的 for
循环。您正在尝试接收从 i+1
到 N
的所有进程,而您的 Communicator 只有 N
个进程(这是因为您已经分配了 N = world_size
)。因此你的奴隶是1..N-1
。因此,您需要按如下方式更改 for
语句:
for ( int i= 0; i< (N-1); i++){ //To receive data from process i=1 to i=N-1
另外请注意,您的代码难以调试和管理。您会发现使用 MPI_Scatter
和 MPI_Gather
很容易。看看this tutorial
对于相当长的代码,我深表歉意。我是 MPI 的新手,在尝试并行化排序算法时遇到问题。我基本上有一个巨大的 int 数组,我必须通过将它们分成相等的部分给从属进程来更快地排序。从属进程完成后,它们必须 return 他们的子数组到根进程(合并)可以进行进一步处理。 因此:解决方案必须是用半排序子数组修补的全局数组。 我已经尝试查看论坛中以前的问题,但我仍然坚持整个 day.Please 不要对我太苛刻,因为我在 MPI 中仍然是新手。
void Batcher( char *inputFile, string outputFile, int N ){
int initialised;
MPI_Initialized(&initialised);
//int rank;
if (!initialised)
{
MPI_Init(NULL, NULL);
atexit(finalized);
}
//
//BEGIN : FILE READING FOR ARRAY
//
//Get number of processes
int world_size;
int root = 0;
MPI_Comm_size(MPI_COMM_WORLD, &world_size);
N=world_size;
//Get the rank of the process
int rank;
MPI_Comm_rank(MPI_COMM_WORLD , &rank);
int *global_array;
int *sub_array_per_process;
int element_count;
if ( rank == root ){
// ifstream input;
ofstream output;
std::ifstream input( inputFile, std::ifstream::in);
//get number of integers in the file so they may be populates to array
int counter=0;
string line;
while( std::getline(input,line)){
if ( line !="") ++counter;
}
int numbers[counter];
global_array = new int [counter];
global_array = &numbers[0];
int current;
int index = 0;
//reset read pointer to beginning of input file
input.clear();
input.seekg(0, ios::beg);
//get number from inputfile and add to array numbers
while( input >> current){
numbers[index]=current;
index++;
// cout<<((int) a);
}
global_array = numbers;
for(int i=0; i<counter; i++)
global_array[i]=numbers[i];//<<endl;
for(int i=0; i<counter; i++)
cout<<"global "<< global_array[i]<< " numbers " <<numbers[i] <<endl;
element_count = counter;
input.close();
/*
Send tasks to slaves */
int NON = element_count / (N - 1 );
for(int i=root+1; i< world_size ; i++){
int start = get_preceeding_ranks(i )*NON;
// cout<<"start is "<< start <<endl;
MPI_Send(&global_array[start], NON, MPI_INT,i, 1 ,MPI_COMM_WORLD);
}
MPI_Status status;
int temp[counter];
} // End root process operation
MPI_Bcast(&element_count, 1, MPI_INT,root,MPI_COMM_WORLD );
int NON = element_count / (N - 1 );
//Recieve local su-job from root
if ( rank != root ){
MPI_Status status;
MPI_Recv(sub_array_per_process,NON, MPI_INT , root, 1 , MPI_COMM_WORLD ,&status );
}
int n_per_small_chunk = sizeof(sub_array_per_process) / sizeof(sub_array_per_process[0]) ;
oddEvenMergeSort(sub_array_per_process,0, n_per_small_chunk);
cout<<"After sorting processwise sorting.... "<<endl;
if ( rank != root ){
for(int i=0;i<NON;i++)
cout<<"rank : "<< rank <<" data = "<< sub_array_per_process[ i] << endl;
}
// MPI_Bcast(global_array, element_count , MPI_INT,root,MPI_COMM_WORLD );
//sub_array_per_process = new int[2];
MPI_Barrier(MPI_COMM_WORLD ) ;
if (rank == root ){
int start ;
int sender = -1;
for ( int i= 0; i< N; i++){
start = get_preceeding_ranks(i+1 )*NON;
MPI_Status status;
cout<<" start = "<<start<<endl;
sender = i+1;
MPI_Recv(&global_array[start], NON , MPI_INT , sender , 1, MPI_COMM_WORLD ,&status );
cout<<" Received " << global_array[start] <<" from " << sender << endl
;
// MPI_Bcast(global_array, elem , MPI_INT,root,MPI_COMM_WORLD );
}
for ( int j=0; j< element_count; j++ )
cout<<" Received " << global_array[j] <<" from " <<sender << endl;
}
else { //Send to root your sub-array..
// for (int j=1 ; j < N; i++ )
for ( int i=0;i<NON; i++)
MPI_Send(&sub_array_per_process[i], NON , MPI_INT,0, 1 ,MPI_COMM_WORLD);
}
MPI_Barrier(MPI_COMM_WORLD ) ;
for ( int j=0; j< element_count; j++ )
cout<<" iOutside " << global_array[j] <<" from "<< endl;
MPI_Finalize();
}
int main() {
string output;//Dummy
char* inputFile ="inputsmall.txt";
int N=0; //Dummy
Batcher( inputFile, output, N );
return 0 ;
}
由于代码比较长,特地卡在这里了:-
if (rank == root ){
int start ;
int sender = -1;
for ( int i= 0; i< N; i++){ //get all submitted sub-arrays from slaves.
start = get_preceeding_ranks(i+1 )*NON;
MPI_Status status;
cout<<" start = "<<start<<endl;
sender = i+1;
MPI_Recv(&global_array[start], NON , MPI_INT , sender , 1, MPI_COMM_WORLD ,&status );
cout<<" Received " << global_array[start] <<" from " << sender << endl
;
}
for ( int j=0; j< element_count; j++ )
cout<<" Received " << global_array[j] <<" from " <<sender << endl;
}
else { //Send to root your sub-array..
for ( int i=0;i<NON; i++)
MPI_Send(&sub_array_per_process[i], NON , MPI_INT,0, 1 ,MPI_COMM_WORLD);
}
我得到的输出是:-
start = 2
Received 2 from 1
start = 4
Received 3 from 2
start = 6
Received 4 from 3
start = 8
[tux2:25061] *** Process received signal ***
[tux2:25061] Signal: Segmentation fault (11)
[tux2:25061] Signal code: (128)
[tux2:25061] Failing at address: (nil)
--------------------------------------------------------------------------
mpirun noticed that process rank 0 with PID 25061 on node tux2 exited on signal 11 (Segmentation fault).
--------------------------------------------------------------------------
从您的代码中我可以看出,当 rank
等于 root
时,您正在从从属进程接收排序后的数组。问题在于您的 for
循环。您正在尝试接收从 i+1
到 N
的所有进程,而您的 Communicator 只有 N
个进程(这是因为您已经分配了 N = world_size
)。因此你的奴隶是1..N-1
。因此,您需要按如下方式更改 for
语句:
for ( int i= 0; i< (N-1); i++){ //To receive data from process i=1 to i=N-1
另外请注意,您的代码难以调试和管理。您会发现使用 MPI_Scatter
和 MPI_Gather
很容易。看看this tutorial