如果我们需要按特定顺序打印多线程计算的结果,为什么信号而不是广播有效?
Why signal instead of broadcast works if we need to print the results of multiple thread calculations in specific order?
在我下面的代码中,我创建了一个线程池,它对一个公共数组进行一些计算。但是,我希望并行完成计算,我想打印计算结果,这样线程 ID x 就不会在线程 ID y 之前打印其结果,其中 x > y。所以首先我要打印id 0的结果,然后是1、2等等
我正在使用 pthreads 来做这件事。最初我使用 pthread_cond_broadcast
来唤醒阻塞的线程(一切正常)但后来出于好奇我尝试了 pthread_cond_signal
。有趣的是,该程序仍然可以正常运行。
但我不明白为什么。即:
- 我生成了 5 个线程。他们都完成了他们的计算。
- 其中 4 个被阻塞,而线程 id 0 打印其结果和信号。
- 根据spec
pthread_cond_signal
"shall unblock at least one of the threads that are blocked"。所以也许它解除了线程 id 3 的阻塞。
- 线程 id 3 无法继续,因为还没有轮到它打印结果,所以它等待。
- 死锁随之而来。
那么为什么 pthread_cond_signal
仍然有效?是因为反复的运气还是因为我的 OS 创建了一个阻塞线程队列,而线程 id 1 恰好位于该队列的头部?
这是我的代码(wait/signal 逻辑在函数 ComputeThread
中):
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define DEFAULT_VAL 5
#define ARR_LEN 5
#define THREADS_NUM 5
typedef struct helper_t {
int * currId;
int threadId;
int * computeArr;
pthread_mutex_t * mutex;
pthread_cond_t * cond;
} helper_t;
int * initComputeArr(int len) {
int i;
int * computeArr = (int *) malloc(sizeof(int) * len);
for(i = 0; i < len; i++) {
computeArr[i] = DEFAULT_VAL;
}
return computeArr;
}
void mallocError() {
printf("malloc error\n");
exit(EXIT_FAILURE);
}
helper_t * initHelpers(pthread_mutex_t * mutex, pthread_cond_t * cond, int * computeArr) {
int i;
helper_t * helpers = (helper_t *) malloc(sizeof(helper_t) * THREADS_NUM);
int * currId = (int *) malloc(sizeof(int));
if(!helpers || !currId) {
mallocError();
} else {
*currId = 0;
for(i = 0; i < THREADS_NUM; i++) {
helpers[i].mutex = mutex;
helpers[i].cond = cond;
helpers[i].computeArr = computeArr;
helpers[i].currId = currId;
helpers[i].threadId = i;
}
}
return helpers;
}
void printHelper(helper_t * h) {
printf("threadId %d, currId %d\n", h->threadId, *h->currId);
}
void printHelpers(helper_t * h, int len) {
int i;
for(i = 0; i < len; i++) {
printHelper(&h[i]);
}
}
int calc(int * arr, int uptoIndex) {
int i, sum = 0;
for(i = 0; i <= uptoIndex; i++) {
sum += arr[i];
}
return sum;
}
void * ComputeThread(void * arg) {
int calcResult;
helper_t * h = (helper_t *) arg;
pthread_mutex_t * mutex = h->mutex;
pthread_cond_t * cond = h->cond;
calcResult = calc(h->computeArr, h->threadId);
sleep(1);
pthread_mutex_lock(mutex);
while(*h->currId != h->threadId) {
printf("id %d waiting...\n", h->threadId);
pthread_cond_wait(cond, mutex);
}
printf("curr %d, threadId %d, result %d\n", *h->currId, h->threadId, calcResult);
(*h->currId)++;
pthread_cond_signal(cond);
pthread_mutex_unlock(mutex);
pthread_exit((void *) calcResult);
}
int main() {
int i;
pthread_mutex_t mutex;
pthread_cond_t cond;
int * computeArr;
int * calcResutls;
helper_t * helpers;
pthread_t threads[THREADS_NUM];
computeArr = initComputeArr(ARR_LEN);
calcResutls = initComputeArr(ARR_LEN);
helpers = initHelpers(&mutex, &cond, computeArr);
printHelpers(helpers, THREADS_NUM);
for(i = 0; i < THREADS_NUM; i++) {
pthread_create(&threads[i], NULL, ComputeThread, (void *) &helpers[i]);
}
for(i = 0; i < THREADS_NUM; i++) {
pthread_join(threads[i], (void **) &calcResutls[i]);
}
for(i = 0; i < ARR_LEN; i++) {
printf("%d, ", calcResutls[i]);
}
printf("\n");
printf("end of calc\n");
return 0;
}
我看到未定义的行为被调用,因为代码未初始化互斥锁和条件。
在我下面的代码中,我创建了一个线程池,它对一个公共数组进行一些计算。但是,我希望并行完成计算,我想打印计算结果,这样线程 ID x 就不会在线程 ID y 之前打印其结果,其中 x > y。所以首先我要打印id 0的结果,然后是1、2等等
我正在使用 pthreads 来做这件事。最初我使用 pthread_cond_broadcast
来唤醒阻塞的线程(一切正常)但后来出于好奇我尝试了 pthread_cond_signal
。有趣的是,该程序仍然可以正常运行。
但我不明白为什么。即:
- 我生成了 5 个线程。他们都完成了他们的计算。
- 其中 4 个被阻塞,而线程 id 0 打印其结果和信号。
- 根据spec
pthread_cond_signal
"shall unblock at least one of the threads that are blocked"。所以也许它解除了线程 id 3 的阻塞。 - 线程 id 3 无法继续,因为还没有轮到它打印结果,所以它等待。
- 死锁随之而来。
那么为什么 pthread_cond_signal
仍然有效?是因为反复的运气还是因为我的 OS 创建了一个阻塞线程队列,而线程 id 1 恰好位于该队列的头部?
这是我的代码(wait/signal 逻辑在函数 ComputeThread
中):
#include <stdio.h>
#include <pthread.h>
#include <stdlib.h>
#include <unistd.h>
#define DEFAULT_VAL 5
#define ARR_LEN 5
#define THREADS_NUM 5
typedef struct helper_t {
int * currId;
int threadId;
int * computeArr;
pthread_mutex_t * mutex;
pthread_cond_t * cond;
} helper_t;
int * initComputeArr(int len) {
int i;
int * computeArr = (int *) malloc(sizeof(int) * len);
for(i = 0; i < len; i++) {
computeArr[i] = DEFAULT_VAL;
}
return computeArr;
}
void mallocError() {
printf("malloc error\n");
exit(EXIT_FAILURE);
}
helper_t * initHelpers(pthread_mutex_t * mutex, pthread_cond_t * cond, int * computeArr) {
int i;
helper_t * helpers = (helper_t *) malloc(sizeof(helper_t) * THREADS_NUM);
int * currId = (int *) malloc(sizeof(int));
if(!helpers || !currId) {
mallocError();
} else {
*currId = 0;
for(i = 0; i < THREADS_NUM; i++) {
helpers[i].mutex = mutex;
helpers[i].cond = cond;
helpers[i].computeArr = computeArr;
helpers[i].currId = currId;
helpers[i].threadId = i;
}
}
return helpers;
}
void printHelper(helper_t * h) {
printf("threadId %d, currId %d\n", h->threadId, *h->currId);
}
void printHelpers(helper_t * h, int len) {
int i;
for(i = 0; i < len; i++) {
printHelper(&h[i]);
}
}
int calc(int * arr, int uptoIndex) {
int i, sum = 0;
for(i = 0; i <= uptoIndex; i++) {
sum += arr[i];
}
return sum;
}
void * ComputeThread(void * arg) {
int calcResult;
helper_t * h = (helper_t *) arg;
pthread_mutex_t * mutex = h->mutex;
pthread_cond_t * cond = h->cond;
calcResult = calc(h->computeArr, h->threadId);
sleep(1);
pthread_mutex_lock(mutex);
while(*h->currId != h->threadId) {
printf("id %d waiting...\n", h->threadId);
pthread_cond_wait(cond, mutex);
}
printf("curr %d, threadId %d, result %d\n", *h->currId, h->threadId, calcResult);
(*h->currId)++;
pthread_cond_signal(cond);
pthread_mutex_unlock(mutex);
pthread_exit((void *) calcResult);
}
int main() {
int i;
pthread_mutex_t mutex;
pthread_cond_t cond;
int * computeArr;
int * calcResutls;
helper_t * helpers;
pthread_t threads[THREADS_NUM];
computeArr = initComputeArr(ARR_LEN);
calcResutls = initComputeArr(ARR_LEN);
helpers = initHelpers(&mutex, &cond, computeArr);
printHelpers(helpers, THREADS_NUM);
for(i = 0; i < THREADS_NUM; i++) {
pthread_create(&threads[i], NULL, ComputeThread, (void *) &helpers[i]);
}
for(i = 0; i < THREADS_NUM; i++) {
pthread_join(threads[i], (void **) &calcResutls[i]);
}
for(i = 0; i < ARR_LEN; i++) {
printf("%d, ", calcResutls[i]);
}
printf("\n");
printf("end of calc\n");
return 0;
}
我看到未定义的行为被调用,因为代码未初始化互斥锁和条件。