phread_cond_broadcast 竞争条件

phread_cond_broadcast race condition

我找不到停止这种竞争条件的方法。主线程调用广播例程唤醒所有线程,然后调用 cond_wait 等待所有线程完成。最后一个完成的线程向主线程发出信号。问题是有时在主线程进行广播时并非所有工作线程都在等待条件变量。代码有点乱,因为我正在尝试各种修复。

    unsigned nProcs, curProc;
    pthread_mutex_t WORKlock = PTHREAD_MUTEX_INITIALIZER;
    pthread_mutex_t MAINlock = PTHREAD_MUTEX_INITIALIZER;
    pthread_mute_t wj_varlock = PTHREAD_MUTEX_INITIALIZER;
    pthread_cond_t WORKsig = PTHREAD_COND_INITIALIZER;
    pthread_cond_t MAINsig = PTHREAD_COND_INITIALIZER;

    void *do_work(void *t)
    {
    static unsigned offset = 0;
    unsigned myoff = offset++, locked = 0;
    volatile int nc;

    while(1)
            {
            if(locked == 0)
                    pthread_mutex_lock(&WORKlock);
            else
                    locked = 0;
            pthread_cond_wait(&WORKsig, &WORKlock);
            pthread_mutex_unlock(&WORKlock);
            // this is where the work gets done
            //sleep(1);
            printf("thread %d done!\n", myoff);

            pthread_mutex_lock(&wj_varlock);
            nc = --curProc;
            pthread_mutex_unlock(&wj_varlock);
            if(nc == 0)
                    {
                    pthread_mutex_lock(&WORKlock);
                    locked = 1;
                    pthread_cond_signal(&MAINsig);
                    }

            }
     }

    void main(int argc, char **argv)
    {
    unsigned i, k;
    pthread_t pth;

    nProcs = get_nprocs();  // get number of core from system
    for(i = 0; i < nProcs; ++i)
            {
            k = pthread_create(&pth, NULL, do_work, NULL);
            if(k != 0)
                    {
                    perror("pthread_create");
                    exit(k);
                    }
            }

    pthread_mutex_lock(&MAINlock);
    while(1)
            {
            //prepare work to be done

            puts("work prep");
            //sleep(1);

            curProc = nProcs; // use global var to track active threads
            pthread_cond_broadcast(&WORKsig);
            pthread_cond_wait(&MAINsig, &MAINlock);
            }
    }

条件变量需要与某个共享状态的条件配对(通常称为“谓词”)——这就是它们被称为条件变量的原因。因此,例如要启动工作人员,您可以使用一个简单的全局标志变量:

int start_work = 0;    /* Protected by WORKlock */

然后在工作线程中你会做:

pthread_mutex_lock(&WORKlock);
while (!start_work)
    pthread_cond_wait(&WORKsig, &WORKlock);
pthread_mutex_unlock(&WORKlock);

/* this is where the work gets done */

在主线程中你会做:

pthread_mutex_lock(&WORKlock);
start_work = 1;
pthread_mutex_unlock(&WORKlock);
pthread_cond_broadcast(&WORKsig);

这样可以看到,如果主线程做信号时工作线程没有阻塞在条件变量处,那么start_work就是1,所以根本不会阻塞。

要阻塞主线程直到 worker 完成,您可以使用 curProc > 0 作为谓词。请注意,您不需要 wj_varlockMAINlock - 您只需要一个来保护 curProc 变量。

(为了使您的设计正确,您需要仔细交错 curProcstart_work 上的条件)

#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

int start_work = 0;    /* Protected by WORKlock */
unsigned curProc;      /* Protected by MAINlock */

pthread_mutex_t WORKlock = PTHREAD_MUTEX_INITIALIZER;
pthread_mutex_t MAINlock = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t WORKsig = PTHREAD_COND_INITIALIZER;
pthread_cond_t MAINsig = PTHREAD_COND_INITIALIZER;


