C 未初始化的互斥量有效而初始化的互斥量失败?

C uninitialized mutex works and initialized mutex fails?

我的 C 程序创建了一个生产者线程,以尽可能快的速度保存数据。主线程消费并打印这些。

经过几天的 bug 发现,我注意到如果初始化互斥体,程序会在 30 秒内停止(死锁?)。

但是,如果互斥量未初始化,它会完美地工作。

谁能解释一下??为了避免未定义的行为,我宁愿尽可能初始化它们。

更多信息:具体来说,如果 "pthread_mutex_t signalM"(信号互斥)被初始化,它就会锁定

已初始化

#include <stdlib.h>                     // exit_failure, exit_success
#include <stdio.h>                      // stdin, stdout, printf
#include <pthread.h>                    // threads
#include <string.h>                     // string
#include <unistd.h>                     // sleep
#include <stdbool.h>                    // bool
#include <fcntl.h>                      // open



struct event {
    pthread_mutex_t critical;
    pthread_mutex_t signalM;
    pthread_cond_t signalC;
    int eventCount;
};

struct allVars {
    struct event inEvents;
    struct event outEvents;
    int bufferSize;
    char buffer[10][128];
};




/**
 * Advance the EventCount
 */
void advance(struct event *event) {
    // increment the event counter
    pthread_mutex_lock(&event->critical);
    event->eventCount++;
    pthread_mutex_unlock(&event->critical);

    // signal await to continue
    pthread_mutex_lock(&event->signalM);
    pthread_cond_signal(&event->signalC);
    pthread_mutex_unlock(&event->signalM);
}



/**
 * Wait for ticket and buffer availability
 */
void await(struct event *event, int ticket) {

    int eventCount;

    // get the counter
    pthread_mutex_lock(&event->critical);
    eventCount = event->eventCount;
    pthread_mutex_unlock(&event->critical);

    // lock signaling mutex
    pthread_mutex_lock(&event->signalM);

    // loop until the ticket machine shows your number
    while (ticket > eventCount) {
        // wait until a ticket is called
        pthread_cond_wait(&event->signalC, &event->signalM);

        // get the counter
        pthread_mutex_lock(&event->critical);
        eventCount = event->eventCount;
        pthread_mutex_unlock(&event->critical);
    }

    // unlock signaling mutex
    pthread_mutex_unlock(&event->signalM);
}



/**
 * Add to buffer
 */
void putBuffer(struct allVars *allVars, char data[]) {
    // get the current write position
    pthread_mutex_lock(&allVars->inEvents.critical);
    int in = allVars->inEvents.eventCount;
    pthread_mutex_unlock(&allVars->inEvents.critical);

    // wait until theres a space free in the buffer
    await(&allVars->outEvents, in - allVars->bufferSize + 1);   // set to 2 to keep 1 index distance

    // add data to buffer
    strcpy(allVars->buffer[in % allVars->bufferSize], data);

    // increment the eventCounter and signal
    advance(&allVars->inEvents);
}



/**
 * Get from buffer
 */
char *getBuffer(struct allVars *allVars) {
    // get the current read position
    pthread_mutex_lock(&allVars->outEvents.critical);
    int out = allVars->outEvents.eventCount;
    pthread_mutex_unlock(&allVars->outEvents.critical);

    // wait until theres something in the buffer
    await(&allVars->inEvents, out + 1);

    // allocate memory for returned string
    char *str = malloc(128);

    // get the buffer data
    strcpy(str, allVars->buffer[out % allVars->bufferSize]);

    // increment the eventCounter and signal
    advance(&allVars->outEvents);

    return str;
}



/** child thread (producer) */
void *childThread(void *allVars) {
    char str[10];
    int count = 0;

    while (true) {
        sprintf(str, "%d", count++);
        putBuffer(allVars, str);
    }

    pthread_exit(EXIT_SUCCESS);
}



