无锁 fifo 缓冲区中的删除节点检测
Deleted node detection in lockless fifo buffer
我一直在研究无锁的 c++11 fifo 缓冲区。我几乎明白了。然而,一个小细节让我变得更好。缓冲区有一个头指向:
std::shared_ptr<node<T>> m_head;
类型:
struct node
{
node(const T data)
:
data(new T(data)),
next(nullptr)
{}
std::shared_ptr<T> data;
std::shared_ptr<node<T>> next;
};
然后是产品:
void produce(const T &&data)
{
//bool indicating whether a notification should be sent after adding
bool l_notifyUponAdding;
//the new node to be added at the end of the array
std::shared_ptr<node<T>> l_newNode(new node<T>(std::forward<const T&&>(data)));
//pointer to the last node
std::shared_ptr<node<T>> l_lastNode(std::atomic_load(&m_head));
//value to compare the next of the last node with
std::shared_ptr<node<T>> l_expectedNullPointer;
//notify if this isn't the only node
l_notifyUponAdding = !l_lastNode;
if (!l_lastNode)//if there are no nodes, add this as the only node
if (std::atomic_compare_exchange_strong(&m_head, &l_expectedNullPointer, l_newNode))
return;
do
{
l_expectedNullPointer.reset();
while (l_lastNode->next)
{
l_lastNode = std::atomic_load(&l_lastNode)->next;
}
} while (!std::atomic_compare_exchange_weak(&l_lastNode->next, &l_expectedNullPointer, l_newNode));
//adding failed since another thread already did this.
l_lastNode = l_expectedNullPointer;
if (l_notifyUponAdding)
m_newDataWaiter.notify_one();
}
并消耗:
std::shared_ptr<T> consume(bool blockingCall = false)
{
//Check if the head is null if it is:
if (!std::atomic_load(&m_head))
{
if (blockingCall)//And this is a blocking call,
{
do
{
m_newDataWaiter.wait(m_newDataWaiterLock, [this]{return std::atomic_load(&(this->m_head)) == nullptr; });//we block until
} while (!std::atomic_load(&m_head));// the load yields a head that is not null(to avoid unnecessary calls on spurious wake ups)
}
else//And this is not a blocking call we
{
return nullptr;
}
}
//If we've found a valid head we will now try to make the node pointed to by head the new head.
std::shared_ptr<node<T>> l_poppee = atomic_load(&m_head);
std::shared_ptr<node<T>> l_newHead = atomic_load(&m_head);
//note that l_poppee gets updated if the compare exchange fails
while (l_poppee && !std::atomic_compare_exchange_weak(&m_head, &l_poppee, l_poppee->next))
{
}
if (l_poppee)
return l_poppee->data;
else
return std::shared_ptr<T>();
}
函数。
似乎一切正常。但是我认为有一个缺陷。如果在执行 produce
时所有节点都被消耗。数据将添加到最后一个元素。即使该元素已被删除。
更准确的说,如果这一行已经被执行:
if (std::atomic_compare_exchange_strong(&m_head, &l_expectedNullPointer, l_newNode))
并且加载的节点不为零。最后一个节点的下一个元素将被更改。无论节点是否同时被删除。由于共享指针,只要执行生产函数,就不会物理删除节点。
但是,主指针将被设置为NULL。因此,一旦退出 produce 函数,新节点将被删除。
有人会碰巧知道这个问题的解决方案吗:)?
这种情况总是在无锁列表中通过在列表中保留一个虚拟节点来解决。头始终指向虚拟节点,它是第一个节点
列表。
当队列变空时,头和尾都指向一个虚拟节点。
您可以查看 http://www.research.ibm.com/people/m/michael/podc-1996.pdf 了解详细信息,这样我就不会曲解这个概念,因为它很容易从文章中选取。
我一直在研究无锁的 c++11 fifo 缓冲区。我几乎明白了。然而,一个小细节让我变得更好。缓冲区有一个头指向:
std::shared_ptr<node<T>> m_head;
类型:
struct node
{
node(const T data)
:
data(new T(data)),
next(nullptr)
{}
std::shared_ptr<T> data;
std::shared_ptr<node<T>> next;
};
然后是产品:
void produce(const T &&data)
{
//bool indicating whether a notification should be sent after adding
bool l_notifyUponAdding;
//the new node to be added at the end of the array
std::shared_ptr<node<T>> l_newNode(new node<T>(std::forward<const T&&>(data)));
//pointer to the last node
std::shared_ptr<node<T>> l_lastNode(std::atomic_load(&m_head));
//value to compare the next of the last node with
std::shared_ptr<node<T>> l_expectedNullPointer;
//notify if this isn't the only node
l_notifyUponAdding = !l_lastNode;
if (!l_lastNode)//if there are no nodes, add this as the only node
if (std::atomic_compare_exchange_strong(&m_head, &l_expectedNullPointer, l_newNode))
return;
do
{
l_expectedNullPointer.reset();
while (l_lastNode->next)
{
l_lastNode = std::atomic_load(&l_lastNode)->next;
}
} while (!std::atomic_compare_exchange_weak(&l_lastNode->next, &l_expectedNullPointer, l_newNode));
//adding failed since another thread already did this.
l_lastNode = l_expectedNullPointer;
if (l_notifyUponAdding)
m_newDataWaiter.notify_one();
}
并消耗:
std::shared_ptr<T> consume(bool blockingCall = false)
{
//Check if the head is null if it is:
if (!std::atomic_load(&m_head))
{
if (blockingCall)//And this is a blocking call,
{
do
{
m_newDataWaiter.wait(m_newDataWaiterLock, [this]{return std::atomic_load(&(this->m_head)) == nullptr; });//we block until
} while (!std::atomic_load(&m_head));// the load yields a head that is not null(to avoid unnecessary calls on spurious wake ups)
}
else//And this is not a blocking call we
{
return nullptr;
}
}
//If we've found a valid head we will now try to make the node pointed to by head the new head.
std::shared_ptr<node<T>> l_poppee = atomic_load(&m_head);
std::shared_ptr<node<T>> l_newHead = atomic_load(&m_head);
//note that l_poppee gets updated if the compare exchange fails
while (l_poppee && !std::atomic_compare_exchange_weak(&m_head, &l_poppee, l_poppee->next))
{
}
if (l_poppee)
return l_poppee->data;
else
return std::shared_ptr<T>();
}
函数。
似乎一切正常。但是我认为有一个缺陷。如果在执行 produce
时所有节点都被消耗。数据将添加到最后一个元素。即使该元素已被删除。
更准确的说,如果这一行已经被执行:
if (std::atomic_compare_exchange_strong(&m_head, &l_expectedNullPointer, l_newNode))
并且加载的节点不为零。最后一个节点的下一个元素将被更改。无论节点是否同时被删除。由于共享指针,只要执行生产函数,就不会物理删除节点。
但是,主指针将被设置为NULL。因此,一旦退出 produce 函数,新节点将被删除。
有人会碰巧知道这个问题的解决方案吗:)?
这种情况总是在无锁列表中通过在列表中保留一个虚拟节点来解决。头始终指向虚拟节点,它是第一个节点 列表。
当队列变空时,头和尾都指向一个虚拟节点。
您可以查看 http://www.research.ibm.com/people/m/michael/podc-1996.pdf 了解详细信息,这样我就不会曲解这个概念,因为它很容易从文章中选取。