clean/reliable 关闭使用 pthread 屏障进行同步的线程的好策略是什么?
What's a good strategy for clean/reliable shutdown of threads that use pthread barriers for synchronization?
我有一个基于 pthread 的多线程程序,它有四个线程无限期地执行这个 运行 循环(伪代码):
while(keepRunning)
{
pthread_barrier_wait(&g_stage_one_barrier);
UpdateThisThreadsStateVariables();
pthread_barrier_wait(&g_stage_two_barrier);
DoComputationsThatReadFromAllThreadsStateVariables();
}
这非常有效,因为在第一阶段每个线程都会更新自己的状态变量,这没关系,因为在第一阶段没有其他线程正在读取任何其他线程的状态变量。然后在第二阶段,就线程读取彼此的状态而言,这是一个混战,但这没关系,因为在第二阶段没有线程修改其本地状态变量,因此它们实际上是只读的。
我唯一剩下的问题是,当我的应用程序需要退出时,我如何干净可靠地关闭这些线程? ("cleanly and reliably",我的意思是不引入潜在的死锁或竞争条件,理想情况下不必发送任何 UNIX 信号来强制线程退出 pthread_barrier_wait() 调用)
我的 main() 线程当然可以将每个线程的 keepRunning 设置为 false,但是如何让每个线程的 pthread_barrier_wait() 变为 return? AFAICT 使 pthread_barrier_wait() 到 return 的唯一方法是同时让所有四个线程的执行位置在 pthread_barrier_wait() 内,但是当某些线程可能已经退出时这很难做到已经.
调用 pthread_barrier_destroy() 似乎是我想要做的,但是当任何线程可能正在等待屏障时这样做是未定义的行为。
这个问题是否有众所周知的解决方案?
在屏障处等待的线程不是问题,仍然 运行 UpdateThis...
或 DoComputations...
的线程将延迟关闭。您可以通过定期检查 UpdateThis...
和 DoComputations...
函数中的关机来减少关机时间。
这是一种可能的解决方案的概要
- main 初始化一个互斥量
g_shutdown_mutex
- main 锁定互斥量
- main 启动线程
- 线程在定期尝试锁定
mutex,但由于 main 已锁定互斥锁,因此
trylock
函数
总是会失败
- 当需要关闭时,main 解锁互斥锁
- 现在
trylock
会成功,辅助函数会 return 提前
- 在到达第二个屏障之前,任何成功锁定互斥锁的线程都会设置一个全局变量
g_shutdown_requested
- 通过第二道关卡后,所有线程在
g_shutdown_requested
中看到相同的值,并做出是否退出的相同决定
所以 while
循环看起来像这样
while(1)
{
pthread_barrier_wait(&g_stage_one_barrier);
UpdateThisThreadsStateVariables();
if ( pthread_mutex_trylock( &g_shutdown_mutex ) == 0 )
{
g_shutdown_requested = true;
pthread_mutex_unlock( &g_shutdown_mutex );
break;
}
pthread_barrier_wait(&g_stage_two_barrier);
if ( g_shutdown_requested )
break;
DoComputationsThatReadFromAllThreadsStateVariables();
}
辅助函数看起来像这样
void UpdateThisThreadsStateVariables( void )
{
for ( i = 0;; i++ )
{
// check the mutex once every 4000 times through the loop
if ( (i & 0xfff) == 0 && pthread_mutex_trylock( &g_shutdown_mutex ) == 0 )
{
pthread_mutex_unlock( &g_shutdown_mutex ); // abnormal termination
return;
}
// do the important stuff here
if ( doneWithTheImportantStuff ) // normal termination
break;
}
}
有两个标志并使用类似下面的东西应该可行:
for (;;)
{
pthread_barrier_wait(&g_stage_one_barrier); +
|
UpdateThisThreadsStateVariables(); |
|
pthread_mutex_lock(&shutdownMtx); | Zone 1
pendingShutdown = !keepRunning; |
pthread_mutex_unlock(&shutdownMtx); |
|
pthread_barrier_wait(&g_stage_two_barrier); +
|
if (pendingShutdown) |
break; | Zone 2
|
DoComputationsThatReadFromAllThreadsStateVariables(); |
}
shutdownMtx
也应保护 keepRunning
的设置,但未显示。
逻辑是,到 pendingShutdown
设置为 true
时,所有线程都必须在 区域 1 内。 (即使只有一些线程看到 keepRunning
是 false
,这也是正确的,所以在 keepRunning
上的比赛应该没问题。)因此它们都会达到 pthread_barrier_wait(&g_stage_two_barrier)
,然后当他们进入 Zone 2.
时全部爆发
也可以检查 PTHREAD_BARRIER_SERIAL_THREAD
——它由 pthread_barrier_wait()
恰好为其中一个线程返回——并且只对 pendingShutdown
进行锁定和更新在该线程中,这可以提高性能。
存在需求冲突:屏障语义要求所有线程 in
继续,关闭需要在执行块之间共享线程时终止(可能在不同的屏障内)。
我建议用支持外部 cancel
调用的自定义实现替换屏障。
示例(可能不是 运行,但想法...):
struct _barrier_entry
{
pthread_cond_t cond;
volatile bool released;
volatile struct _barrier_entry *next;
};
typedef struct
{
volatile int capacity;
volatile int count;
volatile struct _barrier_entry *first;
pthread_mutex_t lock;
} custom_barrier_t;
初始化:
int custom_barrier_init(custom_barrier_t *barrier, int capacity)
{
if (NULL == barrier || capacity <= 0)
{
errno = EINVAL;
return -1;
}
barrier->capacity = capacity;
barrier->count = 0;
barrier->first = NULL;
return pthread_mutex_init(&barrier->lock, NULL);
return -1;
}
帮手:
static void _custom_barrier_flush(custom_barrier_t *barrier)
{
struct _barrier_entry *ptr;
for (ptr = barrier->first; NULL != ptr;)
{
struct _barrier_entry *next = ptr->next;
ptr->released = true;
pthread_cond_signal(&ptr->cond);
ptr = next;
}
barrier->first = NULL;
barrier->count = 0;
}
阻塞等待:
int custom_barrier_wait(custom_barrier_t *barrier)
{
struct _barrier_entry entry;
int result;
pthread_cond_init(&barrier->entry, NULL);
entry->next = NULL;
entry->released = false;
pthread_mutex_lock(&barrier->lock);
barrier->count++;
if (barrier->count == barrier->capacity)
{
_custom_barrier_flush(barrier);
result = 0;
}
else
{
entry->next = barrier->first;
barrier->first = entry;
while (true)
{
pthread_cond_wait(&entry->cond, &barrier->lock);
if (entry->released)
{
result = 0;
break;
}
if (barrier->capacity < 0)
{
errno = ECANCELLED;
result = -1;
break;
}
}
}
pthread_mutex_unlock(&barrier->lock);
pthread_cond_destroy(&entry->cond);
return result;
}
取消:
int custom_barrier_cancel(custom_barrier_t *barrier)
{
pthread_mutex_lock(barrier->lock);
barrier->capacity = -1;
_custom_barrier_flush(barrier);
pthread_mutex_unlock(barrier->lock);
return 0;
}
所以线程代码可以运行循环,直到custom_barrier_wait
调用后出现ECANCELLED
错误。
您可以有一个额外的线程在相同的障碍上同步,但仅作为 "shutdown master" 存在。您的工作线程将使用您在问题中拥有的确切代码,并且 "shutdown master" 线程将执行:
while (keepRunning)
{
pthread_barrier_wait(&g_stage_one_barrier);
pthread_mutex_lock(&mkr_lock);
if (!mainKeepRunning)
keepRunning = 0;
pthread_mutex_unlock(&mkr_lock);
pthread_barrier_wait(&g_stage_two_barrier);
}
当主线程想要其他线程关闭时,它会这样做:
pthread_mutex_lock(&mkr_lock);
mainKeepRunning = 0;
pthread_mutex_unlock(&mkr_lock);
(即 keepRunning
变量在第 2 阶段成为只读共享线程状态的一部分,并在第 1 阶段由关闭主线程拥有)。
当然,您也可以选择其他线程之一作为 "shutdown master thread",而不是为此目的使用专用线程。
我有一个基于 pthread 的多线程程序,它有四个线程无限期地执行这个 运行 循环(伪代码):
while(keepRunning)
{
pthread_barrier_wait(&g_stage_one_barrier);
UpdateThisThreadsStateVariables();
pthread_barrier_wait(&g_stage_two_barrier);
DoComputationsThatReadFromAllThreadsStateVariables();
}
这非常有效,因为在第一阶段每个线程都会更新自己的状态变量,这没关系,因为在第一阶段没有其他线程正在读取任何其他线程的状态变量。然后在第二阶段,就线程读取彼此的状态而言,这是一个混战,但这没关系,因为在第二阶段没有线程修改其本地状态变量,因此它们实际上是只读的。
我唯一剩下的问题是,当我的应用程序需要退出时,我如何干净可靠地关闭这些线程? ("cleanly and reliably",我的意思是不引入潜在的死锁或竞争条件,理想情况下不必发送任何 UNIX 信号来强制线程退出 pthread_barrier_wait() 调用)
我的 main() 线程当然可以将每个线程的 keepRunning 设置为 false,但是如何让每个线程的 pthread_barrier_wait() 变为 return? AFAICT 使 pthread_barrier_wait() 到 return 的唯一方法是同时让所有四个线程的执行位置在 pthread_barrier_wait() 内,但是当某些线程可能已经退出时这很难做到已经.
调用 pthread_barrier_destroy() 似乎是我想要做的,但是当任何线程可能正在等待屏障时这样做是未定义的行为。
这个问题是否有众所周知的解决方案?
在屏障处等待的线程不是问题,仍然 运行 UpdateThis...
或 DoComputations...
的线程将延迟关闭。您可以通过定期检查 UpdateThis...
和 DoComputations...
函数中的关机来减少关机时间。
这是一种可能的解决方案的概要
- main 初始化一个互斥量
g_shutdown_mutex
- main 锁定互斥量
- main 启动线程
- 线程在定期尝试锁定
mutex,但由于 main 已锁定互斥锁,因此
trylock
函数 总是会失败 - 当需要关闭时,main 解锁互斥锁
- 现在
trylock
会成功,辅助函数会 return 提前 - 在到达第二个屏障之前,任何成功锁定互斥锁的线程都会设置一个全局变量
g_shutdown_requested
- 通过第二道关卡后,所有线程在
g_shutdown_requested
中看到相同的值,并做出是否退出的相同决定
所以 while
循环看起来像这样
while(1)
{
pthread_barrier_wait(&g_stage_one_barrier);
UpdateThisThreadsStateVariables();
if ( pthread_mutex_trylock( &g_shutdown_mutex ) == 0 )
{
g_shutdown_requested = true;
pthread_mutex_unlock( &g_shutdown_mutex );
break;
}
pthread_barrier_wait(&g_stage_two_barrier);
if ( g_shutdown_requested )
break;
DoComputationsThatReadFromAllThreadsStateVariables();
}
辅助函数看起来像这样
void UpdateThisThreadsStateVariables( void )
{
for ( i = 0;; i++ )
{
// check the mutex once every 4000 times through the loop
if ( (i & 0xfff) == 0 && pthread_mutex_trylock( &g_shutdown_mutex ) == 0 )
{
pthread_mutex_unlock( &g_shutdown_mutex ); // abnormal termination
return;
}
// do the important stuff here
if ( doneWithTheImportantStuff ) // normal termination
break;
}
}
有两个标志并使用类似下面的东西应该可行:
for (;;)
{
pthread_barrier_wait(&g_stage_one_barrier); +
|
UpdateThisThreadsStateVariables(); |
|
pthread_mutex_lock(&shutdownMtx); | Zone 1
pendingShutdown = !keepRunning; |
pthread_mutex_unlock(&shutdownMtx); |
|
pthread_barrier_wait(&g_stage_two_barrier); +
|
if (pendingShutdown) |
break; | Zone 2
|
DoComputationsThatReadFromAllThreadsStateVariables(); |
}
shutdownMtx
也应保护 keepRunning
的设置,但未显示。
逻辑是,到 pendingShutdown
设置为 true
时,所有线程都必须在 区域 1 内。 (即使只有一些线程看到 keepRunning
是 false
,这也是正确的,所以在 keepRunning
上的比赛应该没问题。)因此它们都会达到 pthread_barrier_wait(&g_stage_two_barrier)
,然后当他们进入 Zone 2.
也可以检查 PTHREAD_BARRIER_SERIAL_THREAD
——它由 pthread_barrier_wait()
恰好为其中一个线程返回——并且只对 pendingShutdown
进行锁定和更新在该线程中,这可以提高性能。
存在需求冲突:屏障语义要求所有线程 in
继续,关闭需要在执行块之间共享线程时终止(可能在不同的屏障内)。
我建议用支持外部 cancel
调用的自定义实现替换屏障。
示例(可能不是 运行,但想法...):
struct _barrier_entry
{
pthread_cond_t cond;
volatile bool released;
volatile struct _barrier_entry *next;
};
typedef struct
{
volatile int capacity;
volatile int count;
volatile struct _barrier_entry *first;
pthread_mutex_t lock;
} custom_barrier_t;
初始化:
int custom_barrier_init(custom_barrier_t *barrier, int capacity)
{
if (NULL == barrier || capacity <= 0)
{
errno = EINVAL;
return -1;
}
barrier->capacity = capacity;
barrier->count = 0;
barrier->first = NULL;
return pthread_mutex_init(&barrier->lock, NULL);
return -1;
}
帮手:
static void _custom_barrier_flush(custom_barrier_t *barrier)
{
struct _barrier_entry *ptr;
for (ptr = barrier->first; NULL != ptr;)
{
struct _barrier_entry *next = ptr->next;
ptr->released = true;
pthread_cond_signal(&ptr->cond);
ptr = next;
}
barrier->first = NULL;
barrier->count = 0;
}
阻塞等待:
int custom_barrier_wait(custom_barrier_t *barrier)
{
struct _barrier_entry entry;
int result;
pthread_cond_init(&barrier->entry, NULL);
entry->next = NULL;
entry->released = false;
pthread_mutex_lock(&barrier->lock);
barrier->count++;
if (barrier->count == barrier->capacity)
{
_custom_barrier_flush(barrier);
result = 0;
}
else
{
entry->next = barrier->first;
barrier->first = entry;
while (true)
{
pthread_cond_wait(&entry->cond, &barrier->lock);
if (entry->released)
{
result = 0;
break;
}
if (barrier->capacity < 0)
{
errno = ECANCELLED;
result = -1;
break;
}
}
}
pthread_mutex_unlock(&barrier->lock);
pthread_cond_destroy(&entry->cond);
return result;
}
取消:
int custom_barrier_cancel(custom_barrier_t *barrier)
{
pthread_mutex_lock(barrier->lock);
barrier->capacity = -1;
_custom_barrier_flush(barrier);
pthread_mutex_unlock(barrier->lock);
return 0;
}
所以线程代码可以运行循环,直到custom_barrier_wait
调用后出现ECANCELLED
错误。
您可以有一个额外的线程在相同的障碍上同步,但仅作为 "shutdown master" 存在。您的工作线程将使用您在问题中拥有的确切代码,并且 "shutdown master" 线程将执行:
while (keepRunning)
{
pthread_barrier_wait(&g_stage_one_barrier);
pthread_mutex_lock(&mkr_lock);
if (!mainKeepRunning)
keepRunning = 0;
pthread_mutex_unlock(&mkr_lock);
pthread_barrier_wait(&g_stage_two_barrier);
}
当主线程想要其他线程关闭时,它会这样做:
pthread_mutex_lock(&mkr_lock);
mainKeepRunning = 0;
pthread_mutex_unlock(&mkr_lock);
(即 keepRunning
变量在第 2 阶段成为只读共享线程状态的一部分,并在第 1 阶段由关闭主线程拥有)。
当然,您也可以选择其他线程之一作为 "shutdown master thread",而不是为此目的使用专用线程。