这种无锁设计线程安全吗?

Is this lock free design thread safe?

在不同的线程中,我执行以下操作:

共享变量:

std::shared_ptr<Data> dataPtr;
std::atomic<int> version_number;

Thread1,producer收到新数据并做

dataPtr.reset(newdata);
version_number++;

其他线程,消费者正在做:

int local_version=0;
std::shared_ptr<Data> localPtr;
while(local_version!=version_number)
 localPtr=dataPtr;
 ...operation on my_ptr...
 localPtr.reset();
 local_version=version_number.load();

在这里我知道消费者可能会跳过某些版本,如果他们正在处理数据并且新的更新继续进行,这对我来说很好,我不需要他们处理所有版本,只需要他们处理最后一个可用的版本。 我的问题是,这条线是原子的:

localPtr=dataPtr;

我是否总是会获得 dataPtr 中内容的最新版本,还是会被缓存或可能导致我的设计出现任何错误?

谢谢。

根据http://en.cppreference.com/w/cpp/memory/shared_ptr:是的。 my_ptr = dataPtr 是线程安全的。

All member functions (including copy constructor and copy assignment) can be called by multiple threads on different instances of shared_ptr without additional synchronization even if these instances are copies and share ownership of the same object.

尽管无法保证您认为正在加载的版本就是您要加载的版本;生产者设置指针和增加版本号不是atomic操作,消费者读取指针和更新版本号也不是[=14] =]

正如 haavee 指出的那样,多个线程可以安全地同时执行

localPtr = dataPtr;

因为共享变量是只读的,进程中更新的共享元数据块有特殊的线程安全保证。

但是,

之间有一场比赛
dataPtr.reset(newdata); // in producer, a WRITE to the shared_ptr
localPtr = dataPtr;     // in consumer, an access to the same shared_ptr

所以这个设计不是线程安全的。

这段代码在我看来相当构造。如果随机数量的消费者查看相同的数据会有什么好处? (这就是您的代码中会发生的情况,尽管以技术上线程安全的方式进行。)

如果您想让第一个消费者获取其他人不采用的数据,您可能希望以原子方式将 dataPtr 交换为来自每个消费者的 empty() shared_ptr。然后在交换之后,消费者检查他得到的非空内容并进行计算。所有其他执行相同操作的消费者在各自交换后将获得一个 empty() 共享 ptr。

从您的代码中删除版本号后,您将获得一个锁免费使用一次的生产者消费者方案。

std::shared_ptr<Data> dataPtr;

void Producer()
{
    std::shared_ptr<Data> newOne = std::shared_ptr<Data>::make_shared();
    std::atomic_exchange(dataPtr, newOne);
}

// called from multiple threads
void Consumer()
{
    std::shared_ptr<Data> mine;
    std::atomic_exchange(mine,dataPtr);
    if( !mine.empty() )
    {  // compute, using mine. Only one thread is lucky for any given Data instance stored by producer.
    }
}

编辑: 从文档中可以看出 here,shared_ptr::swap() 不是原子的。相应地调整了代码。 EDIT2:制作人更正。一开始不使用那些东西的另一个原因。

对于您描述的用例,当您真的不关心某些消费者是否时不时错过某些内容时,这里是一个完整的实现,它将版本号与数据打包在一起。该模板也允许将其用于其他类型。可能会添加更多构造函数、删除等...

#include "stdafx.h"
#include <cstdint>
#include <string>
#include <memory>
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>


template <class _T>
class Versioned
{
    _T m_data;
    uint32_t m_version;
    static std::atomic<uint32_t> s_next;
public: 
    Versioned(_T & data)
        : m_data(data)
        , m_version(s_next.fetch_add(1UL))
    {}
    ~Versioned()
    {

    }
    const _T & Data() const
    {
        return m_data;
    }
    uint32_t Version() const
    {
        return m_version;
    }
};

template <class _T>
std::atomic<uint32_t> Versioned<_T>::s_next;

typedef Versioned<std::string> VersionedString;

static volatile bool s_running = true;
static std::shared_ptr<VersionedString> s_dataPtr;

int _tmain(int argc, _TCHAR* argv[])
{
    std::vector<std::thread> consumers;
    for (size_t i = 0; i < 3; ++i)
    {
        consumers.push_back(std::thread([]()
        { 
            uint32_t oldVersion = ~0UL;
            std::shared_ptr<VersionedString> mine; 
            while (s_running)
            {
                mine = std::atomic_load(&s_dataPtr);
                if (mine)
                {
                    if (mine->Version() != oldVersion)
                    {
                        oldVersion = mine->Version();

                        // No lock taken for cout -> chaotic output possible.
                        std::cout << mine->Data().c_str();
                    }
                }
            }
        }));
    }

    for (size_t i = 0; i < 100; ++i)
    {
        std::shared_ptr<VersionedString> next = std::make_shared<VersionedString>(std::string("Hello World!"));
        std::atomic_store(&s_dataPtr, next);
        std::this_thread::sleep_for(std::chrono::milliseconds(100));
    }

    s_running = false;
    for (auto& t : consumers)
    {
        t.join();
    }

    return 0;
}

从概念上讲,您的 "lock free" 方案只是在浪费时间 CPU。

如果您不关心丢失中间版本,只需让您的生产者将其输出限制在消费者可以应对的频率,并使用共享队列或任何经过验证的任务间通信机制来传递数据包。

实时系统都是为了保证响应能力,一个好的设计会尝试对其设置一个合理的上限,而不是为了酷而燃烧 CPU。

C++11 和时尚的新 "non blocking" 心血来潮正在造成如此大的伤害,它们诱使每个人和他的狗相信几个原子变量将解决所有同步问题。事实上,他们不会。