int main(void) {
    // init structs
    struct event inEvents = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    };
    struct event outEvents = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    };
    struct allVars allVars = {
        inEvents,       // events
        outEvents,
        10,             // buffersize
        {"", {""}}      // buffer[][]
    };


    // create child thread (producer)
    pthread_t thread;
    if (pthread_create(&thread, NULL, childThread, &allVars)) {
        fprintf(stderr, "failed to create child thread");
        exit(EXIT_FAILURE);
    }


    // (consumer)
    while (true) {
        char *out = getBuffer(&allVars);
        printf("buf: %s\n", out);
        free(out);
    }


    return (EXIT_SUCCESS);
}

未初始化

#include <stdlib.h>                     // exit_failure, exit_success
#include <stdio.h>                      // stdin, stdout, printf
#include <pthread.h>                    // threads
#include <string.h>                     // string
#include <unistd.h>                     // sleep
#include <stdbool.h>                    // bool
#include <fcntl.h>                      // open



struct event {
    pthread_mutex_t critical;
    pthread_mutex_t signalM;
    pthread_cond_t signalC;
    int eventCount;
};

struct allVars {
    struct event inEvents;
    struct event outEvents;
    int bufferSize;
    char buffer[10][128];
};




/**
 * Advance the EventCount
 */
void advance(struct event *event) {
    // increment the event counter
    pthread_mutex_lock(&event->critical);
    event->eventCount++;
    pthread_mutex_unlock(&event->critical);

    // signal await to continue
    pthread_mutex_lock(&event->signalM);
    pthread_cond_signal(&event->signalC);
    pthread_mutex_unlock(&event->signalM);
}



/**
 * Wait for ticket and buffer availability
 */
void await(struct event *event, int ticket) {

    int eventCount;

    // get the counter
    pthread_mutex_lock(&event->critical);
    eventCount = event->eventCount;
    pthread_mutex_unlock(&event->critical);

    // lock signaling mutex
    pthread_mutex_lock(&event->signalM);

    // loop until the ticket machine shows your number
    while (ticket > eventCount) {
        // wait until a ticket is called
        pthread_cond_wait(&event->signalC, &event->signalM);

        // get the counter
        pthread_mutex_lock(&event->critical);
        eventCount = event->eventCount;
        pthread_mutex_unlock(&event->critical);
    }

    // unlock signaling mutex
    pthread_mutex_unlock(&event->signalM);
}



/**
 * Add to buffer
 */
void putBuffer(struct allVars *allVars, char data[]) {
    // get the current write position
    pthread_mutex_lock(&allVars->inEvents.critical);
    int in = allVars->inEvents.eventCount;
    pthread_mutex_unlock(&allVars->inEvents.critical);

    // wait until theres a space free in the buffer
    await(&allVars->outEvents, in - allVars->bufferSize + 1);   // set to 2 to keep 1 index distance

    // add data to buffer
    strcpy(allVars->buffer[in % allVars->bufferSize], data);

    // increment the eventCounter and signal
    advance(&allVars->inEvents);
}



/**
 * Get from buffer
 */
char *getBuffer(struct allVars *allVars) {
    // get the current read position
    pthread_mutex_lock(&allVars->outEvents.critical);
    int out = allVars->outEvents.eventCount;
    pthread_mutex_unlock(&allVars->outEvents.critical);

    // wait until theres something in the buffer
    await(&allVars->inEvents, out + 1);

    // allocate memory for returned string
    char *str = malloc(128);

    // get the buffer data
    strcpy(str, allVars->buffer[out % allVars->bufferSize]);

    // increment the eventCounter and signal
    advance(&allVars->outEvents);

    return str;
}



/** child thread (producer) */
void *childThread(void *allVars) {
    char str[10];
    int count = 0;

    while (true) {
        sprintf(str, "%d", count++);
        putBuffer(allVars, str);
    }

    pthread_exit(EXIT_SUCCESS);
}



