C++11(或更高版本)中的通用多个生产者/消费者中的奇怪挂起

Weird hang in generic multiple producers / consumers in C++11 (or above)

我使用 C++11(或更高版本)多线程编写了 "generic" 多生产者/消费者。代码(下面)有点工作,但如果创建了太多的生产者/消费者线程,它会挂起/崩溃。

这个想法是巧妙地分离关注点:MultiProducerConsumer object 负责协议(线程维护、互斥锁、condvar),而 "user" 注入相关仿函数来完成具体工作(生产者、消费者、终止谓词)到 object.

已使用 VS 2017 和 cygwin g++ 进行测试。 cygwin 上的情况更糟(为什么?)。我无法弄清楚问题是什么,我可以使用提示。提前致谢。

header,multi_producer_consumer.hpp:

#pragma once
#include <algorithm>
#include <functional>
#include <iterator>
#include <thread>
#include <mutex>
#include <condition_variable>
//#include <cassert>

template<typename Container>
struct MultiProducerConsumer
{
    using Type = typename Container::value_type;
    using ModifierFct = std::function<void(Container&)>;
    using DoneFctr = std::function<bool(const Container&)>;

    MultiProducerConsumer(const Container& q, 
                          ModifierFct producer,
                          ModifierFct consumer,
                          DoneFctr donef,
                          size_t n_producers,
                          size_t n_consumers):
        m_queue(q),
        m_pf(producer),
        m_cf(consumer),
        m_producers(n_producers),
        m_consumers(n_consumers),
        m_done(donef),
        m_joined(false)
    {
        ///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?

        for (auto i = 0; i < n_producers; ++i)
        {
            m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
        }

        for (int i = 0; i < n_consumers; ++i)
        {
            m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
        }
    }

    virtual ~MultiProducerConsumer(void)
    {
        if (!m_joined)
            join();
    }

    virtual bool done(void) const
    {
        std::lock_guard<std::mutex> lk(m_mutex);
        return m_done(m_queue);
    }

    void join(void)
    {
        std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
        std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
        m_joined = true;
    }

protected:
    virtual void produce(size_t i)
    {
        while (!done())
        {
            std::lock_guard<std::mutex> lk(m_mutex);
            m_pf(m_queue);
            ///if (i == 0)//should only only one thread notify all the consumers...? nope
            m_condvar.notify_all();//notifies all...not one
        }
    }

    virtual void consume(size_t i)
    {
        while (!done())
        {
            std::unique_lock<std::mutex> lk(m_mutex);
            m_condvar.wait(lk, [this]() {
                return !m_queue.empty();
            });
            m_cf(m_queue);
        }
    }
private:
    Container m_queue;
    ModifierFct m_pf;
    ModifierFct m_cf;
    DoneFctr m_done;

    mutable std::mutex m_mutex;
    std::condition_variable m_condvar;

    std::vector<std::thread> m_producers;
    std::vector<std::thread> m_consumers;

    bool m_joined;
};

下面的测试器使用 queue 向量,这些向量被 "produced"(简单地从 "outside" queue 矩阵移动到生产者/消费者中queue)。消费者 "consume" 通过对每个矢量求和并将总和存储到另一个 "outside" 容器(求和)中来得到矢量。当遇到第一个总和为零的向量时,整个过程终止。下面是代码:

#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <queue>
#include <numeric>
#include <iterator>
#include <cassert>

#include "multi_producer_consumer.hpp"

template<typename T>
using QVec = std::queue<std::vector<T>>;

template<typename T>
inline
T sum(const std::vector<T>& v)
{
    return std::accumulate(v.begin(), v.end(), 0);
}

template<typename T>
T from_string(std::string&& str)
{
    T ret;
    std::stringstream ss(str);
    ss >> ret;

    return ret;
}

int main(int argc, char* argv[])
{
    int n_p = 1;
    int n_c = 1;
    if (argc == 3)
    {
        n_p = from_string<int>(argv[1]);
        n_c = from_string<int>(argv[2]);
    }

    const unsigned long max_n_threads = std::thread::hardware_concurrency();
    std::cout << "max # threads: " << max_n_threads << "\n";
    std::cout << "n_producers: " << n_p << ", n_consumers: " << n_c << "\n";

    try {
        std::vector<int> vstart(1, 1);
        std::vector<int> vstop(1, 0);

        std::queue<std::vector<int>> matrix;
        matrix.push(vstart);
        matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
        matrix.push(std::vector<int>{ 6, 7, 8, 9 });
        matrix.push(std::vector<int>{ 10, 11, 12, 13 });
        matrix.push(vstop);
        matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
        std::vector<long> sums;
        QVec<int> qqv;

        //multi-producer-consumer that feeds vector from a queue
        //to a consumer that sums them up, until sum is zero:
        //
        MultiProducerConsumer<QVec<int>> mpc(qqv, 
            [&matrix](QVec<int>& qv) { //producer function: move elements from matrix into qv
            if (!matrix.empty())
            {
                auto v = matrix.front();
                matrix.pop();
                qv.push(v);
            }
        },
            [&sums](QVec<int>& qv) {  //consumer function: pop from qv and sum up elements
            //if (!qv.empty())//this test is superfluous
            //{
                auto v = qv.front();
                qv.pop();
                sums.push_back(sum(v));
            //}
        },
            [](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
            if (!qv.empty())
            {
                auto v = qv.front();
                return (sum(v) == 0);
            }
            return false;
        }, n_p, n_c);//1,1 => okay; 1,2 => okay; 2,2 => okay; 5,5 => okay on Win64; hangs on cygwin; 5,10 => it can hang

        //need main thread to block until producers/consumers are done,
        //so that matrix/sums are not destructed while 
        //producers/consumers are still trying to use them:
        //
        mpc.join();

        std::cout << "sums:\n";
        std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
    }
    catch (std::exception& ex)
    {
        std::cerr << ex.what() << "\n";
        return 1;
    }
    catch (...)
    {
        std::cerr << "Unknown exception.\n";
        return 1;
    }

    std::cout << "Done!" << std::endl;
    return 0;
}

