
Non blocking way of adding a work item to array or list


我现在已经完成了我的队列(克服了下面描述的问题,以及更多)。对于那些感兴趣的人,可以找到 here。我很乐意听到任何评论:)。请注意队列不仅仅是一个工作项队列,而是一个模板容器,当然可以用工作项实例化它。


在观看 Herb Sutter's talk C++11 和 14 中的并发后,我对非阻塞并发感到非常兴奋。


我的问题很简单。我正在创建一个非常简单的线程池。为了做到这一点,我在 workPool class 中设置了一些工作线程 运行。我保留了一份 workItems 的列表。





#include <thread>
#include <deque>
#include <vector>

class workPool
    workPool(int workerCount) :
        for (int i = workerCount; i > 0; --i)
            workers.push_back(std::thread(&workPool::doWork, this));

        running = 0;
    bool running;
    std::vector< std::thread > workers;
    std::deque< std::function<void()> > workItems;

    void doWork()
        while (running)
            if (!workItems.size())
                //here the thread should be paused till a new item is added


    void addWorkitem()
        //This is my confusion. How should I do this?


在这种拥有共享资源(工作队列)的上下文中,如果您真正深入挖掘,通常会被原子和 CAS 循环所取代。

获得无锁并发堆栈的基本想法相当简单(编辑:虽然我在第一个 post 中犯了一个错误,但可能有点欺骗性的棘手 - 更有理由欣赏好库)。为简单起见,我选择了堆栈,但使用队列并不需要太多。


Create a new work item.
Loop Repeatedly:
    Store the top pointer to the stack.
    Set the work item's next pointer to the top of the stack.
    Atomic: Compare and swap the top pointer with the pointer to the work item.
            If this succeeds and returns the top pointer we stored, break out
            of the loop.


     Fetch top pointer.
     If top pointer is not null:
         Atomic: CAS top pointer with next pointer.
         If successful, break.
         (Optional) Sleep/Yield to avoid burning cycles.

 Process the item pointed to by the previous top pointer.


我不知道如何在 C++ 11(或更高版本)中执行此操作;然而,这里有一个关于如何使用 C++ 98 和 `boost (v1.50) 来实现的解决方案:


#include <boost/scoped_ptr.hpp>
#include <boost/function.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/thread.hpp>

class WorkHandler

    typedef boost::function<void(void)> Work; // the type of work we can handle
    void AddWork(Work w) { pThreadProcessing->post(w); }

    void ProcessWork();

    boost::scoped_ptr<boost::asio::io_service> pThreadProcessing;
    boost::thread thread;
    bool runThread; // Make sure this is atomic

: pThreadProcessing(new boost::asio::io_service), // create our io service
thread(&WorkHandler::ProcessWork, this), // create our thread
runThread(true) // run the thread

    runThread = false; // stop running the thread
    thread.join(); // wait for the thread to finish

void WorkHandler::ProcessWork()
    while (runThread) // while the thread is running
        pThreadProcessing->run(); // process work
        pThreadProcessing->reset(); // prepare for more work

int CalculateSomething(int a, int b)
    return a + b;

int main()
    WorkHandler wh; // create a work handler
    // give it some work to do
    wh.AddWork(boost::bind(&CalculateSomething, 4, 5));
    wh.AddWork(boost::bind(&CalculateSomething, 10, 100));
    wh.AddWork(boost::bind(&CalculateSomething, 35, -1));
    Sleep(2000); // ONLY for demonstration! This just allows the thread a chance to work before we destroy it.
    return 0;

boost::asio::io_service 是线程安全的,因此您可以 post 处理它而不需要互斥体。

注意:虽然我没有使 bool runThread 成为原子,但为了线程安全,它应该是(我只是在我的 c++ 中没有 atomic

我最近看了 Herb 的演讲,我相信他的 lock-free linked list 应该不错。唯一的问题是 atomic< shared_ptr<T> > 尚未实施。我使用了 atomic_* 函数调用,正如 Herb 在他的演讲中所解释的那样。

在示例中,我已将任务简化为 int,但它可以是您想要的任何内容。

函数atomic_compare_exchange_weak接受三个参数:要比较的项目、预期值和期望值。它returns true 或false 表示成功或失败。失败时,预期值将更改为找到的值。

#include <memory>
#include <atomic>

// Untested code.

struct WorkItem { // Simple linked list implementation.
    int work;
    shared_ptr<WorkItem> next; // remember to use as atomic

class WorkList {
    shared_ptr<WorkItem> head; // remember to use as atomic
    // Used by producers to add work to the list. This implementation adds
    // new items to the front (stack), but it can easily be changed to a queue.
    void push_work(int work) {
        shared_ptr<WorkItem> p(new WorkItem()); // The new item we want to add.
        p->work = work;
        p->next = head;

        // Do we get to change head to p?
        while (!atomic_compare_exchange_weak(&head, &p->next, p)) {
            // Nope, someone got there first, try again with the new p->next,
            // and remember: p->next is automatically changed to the new value of head.
        // Yup, great! Everything's done then.

    // Used by consumers to claim items to process.
    int pop_work() {
        auto p = atomic_load(&head); // The item we want to process.
        int work = (p ? p->work : -1);

        // Do we get to change head to p->next?
        while (p && !atomic_compare_exchange_weak(&head, &p, p->next)) {
            // Nope, someone got there first, try again with the new p,
            // and remember: p is automatically changed to the new value of head.
            work = (p ? p->work : -1); // Make sure to update work as well!
        // Yup, great! Everything's done then, return the new task.
        return work; // Returns -1 if list is empty.

编辑: 演讲中解释了将 shared_ptratomic_* 函数结合使用的原因。简而言之:从链表中弹出一个项目可能会从某个遍历链表的人的下方删除它,或者不同的节点可能会分配到相同的内存地址 (The ABA Problem)。使用 shared_ptr 将确保任何老读者都将持有对原始项目的有效引用。

正如 Herb 所解释的,这使得 pop 函数的实现变得微不足道。