如何控制具有多个互斥锁和条件的线程?
How to control pthreads with multiple mutexes and conditions?
在下面的代码中,我编写了一个程序,使用多线程对 int
数组执行 add/remove 操作。条件是多个线程不能对同一个cell进行操作,但是可以对不同的cell进行并行操作
我想为了实现这样的条件,我需要使用多个互斥体和条件变量,准确地说,数组中有多少个单元格。我数组的所有单元格的初始值为 10
并且线程 increment/decrement 这个值由 3
.
下面的代码似乎可以工作(所有线程完成工作后数组的单元格值符合预期)但我不明白一些事情:
- 我首先生成休眠一秒钟的加法器线程。此外,每个线程都有
printf
语句,如果线程等待,就会触发该语句。删除线程不会休眠,所以我希望删除线程调用它们的 printf
语句,因为它们必须至少等待一秒钟才能完成加法器线程的工作。 但是 移除线程从未调用 printf
。
- 我的第二个问题:正如我提到的,我首先生成加法器线程,所以我希望单元格值从 10 变为 13。然后,如果移除线程获取锁,该值可以从 13 变为 10 OR 如果加法器线程获得锁,则单元格值将从 13 变为 16。但我没有看到线程内
printf
语句中的行为。例如,我有一个 printf
序列:add thread id and cell id 1: cell value 10->13
,然后是 remove thread id and cell id 1: cell value 10->7
,然后是 add thread id and cell id 1: cell value 10->13
。这没有意义。我确保线程都指向同一个数组。
底线我想知道我的解决方案是否正确,如果是,为什么会出现我描述的行为。如果我的解决方案不正确,我将不胜感激正确解决方案或至少一般方向的示例。
这是代码(所有逻辑都在AdderThread
、RemoveThread
):
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#define ARR_LEN 5
#define THREADS_NUM 5
#define INIT_VAL 10
#define ADD_VAL 3
#define REMOVE_VAL 3
#define ADDER_LOOPS 2
typedef struct helper_t {
int threadId;
int * arr;
int * stateArr; //0 if free, 1 if busy
} helper_t;
enum STATE {FREE, BUSY};
enum ERRORS {MUTEX, COND, CREATE, JOIN, LOCK, UNLOCK, WAIT, BROADCAST};
pthread_mutex_t mutexArr[THREADS_NUM];
pthread_cond_t condArr[THREADS_NUM];
void errorHandler(int errorId) {
switch (errorId) {
case MUTEX:
printf("mutex error\n");
break;
case COND:
printf("cond error\n");
break;
case CREATE:
printf("create error\n");
break;
case JOIN:
printf("join error\n");
break;
case LOCK:
printf("lock error\n");
break;
case UNLOCK:
printf("unlock error\n");
break;
case WAIT:
printf("wait error\n");
break;
case BROADCAST:
printf("broadcast error\n");
break;
default:
printf("default switch\n");
break;
}
}
void mallocError() {
printf("malloc error\nExiting app\n");
exit(EXIT_FAILURE);
}
void initMutexesAndConds(pthread_mutex_t * mutexArr, pthread_cond_t * condArr) {
int i;
for(i = 0; i < THREADS_NUM; i++) {
pthread_mutex_init(&mutexArr[i], NULL);
pthread_cond_init(&condArr[i], NULL);
}
}
helper_t * initStructs(int * arr, int * stateArr) {
int i;
helper_t * helpers = (helper_t *) malloc(sizeof(helper_t) * THREADS_NUM);
if(!helpers) {
mallocError();
} else {
for(i = 0; i < THREADS_NUM; i++) {
helpers[i].threadId = i;
helpers[i].arr = arr;
helpers[i].stateArr = stateArr;
}
}
return helpers;
}
void printArr(int * arr, int len) {
int i;
for(i = 0; i < len; i++) {
printf("%d, ", arr[i]);
}
printf("\n");
}
void * AdderThread(void * arg) {
int i;
helper_t * h = (helper_t *) arg;
int id = h->threadId;
for(i = 0; i < ADDER_LOOPS; i++) {
pthread_mutex_t * mutex = &mutexArr[id];
pthread_cond_t * cond = &condArr[id];
if(pthread_mutex_lock(mutex)) {
errorHandler(LOCK);
}
while(h->stateArr[id] == BUSY) {
printf("adder id %d waiting...\n", id);
if(pthread_cond_wait(cond, mutex)) {
errorHandler(WAIT);
}
}
h->stateArr[id] = BUSY;
sleep(1);
h->arr[id] = h->arr[id] + ADD_VAL;
printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
h->stateArr[id] = FREE;
if(pthread_cond_broadcast(cond)) {
errorHandler(BROADCAST);
}
if(pthread_mutex_unlock(mutex)) {
errorHandler(UNLOCK);
}
}
pthread_exit(NULL);
}
void * RemoveThread(void * arg) {
helper_t * h = (helper_t *) arg;
int id = h->threadId;
pthread_mutex_t * mutex = &mutexArr[id];
pthread_cond_t * cond = &condArr[id];
if(pthread_mutex_lock(mutex)) {
errorHandler(LOCK);
}
while(h->stateArr[id] == BUSY) {
printf("remover id %d waiting...\n", id);
if(pthread_cond_wait(cond, mutex)) {
errorHandler(WAIT);
}
}
h->stateArr[id] = BUSY;
h->arr[id] = h->arr[id] - REMOVE_VAL;
printf("remove thread id and cell id %d: cell value %d->%d\n", id, h->arr[id], h->arr[id]-ADD_VAL);
h->stateArr[id] = FREE;
if(pthread_cond_broadcast(cond)) {
errorHandler(BROADCAST);
}
if(pthread_mutex_unlock(mutex)) {
errorHandler(UNLOCK);
}
pthread_exit(NULL);
}
int main() {
int i;
helper_t * adderHelpers;
helper_t * removeHelpers;
pthread_t adders[THREADS_NUM];
pthread_t removers[THREADS_NUM];
int * arr = (int *) malloc(sizeof(int) * ARR_LEN);
int * stateArr = (int *) malloc(sizeof(int) * ARR_LEN);
if(!arr || !stateArr) {
mallocError();
}
for(i = 0; i < ARR_LEN; i++) {
arr[i] = INIT_VAL;
stateArr[i] = FREE;
}
initMutexesAndConds(mutexArr, condArr);
adderHelpers = initStructs(arr, stateArr);
removeHelpers = initStructs(arr, stateArr);
for(i = 0; i < THREADS_NUM; i++) {
pthread_create(&adders[i], NULL, AdderThread, &adderHelpers[i]);
pthread_create(&removers[i], NULL, RemoveThread, &removeHelpers[i]);
}
for(i = 0; i < THREADS_NUM; i++) {
pthread_join(adders[i], NULL);
pthread_join(removers[i], NULL);
}
printf("the results are:\n");
printArr(arr, THREADS_NUM);
printf("DONE.\n");
return 0;
}
1) Addr中的代码序列:
h->stateArr[id] = BUSY;
sleep(1);
h->arr[id] = h->arr[id] + ADD_VAL;
printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
h->stateArr[id] = FREE;
是在锁定互斥锁的情况下执行的;因此 Remove 将永远不会有机会看到除了 FREE 之外的状态。
2) 没有 gua运行tee 互斥锁所有权交替(afaik),但至少,要正确协调线程,您永远不应该依赖这样的实现细节。就是工作和“碰巧工作”的区别,通常会导致“习惯工作”......
如果你把 sleep() 放在互斥解锁和互斥锁之间,你可能会有更好的情况,但实际上,它只是解锁它然后再次锁定它,所以系统完全有权就让它继续执行吧。
[我 运行 条评论中 space 条...]:
是的,条件变量在这里对您没有任何作用。条件变量的想法是能够在某些共享反对上发生重大事件(例如状态更改)时得到通知。
例如,水库可能只有一个水位条件变量。多路复用可能有很多条件:level < 1m;水平 > 5m;水平 > 10m。为了保持系统独立(从而工作),更新级别的位可能只是:
pthread_mutex_lock(&levellock);
level = x;
pthread_cond_broadcast(&newlevel);
pthread_mutex_unlock(&levellock);
执行条件的参与者会做类似的事情:
pthread_mutex_lock(&levellock);
while (1) {
if (level is my conditions) {
pthread_mutex_unlock(&levellock);
alert the media
pthread_mutex_lock(&levellock);
}
pthread_cond_wait(&newlevel, &levellock);
}
因此我可以在不破坏关卡设置代码或整个系统的情况下添加许多“状态监视器”。许多是有限的,但是通过在我提醒媒体时释放互斥锁,我避免了让我的水监测系统依赖于警报处理。
如果你熟悉“publish/subscribe”,你可能会觉得这很熟悉。这基本上是相同的模型,只是 PS 隐藏了一堆细节。
在下面的代码中,我编写了一个程序,使用多线程对 int
数组执行 add/remove 操作。条件是多个线程不能对同一个cell进行操作,但是可以对不同的cell进行并行操作
我想为了实现这样的条件,我需要使用多个互斥体和条件变量,准确地说,数组中有多少个单元格。我数组的所有单元格的初始值为 10
并且线程 increment/decrement 这个值由 3
.
下面的代码似乎可以工作(所有线程完成工作后数组的单元格值符合预期)但我不明白一些事情:
- 我首先生成休眠一秒钟的加法器线程。此外,每个线程都有
printf
语句,如果线程等待,就会触发该语句。删除线程不会休眠,所以我希望删除线程调用它们的printf
语句,因为它们必须至少等待一秒钟才能完成加法器线程的工作。 但是 移除线程从未调用printf
。 - 我的第二个问题:正如我提到的,我首先生成加法器线程,所以我希望单元格值从 10 变为 13。然后,如果移除线程获取锁,该值可以从 13 变为 10 OR 如果加法器线程获得锁,则单元格值将从 13 变为 16。但我没有看到线程内
printf
语句中的行为。例如,我有一个printf
序列:add thread id and cell id 1: cell value 10->13
,然后是remove thread id and cell id 1: cell value 10->7
,然后是add thread id and cell id 1: cell value 10->13
。这没有意义。我确保线程都指向同一个数组。
底线我想知道我的解决方案是否正确,如果是,为什么会出现我描述的行为。如果我的解决方案不正确,我将不胜感激正确解决方案或至少一般方向的示例。
这是代码(所有逻辑都在AdderThread
、RemoveThread
):
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>
#define ARR_LEN 5
#define THREADS_NUM 5
#define INIT_VAL 10
#define ADD_VAL 3
#define REMOVE_VAL 3
#define ADDER_LOOPS 2
typedef struct helper_t {
int threadId;
int * arr;
int * stateArr; //0 if free, 1 if busy
} helper_t;
enum STATE {FREE, BUSY};
enum ERRORS {MUTEX, COND, CREATE, JOIN, LOCK, UNLOCK, WAIT, BROADCAST};
pthread_mutex_t mutexArr[THREADS_NUM];
pthread_cond_t condArr[THREADS_NUM];
void errorHandler(int errorId) {
switch (errorId) {
case MUTEX:
printf("mutex error\n");
break;
case COND:
printf("cond error\n");
break;
case CREATE:
printf("create error\n");
break;
case JOIN:
printf("join error\n");
break;
case LOCK:
printf("lock error\n");
break;
case UNLOCK:
printf("unlock error\n");
break;
case WAIT:
printf("wait error\n");
break;
case BROADCAST:
printf("broadcast error\n");
break;
default:
printf("default switch\n");
break;
}
}
void mallocError() {
printf("malloc error\nExiting app\n");
exit(EXIT_FAILURE);
}
void initMutexesAndConds(pthread_mutex_t * mutexArr, pthread_cond_t * condArr) {
int i;
for(i = 0; i < THREADS_NUM; i++) {
pthread_mutex_init(&mutexArr[i], NULL);
pthread_cond_init(&condArr[i], NULL);
}
}
helper_t * initStructs(int * arr, int * stateArr) {
int i;
helper_t * helpers = (helper_t *) malloc(sizeof(helper_t) * THREADS_NUM);
if(!helpers) {
mallocError();
} else {
for(i = 0; i < THREADS_NUM; i++) {
helpers[i].threadId = i;
helpers[i].arr = arr;
helpers[i].stateArr = stateArr;
}
}
return helpers;
}
void printArr(int * arr, int len) {
int i;
for(i = 0; i < len; i++) {
printf("%d, ", arr[i]);
}
printf("\n");
}
void * AdderThread(void * arg) {
int i;
helper_t * h = (helper_t *) arg;
int id = h->threadId;
for(i = 0; i < ADDER_LOOPS; i++) {
pthread_mutex_t * mutex = &mutexArr[id];
pthread_cond_t * cond = &condArr[id];
if(pthread_mutex_lock(mutex)) {
errorHandler(LOCK);
}
while(h->stateArr[id] == BUSY) {
printf("adder id %d waiting...\n", id);
if(pthread_cond_wait(cond, mutex)) {
errorHandler(WAIT);
}
}
h->stateArr[id] = BUSY;
sleep(1);
h->arr[id] = h->arr[id] + ADD_VAL;
printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
h->stateArr[id] = FREE;
if(pthread_cond_broadcast(cond)) {
errorHandler(BROADCAST);
}
if(pthread_mutex_unlock(mutex)) {
errorHandler(UNLOCK);
}
}
pthread_exit(NULL);
}
void * RemoveThread(void * arg) {
helper_t * h = (helper_t *) arg;
int id = h->threadId;
pthread_mutex_t * mutex = &mutexArr[id];
pthread_cond_t * cond = &condArr[id];
if(pthread_mutex_lock(mutex)) {
errorHandler(LOCK);
}
while(h->stateArr[id] == BUSY) {
printf("remover id %d waiting...\n", id);
if(pthread_cond_wait(cond, mutex)) {
errorHandler(WAIT);
}
}
h->stateArr[id] = BUSY;
h->arr[id] = h->arr[id] - REMOVE_VAL;
printf("remove thread id and cell id %d: cell value %d->%d\n", id, h->arr[id], h->arr[id]-ADD_VAL);
h->stateArr[id] = FREE;
if(pthread_cond_broadcast(cond)) {
errorHandler(BROADCAST);
}
if(pthread_mutex_unlock(mutex)) {
errorHandler(UNLOCK);
}
pthread_exit(NULL);
}
int main() {
int i;
helper_t * adderHelpers;
helper_t * removeHelpers;
pthread_t adders[THREADS_NUM];
pthread_t removers[THREADS_NUM];
int * arr = (int *) malloc(sizeof(int) * ARR_LEN);
int * stateArr = (int *) malloc(sizeof(int) * ARR_LEN);
if(!arr || !stateArr) {
mallocError();
}
for(i = 0; i < ARR_LEN; i++) {
arr[i] = INIT_VAL;
stateArr[i] = FREE;
}
initMutexesAndConds(mutexArr, condArr);
adderHelpers = initStructs(arr, stateArr);
removeHelpers = initStructs(arr, stateArr);
for(i = 0; i < THREADS_NUM; i++) {
pthread_create(&adders[i], NULL, AdderThread, &adderHelpers[i]);
pthread_create(&removers[i], NULL, RemoveThread, &removeHelpers[i]);
}
for(i = 0; i < THREADS_NUM; i++) {
pthread_join(adders[i], NULL);
pthread_join(removers[i], NULL);
}
printf("the results are:\n");
printArr(arr, THREADS_NUM);
printf("DONE.\n");
return 0;
}
1) Addr中的代码序列:
h->stateArr[id] = BUSY;
sleep(1);
h->arr[id] = h->arr[id] + ADD_VAL;
printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
h->stateArr[id] = FREE;
是在锁定互斥锁的情况下执行的;因此 Remove 将永远不会有机会看到除了 FREE 之外的状态。
2) 没有 gua运行tee 互斥锁所有权交替(afaik),但至少,要正确协调线程,您永远不应该依赖这样的实现细节。就是工作和“碰巧工作”的区别,通常会导致“习惯工作”......
如果你把 sleep() 放在互斥解锁和互斥锁之间,你可能会有更好的情况,但实际上,它只是解锁它然后再次锁定它,所以系统完全有权就让它继续执行吧。
[我 运行 条评论中 space 条...]:
是的,条件变量在这里对您没有任何作用。条件变量的想法是能够在某些共享反对上发生重大事件(例如状态更改)时得到通知。
例如,水库可能只有一个水位条件变量。多路复用可能有很多条件:level < 1m;水平 > 5m;水平 > 10m。为了保持系统独立(从而工作),更新级别的位可能只是:
pthread_mutex_lock(&levellock);
level = x;
pthread_cond_broadcast(&newlevel);
pthread_mutex_unlock(&levellock);
执行条件的参与者会做类似的事情:
pthread_mutex_lock(&levellock);
while (1) {
if (level is my conditions) {
pthread_mutex_unlock(&levellock);
alert the media
pthread_mutex_lock(&levellock);
}
pthread_cond_wait(&newlevel, &levellock);
}
因此我可以在不破坏关卡设置代码或整个系统的情况下添加许多“状态监视器”。许多是有限的,但是通过在我提醒媒体时释放互斥锁,我避免了让我的水监测系统依赖于警报处理。
如果你熟悉“publish/subscribe”,你可能会觉得这很熟悉。这基本上是相同的模型,只是 PS 隐藏了一堆细节。