有问题。只是不知道是什么。

[已编辑] 跟进 Humphrey Winnebago 的回答,尤其是在尝试修复第 2 部分时:修改生产者/消费者操作的粒度以将它们与 queue 维护分开。下面header的重要部分:

template<typename Container>
struct MultiProducerConsumer
{
    using Type = typename Container::value_type;
    using ModifierFct = std::function<void(Type&)>;
    using DoneFctr = std::function<bool(const Container&)>;

    MultiProducerConsumer(const Container& q, 
                          ModifierFct producer,
                          ModifierFct consumer,
                          DoneFctr donef,
                          size_t n_producers,
                          size_t n_consumers):
        m_queue(q),
        m_pf(producer),
        m_cf(consumer),
        m_producers(n_producers),
        m_consumers(n_consumers),
        m_done(donef),
        m_joined(false)
    {
        ///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?

        for (auto i = 0; i < n_producers; ++i)
        {
            m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
        }

        for (auto i = 0; i < n_consumers; ++i)
        {
            m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
        }
    }

    virtual ~MultiProducerConsumer(void)
    {
        if (!m_joined)
            join();
    }

    void join(void)
    {
        std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
        std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
        m_joined = true;
    }

protected:
    // be careful with the virtual functions + overloading
    virtual bool done(std::lock_guard<std::mutex>&) const
    {
        return m_done(m_queue);
    }
    virtual bool done(std::unique_lock<std::mutex>&) const
    {
        return m_done(m_queue);
    }

    virtual void produce(size_t i)
    {
        while (true)  // 1
        {
            ///std::lock_guard<std::mutex> lk(m_mutex);
            std::unique_lock<std::mutex> lk(m_mutex);
            if (done(lk))  // 2
                break;

            Type v;

            //Part 2/2: Serious design flaws: 
            //should move producer work outside of critical section
            //but this requires call below to be surrounded by unlock/lock:
            //
            lk.unlock();
            m_pf(v);
            lk.lock();

            m_queue.push(v);

            m_condvar.notify_all();
        }
        m_condvar.notify_all();  // 3. need to break any sleeping consumers
    }

    virtual void consume(size_t i)
    {
        while (true)  // 1
        {
            std::unique_lock<std::mutex> lk(m_mutex);
            m_condvar.wait(lk, [this]() {
                return !m_queue.empty();
            });

            if (done(lk))  // 2 & 3
                break;

            auto v = m_queue.front();
            m_queue.pop();

            //Consumer fix for Part 2 / 2: Serious design flaws:
            //
            lk.unlock();

            //Consumer fix for Part 2 / 2: Serious design flaws:
            //move outside this critical section
            //
            m_cf(v);
        }
    }
private:
    Container m_queue;
    ModifierFct m_pf;
    ModifierFct m_cf;
    DoneFctr m_done;

    mutable std::mutex m_mutex;
    std::condition_variable m_condvar;

    std::vector<std::thread> m_producers;
    std::vector<std::thread> m_consumers;

    bool m_joined;
};

当然,现在测试器中的容器 "matrix" 和 "sums" 存在竞争条件。但这是与 MPC 抽象不同的问题。是的,代码再次序列化结束,但那是因为生产使用外部共享资源而消费使用另一个共享资源。如果生产使用,比如说,一个随机#生成器和消费存储在不同位置的结果,那么就不会有生产者/消费者的序列化。新测试代码的(重要部分)以及(我希望的)第 2 部分的修复:

