动态负载均衡master-worker

Dynamic load balancing master-worker

我有一个索引数组,我希望每个工作人员根据这些索引做一些事情。 数组的大小可能比列的总数多,所以我的第一个问题是这里除了主从负载平衡之外还有其他方法吗?我想要一个平衡系统,我也想将每个索引分配给每个等级。

我在考虑 master-worker,在这种方法中,master 等级 (0) 将每个索引赋予其他等级。但是当我 运行 我的代码有 3 个等级和 15 个索引时,我的代码在发送索引 4 的 while 循环中停止。我想知道是否有人可以帮助我找到问题

if(pCurrentID == 0) { // Master
   MPI_Status status;

   int nindices = 15;
   int mesg[1] = {0};
   int initial_id = 0;
   int recv_mesg[1] = {0};

  // -- send out initial ids to workers --//
   while (initial_id < size - 1) {
     if (initial_id < nindices) {
       MPI_Send(mesg, 1, MPI_INT, initial_id + 1, 1, MPI_COMM_WORLD);
       mesg[0] += 1;
       ++initial_id;
     }
   }

   //-- hand out id to workers dynamically --//
   while (mesg[0] != nindices) {
     MPI_Probe(MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &status);
     int isource = status.MPI_SOURCE;
     MPI_Recv(recv_mesg, 1, MPI_INT, isource, 1, MPI_COMM_WORLD, &status);
     MPI_Send(mesg, 1, MPI_INT, isource, 1, MPI_COMM_WORLD);
     mesg[0] += 1;
   }

   //-- hand out ending signals once done --//
   for (int rank = 1; rank < size; ++rank) {
     mesg[0] = -1;
     MPI_Send(mesg, 1, MPI_INT, rank, 0, MPI_COMM_WORLD);
   }
 } else { 
   MPI_Status status;
   int id[1] = {0};
   // Get the surrounding fragment id
   MPI_Probe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
   int itag = status.MPI_TAG;
   MPI_Recv(id, 1, MPI_INT, 0, itag, MPI_COMM_WORLD, &status);
   
   int jfrag = id[0];
   if (jfrag < 0) break;
   // do something
   MPI_Send(id, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
 }

I have an array of index which I want each worker do something based on these indexes. the size of the array might be more than the total number of ranks, so my first question is if there is another way except master-worker load balancing here? I want to have a balances system and also I want to assign each index to each ranks.

不,但是如果 per 数组索引执行的工作花费的时间大致相同,您可以简单地 scatter 数组过程。

I was thinking about master-worker, and in this approach master rank (0) is giving each index to other ranks. but when I was running my code with 3 rank and 15 index my code is halting in while loop for sending the index 4. I was wondering If anybody can help me to find the problem

正如评论中已经指出的那样,问题是您缺少(在工作人员方面)查询 master 工作的循环。

负载均衡器可以实现如下:

  1. master initial 向其他 worker 发送一个迭代;
  2. 每个worker等待master的消息;
  3. 之后 master 从 MPI_ANY_SOURCE 调用 MPI_Recv 并等待另一个 worker 请求工作;
  4. worker 完成其第一次迭代后,将其等级发送给 master,向 master 发送新迭代的信号;
  5. master读取step 4.中worker发送的rank,检查数组是否有新的索引,如果还有有效的索引,则发送给worker。否则,发送一条特殊消息,通知工作人员没有更多工作要执行。例如,该消息可以是 -1;
  6. 当工作人员收到特殊消息时,它会停止工作;
  7. master在所有worker都收到特殊消息后停止工作

这种方法的一个例子:

#include <stdio.h>
#include <stdlib.h>
#include <mpi.h>

int main(int argc,char *argv[]){
    MPI_Init(NULL,NULL); // Initialize the MPI environment
    int rank; 
    int size;
    MPI_Status status;
    MPI_Comm_rank(MPI_COMM_WORLD,&rank);
    MPI_Comm_size(MPI_COMM_WORLD,&size);

    int work_is_done = -1;
    if(rank == 0){
       int max_index = 10; 
       int index_simulator = 0;
       // Send statically the first iterations
       for(int i = 1; i < size; i++){
           MPI_Send(&index_simulator, 1, MPI_INT, i, i, MPI_COMM_WORLD); 
           index_simulator++;
       }  
       int processes_finishing_work = 0;
     
       do{
          int process_that_wants_work = 0;
          MPI_Recv(&process_that_wants_work, 1, MPI_INT, MPI_ANY_SOURCE, 1, MPI_COMM_WORLD, &status);
          if(index_simulator < max_index){
             MPI_Send(&index_simulator, 1, MPI_INT, process_that_wants_work, 1, MPI_COMM_WORLD);  
             index_simulator++;
          }
          else{ // send special message 
               MPI_Send(&work_is_done, 1, MPI_INT, process_that_wants_work, 1, MPI_COMM_WORLD);
               processes_finishing_work++;
          }
       } while(processes_finishing_work < size - 1);
    }
    else{
        int index_to_work = 0;
        MPI_Recv(&index_to_work, 1, MPI_INT, 0, rank, MPI_COMM_WORLD, &status);    
        // Work with the iterations index_to_work
    
       do{
          MPI_Send(&rank, 1, MPI_INT, 0, 1, MPI_COMM_WORLD);
          MPI_Recv(&index_to_work, 1, MPI_INT, 0, 1, MPI_COMM_WORLD, &status);
          if(index_to_work != work_is_done)
             // Work with the iterations index_to_work
       }while(index_to_work != work_is_done);
    }
    printf("Process {%d} -> I AM OUT\n", rank);
    MPI_Finalize();
    return 0;
 }

您可以通过减少以下内容来改进上述方法:1) 发送的消息数量和 2) 等待消息的时间。对于前者,您可以尝试使用分块策略( 发送多个索引 per MPI 通信)。对于后者,您可以尝试使用非阻塞 MPI 通信,或者让两个线程 per 处理一个到 receive/send 另一个工作以实际执行工作。这种多线程方法还允许主进程实际处理数组索引,但它使方法变得非常复杂。