int main(void) {
    // init structs
    struct event inEvents; /* = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    }; */
    struct event outEvents; /* = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    }; */

    struct allVars allVars = {
        inEvents,       // events
        outEvents,
        10,             // buffersize
        {"", {""}}      // buffer[][]
    };


    // create child thread (producer)
    pthread_t thread;
    if (pthread_create(&thread, NULL, childThread, &allVars)) {
        fprintf(stderr, "failed to create child thread");
        exit(EXIT_FAILURE);
    }


    // (consumer)
    while (true) {
        char *out = getBuffer(&allVars);
        printf("buf: %s\n", out);
        free(out);
    }


    return (EXIT_SUCCESS);
}

我修改了 getBuffer()putBuffer() 例程,如图所示(在代码的初始化和未初始化版本中):

static
void putBuffer(struct allVars *allVars, char data[])
{
    int lock_ok = pthread_mutex_lock(&allVars->inEvents.critical);
    if (lock_ok != 0)
        printf("%s(): lock error %d (%s)\n", __func__, lock_ok, strerror(lock_ok));
    int in = allVars->inEvents.eventCount;
    int unlock_ok = pthread_mutex_unlock(&allVars->inEvents.critical);
    if (unlock_ok != 0)
        printf("%s(): unlock error %d (%s)\n", __func__, unlock_ok, strerror(unlock_ok));

    await(&allVars->outEvents, in - allVars->bufferSize + 1);

    strcpy(allVars->buffer[in % allVars->bufferSize], data);

    advance(&allVars->inEvents);
}

static
char *getBuffer(struct allVars *allVars)
{
    int lock_ok = pthread_mutex_lock(&allVars->outEvents.critical);
    if (lock_ok != 0)
        printf("%s(): lock error %d (%s)\n", __func__, lock_ok, strerror(lock_ok));
    int out = allVars->outEvents.eventCount;
    int unlock_ok = pthread_mutex_unlock(&allVars->outEvents.critical);
    if (unlock_ok != 0)
        printf("%s(): unlock error %d (%s)\n", __func__, unlock_ok, strerror(unlock_ok));

    await(&allVars->inEvents, out + 1);

    char *str = malloc(128);

    strcpy(str, allVars->buffer[out % allVars->bufferSize]);

    advance(&allVars->outEvents);

    return str;
}

然后 运行 未初始化的代码会产生很多消息,例如:

buf: 46566
putBuffer(): lock error 22 (Invalid argument)
getBuffer(): lock error 22 (Invalid argument)
putBuffer(): unlock error 22 (Invalid argument)
getBuffer(): unlock error 22 (Invalid argument)

基本上,在我看来,您的锁定和解锁被忽略了。您的代码中还有其他地方您也应该检查。

从根本上说,如果忽略报告的错误,您不会注意到锁定和解锁根本不起作用,并且代码没有理由停止 运行。

始终检查可能失败的系统调用的 return 值。

对于初始化代码为何锁定,我没有立即的解释。它对我有用,运行 Mac OS X 10.10.3 with GCC 5.1.0,经过大约 100,000 到 800,000 次迭代后。

Jonathan 解释了为什么没有初始化互斥量的代码没有死锁(主要是因为尝试使用未初始化的互斥量永远不会阻塞,它会立即 return 一个错误)。

在正确初始化互斥体的程序版本中导致无限等待的问题是您没有正确使用条件变量。谓词表达式的检查和条件变量的等待必须相对于可能正在修改谓词的任何其他线程以原子方式完成。您的代码正在检查一个谓词,该谓词是另一个线程甚至无法访问的局部变量。您的代码将实际谓词读入关键部分内的局部变量,但随后释放用于读取谓词的互斥锁并获取不同的互斥锁以读取 'false' 谓词(无论如何其他线程都无法修改) 原子地与条件变量 wait.