//...
try {
        std::vector<int> vstart(1, 1);
        std::vector<int> vstop(1, 0);

        std::queue<std::vector<int>> matrix;
        matrix.push(vstart);
        matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
        matrix.push(std::vector<int>{ 6, 7, 8, 9 });
        matrix.push(std::vector<int>{ 10, 11, 12, 13 });
        matrix.push(vstop);
        matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
        std::vector<long> sums;
        QVec<int> qqv;

        //now matrix and sum need to be protected
        //but they're not the multi-producer-consumer's (MPC)
        //responsibility anymore;
        //
        std::mutex sum_mutex;
        std::mutex matrix_mutex;

        //multi-producer-consumer that feeds vector from a queue
        //to a consumer that sums them up, until sum is zero:
        //
        MultiProducerConsumer<QVec<int>> mpc(qqv,
            [&matrix_mutex, &matrix](std::vector<int>& v) { //producer function: move elements from matrix into qv
            if (!matrix.empty())
            {
                std::lock_guard<std::mutex> guard(matrix_mutex);
                v = matrix.front();
                matrix.pop();
            }
        },
            [&sum_mutex, &sums](std::vector<int>& v) {  //consumer function: pop from qv and sum up elements
            long s = sum(v);
            std::lock_guard<std::mutex> guard(sum_mutex);
            sums.push_back(s);
        },
            [](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
            if (!qv.empty())
            {
                auto v = qv.front();
                return (sum(v) == 0);
            }
            return false;
        }, n_p, n_c);

                     //need main thread to block until producers/consumers are done,
                     //so that matrix/sums are not destructed while 
                     //producers/consumers are still trying to use them:
                     //
        mpc.join();

        std::cout << "sums:\n";
        std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
    }
    catch (std::exception& ex)
    {
        std::cerr << ex.what() << "\n";
        return 1;
    }
    catch (...)
    {
        std::cerr << "Unknown exception.\n";
        return 1;
    }
//...

--- 第 1/2 部分:为什么它不起作用的答案 ---

matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not

是的(有时)。我发现当它挂起时,是因为生产者吸走了最后一个项目。

您在广义生产者和消费者函数中存在缺陷。以生产者为例:

virtual bool done(void) const
{
    std::lock_guard<std::mutex> lk(m_mutex);
    return m_done(m_queue);
}

virtual void produce(size_t i)
{
    while (!done())  // <---- HERE to...
    {
        std::lock_guard<std::mutex> lk(m_mutex);  // <----- ...HERE. done() condition may not hold, as mutex was released
        m_pf(m_queue);consumers...? nope
        m_condvar.notify_all();
    }
}

您获取互斥量,检查条件,然后释放互斥量。然后再次获取互斥锁,假设条件仍然成立。这是一个竞争条件。它不是那种会破坏你的数据结构的竞争条件,但它仍然是一种竞争条件。

在不同的情况下,这可能确实有效,但在这种情况下,它与您的消费者的等待条件和您的 "done predicate".

的交互效果很差

"done predicate":

    [](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
        if (!qv.empty())
        {
            auto v = qv.front();
            return (sum(v) == 0);
        }
        return false; 
    }
  1. 消费者检查完成()。获取互斥量。看到 m_queue 是 空(表示未完成)。释放互斥体。进入功能 body。
  2. 生产者获取互斥锁,"produces" vstop,然后释放互斥锁。
  3. 消费者获取互斥体。有条件地等待 !m_queue.empty()。 m_queue不为空,因为producer生产了vstop。由于谓词为真,消费者不会等待。
  4. 消费者"consumes"vstop。释放互斥量。
  5. done() 现在永远不会 return true。

修复:

// be careful with the virtual functions + overloading
virtual bool done(std::lock_guard<std::mutex>&) const
{
    return m_done(m_queue);
}
virtual bool done(std::unique_lock<std::mutex>&) const
{
    return m_done(m_queue);
}

virtual void produce(size_t i)
{
    while (true)  // 1
    {
        std::lock_guard<std::mutex> lk(m_mutex);
        if (done(lk))  // 2
            break;
        m_pf(m_queue);
        m_condvar.notify_all();
    }
    m_condvar.notify_all();  // 3. need to break any sleeping consumers
}

virtual void consume(size_t i)
{
    while (true)  // 1
    {
        std::unique_lock<std::mutex> lk(m_mutex);
        m_condvar.wait(lk, [this]() {
            return !m_queue.empty();
        });

        if (done(lk))  // 2 & 3
            break;

        m_cf(m_queue);
    }
}

这应该足以修复它,但您也可以修补您的 "done" 谓词

    [&matrix](const QVec<int>& qv) {  // 1
    if (!qv.empty())
    {
        auto v = qv.front();
        return (sum(v) == 0);
    }
    assert(!matrix.empty());  // 2
    // or... if (matrix.empty()) throw, since you'll probably want to test in release mode
    return false;

--- 第 2/2 部分:严重的设计缺陷 ---

您的生产者和消费者在工作时拥有互斥量。这根本不是并发编程。无非就是复杂的顺序编程。您的算法与仅一次生成所有内容(在一个线程上)然后全部使用它们(在同一线程上)的算法之间没有区别,只是后者的错误可能性减少了 100 倍。

你应该尽快完成临界区。获取互斥量,触摸共享数据,然后释放。然后处理数据 thread-locally.

您正在复制和工作,同时持有互斥锁。这让我难以置信。你真的需要re-think你的设计。

将入队和出队操作移至生产和消费。只获取用于入队和出队的互斥量。并且不要复制...使用移动操作。更改您的 "done predicate",这样它就不会做很多工作。