将工作项添加到数组或列表的非阻塞方式
Non blocking way of adding a work item to array or list
编辑:
我现在已经完成了我的队列(克服了下面描述的问题,以及更多)。对于那些感兴趣的人,可以找到 here。我很乐意听到任何评论:)。请注意队列不仅仅是一个工作项队列,而是一个模板容器,当然可以用工作项实例化它。
原文:
在观看 Herb Sutter's talk C++11 和 14 中的并发后,我对非阻塞并发感到非常兴奋。
但是,我还没有找到我认为是基本问题的解决方案。所以如果这已经在这里了,请对我客气一点。
我的问题很简单。我正在创建一个非常简单的线程池。为了做到这一点,我在 workPool
class 中设置了一些工作线程 运行。我保留了一份 workItems
的列表。
如何以无锁方式添加工作项。
这样做的非无锁方式当然是创建一个互斥体。如果您添加项目并在当前工作项目完成后读取(当然锁定)列表,请锁定它。
但是我不知道如何以无锁方式执行此操作。
下面是我正在创建的内容的粗略概念。我为这个问题写的这段代码。而且它既不完整,也没有错误:)
#include <thread>
#include <deque>
#include <vector>
class workPool
{
public:
workPool(int workerCount) :
running(1)
{
for (int i = workerCount; i > 0; --i)
workers.push_back(std::thread(&workPool::doWork, this));
}
~workPool()
{
running = 0;
}
private:
bool running;
std::vector< std::thread > workers;
std::deque< std::function<void()> > workItems;
void doWork()
{
while (running)
{
(*workItems.begin())();
workItems.erase(workItems.begin());
if (!workItems.size())
//here the thread should be paused till a new item is added
}
}
void addWorkitem()
{
//This is my confusion. How should I do this?
}
};
在这种拥有共享资源(工作队列)的上下文中,如果您真正深入挖掘,通常会被原子和 CAS 循环所取代。
获得无锁并发堆栈的基本想法相当简单(编辑:虽然我在第一个 post 中犯了一个错误,但可能有点欺骗性的棘手 - 更有理由欣赏好库)。为简单起见,我选择了堆栈,但使用队列并不需要太多。
写入堆栈:
Create a new work item.
Loop Repeatedly:
Store the top pointer to the stack.
Set the work item's next pointer to the top of the stack.
Atomic: Compare and swap the top pointer with the pointer to the work item.
If this succeeds and returns the top pointer we stored, break out
of the loop.
从堆栈中弹出:
Loop:
Fetch top pointer.
If top pointer is not null:
Atomic: CAS top pointer with next pointer.
If successful, break.
Else:
(Optional) Sleep/Yield to avoid burning cycles.
Process the item pointed to by the previous top pointer.
现在,如果你真的很详细,你可以在推送或弹出失败时坚持其他工作让线程去做,例如
我不知道如何在 C++ 11(或更高版本)中执行此操作;然而,这里有一个关于如何使用 C++ 98 和 `boost (v1.50) 来实现的解决方案:
这显然不是一个非常有用的示例,它仅用于演示目的:
#include <boost/scoped_ptr.hpp>
#include <boost/function.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/thread.hpp>
class WorkHandler
{
public:
WorkHandler();
~WorkHandler();
typedef boost::function<void(void)> Work; // the type of work we can handle
void AddWork(Work w) { pThreadProcessing->post(w); }
private:
void ProcessWork();
boost::scoped_ptr<boost::asio::io_service> pThreadProcessing;
boost::thread thread;
bool runThread; // Make sure this is atomic
};
WorkHandler::WorkHandler()
: pThreadProcessing(new boost::asio::io_service), // create our io service
thread(&WorkHandler::ProcessWork, this), // create our thread
runThread(true) // run the thread
{
}
WorkHandler::~WorkHandler()
{
runThread = false; // stop running the thread
thread.join(); // wait for the thread to finish
}
void WorkHandler::ProcessWork()
{
while (runThread) // while the thread is running
{
pThreadProcessing->run(); // process work
pThreadProcessing->reset(); // prepare for more work
}
}
int CalculateSomething(int a, int b)
{
return a + b;
}
int main()
{
WorkHandler wh; // create a work handler
// give it some work to do
wh.AddWork(boost::bind(&CalculateSomething, 4, 5));
wh.AddWork(boost::bind(&CalculateSomething, 10, 100));
wh.AddWork(boost::bind(&CalculateSomething, 35, -1));
Sleep(2000); // ONLY for demonstration! This just allows the thread a chance to work before we destroy it.
return 0;
}
boost::asio::io_service
是线程安全的,因此您可以 post 处理它而不需要互斥体。
注意:虽然我没有使 bool runThread
成为原子,但为了线程安全,它应该是(我只是在我的 c++ 中没有 atomic
)
我最近看了 Herb 的演讲,我相信他的 lock-free linked list 应该不错。唯一的问题是 atomic< shared_ptr<T> >
尚未实施。我使用了 atomic_*
函数调用,正如 Herb 在他的演讲中所解释的那样。
在示例中,我已将任务简化为 int,但它可以是您想要的任何内容。
函数atomic_compare_exchange_weak
接受三个参数:要比较的项目、预期值和期望值。它returns true 或false 表示成功或失败。失败时,预期值将更改为找到的值。
#include <memory>
#include <atomic>
// Untested code.
struct WorkItem { // Simple linked list implementation.
int work;
shared_ptr<WorkItem> next; // remember to use as atomic
};
class WorkList {
shared_ptr<WorkItem> head; // remember to use as atomic
public:
// Used by producers to add work to the list. This implementation adds
// new items to the front (stack), but it can easily be changed to a queue.
void push_work(int work) {
shared_ptr<WorkItem> p(new WorkItem()); // The new item we want to add.
p->work = work;
p->next = head;
// Do we get to change head to p?
while (!atomic_compare_exchange_weak(&head, &p->next, p)) {
// Nope, someone got there first, try again with the new p->next,
// and remember: p->next is automatically changed to the new value of head.
}
// Yup, great! Everything's done then.
}
// Used by consumers to claim items to process.
int pop_work() {
auto p = atomic_load(&head); // The item we want to process.
int work = (p ? p->work : -1);
// Do we get to change head to p->next?
while (p && !atomic_compare_exchange_weak(&head, &p, p->next)) {
// Nope, someone got there first, try again with the new p,
// and remember: p is automatically changed to the new value of head.
work = (p ? p->work : -1); // Make sure to update work as well!
}
// Yup, great! Everything's done then, return the new task.
return work; // Returns -1 if list is empty.
}
};
编辑: 演讲中解释了将 shared_ptr
与 atomic_*
函数结合使用的原因。简而言之:从链表中弹出一个项目可能会从某个遍历链表的人的下方删除它,或者不同的节点可能会分配到相同的内存地址 (The ABA Problem)。使用 shared_ptr
将确保任何老读者都将持有对原始项目的有效引用。
正如 Herb 所解释的,这使得 pop 函数的实现变得微不足道。
编辑:
我现在已经完成了我的队列(克服了下面描述的问题,以及更多)。对于那些感兴趣的人,可以找到 here。我很乐意听到任何评论:)。请注意队列不仅仅是一个工作项队列,而是一个模板容器,当然可以用工作项实例化它。
原文:
在观看 Herb Sutter's talk C++11 和 14 中的并发后,我对非阻塞并发感到非常兴奋。
但是,我还没有找到我认为是基本问题的解决方案。所以如果这已经在这里了,请对我客气一点。
我的问题很简单。我正在创建一个非常简单的线程池。为了做到这一点,我在 workPool
class 中设置了一些工作线程 运行。我保留了一份 workItems
的列表。
如何以无锁方式添加工作项。
这样做的非无锁方式当然是创建一个互斥体。如果您添加项目并在当前工作项目完成后读取(当然锁定)列表,请锁定它。
但是我不知道如何以无锁方式执行此操作。
下面是我正在创建的内容的粗略概念。我为这个问题写的这段代码。而且它既不完整,也没有错误:)
#include <thread>
#include <deque>
#include <vector>
class workPool
{
public:
workPool(int workerCount) :
running(1)
{
for (int i = workerCount; i > 0; --i)
workers.push_back(std::thread(&workPool::doWork, this));
}
~workPool()
{
running = 0;
}
private:
bool running;
std::vector< std::thread > workers;
std::deque< std::function<void()> > workItems;
void doWork()
{
while (running)
{
(*workItems.begin())();
workItems.erase(workItems.begin());
if (!workItems.size())
//here the thread should be paused till a new item is added
}
}
void addWorkitem()
{
//This is my confusion. How should I do this?
}
};
在这种拥有共享资源(工作队列)的上下文中,如果您真正深入挖掘,通常会被原子和 CAS 循环所取代。
获得无锁并发堆栈的基本想法相当简单(编辑:虽然我在第一个 post 中犯了一个错误,但可能有点欺骗性的棘手 - 更有理由欣赏好库)。为简单起见,我选择了堆栈,但使用队列并不需要太多。
写入堆栈:
Create a new work item.
Loop Repeatedly:
Store the top pointer to the stack.
Set the work item's next pointer to the top of the stack.
Atomic: Compare and swap the top pointer with the pointer to the work item.
If this succeeds and returns the top pointer we stored, break out
of the loop.
从堆栈中弹出:
Loop:
Fetch top pointer.
If top pointer is not null:
Atomic: CAS top pointer with next pointer.
If successful, break.
Else:
(Optional) Sleep/Yield to avoid burning cycles.
Process the item pointed to by the previous top pointer.
现在,如果你真的很详细,你可以在推送或弹出失败时坚持其他工作让线程去做,例如
我不知道如何在 C++ 11(或更高版本)中执行此操作;然而,这里有一个关于如何使用 C++ 98 和 `boost (v1.50) 来实现的解决方案:
这显然不是一个非常有用的示例,它仅用于演示目的:
#include <boost/scoped_ptr.hpp>
#include <boost/function.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/thread.hpp>
class WorkHandler
{
public:
WorkHandler();
~WorkHandler();
typedef boost::function<void(void)> Work; // the type of work we can handle
void AddWork(Work w) { pThreadProcessing->post(w); }
private:
void ProcessWork();
boost::scoped_ptr<boost::asio::io_service> pThreadProcessing;
boost::thread thread;
bool runThread; // Make sure this is atomic
};
WorkHandler::WorkHandler()
: pThreadProcessing(new boost::asio::io_service), // create our io service
thread(&WorkHandler::ProcessWork, this), // create our thread
runThread(true) // run the thread
{
}
WorkHandler::~WorkHandler()
{
runThread = false; // stop running the thread
thread.join(); // wait for the thread to finish
}
void WorkHandler::ProcessWork()
{
while (runThread) // while the thread is running
{
pThreadProcessing->run(); // process work
pThreadProcessing->reset(); // prepare for more work
}
}
int CalculateSomething(int a, int b)
{
return a + b;
}
int main()
{
WorkHandler wh; // create a work handler
// give it some work to do
wh.AddWork(boost::bind(&CalculateSomething, 4, 5));
wh.AddWork(boost::bind(&CalculateSomething, 10, 100));
wh.AddWork(boost::bind(&CalculateSomething, 35, -1));
Sleep(2000); // ONLY for demonstration! This just allows the thread a chance to work before we destroy it.
return 0;
}
boost::asio::io_service
是线程安全的,因此您可以 post 处理它而不需要互斥体。
注意:虽然我没有使 bool runThread
成为原子,但为了线程安全,它应该是(我只是在我的 c++ 中没有 atomic
)
我最近看了 Herb 的演讲,我相信他的 lock-free linked list 应该不错。唯一的问题是 atomic< shared_ptr<T> >
尚未实施。我使用了 atomic_*
函数调用,正如 Herb 在他的演讲中所解释的那样。
在示例中,我已将任务简化为 int,但它可以是您想要的任何内容。
函数atomic_compare_exchange_weak
接受三个参数:要比较的项目、预期值和期望值。它returns true 或false 表示成功或失败。失败时,预期值将更改为找到的值。
#include <memory>
#include <atomic>
// Untested code.
struct WorkItem { // Simple linked list implementation.
int work;
shared_ptr<WorkItem> next; // remember to use as atomic
};
class WorkList {
shared_ptr<WorkItem> head; // remember to use as atomic
public:
// Used by producers to add work to the list. This implementation adds
// new items to the front (stack), but it can easily be changed to a queue.
void push_work(int work) {
shared_ptr<WorkItem> p(new WorkItem()); // The new item we want to add.
p->work = work;
p->next = head;
// Do we get to change head to p?
while (!atomic_compare_exchange_weak(&head, &p->next, p)) {
// Nope, someone got there first, try again with the new p->next,
// and remember: p->next is automatically changed to the new value of head.
}
// Yup, great! Everything's done then.
}
// Used by consumers to claim items to process.
int pop_work() {
auto p = atomic_load(&head); // The item we want to process.
int work = (p ? p->work : -1);
// Do we get to change head to p->next?
while (p && !atomic_compare_exchange_weak(&head, &p, p->next)) {
// Nope, someone got there first, try again with the new p,
// and remember: p is automatically changed to the new value of head.
work = (p ? p->work : -1); // Make sure to update work as well!
}
// Yup, great! Everything's done then, return the new task.
return work; // Returns -1 if list is empty.
}
};
编辑: 演讲中解释了将 shared_ptr
与 atomic_*
函数结合使用的原因。简而言之:从链表中弹出一个项目可能会从某个遍历链表的人的下方删除它,或者不同的节点可能会分配到相同的内存地址 (The ABA Problem)。使用 shared_ptr
将确保任何老读者都将持有对原始项目的有效引用。
正如 Herb 所解释的,这使得 pop 函数的实现变得微不足道。