如何控制具有多个互斥锁和条件的线程?

How to control pthreads with multiple mutexes and conditions?

在下面的代码中,我编写了一个程序,使用多线程对 int 数组执行 add/remove 操作。条件是多个线程不能对同一个cell进行操作,但是可以对不同的cell进行并行操作

我想为了实现这样的条件,我需要使用多个互斥体和条件变量,准确地说,数组中有多少个单元格。我数组的所有单元格的初始值为 10 并且线程 increment/decrement 这个值由 3.

下面的代码似乎可以工作(所有线程完成工作后数组的单元格值符合预期)但我不明白一些事情:

  1. 我首先生成休眠一秒钟的加法器线程。此外,每个线程都有 printf 语句,如果线程等待,就会触发该语句。删除线程不会休眠,所以我希望删除线程调用它们的 printf 语句,因为它们必须至少等待一秒钟才能完成加法器线程的工作。 但是 移除线程从未调用 printf
  2. 我的第二个问题:正如我提到的,我首先生成加法器线程,所以我希望单元格值从 10 变为 13。然后,如果移除线程获取锁,该值可以从 13 变为 10 OR 如果加法器线程获得锁,则单元格值将从 13 变为 16。但我没有看到线程内 printf 语句中的行为。例如,我有一个 printf 序列:add thread id and cell id 1: cell value 10->13,然后是 remove thread id and cell id 1: cell value 10->7,然后是 add thread id and cell id 1: cell value 10->13。这没有意义。我确保线程都指向同一个数组。

底线我想知道我的解决方案是否正确,如果是,为什么会出现我描述的行为。如果我的解决方案不正确,我将不胜感激正确解决方案或至少一般方向的示例。

这是代码(所有逻辑都在AdderThreadRemoveThread):

#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <pthread.h>

#define ARR_LEN 5
#define THREADS_NUM 5
#define INIT_VAL 10
#define ADD_VAL 3
#define REMOVE_VAL 3
#define ADDER_LOOPS 2

typedef struct helper_t {
    int threadId;
    int * arr;
    int * stateArr; //0 if free, 1 if busy
} helper_t;

enum STATE {FREE, BUSY};
enum ERRORS {MUTEX, COND, CREATE, JOIN, LOCK, UNLOCK, WAIT, BROADCAST};

pthread_mutex_t mutexArr[THREADS_NUM];
pthread_cond_t condArr[THREADS_NUM];

void errorHandler(int errorId) {
    switch (errorId) {
        case MUTEX:
            printf("mutex error\n");
            break;
        case COND:
            printf("cond error\n");
            break;
        case CREATE:
            printf("create error\n");
            break;
        case JOIN:
            printf("join error\n");
            break;
        case LOCK:
            printf("lock error\n");
            break;
        case UNLOCK:
            printf("unlock error\n");
            break;
        case WAIT:
            printf("wait error\n");
            break;
        case BROADCAST:
            printf("broadcast error\n");
            break;
        default:
            printf("default switch\n");
            break;
    }
}

void mallocError() {
    printf("malloc error\nExiting app\n");
    exit(EXIT_FAILURE);
}

void initMutexesAndConds(pthread_mutex_t * mutexArr, pthread_cond_t * condArr) {
    int i;

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_mutex_init(&mutexArr[i], NULL);
        pthread_cond_init(&condArr[i], NULL);
    }
}

helper_t * initStructs(int * arr, int * stateArr) {
    int i;
    helper_t * helpers = (helper_t *) malloc(sizeof(helper_t) * THREADS_NUM);
    if(!helpers) {
        mallocError();
    } else {
        for(i = 0; i < THREADS_NUM; i++) {
            helpers[i].threadId = i;
            helpers[i].arr = arr;
            helpers[i].stateArr = stateArr;
        }
    }
    return helpers;
}

void printArr(int * arr, int len) {
    int i;
    for(i = 0; i < len; i++) {
        printf("%d, ", arr[i]);
    }
    printf("\n");
}

void * AdderThread(void * arg) {
    int i;
    helper_t * h = (helper_t *) arg;
    int id = h->threadId;
    for(i = 0; i < ADDER_LOOPS; i++) {
        pthread_mutex_t * mutex = &mutexArr[id];
        pthread_cond_t * cond = &condArr[id];
        if(pthread_mutex_lock(mutex)) {
            errorHandler(LOCK);
        }
        while(h->stateArr[id] == BUSY) {
            printf("adder id %d waiting...\n", id);
            if(pthread_cond_wait(cond, mutex)) {
                errorHandler(WAIT);
            }
        }
        h->stateArr[id] = BUSY;
        sleep(1);
        h->arr[id] = h->arr[id] + ADD_VAL;
        printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
        h->stateArr[id] = FREE;
        if(pthread_cond_broadcast(cond)) {
            errorHandler(BROADCAST);
        }
        if(pthread_mutex_unlock(mutex)) {
            errorHandler(UNLOCK);
        }
    }
    pthread_exit(NULL);
}