所以你有这样一种情况,可以修改实际谓词 event->eventCount,并且在等待线程读取谓词和阻塞条件变量时发出修改信号。

我认为以下内容可以解决您的僵局,但我还没有机会进行太多测试。更改本质上是从 struct event 中删除 signalM 互斥锁,并将对它的任何使用替换为 critical 互斥锁:

#include <stdlib.h>                     // exit_failure, exit_success
#include <stdio.h>                      // stdin, stdout, printf
#include <pthread.h>                    // threads
#include <string.h>                     // string
#include <unistd.h>                     // sleep
#include <stdbool.h>                    // bool
#include <fcntl.h>                      // open



struct event {
    pthread_mutex_t critical;
    pthread_cond_t signalC;
    int eventCount;
};

struct allVars {
    struct event inEvents;
    struct event outEvents;
    int bufferSize;
    char buffer[10][128];
};




/**
 * Advance the EventCount
 */
void advance(struct event *event) {
    // increment the event counter
    pthread_mutex_lock(&event->critical);
    event->eventCount++;
    pthread_mutex_unlock(&event->critical);

    // signal await to continue
    pthread_cond_signal(&event->signalC);
}



/**
 * Wait for ticket and buffer availability
 */
void await(struct event *event, int ticket) {


    // get the counter
    pthread_mutex_lock(&event->critical);

    // loop until the ticket machine shows your number
    while (ticket > event->eventCount) {
        // wait until a ticket is called
        pthread_cond_wait(&event->signalC, &event->critical);
    }

    // unlock signaling mutex
    pthread_mutex_unlock(&event->critical);
}



/**
 * Add to buffer
 */
void putBuffer(struct allVars *allVars, char data[]) {
    // get the current write position
    pthread_mutex_lock(&allVars->inEvents.critical);
    int in = allVars->inEvents.eventCount;
    pthread_mutex_unlock(&allVars->inEvents.critical);

    // wait until theres a space free in the buffer
    await(&allVars->outEvents, in - allVars->bufferSize + 1);   // set to 2 to keep 1 index distance

    // add data to buffer
    strcpy(allVars->buffer[in % allVars->bufferSize], data);

    // increment the eventCounter and signal
    advance(&allVars->inEvents);
}



/**
 * Get from buffer
 */
char *getBuffer(struct allVars *allVars) {
    // get the current read position
    pthread_mutex_lock(&allVars->outEvents.critical);
    int out = allVars->outEvents.eventCount;
    pthread_mutex_unlock(&allVars->outEvents.critical);

    // wait until theres something in the buffer
    await(&allVars->inEvents, out + 1);

    // allocate memory for returned string
    char *str = malloc(128);

    // get the buffer data
    strcpy(str, allVars->buffer[out % allVars->bufferSize]);

    // increment the eventCounter and signal
    advance(&allVars->outEvents);

    return str;
}



/** child thread (producer) */
void *childThread(void *allVars) {
    char str[10];
    int count = 0;

    while (true) {
        sprintf(str, "%d", count++);
        putBuffer(allVars, str);
    }

    pthread_exit(EXIT_SUCCESS);
}



int main(void) {
    // init structs
    struct event inEvents = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    };
    struct event outEvents = {
        PTHREAD_MUTEX_INITIALIZER,
        PTHREAD_COND_INITIALIZER,
        0
    };
    struct allVars allVars = {
        inEvents,       // events
        outEvents,
        10,             // buffersize
        {"", {""}}      // buffer[][]
    };


    // create child thread (producer)
    pthread_t thread;
    if (pthread_create(&thread, NULL, childThread, &allVars)) {
        fprintf(stderr, "failed to create child thread");
        exit(EXIT_FAILURE);
    }


    // (consumer)
    while (true) {
        char *out = getBuffer(&allVars);
        printf("buf: %s\n", out);
        free(out);
    }


    return (EXIT_SUCCESS);
}