正确使用 atomic<T>

Correctly using atomic<T>

我目前正在努力为基于 https://github.com/nbsdx/ThreadPool 的 C++11 线程池 class 的原子变量编写无锁代码。这是我修改后的 class:

class ThreadPool
{
public:
    explicit ThreadPool(int threadCount) :
        _jobsLeft(0),
        _bailout(false)
    {
        _threads = std::vector<std::thread>(threadCount);
        for (int index = 0; index < threadCount; ++index)
        {
            _threads[index] = std::move(std::thread([this]
                {
                    this->Task();
                }));
        }
    }

    void AddJob(std::function<void(void)> job)
    {
        {
            std::lock_guard<std::mutex> lock(_queueMutex);
            _queue.emplace(job);
        }
        {
            std::lock_guard<std::mutex> lock(_jobsLeftMutex);
            ++_jobsLeft;
        }
        _jobAvailableVar.notify_one();
    }

    void JoinAll()
    {
        if (!_bailout)
        {
            _bailout = true;
            _jobAvailableVar.notify_all();
            for (auto& x : _threads)
            {
                if (x.joinable())
                {
                    x.join();
                }
            }
        }
    }

    void WaitAll()
    {
        std::unique_lock<std::mutex> lock(_jobsLeftMutex);
        if (_jobsLeft > 0)
        {
            _waitVar.wait(lock, [this]
                          {
                              return _jobsLeft == 0;
                          });
        }
    }

private:
    void Task()
    {
        while (!_bailout)
        {
            std::function<void(void)> job;
            {
                std::unique_lock<std::mutex> lock(_queueMutex);
                _jobAvailableVar.wait(lock, [this]
                                      {
                                          return _queue.size() > 0 || _bailout;
                                      });
                if (_bailout)
                {
                    return;
                }
                job = _queue.front();
                _queue.pop();
            }
            job();
            {
                std::lock_guard<std::mutex> lock(_jobsLeftMutex);
                --_jobsLeft;
            }
            _waitVar.notify_one();
        }
    }

    std::vector<std::thread> _threads;
    std::queue<std::function<void(void)>> _queue;
    int _jobsLeft;
    std::atomic<bool> _bailout;
    std::condition_variable _jobAvailableVar;
    std::condition_variable _waitVar;
    std::mutex _jobsLeftMutex;
    std::mutex _queueMutex;
};

我对以下内容感到困惑,非常感谢任何指点:

编辑:带普通锁的最终版本可在此处获得:https://github.com/stfx/ThreadPool2

查看原子方法:http://en.cppreference.com/w/cpp/atomic/atomic

您应该使用 load 方法以原子方式读取值。但是,请注意以下不是原子的。

if (!_bailout)
{
    _bailout = true;

有一种方法可以执行这些比较和值交换。请参阅 compare_exchange_weak 方法。对于 jobCount,您可以使用 atomic。 ++-- 是原子的。

但是要注意,atomic只是一个单一的方法调用,调用之后情况可能会发生变化。您仍然需要一个同步队列,为此您需要一个锁。它不需要是标准的 OS 锁,您可以使用原子变量创建锁(使用 storecompare_exchange_weak 方法)。请参阅以下 post:https://webkit.org/blog/6161/locking-in-webkit/