这种无锁设计线程安全吗?
Is this lock free design thread safe?
在不同的线程中,我执行以下操作:
共享变量:
std::shared_ptr<Data> dataPtr;
std::atomic<int> version_number;
Thread1,producer收到新数据并做
dataPtr.reset(newdata);
version_number++;
其他线程,消费者正在做:
int local_version=0;
std::shared_ptr<Data> localPtr;
while(local_version!=version_number)
localPtr=dataPtr;
...operation on my_ptr...
localPtr.reset();
local_version=version_number.load();
在这里我知道消费者可能会跳过某些版本,如果他们正在处理数据并且新的更新继续进行,这对我来说很好,我不需要他们处理所有版本,只需要他们处理最后一个可用的版本。
我的问题是,这条线是原子的:
localPtr=dataPtr;
我是否总是会获得 dataPtr 中内容的最新版本,还是会被缓存或可能导致我的设计出现任何错误?
谢谢。
根据http://en.cppreference.com/w/cpp/memory/shared_ptr:是的。 my_ptr = dataPtr
是线程安全的。
All member functions (including copy constructor and copy assignment)
can be called by multiple threads on different instances of shared_ptr
without additional synchronization even if these instances are copies
and share ownership of the same object.
尽管无法保证您认为正在加载的版本就是您要加载的版本;生产者设置指针和增加版本号不是atomic
操作,消费者读取指针和更新版本号也不是[=14] =]
正如 haavee 指出的那样,多个线程可以安全地同时执行
localPtr = dataPtr;
因为共享变量是只读的,进程中更新的共享元数据块有特殊的线程安全保证。
但是,
之间有一场比赛
dataPtr.reset(newdata); // in producer, a WRITE to the shared_ptr
localPtr = dataPtr; // in consumer, an access to the same shared_ptr
所以这个设计不是线程安全的。
这段代码在我看来相当构造。如果随机数量的消费者查看相同的数据会有什么好处? (这就是您的代码中会发生的情况,尽管以技术上线程安全的方式进行。)
如果您想让第一个消费者获取其他人不采用的数据,您可能希望以原子方式将 dataPtr 交换为来自每个消费者的 empty() shared_ptr。然后在交换之后,消费者检查他得到的非空内容并进行计算。所有其他执行相同操作的消费者在各自交换后将获得一个 empty() 共享 ptr。
从您的代码中删除版本号后,您将获得一个锁免费使用一次的生产者消费者方案。
std::shared_ptr<Data> dataPtr;
void Producer()
{
std::shared_ptr<Data> newOne = std::shared_ptr<Data>::make_shared();
std::atomic_exchange(dataPtr, newOne);
}
// called from multiple threads
void Consumer()
{
std::shared_ptr<Data> mine;
std::atomic_exchange(mine,dataPtr);
if( !mine.empty() )
{ // compute, using mine. Only one thread is lucky for any given Data instance stored by producer.
}
}
编辑:
从文档中可以看出 here,shared_ptr::swap() 不是原子的。相应地调整了代码。
EDIT2:制作人更正。一开始不使用那些东西的另一个原因。
对于您描述的用例,当您真的不关心某些消费者是否时不时错过某些内容时,这里是一个完整的实现,它将版本号与数据打包在一起。该模板也允许将其用于其他类型。可能会添加更多构造函数、删除等...
#include "stdafx.h"
#include <cstdint>
#include <string>
#include <memory>
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>
template <class _T>
class Versioned
{
_T m_data;
uint32_t m_version;
static std::atomic<uint32_t> s_next;
public:
Versioned(_T & data)
: m_data(data)
, m_version(s_next.fetch_add(1UL))
{}
~Versioned()
{
}
const _T & Data() const
{
return m_data;
}
uint32_t Version() const
{
return m_version;
}
};
template <class _T>
std::atomic<uint32_t> Versioned<_T>::s_next;
typedef Versioned<std::string> VersionedString;
static volatile bool s_running = true;
static std::shared_ptr<VersionedString> s_dataPtr;
int _tmain(int argc, _TCHAR* argv[])
{
std::vector<std::thread> consumers;
for (size_t i = 0; i < 3; ++i)
{
consumers.push_back(std::thread([]()
{
uint32_t oldVersion = ~0UL;
std::shared_ptr<VersionedString> mine;
while (s_running)
{
mine = std::atomic_load(&s_dataPtr);
if (mine)
{
if (mine->Version() != oldVersion)
{
oldVersion = mine->Version();
// No lock taken for cout -> chaotic output possible.
std::cout << mine->Data().c_str();
}
}
}
}));
}
for (size_t i = 0; i < 100; ++i)
{
std::shared_ptr<VersionedString> next = std::make_shared<VersionedString>(std::string("Hello World!"));
std::atomic_store(&s_dataPtr, next);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
s_running = false;
for (auto& t : consumers)
{
t.join();
}
return 0;
}
从概念上讲,您的 "lock free" 方案只是在浪费时间 CPU。
如果您不关心丢失中间版本,只需让您的生产者将其输出限制在消费者可以应对的频率,并使用共享队列或任何经过验证的任务间通信机制来传递数据包。
实时系统都是为了保证响应能力,一个好的设计会尝试对其设置一个合理的上限,而不是为了酷而燃烧 CPU。
C++11 和时尚的新 "non blocking" 心血来潮正在造成如此大的伤害,它们诱使每个人和他的狗相信几个原子变量将解决所有同步问题。事实上,他们不会。
在不同的线程中,我执行以下操作:
共享变量:
std::shared_ptr<Data> dataPtr;
std::atomic<int> version_number;
Thread1,producer收到新数据并做
dataPtr.reset(newdata);
version_number++;
其他线程,消费者正在做:
int local_version=0;
std::shared_ptr<Data> localPtr;
while(local_version!=version_number)
localPtr=dataPtr;
...operation on my_ptr...
localPtr.reset();
local_version=version_number.load();
在这里我知道消费者可能会跳过某些版本,如果他们正在处理数据并且新的更新继续进行,这对我来说很好,我不需要他们处理所有版本,只需要他们处理最后一个可用的版本。 我的问题是,这条线是原子的:
localPtr=dataPtr;
我是否总是会获得 dataPtr 中内容的最新版本,还是会被缓存或可能导致我的设计出现任何错误?
谢谢。
根据http://en.cppreference.com/w/cpp/memory/shared_ptr:是的。 my_ptr = dataPtr
是线程安全的。
All member functions (including copy constructor and copy assignment) can be called by multiple threads on different instances of shared_ptr without additional synchronization even if these instances are copies and share ownership of the same object.
尽管无法保证您认为正在加载的版本就是您要加载的版本;生产者设置指针和增加版本号不是atomic
操作,消费者读取指针和更新版本号也不是[=14] =]
正如 haavee 指出的那样,多个线程可以安全地同时执行
localPtr = dataPtr;
因为共享变量是只读的,进程中更新的共享元数据块有特殊的线程安全保证。
但是,
之间有一场比赛dataPtr.reset(newdata); // in producer, a WRITE to the shared_ptr
localPtr = dataPtr; // in consumer, an access to the same shared_ptr
所以这个设计不是线程安全的。
这段代码在我看来相当构造。如果随机数量的消费者查看相同的数据会有什么好处? (这就是您的代码中会发生的情况,尽管以技术上线程安全的方式进行。)
如果您想让第一个消费者获取其他人不采用的数据,您可能希望以原子方式将 dataPtr 交换为来自每个消费者的 empty() shared_ptr。然后在交换之后,消费者检查他得到的非空内容并进行计算。所有其他执行相同操作的消费者在各自交换后将获得一个 empty() 共享 ptr。
从您的代码中删除版本号后,您将获得一个锁免费使用一次的生产者消费者方案。
std::shared_ptr<Data> dataPtr;
void Producer()
{
std::shared_ptr<Data> newOne = std::shared_ptr<Data>::make_shared();
std::atomic_exchange(dataPtr, newOne);
}
// called from multiple threads
void Consumer()
{
std::shared_ptr<Data> mine;
std::atomic_exchange(mine,dataPtr);
if( !mine.empty() )
{ // compute, using mine. Only one thread is lucky for any given Data instance stored by producer.
}
}
编辑: 从文档中可以看出 here,shared_ptr::swap() 不是原子的。相应地调整了代码。 EDIT2:制作人更正。一开始不使用那些东西的另一个原因。
对于您描述的用例,当您真的不关心某些消费者是否时不时错过某些内容时,这里是一个完整的实现,它将版本号与数据打包在一起。该模板也允许将其用于其他类型。可能会添加更多构造函数、删除等...
#include "stdafx.h"
#include <cstdint>
#include <string>
#include <memory>
#include <atomic>
#include <thread>
#include <vector>
#include <iostream>
#include <chrono>
template <class _T>
class Versioned
{
_T m_data;
uint32_t m_version;
static std::atomic<uint32_t> s_next;
public:
Versioned(_T & data)
: m_data(data)
, m_version(s_next.fetch_add(1UL))
{}
~Versioned()
{
}
const _T & Data() const
{
return m_data;
}
uint32_t Version() const
{
return m_version;
}
};
template <class _T>
std::atomic<uint32_t> Versioned<_T>::s_next;
typedef Versioned<std::string> VersionedString;
static volatile bool s_running = true;
static std::shared_ptr<VersionedString> s_dataPtr;
int _tmain(int argc, _TCHAR* argv[])
{
std::vector<std::thread> consumers;
for (size_t i = 0; i < 3; ++i)
{
consumers.push_back(std::thread([]()
{
uint32_t oldVersion = ~0UL;
std::shared_ptr<VersionedString> mine;
while (s_running)
{
mine = std::atomic_load(&s_dataPtr);
if (mine)
{
if (mine->Version() != oldVersion)
{
oldVersion = mine->Version();
// No lock taken for cout -> chaotic output possible.
std::cout << mine->Data().c_str();
}
}
}
}));
}
for (size_t i = 0; i < 100; ++i)
{
std::shared_ptr<VersionedString> next = std::make_shared<VersionedString>(std::string("Hello World!"));
std::atomic_store(&s_dataPtr, next);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
s_running = false;
for (auto& t : consumers)
{
t.join();
}
return 0;
}
从概念上讲,您的 "lock free" 方案只是在浪费时间 CPU。
如果您不关心丢失中间版本,只需让您的生产者将其输出限制在消费者可以应对的频率,并使用共享队列或任何经过验证的任务间通信机制来传递数据包。
实时系统都是为了保证响应能力,一个好的设计会尝试对其设置一个合理的上限,而不是为了酷而燃烧 CPU。
C++11 和时尚的新 "non blocking" 心血来潮正在造成如此大的伤害,它们诱使每个人和他的狗相信几个原子变量将解决所有同步问题。事实上,他们不会。