void *do_work(void *t)
{
    static int off;   /* Protected by WORKlock */
    int myoff;

    pthread_mutex_lock(&WORKlock);
    myoff = ++off;
    pthread_mutex_unlock(&WORKlock);

    while (1)
    {
        /* Wait to start work */
        pthread_mutex_lock(&WORKlock);
        while (start_work == 0)
            pthread_cond_wait(&WORKsig, &WORKlock);
        pthread_mutex_unlock(&WORKlock);

        /* Increase number of active processes */
        pthread_mutex_lock(&MAINlock);
        ++curProc;
        pthread_mutex_unlock(&MAINlock);
        pthread_cond_signal(&MAINsig);

        /* this is where the work gets done */
        printf("Working (%d)...\n", myoff);
        sleep(1);

        /* Wait for all work to be done */
        pthread_mutex_lock(&WORKlock);
        while (start_work == 1)
            pthread_cond_wait(&WORKsig, &WORKlock);
        pthread_mutex_unlock(&WORKlock);

        /* Reduce number of active processes */
        pthread_mutex_lock(&MAINlock);
        --curProc;
        if (curProc == 0)
            pthread_cond_signal(&MAINsig);
        pthread_mutex_unlock(&MAINlock);
    }
}

int get_nprocs(void)
{
    return 8;
}

int main(int argc, char **argv)
{
    unsigned i, k;
    pthread_t pth;

    unsigned nProcs = get_nprocs();  // get number of core from system
    for (i = 0; i < nProcs; ++i)
    {
        k = pthread_create(&pth, NULL, do_work, NULL);
        if (k != 0)
        {
            perror("pthread_create");
            exit(k);
        }
    }

    curProc = 0;
    while (1)
    {
        //prepare work to be done

        puts("work prep");
        //sleep(1);

        /* Tell threads to start work */
        pthread_mutex_lock(&WORKlock);
        start_work = 1;
        pthread_mutex_unlock(&WORKlock);
        pthread_cond_broadcast(&WORKsig);

        /* Wait for threads to start */
        pthread_mutex_lock(&MAINlock);
        while (curProc < nProcs)
            pthread_cond_wait(&MAINsig, &MAINlock);
        pthread_mutex_unlock(&MAINlock);

        /* Tell threads not to start next lot of work */
        pthread_mutex_lock(&WORKlock);
        start_work = 0;
        pthread_mutex_unlock(&WORKlock);
        pthread_cond_broadcast(&WORKsig);

        /* Wait for threads to finish */
        pthread_mutex_lock(&MAINlock);
        while (curProc > 0)
            pthread_cond_wait(&MAINsig, &MAINlock);
        pthread_mutex_unlock(&MAINlock);
    }

    return 0;
}

请注意,使用 barriers:

可以更简单地完成这种互锁操作
#include <stdio.h>
#include <stdlib.h>
#include <pthread.h>
#include <unistd.h>

pthread_mutex_t WORKlock = PTHREAD_MUTEX_INITIALIZER;
pthread_barrier_t WORKbarrier;

void *do_work(void *t)
{
    static int off;   /* Protected by WORKlock */
    int myoff;

    pthread_mutex_lock(&WORKlock);
    myoff = ++off;
    pthread_mutex_unlock(&WORKlock);

    while (1)
    {
        /* Wait to start work */
        pthread_barrier_wait(&WORKbarrier);

        /* this is where the work gets done */
        printf("Working (%d)...\n", myoff);
        sleep(1);

        /* Wait for all work to be done */
        pthread_barrier_wait(&WORKbarrier);
    }
}

int get_nprocs(void)
{
    return 8;
}

int main(int argc, char **argv)
{
    unsigned i, k;
    pthread_t pth;

    unsigned nProcs = get_nprocs();  // get number of core from system
    for (i = 0; i < nProcs; ++i)
    {
        k = pthread_create(&pth, NULL, do_work, NULL);
        if (k != 0)
        {
            perror("pthread_create");
            exit(k);
        }
    }

    pthread_barrier_init(&WORKbarrier, NULL, nProcs + 1);
    while (1)
    {
        //prepare work to be done

        puts("work prep");
        //sleep(1);

        /* Tell threads to start work */
        pthread_barrier_wait(&WORKbarrier);

        /* Wait for threads to finish */
        pthread_barrier_wait(&WORKbarrier);
    }

    return 0;
}