MPI中分配奇数个处理器核时如何分配数据?
How to distribute data when odd number of processer cores are allotted in MPI?
我是 MPI 编程的初学者。我正在尝试编写一个程序,动态接收不同大小的一维数组(100、1000、10000、1000000 等的倍数)并将其分散到分配的处理器内核。处理器内核计算接收到的元素的总和并将总和发回。根进程打印输入数组中元素的总和。
我用MPI_Scatter()
和MPI_Reduce()
解决了这个问题。然而,当分配的处理器内核数量为奇数时,一些数据就会被遗漏。例如,当我的输入数据大小为 100 和 3 个进程时 - 只添加了 99 个元素,最后一个被遗漏了。
我搜索了替代方案,发现 MPI_Scatterv()
可用于数据分布不均。但是没有 material 可以指导我实施它。有人能帮我吗?我在这里发布我的代码。提前致谢。
#include <stdio.h>
#include <mpi.h>
#include <stdlib.h>
void readArray(char * fileName, double ** a, int * n);
int Numprocs, MyRank;
int mpi_err;
#define Root = 0
void init_it(int *argc, char ***argv) {
mpi_err = MPI_Init(argc, argv);
mpi_err = MPI_Comm_rank(MPI_COMM_WORLD, &MyRank);
mpi_err = MPI_Comm_size(MPI_COMM_WORLD, &Numprocs);
}
int main(int argc, char** argv) {
/* .......Variables Initialisation ......*/
int index;
double *InputBuffer, *RecvBuffer, sum=0.0, psum = 0.0;
double ptime = 0.0, Totaltime= 0.0,startwtime = 0.0, endwtime = 0.0;
int Scatter_DataSize;
int DataSize;
FILE *fp;
init_it(&argc,&argv);
if (argc != 2) {
fprintf(stderr, "\n*** Usage: arraySum <inputFile>\n\n");
exit(1);
}
if (MyRank == 0) {
startwtime = MPI_Wtime();
printf("Number of nodes running %d\n",Numprocs);
/*...... Read input....*/
readArray(argv[1], &InputBuffer, &DataSize);
printf("Size of array %d\n", DataSize);
}
if (MyRank!=0) {
MPI_Recv(&DataSize, 1, MPI_INT, 0, 1, MPI_COMM_WORLD, NULL);
}
else {
int i;
for (i=1;i<Numprocs;i++) {
MPI_Send(&DataSize, 1, MPI_INT, i, 1, MPI_COMM_WORLD);
d[i]= i*Numprocs;
}
}
Scatter_DataSize = DataSize / Numprocs;
RecvBuffer = (double *)malloc(Scatter_DataSize * sizeof(double));
MPI_Barrier(MPI_COMM_WORLD);
mpi_err = MPI_Scatter(InputBuffer, Scatter_DataSize, MPI_DOUBLE,
RecvBuffer, Scatter_DataSize, MPI_DOUBLE,
0, MPI_COMM_WORLD);
for (index = 0; index < Scatter_DataSize; index++) {
psum = psum + RecvBuffer[index];
}
//printf("Processor %d computed sum %f\n", MyRank, psum);
mpi_err = MPI_Reduce(&psum, &sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
if (MyRank == 0) {
endwtime = MPI_Wtime();
Totaltime = endwtime - startwtime;
printf("Total sum %f\n",sum);
printf("Total time %f\n", Totaltime);
}
MPI_Finalize();
return 0;
}
void readArray(char * fileName, double ** a, int * n) {
int count, DataSize;
double * InputBuffer;
FILE * fin;
fin = fopen(fileName, "r");
if (fin == NULL) {
fprintf(stderr, "\n*** Unable to open input file '%s'\n\n",
fileName);
exit(1);
}
fscanf(fin, "%d\n", &DataSize);
InputBuffer = (double *)malloc(DataSize * sizeof(double));
if (InputBuffer == NULL) {
fprintf(stderr, "\n*** Unable to allocate %d-length array", DataSize);
exit(1);
}
for (count = 0; count < DataSize; count++) {
fscanf(fin, "%lf", &InputBuffer[count]);
}
fclose(fin);
*n = DataSize;
*a = InputBuffer;
}
Scatter_Datasize的计算:
Scatter_DataSize = 数据大小 / Numprocs;
仅当 DataSize 是 Numprocs 的倍数时才正确,在您的情况下,由于 DataSize 始终为偶数,因此在 Numprocs 为偶数时发生。当 Numprocs 为奇数时,您应该显式计算余数并将其分配给一个 MPI 进程,我建议最后一个。
在您的情况下,您可能只需要使用 MPI_Scatterv
的 sendcount[]
数组。事实上,一个简单的实现是计算除一个进程之外的所有进程都将接收的 sendtype
类型的元素数量(比如 Nelement
)。其中一个进程(例如最后一个)将获取剩余的数据。在那种情况下,sendcount[i] = Nelement
for indexes i
from 0
to p-2
(p
是 communicator 中的进程数,对你来说 MPI_COMM_WORLD
).那么进程p-1
就会得到sendcount[p-1] = DataSize-Nelement*(p-1)
。关于位移数组 displs[]
,您只需指定从中获取传出数据以处理 i 的位移(元素数量)(参见 [1] 第 161 页)。对于前面的示例,这将是:
for (i=0; i<p; ++i)
displs[i]=Nelement*i;
如果您决定另一个进程 q
必须计算其他数据,请考虑使用 0 ≤ q < q+1 ≤ p
为进程 q+1
设置良好的位移 displs[q+1]
。
[1] MPI:消息传递接口标准(版本 3.1):http://www.mpi-forum.org/docs/mpi-3.1/mpi31-report.pdf
我是 MPI 编程的初学者。我正在尝试编写一个程序,动态接收不同大小的一维数组(100、1000、10000、1000000 等的倍数)并将其分散到分配的处理器内核。处理器内核计算接收到的元素的总和并将总和发回。根进程打印输入数组中元素的总和。
我用MPI_Scatter()
和MPI_Reduce()
解决了这个问题。然而,当分配的处理器内核数量为奇数时,一些数据就会被遗漏。例如,当我的输入数据大小为 100 和 3 个进程时 - 只添加了 99 个元素,最后一个被遗漏了。
我搜索了替代方案,发现 MPI_Scatterv()
可用于数据分布不均。但是没有 material 可以指导我实施它。有人能帮我吗?我在这里发布我的代码。提前致谢。
#include <stdio.h>
#include <mpi.h>
#include <stdlib.h>
void readArray(char * fileName, double ** a, int * n);
int Numprocs, MyRank;
int mpi_err;
#define Root = 0
void init_it(int *argc, char ***argv) {
mpi_err = MPI_Init(argc, argv);
mpi_err = MPI_Comm_rank(MPI_COMM_WORLD, &MyRank);
mpi_err = MPI_Comm_size(MPI_COMM_WORLD, &Numprocs);
}
int main(int argc, char** argv) {
/* .......Variables Initialisation ......*/
int index;
double *InputBuffer, *RecvBuffer, sum=0.0, psum = 0.0;
double ptime = 0.0, Totaltime= 0.0,startwtime = 0.0, endwtime = 0.0;
int Scatter_DataSize;
int DataSize;
FILE *fp;
init_it(&argc,&argv);
if (argc != 2) {
fprintf(stderr, "\n*** Usage: arraySum <inputFile>\n\n");
exit(1);
}
if (MyRank == 0) {
startwtime = MPI_Wtime();
printf("Number of nodes running %d\n",Numprocs);
/*...... Read input....*/
readArray(argv[1], &InputBuffer, &DataSize);
printf("Size of array %d\n", DataSize);
}
if (MyRank!=0) {
MPI_Recv(&DataSize, 1, MPI_INT, 0, 1, MPI_COMM_WORLD, NULL);
}
else {
int i;
for (i=1;i<Numprocs;i++) {
MPI_Send(&DataSize, 1, MPI_INT, i, 1, MPI_COMM_WORLD);
d[i]= i*Numprocs;
}
}
Scatter_DataSize = DataSize / Numprocs;
RecvBuffer = (double *)malloc(Scatter_DataSize * sizeof(double));
MPI_Barrier(MPI_COMM_WORLD);
mpi_err = MPI_Scatter(InputBuffer, Scatter_DataSize, MPI_DOUBLE,
RecvBuffer, Scatter_DataSize, MPI_DOUBLE,
0, MPI_COMM_WORLD);
for (index = 0; index < Scatter_DataSize; index++) {
psum = psum + RecvBuffer[index];
}
//printf("Processor %d computed sum %f\n", MyRank, psum);
mpi_err = MPI_Reduce(&psum, &sum, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
if (MyRank == 0) {
endwtime = MPI_Wtime();
Totaltime = endwtime - startwtime;
printf("Total sum %f\n",sum);
printf("Total time %f\n", Totaltime);
}
MPI_Finalize();
return 0;
}
void readArray(char * fileName, double ** a, int * n) {
int count, DataSize;
double * InputBuffer;
FILE * fin;
fin = fopen(fileName, "r");
if (fin == NULL) {
fprintf(stderr, "\n*** Unable to open input file '%s'\n\n",
fileName);
exit(1);
}
fscanf(fin, "%d\n", &DataSize);
InputBuffer = (double *)malloc(DataSize * sizeof(double));
if (InputBuffer == NULL) {
fprintf(stderr, "\n*** Unable to allocate %d-length array", DataSize);
exit(1);
}
for (count = 0; count < DataSize; count++) {
fscanf(fin, "%lf", &InputBuffer[count]);
}
fclose(fin);
*n = DataSize;
*a = InputBuffer;
}
Scatter_Datasize的计算:
Scatter_DataSize = 数据大小 / Numprocs;
仅当 DataSize 是 Numprocs 的倍数时才正确,在您的情况下,由于 DataSize 始终为偶数,因此在 Numprocs 为偶数时发生。当 Numprocs 为奇数时,您应该显式计算余数并将其分配给一个 MPI 进程,我建议最后一个。
在您的情况下,您可能只需要使用 MPI_Scatterv
的 sendcount[]
数组。事实上,一个简单的实现是计算除一个进程之外的所有进程都将接收的 sendtype
类型的元素数量(比如 Nelement
)。其中一个进程(例如最后一个)将获取剩余的数据。在那种情况下,sendcount[i] = Nelement
for indexes i
from 0
to p-2
(p
是 communicator 中的进程数,对你来说 MPI_COMM_WORLD
).那么进程p-1
就会得到sendcount[p-1] = DataSize-Nelement*(p-1)
。关于位移数组 displs[]
,您只需指定从中获取传出数据以处理 i 的位移(元素数量)(参见 [1] 第 161 页)。对于前面的示例,这将是:
for (i=0; i<p; ++i)
displs[i]=Nelement*i;
如果您决定另一个进程 q
必须计算其他数据,请考虑使用 0 ≤ q < q+1 ≤ p
为进程 q+1
设置良好的位移 displs[q+1]
。
[1] MPI:消息传递接口标准(版本 3.1):http://www.mpi-forum.org/docs/mpi-3.1/mpi31-report.pdf