pthreads:独占+并发函数(逆信号量)

pthreads: exclusive+concurrent functions (inverse semaphore)

我有锁定(某个库的)每个函数的代码,我想对其进行优化。给定函数 AB,我不介意任何 A 运行 与任何其他 A 或任何 B 运行ning 与任何其他 B 同时存在,但没有 A 可以 运行 而任何 B 是 运行ning,反之亦然。线程数是动态的,出于我无法控制的原因,我被迫对互斥锁和条件变量(即 PTHREAD_MUTEX_INITIALIZER)使用静态分配。

我有一种预感,最有效的方法是两个条件变量。使用 pthread_mutex_trylock 允许一个函数(即 A)并行到 运行,而另一个必须序列化。另外 *trylock 静态初始化实际上是未定义的行为...

编辑:

也许是这样的?我不确定这是不是:

static int countA = 0;
static int countB = 0;
static pthread_mutex_t lockCountA = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockCountB = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockA = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockB = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t condA = PTHREAD_COND_INITIALIZER;
static pthread_cond_t condB = PTHREAD_COND_INITIALIZER;

// for B(), just s/B/A/g
static void A(void) {
    pthread_mutex_lock(&lockB);
    while(countB)
        pthread_cond_wait(&condB, &lockB);
    pthread_mutex_lock(&lockCountA);
    countA += 1;
    pthread_mutex_unlock(&lockCountA)
    doA();
    pthread_mutex_lock(&lockCountA)
    countA -= 1;
    if countA == 0:
        pthread_cond_signal(&condA);
    mutex_unlock(&lockCountA)
    mutex_unlock(&lockB);
}

您的解决方案存在竞争条件。考虑 countAcountB 都为零的情况,两个线程同时调用 A()B()。第一个线程锁定 lockB,第二个线程锁定 lockA,两者都将检查的计数视为零,然后继续增加各自的计数并继续错误。

你的解决方案中的另一个问题是它使用 pthread_cond_signal() 不一定唤醒一个以上的等待线程,所以如果你有 10 个线程等待进入 B() 但只有一个线程 运行ning A(),当后一个线程完成时,只有一个B()线程保证继续,其他9个可能会无限期等待。

它也不允许多个线程同时 运行 doA(),因为 lockB 被暂停了那个调用。

要解决第一个问题,您可以使用一个同时保护 countAcountB 的互斥锁(因为我们必须检查的共享状态涉及这两个变量的组合)。同时,您也可以只使用一个条件变量:等待条件变量的线程必须全部等待进入 A(),或者全部等待进入 B(),但是混合两者是不可能的。解决第二个问题只需要pthread_cond_broadcast()。这导致更简单:

static int countA = 0;
static int countB = 0;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;

void A(void)
{
    pthread_mutex_lock(&lock);
    while (countB)
        pthread_cond_wait(&cond, &lock);
    countA++;
    pthread_mutex_unlock(&lock);

    do_A();

    pthread_mutex_lock(&lock);
    countA--;
    if (!countA)
        pthread_cond_broadcast(&cond);
    pthread_mutex_unlock(&lock);
}

这个解决方案是正确的,但不是 "fair" - 如果有一个连续的线程流执行 A()(这样一个新线程进入并在前一个之前递增 countA thread has finished and decremented it) 然后等待执行的线程 B() 将永远等待。这在您的特定用途中可能不是问题 - 例如,如果您知道任何执行 A() 的线程最终将执行 B(),那么饥饿情况最终必须解决。

改进系统以防止这种饥饿可以通过阻止新条目进入 A() 而有线程排队进入 B():

来完成
static int countA = 0;
static int countB = 0;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int queuedA = 0;
static int queuedB = 0;
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;

void A(void)
{
    pthread_mutex_lock(&lock);
    while (queuedB)
        pthread_cond_wait(&queue_cond, &lock);
    while (countB)
    {
        queuedA++;
        pthread_cond_wait(&cond, &lock);
        queuedA--;
    }
    if (!queuedA)
        pthread_cond_broadcast(&queue_cond);
    countA++;
    pthread_mutex_unlock(&lock);

    do_A();

    pthread_mutex_lock(&lock);
    countA--;
    if (!countA)
        pthread_cond_broadcast(&cond);
    pthread_mutex_unlock(&lock);
}

这可以防止饥饿,因为:

  1. queuedBB() 中有任何线程在 cond 上等待时始终为非零;
  2. 虽然 queuedB 非零,但没有线程可以递增 countA,因此 countA 最终必须达到零并允许等待 cond 的线程继续.
  3. 虽然 countA 为零,但没有线程可以递增 queuedB,因此 queuedB 最终必须达到零并允许等待 queue_cond 的线程继续进行。