MPI主进程收敛循环
MPI master process convergence loop
我正在尝试编写一个 MPI 程序来模拟整个网格的温度流动以达到平衡。我已经使用 openMP pthreads 和 cuda 编写了串行版本和并行版本。
我的目标是并行化计算一维数组的更新温度值的 for 循环。我必须执行并行部分的代码在这里(所有其他变量都在上面初始化):
int nproc, rank,chunksize,leftover,offset,source, tag1=3,tag2=2,tag3=1;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nproc);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
chunksize = (boxes / (nproc-1));
leftover = (boxes % (nproc-1));
if(rank == 0){
//init dsv
for(int idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
}
int stop = 0;
int iter = 0;
float max_tmp;
float min_tmp;
while(stop != 1){
offset = 0;
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD);
printf("sent %d temps to process: %d\n",chunk, dest);
offset = offset + chunk;
}
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: %d\n",chunk, dest);
printf("status: %d\n",status.MPI_TAG);
}
max_tmp = -10000;
min_tmp = 10000;
for(idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
if(newtemps[idx] > max_tmp){
max_tmp = newtemps[idx];
}
if(newtemps[idx] < min_tmp){
min_tmp = newtemps[idx];
}
}
stop = (max_tmp - min_tmp) <= (max_tmp * epsilon);
iter += 1;
}
}
if (rank > 0){
int chunk = (rank <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: 0\n",chunk);
printf("status: %d\n",status.MPI_TAG);
for(int j = offset; j < offset+chunk; j++){
float weightedtmp = 0;
int perimeter = 0;
int num_iters = neighbors[j][0];
for(int i = 1; i <= num_iters; i++){
weightedtmp += temps[neighbors[j][i]] * mults[j][i];
perimeter += mults[j][i];
}
weightedtmp /= perimeter;
newtemps[j] = temps[j] + (weightedtmp - temps[j] ) * affect_rate;
}
printf("sent %d temps to process: 0\n",chunk);
MPI_Send(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD);
}
MPI_Finalize();
然而,我的程序成功地完成了 while 循环的第一次迭代并找到了 while 循环的最大值(匹配我的串行版本),然后将 temps、newtemps 和 offset 变量发送到每个进程。在这里,虽然我的程序停止了,并且进程从不打印他们收到了消息。控制台看起来像这样:
[radeymichael@owens-login04 ~]$ mpicc -o ci changeInput.c
[radeymichael@owens-login04 ~]$ mpirun -np 3 ./ci .1 .1
sent 101 temps to process: 1
sent 100 temps to process: 2
received 101 temps from process: 1
status: 1
received 101 temps from process: 0
status: 1
sent 101 temps to process: 0
received 100 temps from process: 0
status: 1
sent 100 temps to process: 0
received 100 temps from process: 2
status: 1
max: 900.000000
sent 101 temps to process: 1
sent 100 temps to process: 2
我花了很多时间试图找出错误,但我认为我缺乏使用 MPI 的基础知识。如果有人能帮我找出我的误解在哪里,我将不胜感激。
问题是,第 0 级在 while 循环内并将发送数据直到 stop=1
,而所有其他进程将在最后一个 MPI_Send
之后到达 MPI_Finalize
else
部分。一种解决方案(如@Gilles 的评论中所见)是基于 stop
添加一个 while 循环,也适用于所有其他等级,并将 stop
广播给所有该过程由root
。
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
查看下面的代码。
int nproc, rank,chunksize,leftover,offset,source, tag1=3,tag2=2,tag3=1;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nproc);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
chunksize = (boxes / (nproc-1));
leftover = (boxes % (nproc-1));
int stop = 0;
if(rank == 0){
//init dsv
for(int idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
}
int iter = 0;
float max_tmp;
float min_tmp;
while(stop != 1){
offset = 0;
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD);
printf("sent %d temps to process: %d\n",chunk, dest);
offset = offset + chunk;
}
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: %d\n",chunk, dest);
printf("status: %d\n",status.MPI_TAG);
}
max_tmp = -10000;
min_tmp = 10000;
for(idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
if(newtemps[idx] > max_tmp){
max_tmp = newtemps[idx];
}
if(newtemps[idx] < min_tmp){
min_tmp = newtemps[idx];
}
}
stop = (max_tmp - min_tmp) <= (max_tmp * epsilon);
iter += 1;
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
}
}
if (rank > 0){
while(stop != 1){
int chunk = (rank <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: 0\n",chunk);
printf("status: %d\n",status.MPI_TAG);
for(int j = offset; j < offset+chunk; j++){
float weightedtmp = 0;
int perimeter = 0;
int num_iters = neighbors[j][0];
for(int i = 1; i <= num_iters; i++){
weightedtmp += temps[neighbors[j][i]] * mults[j][i];
perimeter += mults[j][i];
}
weightedtmp /= perimeter;
newtemps[j] = temps[j] + (weightedtmp - temps[j] ) * affect_rate;
}
printf("sent %d temps to process: 0\n",chunk);
MPI_Send(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD);
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
}
}
MPI_Finalize();
我正在尝试编写一个 MPI 程序来模拟整个网格的温度流动以达到平衡。我已经使用 openMP pthreads 和 cuda 编写了串行版本和并行版本。
我的目标是并行化计算一维数组的更新温度值的 for 循环。我必须执行并行部分的代码在这里(所有其他变量都在上面初始化):
int nproc, rank,chunksize,leftover,offset,source, tag1=3,tag2=2,tag3=1;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nproc);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
chunksize = (boxes / (nproc-1));
leftover = (boxes % (nproc-1));
if(rank == 0){
//init dsv
for(int idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
}
int stop = 0;
int iter = 0;
float max_tmp;
float min_tmp;
while(stop != 1){
offset = 0;
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD);
printf("sent %d temps to process: %d\n",chunk, dest);
offset = offset + chunk;
}
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: %d\n",chunk, dest);
printf("status: %d\n",status.MPI_TAG);
}
max_tmp = -10000;
min_tmp = 10000;
for(idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
if(newtemps[idx] > max_tmp){
max_tmp = newtemps[idx];
}
if(newtemps[idx] < min_tmp){
min_tmp = newtemps[idx];
}
}
stop = (max_tmp - min_tmp) <= (max_tmp * epsilon);
iter += 1;
}
}
if (rank > 0){
int chunk = (rank <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: 0\n",chunk);
printf("status: %d\n",status.MPI_TAG);
for(int j = offset; j < offset+chunk; j++){
float weightedtmp = 0;
int perimeter = 0;
int num_iters = neighbors[j][0];
for(int i = 1; i <= num_iters; i++){
weightedtmp += temps[neighbors[j][i]] * mults[j][i];
perimeter += mults[j][i];
}
weightedtmp /= perimeter;
newtemps[j] = temps[j] + (weightedtmp - temps[j] ) * affect_rate;
}
printf("sent %d temps to process: 0\n",chunk);
MPI_Send(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD);
}
MPI_Finalize();
然而,我的程序成功地完成了 while 循环的第一次迭代并找到了 while 循环的最大值(匹配我的串行版本),然后将 temps、newtemps 和 offset 变量发送到每个进程。在这里,虽然我的程序停止了,并且进程从不打印他们收到了消息。控制台看起来像这样:
[radeymichael@owens-login04 ~]$ mpicc -o ci changeInput.c
[radeymichael@owens-login04 ~]$ mpirun -np 3 ./ci .1 .1
sent 101 temps to process: 1
sent 100 temps to process: 2
received 101 temps from process: 1
status: 1
received 101 temps from process: 0
status: 1
sent 101 temps to process: 0
received 100 temps from process: 0
status: 1
sent 100 temps to process: 0
received 100 temps from process: 2
status: 1
max: 900.000000
sent 101 temps to process: 1
sent 100 temps to process: 2
我花了很多时间试图找出错误,但我认为我缺乏使用 MPI 的基础知识。如果有人能帮我找出我的误解在哪里,我将不胜感激。
问题是,第 0 级在 while 循环内并将发送数据直到 stop=1
,而所有其他进程将在最后一个 MPI_Send
之后到达 MPI_Finalize
else
部分。一种解决方案(如@Gilles 的评论中所见)是基于 stop
添加一个 while 循环,也适用于所有其他等级,并将 stop
广播给所有该过程由root
。
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
查看下面的代码。
int nproc, rank,chunksize,leftover,offset,source, tag1=3,tag2=2,tag3=1;
MPI_Status status;
MPI_Init(&argc,&argv);
MPI_Comm_size(MPI_COMM_WORLD,&nproc);
MPI_Comm_rank(MPI_COMM_WORLD,&rank);
chunksize = (boxes / (nproc-1));
leftover = (boxes % (nproc-1));
int stop = 0;
if(rank == 0){
//init dsv
for(int idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
}
int iter = 0;
float max_tmp;
float min_tmp;
while(stop != 1){
offset = 0;
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Send(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD);
printf("sent %d temps to process: %d\n",chunk, dest);
offset = offset + chunk;
}
for (int dest=1; dest<nproc; dest++) {
int chunk = (dest <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, dest, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, dest, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, dest, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: %d\n",chunk, dest);
printf("status: %d\n",status.MPI_TAG);
}
max_tmp = -10000;
min_tmp = 10000;
for(idx = 0; idx < boxes; idx++){
temps[idx] = newtemps[idx];
if(newtemps[idx] > max_tmp){
max_tmp = newtemps[idx];
}
if(newtemps[idx] < min_tmp){
min_tmp = newtemps[idx];
}
}
stop = (max_tmp - min_tmp) <= (max_tmp * epsilon);
iter += 1;
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
}
}
if (rank > 0){
while(stop != 1){
int chunk = (rank <= leftover ? chunksize + 1 : chunksize);
MPI_Recv(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD, &status);
MPI_Recv(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD,&status);
MPI_Recv(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD,&status);
printf("received %d temps from process: 0\n",chunk);
printf("status: %d\n",status.MPI_TAG);
for(int j = offset; j < offset+chunk; j++){
float weightedtmp = 0;
int perimeter = 0;
int num_iters = neighbors[j][0];
for(int i = 1; i <= num_iters; i++){
weightedtmp += temps[neighbors[j][i]] * mults[j][i];
perimeter += mults[j][i];
}
weightedtmp /= perimeter;
newtemps[j] = temps[j] + (weightedtmp - temps[j] ) * affect_rate;
}
printf("sent %d temps to process: 0\n",chunk);
MPI_Send(&offset, 1, MPI_INT, 0, tag1, MPI_COMM_WORLD);
MPI_Send(&temps[offset], chunk, MPI_FLOAT, 0, tag2, MPI_COMM_WORLD);
MPI_Send(&newtemps[offset], chunk, MPI_FLOAT, 0, tag3, MPI_COMM_WORLD);
MPI_Bcast(&stop,1, MPI_INT, 0, MPI_COMM_WORLD);
}
}
MPI_Finalize();