原子出队的最简单方法?
The simpliest way to dequeue atomically?
我有一组数据,必须使用多线程同时处理,数据的数量应该比线程的数量大。我决定将数据放入某种队列中,这样每个空闲线程都可以弹出它的部分并处理它,直到队列为空。我可以使用一个简单的 STL 队列,并在我想从队列中取出一个元素时用互斥锁锁定它,但我想尝试一种无锁方法。同时,我的项目太小,无法依赖一些提供无锁结构的第 3 方库,实际上我只需要原子出列。所以我决定基于一个指向 "head" 的向量实现我自己的队列,并自动增加这个指针:
template <typename T>
class AtomicDequeueable
{
public:
// Assumption: data vector never changes
AtomicDequeueable(const std::vector<T>& data) :
m_data(data),
m_pointer(ATOMIC_VAR_INIT(0))
{}
const T * const atomicDequeue()
{
if (std::atomic_load(&m_pointer) < m_data.size())
{
return &m_data
[
std::atomic_fetch_add(&m_pointer, std::size_t(1))
];
}
return nullptr;
}
private:
AtomicDequeueable(const AtomicDequeueable<T>&) {}
std::atomic_size_t m_pointer;
const std::vector<T>& m_data;
};
Threads 的函数如下所示:
void f(AtomicDequeueable<Data>& queue)
{
while (auto dataPtr = queue.atomicDequeue())
{
const Data& data = *dataPtr;
// processing data...
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
我使用无锁结构和原语的经验真的很差,所以我想知道:我的方法能正常工作吗?当然我已经在 Ideone 上测试过了,但我不知道它在真实数据上的表现如何。
Your code is very buggy. Let me be very frank in my recommendation here:
"Actum Ne Agas: Do not do a thing already done." You have plenty of pre-existing C++ 类 available to you which can reliably do queueing, and interprocess communication (IPC) in-general. Use one of them.
Don't worry about "lock free." That's what locks are for, and they're reliable, fast, and cheap.
Your notion of "using a queue" is sound, but you are going to unnecessary work ... and creating bugs ... when you can simply "grab something off the shelf." You know that the standard part will work correctly because other people have tested it to death.
目前,您的 atomicDequeue
函数存在数据竞争:两个线程可能在执行第二条指令之前同时执行了第一条 atomic
指令。但是,这可以解决,因为您实际上只需要 1 个原子操作,按照以下更改:
const T * const atomicDequeue()
{
auto myIndex = std::atomic_fetch_add(&m_pointer, std::size_t(1));
if(myIndex >= m_data.size())
return nullptr;
return &m_data[myIndex];
}
如果在线程操作期间不修改输入向量,则此方法有效。
我有一组数据,必须使用多线程同时处理,数据的数量应该比线程的数量大。我决定将数据放入某种队列中,这样每个空闲线程都可以弹出它的部分并处理它,直到队列为空。我可以使用一个简单的 STL 队列,并在我想从队列中取出一个元素时用互斥锁锁定它,但我想尝试一种无锁方法。同时,我的项目太小,无法依赖一些提供无锁结构的第 3 方库,实际上我只需要原子出列。所以我决定基于一个指向 "head" 的向量实现我自己的队列,并自动增加这个指针:
template <typename T>
class AtomicDequeueable
{
public:
// Assumption: data vector never changes
AtomicDequeueable(const std::vector<T>& data) :
m_data(data),
m_pointer(ATOMIC_VAR_INIT(0))
{}
const T * const atomicDequeue()
{
if (std::atomic_load(&m_pointer) < m_data.size())
{
return &m_data
[
std::atomic_fetch_add(&m_pointer, std::size_t(1))
];
}
return nullptr;
}
private:
AtomicDequeueable(const AtomicDequeueable<T>&) {}
std::atomic_size_t m_pointer;
const std::vector<T>& m_data;
};
Threads 的函数如下所示:
void f(AtomicDequeueable<Data>& queue)
{
while (auto dataPtr = queue.atomicDequeue())
{
const Data& data = *dataPtr;
// processing data...
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
}
我使用无锁结构和原语的经验真的很差,所以我想知道:我的方法能正常工作吗?当然我已经在 Ideone 上测试过了,但我不知道它在真实数据上的表现如何。
Your code is very buggy. Let me be very frank in my recommendation here:
"Actum Ne Agas: Do not do a thing already done." You have plenty of pre-existing C++ 类 available to you which can reliably do queueing, and interprocess communication (IPC) in-general. Use one of them.
Don't worry about "lock free." That's what locks are for, and they're reliable, fast, and cheap.
Your notion of "using a queue" is sound, but you are going to unnecessary work ... and creating bugs ... when you can simply "grab something off the shelf." You know that the standard part will work correctly because other people have tested it to death.
目前,您的 atomicDequeue
函数存在数据竞争:两个线程可能在执行第二条指令之前同时执行了第一条 atomic
指令。但是,这可以解决,因为您实际上只需要 1 个原子操作,按照以下更改:
const T * const atomicDequeue()
{
auto myIndex = std::atomic_fetch_add(&m_pointer, std::size_t(1));
if(myIndex >= m_data.size())
return nullptr;
return &m_data[myIndex];
}
如果在线程操作期间不修改输入向量,则此方法有效。