避免并发等待对象中的死锁

Avoiding deadlock in concurrent waiting object

我已经实现了 "Ticket" class,它在多个线程之间作为 shared_ptr 共享。

程序流程是这样的:

  1. parallelQuery() 被调用以启动新的查询作业。创建了 Ticket 的共享实例。
  2. 查询被分成多个任务,每个任务都在一个工作线程上排队(这部分很重要,否则我只是加入线程并完成)。每个任务都获得共享票。
  3. ticket.wait() 被调用以等待作业的所有任务完成。
  4. 完成一项任务后,它会调用工单上的 done() 方法。
  5. 当所有任务完成后,票证被解锁,任务的结果数据聚合并从 parallelQuery() 返回

在伪代码中:

     std::vector<T> parallelQuery(std::string str) {
         auto ticket = std::make_shared<Ticket>(2);
         auto task1 = std::make_unique<Query>(ticket, str+"a");
         addTaskToWorker(task1);
         auto task2 = std::make_unique<Query>(ticket, str+"b");
         addTaskToWorker(task2);
         ticket->waitUntilDone();
         auto result = aggregateData(task1, task2);
         return result;
     }

我的代码有效。但是我想知道理论上是否有可能导致死锁,以防在等待线程调用 waitUntilDone() 再次锁定之前执行解锁互斥锁。

有这种可能吗,如何避免这个陷阱?

这里是完整的工单class,注意上面问题描述相关的执行顺序示例评论:

#include <mutex>
#include <atomic>

    class Ticket {
    public:
        Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) {
            _mutex.lock();
        }

        void waitUntilDone() {
            _doneLock.lock();
            if (_done != _numTasks) {
                _doneLock.unlock(); // Execution order 1: "waiter" thread is here
                _mutex.lock(); // Execution order 3: "waiter" thread is now in a dealock?
            }
            else {
                _doneLock.unlock();
            }
        }

        void done() {
            _doneLock.lock();
            _done++;
            if (_done == _numTasks) {
                _mutex.unlock(); // Execution order 2: "task1" thread unlocks the mutex
            }
            _doneLock.unlock();
        }

        void cancel() {
            _canceled = true;
            _mutex.unlock();
        }

        bool wasCanceled() {
            return _canceled;
        }

        bool isDone() {
            return _done >= _numTasks;
        }

        int getNumTasks() {
            return _numTasks;
        }

    private:
        std::atomic<int> _numTasks;
        std::atomic<int> _done;
        std::atomic<bool> _canceled;
        // mutex used for caller wait state
        std::mutex _mutex;
        // mutex used to safeguard done counter with lock condition in waitUntilDone
        std::mutex _doneLock;
    };

在编辑问题时我想到的一个可能的解决方案是我可以把 _done++;在 _doneLock() 之前。最终,这应该足够了吗?

更新

我已经根据 Tomer 和 Phil1970 提供的建议更新了 Ticket class。以下实现是否避免了上述陷阱?

class Ticket {
public:
    Ticket(int numTasks = 1) : _numTasks(numTasks), _done(0), _canceled(false) { }

    void waitUntilDone() {
        std::unique_lock<std::mutex> lock(_mutex);
        // loop to avoid spurious wakeups
        while (_done != _numTasks && !_canceled) {
            _condVar.wait(lock);
        }
    }

    void done() {
        std::unique_lock<std::mutex> lock(_mutex);
        // just bail out in case we call done more often than needed
        if (_done == _numTasks) {
            return;
        }
        _done++;
        _condVar.notify_one();
    }

    void cancel() {
        std::unique_lock<std::mutex> lock(_mutex);
        _canceled = true;
        _condVar.notify_one();
    }

    const bool wasCanceled() const {
        return _canceled;
    }

    const bool isDone() const {
        return _done >= _numTasks;
    }

    const int getNumTasks() const {
        return _numTasks;
    }

private:
    std::atomic<int> _numTasks;
    std::atomic<int> _done;
    std::atomic<bool> _canceled;
    std::mutex _mutex;
    std::condition_variable _condVar;
};

您不需要使用互斥锁来操作原子值

UPD

我对主要问题的回答是错误的。我删了一个

您可以使用简单的(非原子的)int _numTasks;还。而且您不需要共享指针 - 只需在堆栈上创建任务并传递指针

     Ticket ticket(2);
     auto task1 = std::make_unique<Query>(&ticket, str+"a");
     addTaskToWorker(task1);

如果你愿意,也可以选择独特的 ptr

     auto ticket = std::make_unique<Ticket>(2);
     auto task1 = std::make_unique<Query>(ticket.get(), str+"a");
     addTaskToWorker(task1);

因为共享指针可以被奥卡姆剃刀剃掉:)

不要编写自己的等待方法,而是使用 std::condition_variable

https://en.cppreference.com/w/cpp/thread/condition_variable.

互斥使用

通常,mutex 应该保护给定的代码区域。也就是说,它应该锁定,完成它的工作然后解锁。在您的 class 中,您有多种方法,其中一些人锁定 _mutex 而其他人解锁它。这是非常容易出错的,就像你以错误的顺序调用方法一样,你很可能处于不一致的状态。如果互斥量被锁定两次会怎样?还是在已经解锁时解锁?

关于互斥量需要注意的另一件事是,如果您有多个互斥量,如果您需要锁定两个互斥量但不以一致的顺序执行,则很容易出现死锁。假设线程 A 先锁定互斥锁 1 和互斥锁 2,线程 B 以相反的顺序锁定它们(先锁定互斥锁 2)。有可能会出现这样的情况:

  • 线程 A 锁互斥量 1
  • 线程 B 锁定互斥锁 2
  • 线程 A 想要锁定互斥体 2 但不能,因为它已经被锁定。
  • 线程 B 想要锁定互斥体 1 但不能,因为它已经被锁定。
  • 两个线程将永远等待

所以在您的代码中,您至少应该进行一些检查以确保正确使用。例如,您应该在解锁互斥量之前验证 _canceled 以确保 cancel 仅被调用一次。

解决方案

我只是提供一些想法

声明一个 mutux 和一个 condition_variable 来管理 class.

中的 done 条件
std::mutex doneMutex;
std::condition_variable done_condition;

那么 waitUntilDone 看起来像:

void waitUntilDone()
{
    std::unique_lock<std::mutex> lk(doneMutex);
    done_condition.wait(lk, []{ return isDone() || wasCancelled();});
}

done 函数看起来像:

void done() 
{
    std::lock_guard<std::mutex> lk(doneMutex);
    _done++;
    if (_done == _numTasks) 
    {
        doneCondition.notify_one();
    }
}

cancel函数会变成

void done() 
{
    std::lock_guard<std::mutex> lk(doneMutex);
    _cancelled = true;
    doneCondition.notify_one();
}

如你所见,你现在只有一个互斥量,所以你基本上消除了死锁的可能性。

变量命名

我建议你不要在互斥量的名称中使用 lock,因为它会造成混淆。

std::mutex someMutex;
std::guard_lock<std::mutex> someLock(someMutex); // std::unique_lock when needed

这样一来,就更容易知道哪个变量引用了互斥体,哪个变量引用了互斥体的锁。

好读

如果你认真对待多线程,那么你应该买那本书:

C++ Concurrency in Action
Practical Multithreading
Anthony Williams

代码审查 (添加部分)

基本相同的代码已发布到 CODE REVIEW:https://codereview.stackexchange.com/questions/225863/multithreading-ticket-class-to-wait-for-parallel-task-completion/225901#225901

我已经在其中添加了一些额外的答案。