pthreads:独占+并发函数(逆信号量)
pthreads: exclusive+concurrent functions (inverse semaphore)
我有锁定(某个库的)每个函数的代码,我想对其进行优化。给定函数 A
和 B
,我不介意任何 A
运行 与任何其他 A
或任何 B
运行ning 与任何其他 B
同时存在,但没有 A
可以 运行 而任何 B
是 运行ning,反之亦然。线程数是动态的,出于我无法控制的原因,我被迫对互斥锁和条件变量(即 PTHREAD_MUTEX_INITIALIZER
)使用静态分配。
我有一种预感,最有效的方法是两个条件变量。使用 pthread_mutex_trylock
允许一个函数(即 A
)并行到 运行,而另一个必须序列化。另外 *trylock
静态初始化实际上是未定义的行为...
编辑:
也许是这样的?我不确定这是不是:
- 可以更简单。毕竟,互斥量是使用信号量实现的,但是需要四个互斥量和两个条件变量来实现基本上是一个反向信号量。
- 涵盖所有竞争条件。
- 是"fair"(超出默认优先级和调度)?
static int countA = 0;
static int countB = 0;
static pthread_mutex_t lockCountA = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockCountB = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockA = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockB = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t condA = PTHREAD_COND_INITIALIZER;
static pthread_cond_t condB = PTHREAD_COND_INITIALIZER;
// for B(), just s/B/A/g
static void A(void) {
pthread_mutex_lock(&lockB);
while(countB)
pthread_cond_wait(&condB, &lockB);
pthread_mutex_lock(&lockCountA);
countA += 1;
pthread_mutex_unlock(&lockCountA)
doA();
pthread_mutex_lock(&lockCountA)
countA -= 1;
if countA == 0:
pthread_cond_signal(&condA);
mutex_unlock(&lockCountA)
mutex_unlock(&lockB);
}
您的解决方案存在竞争条件。考虑 countA
和 countB
都为零的情况,两个线程同时调用 A()
和 B()
。第一个线程锁定 lockB
,第二个线程锁定 lockA
,两者都将检查的计数视为零,然后继续增加各自的计数并继续错误。
你的解决方案中的另一个问题是它使用 pthread_cond_signal()
不一定唤醒一个以上的等待线程,所以如果你有 10 个线程等待进入 B()
但只有一个线程 运行ning A()
,当后一个线程完成时,只有一个B()
线程保证继续,其他9个可能会无限期等待。
它也不允许多个线程同时 运行 doA()
,因为 lockB
被暂停了那个调用。
要解决第一个问题,您可以使用一个同时保护 countA
和 countB
的互斥锁(因为我们必须检查的共享状态涉及这两个变量的组合)。同时,您也可以只使用一个条件变量:等待条件变量的线程必须全部等待进入 A()
,或者全部等待进入 B()
,但是混合两者是不可能的。解决第二个问题只需要pthread_cond_broadcast()
。这导致更简单:
static int countA = 0;
static int countB = 0;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void A(void)
{
pthread_mutex_lock(&lock);
while (countB)
pthread_cond_wait(&cond, &lock);
countA++;
pthread_mutex_unlock(&lock);
do_A();
pthread_mutex_lock(&lock);
countA--;
if (!countA)
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);
}
这个解决方案是正确的,但不是 "fair" - 如果有一个连续的线程流执行 A()
(这样一个新线程进入并在前一个之前递增 countA
thread has finished and decremented it) 然后等待执行的线程 B()
将永远等待。这在您的特定用途中可能不是问题 - 例如,如果您知道任何执行 A()
的线程最终将执行 B()
,那么饥饿情况最终必须解决。
改进系统以防止这种饥饿可以通过阻止新条目进入 A()
而有线程排队进入 B()
:
来完成
static int countA = 0;
static int countB = 0;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int queuedA = 0;
static int queuedB = 0;
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
void A(void)
{
pthread_mutex_lock(&lock);
while (queuedB)
pthread_cond_wait(&queue_cond, &lock);
while (countB)
{
queuedA++;
pthread_cond_wait(&cond, &lock);
queuedA--;
}
if (!queuedA)
pthread_cond_broadcast(&queue_cond);
countA++;
pthread_mutex_unlock(&lock);
do_A();
pthread_mutex_lock(&lock);
countA--;
if (!countA)
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);
}
这可以防止饥饿,因为:
queuedB
在 B()
中有任何线程在 cond
上等待时始终为非零;
- 虽然
queuedB
非零,但没有线程可以递增 countA
,因此 countA
最终必须达到零并允许等待 cond
的线程继续.
- 虽然
countA
为零,但没有线程可以递增 queuedB
,因此 queuedB
最终必须达到零并允许等待 queue_cond
的线程继续进行。
我有锁定(某个库的)每个函数的代码,我想对其进行优化。给定函数 A
和 B
,我不介意任何 A
运行 与任何其他 A
或任何 B
运行ning 与任何其他 B
同时存在,但没有 A
可以 运行 而任何 B
是 运行ning,反之亦然。线程数是动态的,出于我无法控制的原因,我被迫对互斥锁和条件变量(即 PTHREAD_MUTEX_INITIALIZER
)使用静态分配。
我有一种预感,最有效的方法是两个条件变量。使用 pthread_mutex_trylock
允许一个函数(即 A
)并行到 运行,而另一个必须序列化。另外 *trylock
静态初始化实际上是未定义的行为...
编辑:
也许是这样的?我不确定这是不是:
- 可以更简单。毕竟,互斥量是使用信号量实现的,但是需要四个互斥量和两个条件变量来实现基本上是一个反向信号量。
- 涵盖所有竞争条件。
- 是"fair"(超出默认优先级和调度)?
static int countA = 0;
static int countB = 0;
static pthread_mutex_t lockCountA = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockCountB = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockA = PTHREAD_MUTEX_INITIALIZER;
static pthread_mutex_t lockB = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t condA = PTHREAD_COND_INITIALIZER;
static pthread_cond_t condB = PTHREAD_COND_INITIALIZER;
// for B(), just s/B/A/g
static void A(void) {
pthread_mutex_lock(&lockB);
while(countB)
pthread_cond_wait(&condB, &lockB);
pthread_mutex_lock(&lockCountA);
countA += 1;
pthread_mutex_unlock(&lockCountA)
doA();
pthread_mutex_lock(&lockCountA)
countA -= 1;
if countA == 0:
pthread_cond_signal(&condA);
mutex_unlock(&lockCountA)
mutex_unlock(&lockB);
}
您的解决方案存在竞争条件。考虑 countA
和 countB
都为零的情况,两个线程同时调用 A()
和 B()
。第一个线程锁定 lockB
,第二个线程锁定 lockA
,两者都将检查的计数视为零,然后继续增加各自的计数并继续错误。
你的解决方案中的另一个问题是它使用 pthread_cond_signal()
不一定唤醒一个以上的等待线程,所以如果你有 10 个线程等待进入 B()
但只有一个线程 运行ning A()
,当后一个线程完成时,只有一个B()
线程保证继续,其他9个可能会无限期等待。
它也不允许多个线程同时 运行 doA()
,因为 lockB
被暂停了那个调用。
要解决第一个问题,您可以使用一个同时保护 countA
和 countB
的互斥锁(因为我们必须检查的共享状态涉及这两个变量的组合)。同时,您也可以只使用一个条件变量:等待条件变量的线程必须全部等待进入 A()
,或者全部等待进入 B()
,但是混合两者是不可能的。解决第二个问题只需要pthread_cond_broadcast()
。这导致更简单:
static int countA = 0;
static int countB = 0;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
void A(void)
{
pthread_mutex_lock(&lock);
while (countB)
pthread_cond_wait(&cond, &lock);
countA++;
pthread_mutex_unlock(&lock);
do_A();
pthread_mutex_lock(&lock);
countA--;
if (!countA)
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);
}
这个解决方案是正确的,但不是 "fair" - 如果有一个连续的线程流执行 A()
(这样一个新线程进入并在前一个之前递增 countA
thread has finished and decremented it) 然后等待执行的线程 B()
将永远等待。这在您的特定用途中可能不是问题 - 例如,如果您知道任何执行 A()
的线程最终将执行 B()
,那么饥饿情况最终必须解决。
改进系统以防止这种饥饿可以通过阻止新条目进入 A()
而有线程排队进入 B()
:
static int countA = 0;
static int countB = 0;
static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
static int queuedA = 0;
static int queuedB = 0;
static pthread_cond_t queue_cond = PTHREAD_COND_INITIALIZER;
void A(void)
{
pthread_mutex_lock(&lock);
while (queuedB)
pthread_cond_wait(&queue_cond, &lock);
while (countB)
{
queuedA++;
pthread_cond_wait(&cond, &lock);
queuedA--;
}
if (!queuedA)
pthread_cond_broadcast(&queue_cond);
countA++;
pthread_mutex_unlock(&lock);
do_A();
pthread_mutex_lock(&lock);
countA--;
if (!countA)
pthread_cond_broadcast(&cond);
pthread_mutex_unlock(&lock);
}
这可以防止饥饿,因为:
queuedB
在B()
中有任何线程在cond
上等待时始终为非零;- 虽然
queuedB
非零,但没有线程可以递增countA
,因此countA
最终必须达到零并允许等待cond
的线程继续. - 虽然
countA
为零,但没有线程可以递增queuedB
,因此queuedB
最终必须达到零并允许等待queue_cond
的线程继续进行。