pthread_cond_wait有时会收不到信号

pthread_cond_wait sometimes will not receive the signal

我对 pthread_cond_waitpthread_cond_signal 有一个奇怪的问题。我已经安排了一系列线程。启动时都处于休眠状态。唤醒函数将向这些线程发出信号、做一些工作并等待结果。

在下面的设置中,td 是线程数据,包含互斥锁和条件,th 是包含线程指针的数组:

for (size_t i = 0; i < NUM_THREADS; i++) {
    pthread_cond_init(&td[i].cond, NULL);
    pthread_mutex_init(&td[i].cond_mutex, NULL);
    pthread_mutex_init(&td[i].work_mutex, NULL);
    pthread_mutex_lock(&td[i].cond_mutex);
    pthread_mutex_lock(&td[i].work_mutex);
    pthread_create(&th[i], NULL, thread_worker, (void *)&td[i]);
}

线程工作者是这样的:

void*
thread_worker(void* data)
{
    THREAD_DATA *td = (THREAD_DATA *)data;
    while (1) {
        pthread_cond_wait(&td->cond, &td->cond_mutex);  // marker

        // do work ...

        pthread_mutex_unlock(&td->work_mutex);
    }
    pthread_exit(NULL);
}

这个 job 函数应该唤醒所有线程,完成工作,并等待它们完成:

void
job()
{
    for (size_t i = 0; i < NUM_THREADS; i++) {
        pthread_cond_signal(&td[i].cond);
    }
    for (size_t i = 0; i < NUM_THREADS; i++) {
        pthread_mutex_lock(&td[i].work_mutex);  // block until the work is done
    }
}

在极少数情况下(可能是 1000 次运行中的 1 次),上述设置会遇到冻结。当发生这种情况时,thread_worker 中的 'marker' 行将不会被 pthread_cond_signal 发出信号,它只是继续等待。这种情况非常罕见,但时有发生。我生成了许多日志消息,并且我验证了 pthread_cond_wait 总是在 pthread_cond_signal 之前被调用。我在这里做错了什么?

一些检查清单:

1) 在等待 cond 变量之前是否锁定了互斥量 td->cond_mutex?否则,它是未定义的。

2) 你检查谓词 after pthread_cond_wait() returns 了吗?典型用法是

while(!flag) pthread_cond_wait(&cv, &mutex); //waits on flag

这不是你拥有的。这是为了防止虚假唤醒并确保谓词在此期间没有更改。

3) pthread_cond_signal()保证至少唤醒一个线程。如果有多个线程在等待同一个条件变量,您可能需要使用 pthread_cond_broadcast()

4) 如果没有线程在等待条件变量,则 pthread_cond_signal()pthread_cond_broadcast() 无效。

没有任何东西强制 pthread_cond_wait()pthread_cond_signal() 之前被调用。不管你怎么说日志记录,记录的行完全有可能与实际发生的事情顺序不一致。

您没有正确使用互斥锁和条件变量:互斥锁只能由锁定它们的同一个线程解锁,并且条件变量应该与对某些共享状态的测试配对(称为 谓词)。共享状态应该受到传递给 pthread_cond_wait().

的互斥锁的保护

例如,可以修改您的示例以正确使用互斥锁和条件变量。首先在THREAD_DATA结构中添加一个int work_status,其中0表示线程正在等待工作,1表示有工作可用,2表示工作完成。

您似乎不需要在每个 THREAD_DATA 中使用两个互斥锁,并且您不想在设置时在主线程中锁定互斥锁:

for (size_t i = 0; i < NUM_THREADS; i++) {
    pthread_cond_init(&td[i].cond, NULL);
    pthread_mutex_init(&td[i].cond_mutex, NULL);
    td[i].work_status = 0;
    pthread_create(&th[i], NULL, thread_worker, (void *)&td[i]);
}

让线程使用条件变量等待 work_status

void*
thread_worker(void* data)
{
    THREAD_DATA *td = (THREAD_DATA *)data;

    while (1) {
        /* Wait for work to be available */
        pthread_mutex_lock(&td->cond_mutex);
        while (td->work_status != 1)
            pthread_cond_wait(&td->cond, &td->cond_mutex);
        pthread_mutex_unlock(&td->cond_mutex);

        // do work ...

        /* Tell main thread that the work has finished */
        pthread_mutex_lock(&td->cond_mutex);
        td->work_status = 2;
        pthread_cond_signal(&td->cond);
        pthread_mutex_unlock(&td->cond_mutex);
    }
    pthread_exit(NULL);
}

...并在 job() 中适当地设置并等待 work_status:

void
job()
{
    /* Tell threads that work is available */
    for (size_t i = 0; i < NUM_THREADS; i++) {
        pthread_mutex_lock(&td[i].cond_mutex);
        td[i].work_status = 1;
        pthread_cond_signal(&td[i].cond);
        pthread_mutex_unlock(&td[i].cond_mutex);
    }

    /* Wait for threads to signal work complete */
    for (size_t i = 0; i < NUM_THREADS; i++) {
        pthread_mutex_lock(&td[i].cond_mutex);
        while (td[i].work_status != 2)
            pthread_cond_wait(&td[i].cond, &td[i].cond_mutex);
        pthread_mutex_unlock(&td[i].cond_mutex);
    }
}