void * RemoveThread(void * arg) {
    helper_t * h = (helper_t *) arg;
    int id = h->threadId;
    pthread_mutex_t * mutex = &mutexArr[id];
    pthread_cond_t * cond = &condArr[id];
    if(pthread_mutex_lock(mutex)) {
        errorHandler(LOCK);
    }
    while(h->stateArr[id] == BUSY) {
        printf("remover id %d waiting...\n", id);
        if(pthread_cond_wait(cond, mutex)) {
            errorHandler(WAIT);
        }
    }
    h->stateArr[id] = BUSY;
    h->arr[id] = h->arr[id] - REMOVE_VAL;
    printf("remove thread id and cell id %d: cell value %d->%d\n", id, h->arr[id], h->arr[id]-ADD_VAL);
    h->stateArr[id] = FREE;
    if(pthread_cond_broadcast(cond)) {
        errorHandler(BROADCAST);
    }
    if(pthread_mutex_unlock(mutex)) {
        errorHandler(UNLOCK);
    }
    pthread_exit(NULL);
}

int main() {
    int i;
    helper_t * adderHelpers;
    helper_t * removeHelpers;
    pthread_t adders[THREADS_NUM];
    pthread_t removers[THREADS_NUM];
    int * arr = (int *) malloc(sizeof(int) * ARR_LEN);
    int * stateArr = (int *) malloc(sizeof(int) * ARR_LEN);
    if(!arr || !stateArr) {
        mallocError();
    }

    for(i = 0; i < ARR_LEN; i++) {
        arr[i] = INIT_VAL;
        stateArr[i] = FREE;
    }

    initMutexesAndConds(mutexArr, condArr);
    adderHelpers = initStructs(arr, stateArr);
    removeHelpers = initStructs(arr, stateArr);

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_create(&adders[i], NULL, AdderThread, &adderHelpers[i]);
        pthread_create(&removers[i], NULL, RemoveThread, &removeHelpers[i]);
    }

    for(i = 0; i < THREADS_NUM; i++) {
        pthread_join(adders[i], NULL);
        pthread_join(removers[i], NULL);
    }

    printf("the results are:\n");
    printArr(arr, THREADS_NUM);
    printf("DONE.\n");

    return 0;
}

1) Addr中的代码序列:

   h->stateArr[id] = BUSY;
        sleep(1);
        h->arr[id] = h->arr[id] + ADD_VAL;
        printf("add thread id and cell id %d: cell value %d->%d\n", id, h->arr[id]-ADD_VAL, h->arr[id]);
        h->stateArr[id] = FREE;

是在锁定互斥锁的情况下执行的;因此 Remove 将永远不会有机会看到除了 FREE 之外的状态。

2) 没有 gua运行tee 互斥锁所有权交替(afaik),但至少,要正确协调线程,您永远不应该依赖这样的实现细节。就是工作和“碰巧工作”的区别,通常会导致“习惯工作”......

如果你把 sleep() 放在互斥解锁和互斥锁之间,你可能会有更好的情况,但实际上,它只是解锁它然后再次锁定它,所以系统完全有权就让它继续执行吧。

[我 运行 条评论中 space 条...]:

是的,条件变量在这里对您没有任何作用。条件变量的想法是能够在某些共享反对上发生重大事件(例如状态更改)时得到通知。

例如,水库可能只有一个水位条件变量。多路复用可能有很多条件:level < 1m;水平 > 5m;水平 > 10m。为了保持系统独立(从而工作),更新级别的位可能只是:

pthread_mutex_lock(&levellock);
level = x;
pthread_cond_broadcast(&newlevel);
pthread_mutex_unlock(&levellock);

执行条件的参与者会做类似的事情:

pthread_mutex_lock(&levellock);
while (1) {
    if (level is my conditions) {
         pthread_mutex_unlock(&levellock);
         alert the media
         pthread_mutex_lock(&levellock);
    }
    pthread_cond_wait(&newlevel, &levellock);
}

因此我可以在不破坏关卡设置代码或整个系统的情况下添加许多“状态监视器”。许多是有限的,但是通过在我提醒媒体时释放互斥锁,我避免了让我的水监测系统依赖于警报处理。

如果你熟悉“publish/subscribe”,你可能会觉得这很熟悉。这基本上是相同的模型,只是 PS 隐藏了一堆细节。