避免并发等待对象中的死锁
Avoiding deadlock in concurrent waiting object
我已经实现了 "Ticket" class,它在多个线程之间作为 shared_ptr 共享。
程序流程是这样的:
- parallelQuery() 被调用以启动新的查询作业。创建了 Ticket 的共享实例。
- 查询被分成多个任务,每个任务都在一个工作线程上排队(这部分很重要,否则我只是加入线程并完成)。每个任务都获得共享票。
- ticket.wait() 被调用以等待作业的所有任务完成。
- 完成一项任务后,它会调用工单上的 done() 方法。
- 当所有任务完成后,票证被解锁,任务的结果数据聚合并从 parallelQuery() 返回
在伪代码中:
std::vector<T> parallelQuery(std::string str) {
auto ticket = std::make_shared<Ticket>(2);
auto task1 = std::make_unique<Query>(ticket, str+"a");
addTaskToWorker(task1);
auto task2 = std::make_unique<Query>(ticket, str+"b");
addTaskToWorker(task2);
ticket->waitUntilDone();
auto result = aggregateData(task1, task2);
return result;
}
我的代码有效。但是我想知道理论上是否有可能导致死锁,以防在等待线程调用 waitUntilDone() 再次锁定之前执行解锁互斥锁。
有这种可能吗,如何避免这个陷阱?
这里是完整的工单class,注意上面问题描述相关的执行顺序示例评论:
#include <mutex>
#include <atomic>
class Ticket {
public:
Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) {
_mutex.lock();
}
void waitUntilDone() {
_doneLock.lock();
if (_done != _numTasks) {
_doneLock.unlock(); // Execution order 1: "waiter" thread is here
_mutex.lock(); // Execution order 3: "waiter" thread is now in a dealock?
}
else {
_doneLock.unlock();
}
}
void done() {
_doneLock.lock();
_done++;
if (_done == _numTasks) {
_mutex.unlock(); // Execution order 2: "task1" thread unlocks the mutex
}
_doneLock.unlock();
}
void cancel() {
_canceled = true;
_mutex.unlock();
}
bool wasCanceled() {
return _canceled;
}
bool isDone() {
return _done >= _numTasks;
}
int getNumTasks() {
return _numTasks;
}
private:
std::atomic<int> _numTasks;
std::atomic<int> _done;
std::atomic<bool> _canceled;
// mutex used for caller wait state
std::mutex _mutex;
// mutex used to safeguard done counter with lock condition in waitUntilDone
std::mutex _doneLock;
};
在编辑问题时我想到的一个可能的解决方案是我可以把 _done++;在 _doneLock() 之前。最终,这应该足够了吗?
更新
我已经根据 Tomer 和 Phil1970 提供的建议更新了 Ticket class。以下实现是否避免了上述陷阱?
class Ticket {
public:
Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) { }
void waitUntilDone() {
std::unique_lock<std::mutex> lock(_mutex);
// loop to avoid spurious wakeups
while (_done != _numTasks && !_canceled) {
_condVar.wait(lock);
}
}
void done() {
std::unique_lock<std::mutex> lock(_mutex);
// just bail out in case we call done more often than needed
if (_done == _numTasks) {
return;
}
_done++;
_condVar.notify_one();
}
void cancel() {
std::unique_lock<std::mutex> lock(_mutex);
_canceled = true;
_condVar.notify_one();
}
const bool wasCanceled() const {
return _canceled;
}
const bool isDone() const {
return _done >= _numTasks;
}
const int getNumTasks() const {
return _numTasks;
}
private:
std::atomic<int> _numTasks;
std::atomic<int> _done;
std::atomic<bool> _canceled;
std::mutex _mutex;
std::condition_variable _condVar;
};
您不需要使用互斥锁来操作原子值
UPD
我对主要问题的回答是错误的。我删了一个
您可以使用简单的(非原子的)int _numTasks;还。而且您不需要共享指针 - 只需在堆栈上创建任务并传递指针
Ticket ticket(2);
auto task1 = std::make_unique<Query>(&ticket, str+"a");
addTaskToWorker(task1);
如果你愿意,也可以选择独特的 ptr
auto ticket = std::make_unique<Ticket>(2);
auto task1 = std::make_unique<Query>(ticket.get(), str+"a");
addTaskToWorker(task1);
因为共享指针可以被奥卡姆剃刀剃掉:)
不要编写自己的等待方法,而是使用 std::condition_variable
。
https://en.cppreference.com/w/cpp/thread/condition_variable.
互斥使用
通常,mutex
应该保护给定的代码区域。也就是说,它应该锁定,完成它的工作然后解锁。在您的 class 中,您有多种方法,其中一些人锁定 _mutex
而其他人解锁它。这是非常容易出错的,就像你以错误的顺序调用方法一样,你很可能处于不一致的状态。如果互斥量被锁定两次会怎样?还是在已经解锁时解锁?
关于互斥量需要注意的另一件事是,如果您有多个互斥量,如果您需要锁定两个互斥量但不以一致的顺序执行,则很容易出现死锁。假设线程 A 先锁定互斥锁 1 和互斥锁 2,线程 B 以相反的顺序锁定它们(先锁定互斥锁 2)。有可能会出现这样的情况:
- 线程 A 锁互斥量 1
- 线程 B 锁定互斥锁 2
- 线程 A 想要锁定互斥体 2 但不能,因为它已经被锁定。
- 线程 B 想要锁定互斥体 1 但不能,因为它已经被锁定。
- 两个线程将永远等待
所以在您的代码中,您至少应该进行一些检查以确保正确使用。例如,您应该在解锁互斥量之前验证 _canceled
以确保 cancel
仅被调用一次。
解决方案
我只是提供一些想法
声明一个 mutux 和一个 condition_variable 来管理 class.
中的 done 条件
std::mutex doneMutex;
std::condition_variable done_condition;
那么 waitUntilDone
看起来像:
void waitUntilDone()
{
std::unique_lock<std::mutex> lk(doneMutex);
done_condition.wait(lk, []{ return isDone() || wasCancelled();});
}
而 done
函数看起来像:
void done()
{
std::lock_guard<std::mutex> lk(doneMutex);
_done++;
if (_done == _numTasks)
{
doneCondition.notify_one();
}
}
而cancel
函数会变成
void done()
{
std::lock_guard<std::mutex> lk(doneMutex);
_cancelled = true;
doneCondition.notify_one();
}
如你所见,你现在只有一个互斥量,所以你基本上消除了死锁的可能性。
变量命名
我建议你不要在互斥量的名称中使用 lock,因为它会造成混淆。
std::mutex someMutex;
std::guard_lock<std::mutex> someLock(someMutex); // std::unique_lock when needed
这样一来,就更容易知道哪个变量引用了互斥体,哪个变量引用了互斥体的锁。
好读
如果你认真对待多线程,那么你应该买那本书:
C++ Concurrency in Action
Practical Multithreading
Anthony Williams
代码审查 (添加部分)
基本相同的代码已发布到 CODE REVIEW:https://codereview.stackexchange.com/questions/225863/multithreading-ticket-class-to-wait-for-parallel-task-completion/225901#225901。
我已经在其中添加了一些额外的答案。
我已经实现了 "Ticket" class,它在多个线程之间作为 shared_ptr 共享。
程序流程是这样的:
- parallelQuery() 被调用以启动新的查询作业。创建了 Ticket 的共享实例。
- 查询被分成多个任务,每个任务都在一个工作线程上排队(这部分很重要,否则我只是加入线程并完成)。每个任务都获得共享票。
- ticket.wait() 被调用以等待作业的所有任务完成。
- 完成一项任务后,它会调用工单上的 done() 方法。
- 当所有任务完成后,票证被解锁,任务的结果数据聚合并从 parallelQuery() 返回
在伪代码中:
std::vector<T> parallelQuery(std::string str) {
auto ticket = std::make_shared<Ticket>(2);
auto task1 = std::make_unique<Query>(ticket, str+"a");
addTaskToWorker(task1);
auto task2 = std::make_unique<Query>(ticket, str+"b");
addTaskToWorker(task2);
ticket->waitUntilDone();
auto result = aggregateData(task1, task2);
return result;
}
我的代码有效。但是我想知道理论上是否有可能导致死锁,以防在等待线程调用 waitUntilDone() 再次锁定之前执行解锁互斥锁。
有这种可能吗,如何避免这个陷阱?
这里是完整的工单class,注意上面问题描述相关的执行顺序示例评论:
#include <mutex>
#include <atomic>
class Ticket {
public:
Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) {
_mutex.lock();
}
void waitUntilDone() {
_doneLock.lock();
if (_done != _numTasks) {
_doneLock.unlock(); // Execution order 1: "waiter" thread is here
_mutex.lock(); // Execution order 3: "waiter" thread is now in a dealock?
}
else {
_doneLock.unlock();
}
}
void done() {
_doneLock.lock();
_done++;
if (_done == _numTasks) {
_mutex.unlock(); // Execution order 2: "task1" thread unlocks the mutex
}
_doneLock.unlock();
}
void cancel() {
_canceled = true;
_mutex.unlock();
}
bool wasCanceled() {
return _canceled;
}
bool isDone() {
return _done >= _numTasks;
}
int getNumTasks() {
return _numTasks;
}
private:
std::atomic<int> _numTasks;
std::atomic<int> _done;
std::atomic<bool> _canceled;
// mutex used for caller wait state
std::mutex _mutex;
// mutex used to safeguard done counter with lock condition in waitUntilDone
std::mutex _doneLock;
};
在编辑问题时我想到的一个可能的解决方案是我可以把 _done++;在 _doneLock() 之前。最终,这应该足够了吗?
更新
我已经根据 Tomer 和 Phil1970 提供的建议更新了 Ticket class。以下实现是否避免了上述陷阱?
class Ticket {
public:
Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) { }
void waitUntilDone() {
std::unique_lock<std::mutex> lock(_mutex);
// loop to avoid spurious wakeups
while (_done != _numTasks && !_canceled) {
_condVar.wait(lock);
}
}
void done() {
std::unique_lock<std::mutex> lock(_mutex);
// just bail out in case we call done more often than needed
if (_done == _numTasks) {
return;
}
_done++;
_condVar.notify_one();
}
void cancel() {
std::unique_lock<std::mutex> lock(_mutex);
_canceled = true;
_condVar.notify_one();
}
const bool wasCanceled() const {
return _canceled;
}
const bool isDone() const {
return _done >= _numTasks;
}
const int getNumTasks() const {
return _numTasks;
}
private:
std::atomic<int> _numTasks;
std::atomic<int> _done;
std::atomic<bool> _canceled;
std::mutex _mutex;
std::condition_variable _condVar;
};
您不需要使用互斥锁来操作原子值
UPD
我对主要问题的回答是错误的。我删了一个
您可以使用简单的(非原子的)int _numTasks;还。而且您不需要共享指针 - 只需在堆栈上创建任务并传递指针
Ticket ticket(2);
auto task1 = std::make_unique<Query>(&ticket, str+"a");
addTaskToWorker(task1);
如果你愿意,也可以选择独特的 ptr
auto ticket = std::make_unique<Ticket>(2);
auto task1 = std::make_unique<Query>(ticket.get(), str+"a");
addTaskToWorker(task1);
因为共享指针可以被奥卡姆剃刀剃掉:)
不要编写自己的等待方法,而是使用 std::condition_variable
。
https://en.cppreference.com/w/cpp/thread/condition_variable.
互斥使用
通常,mutex
应该保护给定的代码区域。也就是说,它应该锁定,完成它的工作然后解锁。在您的 class 中,您有多种方法,其中一些人锁定 _mutex
而其他人解锁它。这是非常容易出错的,就像你以错误的顺序调用方法一样,你很可能处于不一致的状态。如果互斥量被锁定两次会怎样?还是在已经解锁时解锁?
关于互斥量需要注意的另一件事是,如果您有多个互斥量,如果您需要锁定两个互斥量但不以一致的顺序执行,则很容易出现死锁。假设线程 A 先锁定互斥锁 1 和互斥锁 2,线程 B 以相反的顺序锁定它们(先锁定互斥锁 2)。有可能会出现这样的情况:
- 线程 A 锁互斥量 1
- 线程 B 锁定互斥锁 2
- 线程 A 想要锁定互斥体 2 但不能,因为它已经被锁定。
- 线程 B 想要锁定互斥体 1 但不能,因为它已经被锁定。
- 两个线程将永远等待
所以在您的代码中,您至少应该进行一些检查以确保正确使用。例如,您应该在解锁互斥量之前验证 _canceled
以确保 cancel
仅被调用一次。
解决方案
我只是提供一些想法
声明一个 mutux 和一个 condition_variable 来管理 class.
中的 done 条件std::mutex doneMutex;
std::condition_variable done_condition;
那么 waitUntilDone
看起来像:
void waitUntilDone()
{
std::unique_lock<std::mutex> lk(doneMutex);
done_condition.wait(lk, []{ return isDone() || wasCancelled();});
}
而 done
函数看起来像:
void done()
{
std::lock_guard<std::mutex> lk(doneMutex);
_done++;
if (_done == _numTasks)
{
doneCondition.notify_one();
}
}
而cancel
函数会变成
void done()
{
std::lock_guard<std::mutex> lk(doneMutex);
_cancelled = true;
doneCondition.notify_one();
}
如你所见,你现在只有一个互斥量,所以你基本上消除了死锁的可能性。
变量命名
我建议你不要在互斥量的名称中使用 lock,因为它会造成混淆。
std::mutex someMutex;
std::guard_lock<std::mutex> someLock(someMutex); // std::unique_lock when needed
这样一来,就更容易知道哪个变量引用了互斥体,哪个变量引用了互斥体的锁。
好读
如果你认真对待多线程,那么你应该买那本书:
C++ Concurrency in Action
Practical Multithreading
Anthony Williams
代码审查 (添加部分)
基本相同的代码已发布到 CODE REVIEW:https://codereview.stackexchange.com/questions/225863/multithreading-ticket-class-to-wait-for-parallel-task-completion/225901#225901。
我已经在其中添加了一些额外的答案。