如何同步子线程
How to synchronize child threads
我目前正在创建一种算法,用于使用多线程动态规划来解决 0-1 背包问题。我的方法是将 n
by capacity
动态规划 table 分成 4 个 n
by capacity / 4
部分,这将由 4 个线程解决。由于背包依赖于上一行,所以 4 个线程每次都需要解决同一行。在继续下一行之前,它们必须等待所有线程完成当前行的部分。
这是我的代码:
int knapsackParallel(char *values, char *weights, int N, int capacity) {
for (int j = 0; j < 4; j++) {
arguments[j].cpuNum = j;
pthread_create(&tid[j], NULL, solveRow, (void *) &arguments[j]);
pthread_setaffinity_np(tid[j], sizeof(cpu_set_t), &cpusets[j]);
}
for (int j = 0; j < 4; j++) {
pthread_join(tid[j], NULL);
}
}
这里是每个需要同步的线程的代码:
void *solveRow(void *arguments) {
int cpuNum = ((args *) arguments)->cpuNum;
int initial = ((args *) arguments)->cpuNum * (capacity / p);
int limit = cpuNum == p - 1 ? capacity + 1 : initial + (capacity / p);
for (int i = 0; i < N + 1; i++) {
for (int j = initial; j < limit; j++) {
// for the top and left edges, initialize to 0
if (i == 0 || j == 0)
table[i][j] = 0;
// find the max between putting the value in the knapsack or not
else if (weights[i] <= j)
table[i][j] = fmax(values[i] + table[i - 1][j - weights[i]], table[i - 1][j]);
// copy the value in the top of the cell
else
table[i][j] = table[i - 1][j];
}
// insert code here to wait for all the threads to finish their current iteration
// before proceeding to the next iteration
}
到目前为止我尝试过的:
使用多个信号量:
sem_post(&locks[cpuNum]);
sem_post(&locks[cpuNum]);
sem_post(&locks[cpuNum]);
for (int j = 0; j < p; j++) {
if (j == cpuNum) continue;
sem_wait(&locks[j]);
}
printf("thread %d done\n", cpuNum);
使用pthread_cond_wait
:
locks[cpuNum] = 1;
if (locks[0] && locks[1] && locks[2] && locks[3]) {
pthread_cond_broadcast(&full);
}
else {
pthread_cond_wait(&full, &lock);
}
locks[cpuNum] = 0;
使用futex
:
locks[cpuNum] = 1;
futex_wait(&locks[0], 0);
futex_wait(&locks[1], 0);
futex_wait(&locks[2], 0);
futex_wait(&locks[3], 0);
locks[cpuNum] = 0;
您似乎通过为要同步的线程调用 pthread_join 做得很好,因为这确保您的主线程将等待其他线程完成:
问题似乎是您没有在要同步的线程中调用 pthread_exit:
[另一个回答给我的注意力带来了障碍。使用屏障可以免费大大减少所需的代码。最好用那个。]
如果您有一个带有活动行号的 var,线程可以简单地等待它更新。
#define NUM_WORKERS 4
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int active_row_num = 0;
static int outstanding_threads = NUM_WORKERS;
static void done_row( int row_num ) {
pthread_mutex_lock( &mutex );
if ( --outstanding_threads == 0 ) {
++active_row_num;
outstanding_threads = NUM_WORKERS;
pthread_cond_broadcast( &cond );
}
++row_num;
while ( row_num != active_row_num )
pthread_cond_wait( &cond, &mutex );
pthread_mutex_unlock( &mutex );
}
static void *worker( void *arg ) {
for ( int row_num=0; row_num<...; ++row_num ) {
...
done_row( row_num );
}
return NULL;
}
这听起来像是 posix 个障碍的工作:pthread_barrier_init
, pthread_barrier_wait
。
屏障可以解决一些与 pthread_join
相同的问题,但实际上不会让线程终止并重新启动。
一个屏障被初始化为一定的计数。当那么多线程调用等待操作时,屏障就会激活,它们就会全部畅通无阻。其中一个线程收到一个不同的 return 值,告诉它它是“串行线程”:然后它可以执行一些特殊的工作。
因此,程序逻辑中的每个 barrier_wait
点都是所有线程相遇的集合点,这保证了它们正在合作的任何 activity 都处于完成状态。如果它们的 activity 需要集成在一起(例如,部分结果合并为完整结果),串行线程可以执行此操作,然后是所有线程再次会合的另一个屏障,以启动另一轮并行工作。
我目前正在创建一种算法,用于使用多线程动态规划来解决 0-1 背包问题。我的方法是将 n
by capacity
动态规划 table 分成 4 个 n
by capacity / 4
部分,这将由 4 个线程解决。由于背包依赖于上一行,所以 4 个线程每次都需要解决同一行。在继续下一行之前,它们必须等待所有线程完成当前行的部分。
这是我的代码:
int knapsackParallel(char *values, char *weights, int N, int capacity) {
for (int j = 0; j < 4; j++) {
arguments[j].cpuNum = j;
pthread_create(&tid[j], NULL, solveRow, (void *) &arguments[j]);
pthread_setaffinity_np(tid[j], sizeof(cpu_set_t), &cpusets[j]);
}
for (int j = 0; j < 4; j++) {
pthread_join(tid[j], NULL);
}
}
这里是每个需要同步的线程的代码:
void *solveRow(void *arguments) {
int cpuNum = ((args *) arguments)->cpuNum;
int initial = ((args *) arguments)->cpuNum * (capacity / p);
int limit = cpuNum == p - 1 ? capacity + 1 : initial + (capacity / p);
for (int i = 0; i < N + 1; i++) {
for (int j = initial; j < limit; j++) {
// for the top and left edges, initialize to 0
if (i == 0 || j == 0)
table[i][j] = 0;
// find the max between putting the value in the knapsack or not
else if (weights[i] <= j)
table[i][j] = fmax(values[i] + table[i - 1][j - weights[i]], table[i - 1][j]);
// copy the value in the top of the cell
else
table[i][j] = table[i - 1][j];
}
// insert code here to wait for all the threads to finish their current iteration
// before proceeding to the next iteration
}
到目前为止我尝试过的:
使用多个信号量:
sem_post(&locks[cpuNum]);
sem_post(&locks[cpuNum]);
sem_post(&locks[cpuNum]);
for (int j = 0; j < p; j++) {
if (j == cpuNum) continue;
sem_wait(&locks[j]);
}
printf("thread %d done\n", cpuNum);
使用pthread_cond_wait
:
locks[cpuNum] = 1;
if (locks[0] && locks[1] && locks[2] && locks[3]) {
pthread_cond_broadcast(&full);
}
else {
pthread_cond_wait(&full, &lock);
}
locks[cpuNum] = 0;
使用futex
:
locks[cpuNum] = 1;
futex_wait(&locks[0], 0);
futex_wait(&locks[1], 0);
futex_wait(&locks[2], 0);
futex_wait(&locks[3], 0);
locks[cpuNum] = 0;
您似乎通过为要同步的线程调用 pthread_join 做得很好,因为这确保您的主线程将等待其他线程完成:
问题似乎是您没有在要同步的线程中调用 pthread_exit:
[另一个回答给我的注意力带来了障碍。使用屏障可以免费大大减少所需的代码。最好用那个。]
如果您有一个带有活动行号的 var,线程可以简单地等待它更新。
#define NUM_WORKERS 4
static pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int active_row_num = 0;
static int outstanding_threads = NUM_WORKERS;
static void done_row( int row_num ) {
pthread_mutex_lock( &mutex );
if ( --outstanding_threads == 0 ) {
++active_row_num;
outstanding_threads = NUM_WORKERS;
pthread_cond_broadcast( &cond );
}
++row_num;
while ( row_num != active_row_num )
pthread_cond_wait( &cond, &mutex );
pthread_mutex_unlock( &mutex );
}
static void *worker( void *arg ) {
for ( int row_num=0; row_num<...; ++row_num ) {
...
done_row( row_num );
}
return NULL;
}
这听起来像是 posix 个障碍的工作:pthread_barrier_init
, pthread_barrier_wait
。
屏障可以解决一些与 pthread_join
相同的问题,但实际上不会让线程终止并重新启动。
一个屏障被初始化为一定的计数。当那么多线程调用等待操作时,屏障就会激活,它们就会全部畅通无阻。其中一个线程收到一个不同的 return 值,告诉它它是“串行线程”:然后它可以执行一些特殊的工作。
因此,程序逻辑中的每个 barrier_wait
点都是所有线程相遇的集合点,这保证了它们正在合作的任何 activity 都处于完成状态。如果它们的 activity 需要集成在一起(例如,部分结果合并为完整结果),串行线程可以执行此操作,然后是所有线程再次会合的另一个屏障,以启动另一轮并行工作。