如何同步子线程

How to synchronize child threads

我目前正在创建一种算法,用于使用多线程动态规划来解决 0-1 背包问题。我的方法是将 n by capacity 动态规划 table 分成 4 个 n by capacity / 4 部分,这将由 4 个线程解决。由于背包依赖于上一行,所以 4 个线程每次都需要解决同一行。在继续下一行之前,它们必须等待所有线程完成当前行的部分。

这是我的代码:

int knapsackParallel(char *values, char *weights, int N, int capacity) {
  for (int j = 0; j < 4; j++) {
    arguments[j].cpuNum = j;

    pthread_create(&tid[j], NULL, solveRow, (void *) &arguments[j]);

    pthread_setaffinity_np(tid[j], sizeof(cpu_set_t), &cpusets[j]);
  }

  for (int j = 0; j < 4; j++) {
    pthread_join(tid[j], NULL);
  }
}

这里是每个需要同步的线程的代码:

void *solveRow(void *arguments) {
  int cpuNum = ((args *) arguments)->cpuNum;
  int initial = ((args *) arguments)->cpuNum * (capacity / p);
  int limit = cpuNum == p - 1 ? capacity + 1 : initial + (capacity / p);

  for (int i = 0; i < N + 1; i++) {
    for (int j = initial; j < limit; j++) {
      // for the top and left edges, initialize to 0
      if (i == 0 || j == 0)
        table[i][j] = 0;
      // find the max between putting the value in the knapsack or not
      else if (weights[i] <= j)
        table[i][j] = fmax(values[i] + table[i - 1][j - weights[i]], table[i - 1][j]);
      // copy the value in the top of the cell
      else
        table[i][j] = table[i - 1][j];
    }

    // insert code here to wait for all the threads to finish their current iteration
    // before proceeding to the next iteration
  }

到目前为止我尝试过的:

使用多个信号量:

sem_post(&locks[cpuNum]);
sem_post(&locks[cpuNum]);
sem_post(&locks[cpuNum]);

for (int j = 0; j < p; j++) {
  if (j == cpuNum) continue;
  sem_wait(&locks[j]);
}
printf("thread %d done\n", cpuNum);

使用pthread_cond_wait:

locks[cpuNum] = 1;

if (locks[0] && locks[1] && locks[2] && locks[3]) {
  pthread_cond_broadcast(&full);
}
else {
  pthread_cond_wait(&full, &lock);
}
locks[cpuNum] = 0;

使用futex

locks[cpuNum] = 1;

futex_wait(&locks[0], 0);
futex_wait(&locks[1], 0);
futex_wait(&locks[2], 0);
futex_wait(&locks[3], 0);

locks[cpuNum] = 0;

您似乎通过为要同步的线程调用 pthread_join 做得很好,因为这确保您的主线程将等待其他线程完成:

问题似乎是您没有在要同步的线程中调用 pthread_exit:

[另一个回答给我的注意力带来了障碍。使用屏障可以免费大大减少所需的代码。最好用那个。]

如果您有一个带有活动行号的 var,线程可以简单地等待它更新。

#define NUM_WORKERS 4

static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond   = PTHREAD_COND_INITIALIZER;
static int active_row_num      = 0;
static int outstanding_threads = NUM_WORKERS;

static void done_row( int row_num ) {
   pthread_mutex_lock( &mutex );

   if ( --outstanding_threads == 0 ) {
      ++active_row_num;
      outstanding_threads = NUM_WORKERS;
      pthread_cond_broadcast( &cond );
   }

   ++row_num;
   while ( row_num != active_row_num )
      pthread_cond_wait( &cond, &mutex );

   pthread_mutex_unlock( &mutex );
}

static void *worker( void *arg ) {
   for ( int row_num=0; row_num<...; ++row_num ) {
      ...

      done_row( row_num );
   }

   return NULL;
}

这听起来像是 posix 个障碍的工作:pthread_barrier_init, pthread_barrier_wait

屏障可以解决一些与 pthread_join 相同的问题,但实际上不会让线程终止并重新启动。

一个屏障被初始化为一定的计数。当那么多线程调用等待操作时,屏障就会激活,它们就会全部畅通无阻。其中一个线程收到一个不同的 return 值,告诉它它是“串行线程”:然后它可以执行一些特殊的工作。

因此,程序逻辑中的每个 barrier_wait 点都是所有线程相遇的集合点,这保证了它们正在合作的任何 activity 都处于完成状态。如果它们的 activity 需要集成在一起(例如,部分结果合并为完整结果),串行线程可以执行此操作,然后是所有线程再次会合的另一个屏障,以启动另一轮并行工作。