进程终止后恢复 pthread_cond_t & pthread_mutex_t
Recovering pthread_cond_t & pthread_mutex_t after process termination
大家好!
我已经在这里查找过类似的问题,但没有得出最终的解决方案。
在我的应用程序中,我有两个进程 运行 同时需要通过共享内存进行同步。目前我正在为此目的使用 pthread_mutex_t
和 pthread_cond_t
,它们被放入共享内存中。
这工作正常,直到进程 A 在等待条件时崩溃。如果进程 A 重新启动,则会发生死锁(?),其中进程 A 等待条件并且进程 B 无限期地卡在对 pthread_cond_broadcast
.
的调用中
我读到这可能是由于互斥锁处于不一致状态,但实际上它似乎从来没有出现在我的程序中。
如果您能告诉我我是否误解了什么,或者是否有其他方法可以解决这个问题,或者根本不可能防止这种崩溃情况,我将不胜感激。
struct Poller
{
private:
std::atomic<bool> waiting;
pthread_mutex_t mtx;
pthread_cond_t cnd;
auto lockMutex() -> void
{
// Recover Mutex if any process locking it died unexpectedly
LOG_DEBUG(info, "AdaptivePoller", "Locking mutex...");
if(pthread_mutex_lock(&mtx) == EOWNERDEAD) {
LOG_DEBUG(info, "AdaptivePoller", "Mutex in inconsistent state.");
if(pthread_mutex_consistent(&mtx) != 0) {
throw std::runtime_error("Mutex could not be brought back into consistent state.");
}
}
}
public:
Poller()
{
waiting = false;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&mtx, &attr);
pthread_mutexattr_destroy(&attr);
pthread_condattr_t attrcond;
pthread_condattr_init(&attrcond);
pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
pthread_cond_init(&cnd, &attrcond);
pthread_condattr_destroy(&attrcond);
}
auto wait() -> void
{
lockMutex();
waiting = true;
LOG_DEBUG(info, "AdaptivePoller", "Start waiting...");
while(waiting) {
pthread_cond_wait(&cnd, &mtx);
}
pthread_mutex_unlock(&mtx);
}
auto notify() -> void
{
lockMutex();
waiting = false;
LOG_DEBUG(info, "AdaptivePoller", "Notifying...");
pthread_cond_broadcast(&cnd);
pthread_mutex_unlock(&mtx);
}
};
据我所知,POSIX API 没有定义足以使您的 Poller
完全稳健地应对所有流程故障情况的机制。但是您可能会通过使用 SysV 信号量代替条件变量和互斥锁来实现您认为可以改进的情况。这是一个大纲:
信号量can_proceed
提供线程阻塞自身直到被释放。此信号量的初始值为 0,线程通过尝试递减它来阻塞自己。
一个线程通过增加can_proceed
阻塞进程的数量来释放当前阻塞的进程。 SysV 信号量 API 提供了一个用于确定有多少信号量的操作。
以上并非万无一失。至少会出现这些情况:
等待进程 W 在通知进程 N 读取包含 W,但在 W 完成其信号量递减之前。这将允许一个未来的服务员通过而不阻塞。
等待进程 W2 在通知进程递增 can_proceed
和前一个进程的最后一个之间到达 wait()
- 等待进程完成其信号量递减。在这种情况下,W2 可能会立即继续,而不是在通知时已经等待的某个线程 W1。在这种情况下,W1 将在下一次通知时被释放(除非同样的事情再次发生)。
如果两个进程大约同时尝试发出通知,则可以观察到反映进程已释放但尚未完成其信号量递减的等待者计数。可能的结果是一个或多个未来进程将通过 wait()
而不会阻塞。
由于其中两个类似于来自 CV 等待的虚假 return,因此可以通过类似的谓词检查习语来解决它们。鉴于进程完成其信号量递减和获取互斥量之间必然存在间隙,这将是棘手的,但也许可以通过使用原子对象来解决这个问题,因此不需要互斥量。
也有可能进程在 notify()
递增 can_proceed
之前终止,这样通知就无效了。我不认为这是建议方案特有的弱点。
请注意,SysV 信号量本身并不驻留在共享内存中,也不需要共享内存。
大家好!
我已经在这里查找过类似的问题,但没有得出最终的解决方案。
在我的应用程序中,我有两个进程 运行 同时需要通过共享内存进行同步。目前我正在为此目的使用 pthread_mutex_t
和 pthread_cond_t
,它们被放入共享内存中。
这工作正常,直到进程 A 在等待条件时崩溃。如果进程 A 重新启动,则会发生死锁(?),其中进程 A 等待条件并且进程 B 无限期地卡在对 pthread_cond_broadcast
.
我读到这可能是由于互斥锁处于不一致状态,但实际上它似乎从来没有出现在我的程序中。 如果您能告诉我我是否误解了什么,或者是否有其他方法可以解决这个问题,或者根本不可能防止这种崩溃情况,我将不胜感激。
struct Poller
{
private:
std::atomic<bool> waiting;
pthread_mutex_t mtx;
pthread_cond_t cnd;
auto lockMutex() -> void
{
// Recover Mutex if any process locking it died unexpectedly
LOG_DEBUG(info, "AdaptivePoller", "Locking mutex...");
if(pthread_mutex_lock(&mtx) == EOWNERDEAD) {
LOG_DEBUG(info, "AdaptivePoller", "Mutex in inconsistent state.");
if(pthread_mutex_consistent(&mtx) != 0) {
throw std::runtime_error("Mutex could not be brought back into consistent state.");
}
}
}
public:
Poller()
{
waiting = false;
pthread_mutexattr_t attr;
pthread_mutexattr_init(&attr);
pthread_mutexattr_setrobust(&attr, PTHREAD_MUTEX_ROBUST);
pthread_mutexattr_setpshared(&attr, PTHREAD_PROCESS_SHARED);
pthread_mutex_init(&mtx, &attr);
pthread_mutexattr_destroy(&attr);
pthread_condattr_t attrcond;
pthread_condattr_init(&attrcond);
pthread_condattr_setpshared(&attrcond, PTHREAD_PROCESS_SHARED);
pthread_cond_init(&cnd, &attrcond);
pthread_condattr_destroy(&attrcond);
}
auto wait() -> void
{
lockMutex();
waiting = true;
LOG_DEBUG(info, "AdaptivePoller", "Start waiting...");
while(waiting) {
pthread_cond_wait(&cnd, &mtx);
}
pthread_mutex_unlock(&mtx);
}
auto notify() -> void
{
lockMutex();
waiting = false;
LOG_DEBUG(info, "AdaptivePoller", "Notifying...");
pthread_cond_broadcast(&cnd);
pthread_mutex_unlock(&mtx);
}
};
据我所知,POSIX API 没有定义足以使您的 Poller
完全稳健地应对所有流程故障情况的机制。但是您可能会通过使用 SysV 信号量代替条件变量和互斥锁来实现您认为可以改进的情况。这是一个大纲:
信号量
can_proceed
提供线程阻塞自身直到被释放。此信号量的初始值为 0,线程通过尝试递减它来阻塞自己。一个线程通过增加
can_proceed
阻塞进程的数量来释放当前阻塞的进程。 SysV 信号量 API 提供了一个用于确定有多少信号量的操作。
以上并非万无一失。至少会出现这些情况:
等待进程 W 在通知进程 N 读取包含 W,但在 W 完成其信号量递减之前。这将允许一个未来的服务员通过而不阻塞。
等待进程 W2 在通知进程递增
can_proceed
和前一个进程的最后一个之间到达wait()
- 等待进程完成其信号量递减。在这种情况下,W2 可能会立即继续,而不是在通知时已经等待的某个线程 W1。在这种情况下,W1 将在下一次通知时被释放(除非同样的事情再次发生)。如果两个进程大约同时尝试发出通知,则可以观察到反映进程已释放但尚未完成其信号量递减的等待者计数。可能的结果是一个或多个未来进程将通过
wait()
而不会阻塞。
由于其中两个类似于来自 CV 等待的虚假 return,因此可以通过类似的谓词检查习语来解决它们。鉴于进程完成其信号量递减和获取互斥量之间必然存在间隙,这将是棘手的,但也许可以通过使用原子对象来解决这个问题,因此不需要互斥量。
也有可能进程在 notify()
递增 can_proceed
之前终止,这样通知就无效了。我不认为这是建议方案特有的弱点。
请注意,SysV 信号量本身并不驻留在共享内存中,也不需要共享内存。