用于可变复杂性任务或可变速度节点的负载平衡 MPI 多线程?
Load balancing MPI multithreading for variable-complexity tasks or variable-speed nodes?
我编写了一个 MPI 代码,该代码目前是多线程的,它通过将每个数组中相同数量的元素发送到不同的进程来完成工作(因此,对于 6 个工作人员,该数组被分成 6 个相等的部分)。我想做的是仅在工作人员准备好接收时才发送小块,并在不阻塞未来发送的情况下接收完成的块;这样,如果一个块需要 10 秒,而其他块需要 1 秒,则可以在等待长块完成时处理其他数据。
这是我整理的一些框架代码:
#include <mpi.h>
#include <iostream>
#include <vector>
#include <cmath>
struct crazytaxi
{
double a = 10.0;
double b = 25.2;
double c = 222.222;
};
int main(int argc, char** argv)
{
//Initial and temp kanno vectors
std::vector<crazytaxi> kanno;
std::vector<crazytaxi> kanno_tmp;
//init MPI
MPI_Init(NULL,NULL);
//allocate vector
int SZ = 4200;
kanno.resize(SZ);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD,&world_size);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);
if (world_rank == 0)
{
for (int i = 0; i < SZ; i++)
kanno[i].a = 1.0*i;
kanno[i].b = 10.0/(i+1);
}
for (int j = 0; j < 10; j++) {
//Make sure all processes have same kanno vector;
if (world_rank == 0) {
for (int i = 1; i < world_size; i++)
MPI_Send(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,i,3,MPI_COMM_WORLD);
} else {
MPI_Recv(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,0,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
}
//copy to tmp vector
kanno_tmp = kanno;
MPI_Barrier();
//the sender
if (world_rank == 0) {
unsigned p1 = 0;
unsigned segment = 10;
unsigned p2 = segment;
while (p1 < SZ) {
for (int i = 0; i < world_size; i++) {
//if (process #i is ready to receive)
//Send data in chunks of 10 to i
//else
//continue
}
}
}
if (world_rank != 0) {
//Receive data to be processed
//do some math
for (unsigned i = p1; i < p2; i++)
kanno_tmp[i].a = std::sqrt(kanno[i].a)/((double)i+1.0);
//Send processed data to 0 and wait to receive new data.
}
//copy temp vector to kanno
kanno = kanno_tmp;
}
//print some of the results;
if (world_rank == 0)
{
for (int i = 0; i < SZ; i += 40)
printf("Line %d: %lg,%lg\n",i,kanno[i].a,kanno[i].b);
}
MPI_Finalize();
}
除了我的 MPI_Send 和 MPI_Recv 调用会阻塞,或者 'master' 进程不知道 'slave' 进程已准备好接收数据。
MPI 中有没有办法做类似
的事情
unsigned Datapointer = [some_array_index];
while (Datapointer < array_size) {
if (world_rank == 0) {
for (int i = 1; i < world_size; i++)
{
if (<process i is ready to receive>) {
MPI_Send([...]);
Datapointer += 10;
}
if (<process i has sent data>)
MPI_Recv([...]);
if (Datapointer > array_size) {
MPI_Bcast([killswitch]);
break;
}
}
}
}
MPI_Barrier();
或者是否有更有效的方法来为可变复杂性块或可变速度节点构造此结构?
正如@Gilles Gouaillardet,指出这种情况下的关键字是MPI_ANY_SOURCE
。使用它,进程可以从任何来源接收消息。要知道哪个进程发送了该消息,您可以在 recv
调用的状态上使用 status.MPI_SOURCE
。
MPI_Status status;
if(rank == 0) {
//send initial work to all processes
while(true) {
MPI_recv(buf, 32, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// do the distribution logic
MPI_send(buf, 32, MPI_INT, status.MPI_SOURCE, tag, MPI_COMM_WORLD);
// break out of the loop once the work is over and send all the processes
message to stop waiting for work
}
}
else {
while(true){
// receive work from rank 0
MPI_recv(buf, 32, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// Perform computation and send back the result
MPI_send(buf, 32, MPI_INT, 0, tag, MPI_COMM_WORLD);
//break this until asked by master 0 using some kind of special message
}
}
我编写了一个 MPI 代码,该代码目前是多线程的,它通过将每个数组中相同数量的元素发送到不同的进程来完成工作(因此,对于 6 个工作人员,该数组被分成 6 个相等的部分)。我想做的是仅在工作人员准备好接收时才发送小块,并在不阻塞未来发送的情况下接收完成的块;这样,如果一个块需要 10 秒,而其他块需要 1 秒,则可以在等待长块完成时处理其他数据。
这是我整理的一些框架代码:
#include <mpi.h>
#include <iostream>
#include <vector>
#include <cmath>
struct crazytaxi
{
double a = 10.0;
double b = 25.2;
double c = 222.222;
};
int main(int argc, char** argv)
{
//Initial and temp kanno vectors
std::vector<crazytaxi> kanno;
std::vector<crazytaxi> kanno_tmp;
//init MPI
MPI_Init(NULL,NULL);
//allocate vector
int SZ = 4200;
kanno.resize(SZ);
int world_size;
MPI_Comm_size(MPI_COMM_WORLD,&world_size);
int world_rank;
MPI_Comm_rank(MPI_COMM_WORLD,&world_rank);
if (world_rank == 0)
{
for (int i = 0; i < SZ; i++)
kanno[i].a = 1.0*i;
kanno[i].b = 10.0/(i+1);
}
for (int j = 0; j < 10; j++) {
//Make sure all processes have same kanno vector;
if (world_rank == 0) {
for (int i = 1; i < world_size; i++)
MPI_Send(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,i,3,MPI_COMM_WORLD);
} else {
MPI_Recv(&kanno[0],sizeof(crazytaxi)*kanno.size(),MPI_BYTE,0,3,MPI_COMM_WORLD,MPI_STATUS_IGNORE);
}
//copy to tmp vector
kanno_tmp = kanno;
MPI_Barrier();
//the sender
if (world_rank == 0) {
unsigned p1 = 0;
unsigned segment = 10;
unsigned p2 = segment;
while (p1 < SZ) {
for (int i = 0; i < world_size; i++) {
//if (process #i is ready to receive)
//Send data in chunks of 10 to i
//else
//continue
}
}
}
if (world_rank != 0) {
//Receive data to be processed
//do some math
for (unsigned i = p1; i < p2; i++)
kanno_tmp[i].a = std::sqrt(kanno[i].a)/((double)i+1.0);
//Send processed data to 0 and wait to receive new data.
}
//copy temp vector to kanno
kanno = kanno_tmp;
}
//print some of the results;
if (world_rank == 0)
{
for (int i = 0; i < SZ; i += 40)
printf("Line %d: %lg,%lg\n",i,kanno[i].a,kanno[i].b);
}
MPI_Finalize();
}
除了我的 MPI_Send 和 MPI_Recv 调用会阻塞,或者 'master' 进程不知道 'slave' 进程已准备好接收数据。
MPI 中有没有办法做类似
的事情unsigned Datapointer = [some_array_index];
while (Datapointer < array_size) {
if (world_rank == 0) {
for (int i = 1; i < world_size; i++)
{
if (<process i is ready to receive>) {
MPI_Send([...]);
Datapointer += 10;
}
if (<process i has sent data>)
MPI_Recv([...]);
if (Datapointer > array_size) {
MPI_Bcast([killswitch]);
break;
}
}
}
}
MPI_Barrier();
或者是否有更有效的方法来为可变复杂性块或可变速度节点构造此结构?
正如@Gilles Gouaillardet,指出这种情况下的关键字是MPI_ANY_SOURCE
。使用它,进程可以从任何来源接收消息。要知道哪个进程发送了该消息,您可以在 recv
调用的状态上使用 status.MPI_SOURCE
。
MPI_Status status;
if(rank == 0) {
//send initial work to all processes
while(true) {
MPI_recv(buf, 32, MPI_INT, MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// do the distribution logic
MPI_send(buf, 32, MPI_INT, status.MPI_SOURCE, tag, MPI_COMM_WORLD);
// break out of the loop once the work is over and send all the processes
message to stop waiting for work
}
}
else {
while(true){
// receive work from rank 0
MPI_recv(buf, 32, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
// Perform computation and send back the result
MPI_send(buf, 32, MPI_INT, 0, tag, MPI_COMM_WORLD);
//break this until asked by master 0 using some kind of special message
}
}