C++11(或更高版本)中的通用多个生产者/消费者中的奇怪挂起
Weird hang in generic multiple producers / consumers in C++11 (or above)
我使用 C++11(或更高版本)多线程编写了 "generic" 多生产者/消费者。代码(下面)有点工作,但如果创建了太多的生产者/消费者线程,它会挂起/崩溃。
这个想法是巧妙地分离关注点:MultiProducerConsumer object 负责协议(线程维护、互斥锁、condvar),而 "user" 注入相关仿函数来完成具体工作(生产者、消费者、终止谓词)到 object.
已使用 VS 2017 和 cygwin g++ 进行测试。 cygwin 上的情况更糟(为什么?)。我无法弄清楚问题是什么,我可以使用提示。提前致谢。
header,multi_producer_consumer.hpp:
#pragma once
#include <algorithm>
#include <functional>
#include <iterator>
#include <thread>
#include <mutex>
#include <condition_variable>
//#include <cassert>
template<typename Container>
struct MultiProducerConsumer
{
using Type = typename Container::value_type;
using ModifierFct = std::function<void(Container&)>;
using DoneFctr = std::function<bool(const Container&)>;
MultiProducerConsumer(const Container& q,
ModifierFct producer,
ModifierFct consumer,
DoneFctr donef,
size_t n_producers,
size_t n_consumers):
m_queue(q),
m_pf(producer),
m_cf(consumer),
m_producers(n_producers),
m_consumers(n_consumers),
m_done(donef),
m_joined(false)
{
///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?
for (auto i = 0; i < n_producers; ++i)
{
m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
}
for (int i = 0; i < n_consumers; ++i)
{
m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
}
}
virtual ~MultiProducerConsumer(void)
{
if (!m_joined)
join();
}
virtual bool done(void) const
{
std::lock_guard<std::mutex> lk(m_mutex);
return m_done(m_queue);
}
void join(void)
{
std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
m_joined = true;
}
protected:
virtual void produce(size_t i)
{
while (!done())
{
std::lock_guard<std::mutex> lk(m_mutex);
m_pf(m_queue);
///if (i == 0)//should only only one thread notify all the consumers...? nope
m_condvar.notify_all();//notifies all...not one
}
}
virtual void consume(size_t i)
{
while (!done())
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
m_cf(m_queue);
}
}
private:
Container m_queue;
ModifierFct m_pf;
ModifierFct m_cf;
DoneFctr m_done;
mutable std::mutex m_mutex;
std::condition_variable m_condvar;
std::vector<std::thread> m_producers;
std::vector<std::thread> m_consumers;
bool m_joined;
};
下面的测试器使用 queue 向量,这些向量被 "produced"(简单地从 "outside" queue 矩阵移动到生产者/消费者中queue)。消费者 "consume" 通过对每个矢量求和并将总和存储到另一个 "outside" 容器(求和)中来得到矢量。当遇到第一个总和为零的向量时,整个过程终止。下面是代码:
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <queue>
#include <numeric>
#include <iterator>
#include <cassert>
#include "multi_producer_consumer.hpp"
template<typename T>
using QVec = std::queue<std::vector<T>>;
template<typename T>
inline
T sum(const std::vector<T>& v)
{
return std::accumulate(v.begin(), v.end(), 0);
}
template<typename T>
T from_string(std::string&& str)
{
T ret;
std::stringstream ss(str);
ss >> ret;
return ret;
}
int main(int argc, char* argv[])
{
int n_p = 1;
int n_c = 1;
if (argc == 3)
{
n_p = from_string<int>(argv[1]);
n_c = from_string<int>(argv[2]);
}
const unsigned long max_n_threads = std::thread::hardware_concurrency();
std::cout << "max # threads: " << max_n_threads << "\n";
std::cout << "n_producers: " << n_p << ", n_consumers: " << n_c << "\n";
try {
std::vector<int> vstart(1, 1);
std::vector<int> vstop(1, 0);
std::queue<std::vector<int>> matrix;
matrix.push(vstart);
matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
matrix.push(std::vector<int>{ 6, 7, 8, 9 });
matrix.push(std::vector<int>{ 10, 11, 12, 13 });
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
std::vector<long> sums;
QVec<int> qqv;
//multi-producer-consumer that feeds vector from a queue
//to a consumer that sums them up, until sum is zero:
//
MultiProducerConsumer<QVec<int>> mpc(qqv,
[&matrix](QVec<int>& qv) { //producer function: move elements from matrix into qv
if (!matrix.empty())
{
auto v = matrix.front();
matrix.pop();
qv.push(v);
}
},
[&sums](QVec<int>& qv) { //consumer function: pop from qv and sum up elements
//if (!qv.empty())//this test is superfluous
//{
auto v = qv.front();
qv.pop();
sums.push_back(sum(v));
//}
},
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}, n_p, n_c);//1,1 => okay; 1,2 => okay; 2,2 => okay; 5,5 => okay on Win64; hangs on cygwin; 5,10 => it can hang
//need main thread to block until producers/consumers are done,
//so that matrix/sums are not destructed while
//producers/consumers are still trying to use them:
//
mpc.join();
std::cout << "sums:\n";
std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
}
catch (std::exception& ex)
{
std::cerr << ex.what() << "\n";
return 1;
}
catch (...)
{
std::cerr << "Unknown exception.\n";
return 1;
}
std::cout << "Done!" << std::endl;
return 0;
}
有问题。只是不知道是什么。
[已编辑]
跟进 Humphrey Winnebago 的回答,尤其是在尝试修复第 2 部分时:修改生产者/消费者操作的粒度以将它们与 queue 维护分开。下面header的重要部分:
template<typename Container>
struct MultiProducerConsumer
{
using Type = typename Container::value_type;
using ModifierFct = std::function<void(Type&)>;
using DoneFctr = std::function<bool(const Container&)>;
MultiProducerConsumer(const Container& q,
ModifierFct producer,
ModifierFct consumer,
DoneFctr donef,
size_t n_producers,
size_t n_consumers):
m_queue(q),
m_pf(producer),
m_cf(consumer),
m_producers(n_producers),
m_consumers(n_consumers),
m_done(donef),
m_joined(false)
{
///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?
for (auto i = 0; i < n_producers; ++i)
{
m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
}
for (auto i = 0; i < n_consumers; ++i)
{
m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
}
}
virtual ~MultiProducerConsumer(void)
{
if (!m_joined)
join();
}
void join(void)
{
std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
m_joined = true;
}
protected:
// be careful with the virtual functions + overloading
virtual bool done(std::lock_guard<std::mutex>&) const
{
return m_done(m_queue);
}
virtual bool done(std::unique_lock<std::mutex>&) const
{
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (true) // 1
{
///std::lock_guard<std::mutex> lk(m_mutex);
std::unique_lock<std::mutex> lk(m_mutex);
if (done(lk)) // 2
break;
Type v;
//Part 2/2: Serious design flaws:
//should move producer work outside of critical section
//but this requires call below to be surrounded by unlock/lock:
//
lk.unlock();
m_pf(v);
lk.lock();
m_queue.push(v);
m_condvar.notify_all();
}
m_condvar.notify_all(); // 3. need to break any sleeping consumers
}
virtual void consume(size_t i)
{
while (true) // 1
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
if (done(lk)) // 2 & 3
break;
auto v = m_queue.front();
m_queue.pop();
//Consumer fix for Part 2 / 2: Serious design flaws:
//
lk.unlock();
//Consumer fix for Part 2 / 2: Serious design flaws:
//move outside this critical section
//
m_cf(v);
}
}
private:
Container m_queue;
ModifierFct m_pf;
ModifierFct m_cf;
DoneFctr m_done;
mutable std::mutex m_mutex;
std::condition_variable m_condvar;
std::vector<std::thread> m_producers;
std::vector<std::thread> m_consumers;
bool m_joined;
};
当然,现在测试器中的容器 "matrix" 和 "sums" 存在竞争条件。但这是与 MPC 抽象不同的问题。是的,代码再次序列化结束,但那是因为生产使用外部共享资源而消费使用另一个共享资源。如果生产使用,比如说,一个随机#生成器和消费存储在不同位置的结果,那么就不会有生产者/消费者的序列化。新测试代码的(重要部分)以及(我希望的)第 2 部分的修复:
//...
try {
std::vector<int> vstart(1, 1);
std::vector<int> vstop(1, 0);
std::queue<std::vector<int>> matrix;
matrix.push(vstart);
matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
matrix.push(std::vector<int>{ 6, 7, 8, 9 });
matrix.push(std::vector<int>{ 10, 11, 12, 13 });
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
std::vector<long> sums;
QVec<int> qqv;
//now matrix and sum need to be protected
//but they're not the multi-producer-consumer's (MPC)
//responsibility anymore;
//
std::mutex sum_mutex;
std::mutex matrix_mutex;
//multi-producer-consumer that feeds vector from a queue
//to a consumer that sums them up, until sum is zero:
//
MultiProducerConsumer<QVec<int>> mpc(qqv,
[&matrix_mutex, &matrix](std::vector<int>& v) { //producer function: move elements from matrix into qv
if (!matrix.empty())
{
std::lock_guard<std::mutex> guard(matrix_mutex);
v = matrix.front();
matrix.pop();
}
},
[&sum_mutex, &sums](std::vector<int>& v) { //consumer function: pop from qv and sum up elements
long s = sum(v);
std::lock_guard<std::mutex> guard(sum_mutex);
sums.push_back(s);
},
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}, n_p, n_c);
//need main thread to block until producers/consumers are done,
//so that matrix/sums are not destructed while
//producers/consumers are still trying to use them:
//
mpc.join();
std::cout << "sums:\n";
std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
}
catch (std::exception& ex)
{
std::cerr << ex.what() << "\n";
return 1;
}
catch (...)
{
std::cerr << "Unknown exception.\n";
return 1;
}
//...
--- 第 1/2 部分:为什么它不起作用的答案 ---
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
是的(有时)。我发现当它挂起时,是因为生产者吸走了最后一个项目。
您在广义生产者和消费者函数中存在缺陷。以生产者为例:
virtual bool done(void) const
{
std::lock_guard<std::mutex> lk(m_mutex);
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (!done()) // <---- HERE to...
{
std::lock_guard<std::mutex> lk(m_mutex); // <----- ...HERE. done() condition may not hold, as mutex was released
m_pf(m_queue);consumers...? nope
m_condvar.notify_all();
}
}
您获取互斥量,检查条件,然后释放互斥量。然后再次获取互斥锁,假设条件仍然成立。这是一个竞争条件。它不是那种会破坏你的数据结构的竞争条件,但它仍然是一种竞争条件。
在不同的情况下,这可能确实有效,但在这种情况下,它与您的消费者的等待条件和您的 "done predicate".
的交互效果很差
"done predicate":
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}
- 消费者检查完成()。获取互斥量。看到 m_queue 是
空(表示未完成)。释放互斥体。进入功能
body。
- 生产者获取互斥锁,"produces" vstop,然后释放互斥锁。
- 消费者获取互斥体。有条件地等待 !m_queue.empty()。 m_queue不为空,因为producer生产了vstop。由于谓词为真,消费者不会等待。
- 消费者"consumes"vstop。释放互斥量。
- done() 现在永远不会 return true。
修复:
// be careful with the virtual functions + overloading
virtual bool done(std::lock_guard<std::mutex>&) const
{
return m_done(m_queue);
}
virtual bool done(std::unique_lock<std::mutex>&) const
{
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (true) // 1
{
std::lock_guard<std::mutex> lk(m_mutex);
if (done(lk)) // 2
break;
m_pf(m_queue);
m_condvar.notify_all();
}
m_condvar.notify_all(); // 3. need to break any sleeping consumers
}
virtual void consume(size_t i)
{
while (true) // 1
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
if (done(lk)) // 2 & 3
break;
m_cf(m_queue);
}
}
这应该足以修复它,但您也可以修补您的 "done" 谓词
[&matrix](const QVec<int>& qv) { // 1
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
assert(!matrix.empty()); // 2
// or... if (matrix.empty()) throw, since you'll probably want to test in release mode
return false;
--- 第 2/2 部分:严重的设计缺陷 ---
您的生产者和消费者在工作时拥有互斥量。这根本不是并发编程。无非就是复杂的顺序编程。您的算法与仅一次生成所有内容(在一个线程上)然后全部使用它们(在同一线程上)的算法之间没有区别,只是后者的错误可能性减少了 100 倍。
你应该尽快完成临界区。获取互斥量,触摸共享数据,然后释放。然后处理数据 thread-locally.
您正在复制和工作,同时持有互斥锁。这让我难以置信。你真的需要re-think你的设计。
将入队和出队操作移至生产和消费。只获取用于入队和出队的互斥量。并且不要复制...使用移动操作。更改您的 "done predicate",这样它就不会做很多工作。
我使用 C++11(或更高版本)多线程编写了 "generic" 多生产者/消费者。代码(下面)有点工作,但如果创建了太多的生产者/消费者线程,它会挂起/崩溃。
这个想法是巧妙地分离关注点:MultiProducerConsumer object 负责协议(线程维护、互斥锁、condvar),而 "user" 注入相关仿函数来完成具体工作(生产者、消费者、终止谓词)到 object.
已使用 VS 2017 和 cygwin g++ 进行测试。 cygwin 上的情况更糟(为什么?)。我无法弄清楚问题是什么,我可以使用提示。提前致谢。
header,multi_producer_consumer.hpp:
#pragma once
#include <algorithm>
#include <functional>
#include <iterator>
#include <thread>
#include <mutex>
#include <condition_variable>
//#include <cassert>
template<typename Container>
struct MultiProducerConsumer
{
using Type = typename Container::value_type;
using ModifierFct = std::function<void(Container&)>;
using DoneFctr = std::function<bool(const Container&)>;
MultiProducerConsumer(const Container& q,
ModifierFct producer,
ModifierFct consumer,
DoneFctr donef,
size_t n_producers,
size_t n_consumers):
m_queue(q),
m_pf(producer),
m_cf(consumer),
m_producers(n_producers),
m_consumers(n_consumers),
m_done(donef),
m_joined(false)
{
///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?
for (auto i = 0; i < n_producers; ++i)
{
m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
}
for (int i = 0; i < n_consumers; ++i)
{
m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
}
}
virtual ~MultiProducerConsumer(void)
{
if (!m_joined)
join();
}
virtual bool done(void) const
{
std::lock_guard<std::mutex> lk(m_mutex);
return m_done(m_queue);
}
void join(void)
{
std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
m_joined = true;
}
protected:
virtual void produce(size_t i)
{
while (!done())
{
std::lock_guard<std::mutex> lk(m_mutex);
m_pf(m_queue);
///if (i == 0)//should only only one thread notify all the consumers...? nope
m_condvar.notify_all();//notifies all...not one
}
}
virtual void consume(size_t i)
{
while (!done())
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
m_cf(m_queue);
}
}
private:
Container m_queue;
ModifierFct m_pf;
ModifierFct m_cf;
DoneFctr m_done;
mutable std::mutex m_mutex;
std::condition_variable m_condvar;
std::vector<std::thread> m_producers;
std::vector<std::thread> m_consumers;
bool m_joined;
};
下面的测试器使用 queue 向量,这些向量被 "produced"(简单地从 "outside" queue 矩阵移动到生产者/消费者中queue)。消费者 "consume" 通过对每个矢量求和并将总和存储到另一个 "outside" 容器(求和)中来得到矢量。当遇到第一个总和为零的向量时,整个过程终止。下面是代码:
#include <iostream>
#include <string>
#include <sstream>
#include <vector>
#include <queue>
#include <numeric>
#include <iterator>
#include <cassert>
#include "multi_producer_consumer.hpp"
template<typename T>
using QVec = std::queue<std::vector<T>>;
template<typename T>
inline
T sum(const std::vector<T>& v)
{
return std::accumulate(v.begin(), v.end(), 0);
}
template<typename T>
T from_string(std::string&& str)
{
T ret;
std::stringstream ss(str);
ss >> ret;
return ret;
}
int main(int argc, char* argv[])
{
int n_p = 1;
int n_c = 1;
if (argc == 3)
{
n_p = from_string<int>(argv[1]);
n_c = from_string<int>(argv[2]);
}
const unsigned long max_n_threads = std::thread::hardware_concurrency();
std::cout << "max # threads: " << max_n_threads << "\n";
std::cout << "n_producers: " << n_p << ", n_consumers: " << n_c << "\n";
try {
std::vector<int> vstart(1, 1);
std::vector<int> vstop(1, 0);
std::queue<std::vector<int>> matrix;
matrix.push(vstart);
matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
matrix.push(std::vector<int>{ 6, 7, 8, 9 });
matrix.push(std::vector<int>{ 10, 11, 12, 13 });
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
std::vector<long> sums;
QVec<int> qqv;
//multi-producer-consumer that feeds vector from a queue
//to a consumer that sums them up, until sum is zero:
//
MultiProducerConsumer<QVec<int>> mpc(qqv,
[&matrix](QVec<int>& qv) { //producer function: move elements from matrix into qv
if (!matrix.empty())
{
auto v = matrix.front();
matrix.pop();
qv.push(v);
}
},
[&sums](QVec<int>& qv) { //consumer function: pop from qv and sum up elements
//if (!qv.empty())//this test is superfluous
//{
auto v = qv.front();
qv.pop();
sums.push_back(sum(v));
//}
},
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}, n_p, n_c);//1,1 => okay; 1,2 => okay; 2,2 => okay; 5,5 => okay on Win64; hangs on cygwin; 5,10 => it can hang
//need main thread to block until producers/consumers are done,
//so that matrix/sums are not destructed while
//producers/consumers are still trying to use them:
//
mpc.join();
std::cout << "sums:\n";
std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
}
catch (std::exception& ex)
{
std::cerr << ex.what() << "\n";
return 1;
}
catch (...)
{
std::cerr << "Unknown exception.\n";
return 1;
}
std::cout << "Done!" << std::endl;
return 0;
}
有问题。只是不知道是什么。
[已编辑] 跟进 Humphrey Winnebago 的回答,尤其是在尝试修复第 2 部分时:修改生产者/消费者操作的粒度以将它们与 queue 维护分开。下面header的重要部分:
template<typename Container>
struct MultiProducerConsumer
{
using Type = typename Container::value_type;
using ModifierFct = std::function<void(Type&)>;
using DoneFctr = std::function<bool(const Container&)>;
MultiProducerConsumer(const Container& q,
ModifierFct producer,
ModifierFct consumer,
DoneFctr donef,
size_t n_producers,
size_t n_consumers):
m_queue(q),
m_pf(producer),
m_cf(consumer),
m_producers(n_producers),
m_consumers(n_consumers),
m_done(donef),
m_joined(false)
{
///std::lock_guard<std::mutex> lk(m_mutex);//why? to prevent the producers to start before consumers are created. So what, if they do?
for (auto i = 0; i < n_producers; ++i)
{
m_producers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::produce), this, i);
}
for (auto i = 0; i < n_consumers; ++i)
{
m_consumers[i] = std::thread(std::mem_fn(&MultiProducerConsumer::consume), this, i);
}
}
virtual ~MultiProducerConsumer(void)
{
if (!m_joined)
join();
}
void join(void)
{
std::for_each(m_producers.begin(), m_producers.end(), std::mem_fn(&std::thread::join));
std::for_each(m_consumers.begin(), m_consumers.end(), std::mem_fn(&std::thread::join));
m_joined = true;
}
protected:
// be careful with the virtual functions + overloading
virtual bool done(std::lock_guard<std::mutex>&) const
{
return m_done(m_queue);
}
virtual bool done(std::unique_lock<std::mutex>&) const
{
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (true) // 1
{
///std::lock_guard<std::mutex> lk(m_mutex);
std::unique_lock<std::mutex> lk(m_mutex);
if (done(lk)) // 2
break;
Type v;
//Part 2/2: Serious design flaws:
//should move producer work outside of critical section
//but this requires call below to be surrounded by unlock/lock:
//
lk.unlock();
m_pf(v);
lk.lock();
m_queue.push(v);
m_condvar.notify_all();
}
m_condvar.notify_all(); // 3. need to break any sleeping consumers
}
virtual void consume(size_t i)
{
while (true) // 1
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
if (done(lk)) // 2 & 3
break;
auto v = m_queue.front();
m_queue.pop();
//Consumer fix for Part 2 / 2: Serious design flaws:
//
lk.unlock();
//Consumer fix for Part 2 / 2: Serious design flaws:
//move outside this critical section
//
m_cf(v);
}
}
private:
Container m_queue;
ModifierFct m_pf;
ModifierFct m_cf;
DoneFctr m_done;
mutable std::mutex m_mutex;
std::condition_variable m_condvar;
std::vector<std::thread> m_producers;
std::vector<std::thread> m_consumers;
bool m_joined;
};
当然,现在测试器中的容器 "matrix" 和 "sums" 存在竞争条件。但这是与 MPC 抽象不同的问题。是的,代码再次序列化结束,但那是因为生产使用外部共享资源而消费使用另一个共享资源。如果生产使用,比如说,一个随机#生成器和消费存储在不同位置的结果,那么就不会有生产者/消费者的序列化。新测试代码的(重要部分)以及(我希望的)第 2 部分的修复:
//...
try {
std::vector<int> vstart(1, 1);
std::vector<int> vstop(1, 0);
std::queue<std::vector<int>> matrix;
matrix.push(vstart);
matrix.push(std::vector<int>{ 1, 2, 3, 4, 5 });
matrix.push(std::vector<int>{ 6, 7, 8, 9 });
matrix.push(std::vector<int>{ 10, 11, 12, 13 });
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
std::vector<long> sums;
QVec<int> qqv;
//now matrix and sum need to be protected
//but they're not the multi-producer-consumer's (MPC)
//responsibility anymore;
//
std::mutex sum_mutex;
std::mutex matrix_mutex;
//multi-producer-consumer that feeds vector from a queue
//to a consumer that sums them up, until sum is zero:
//
MultiProducerConsumer<QVec<int>> mpc(qqv,
[&matrix_mutex, &matrix](std::vector<int>& v) { //producer function: move elements from matrix into qv
if (!matrix.empty())
{
std::lock_guard<std::mutex> guard(matrix_mutex);
v = matrix.front();
matrix.pop();
}
},
[&sum_mutex, &sums](std::vector<int>& v) { //consumer function: pop from qv and sum up elements
long s = sum(v);
std::lock_guard<std::mutex> guard(sum_mutex);
sums.push_back(s);
},
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}, n_p, n_c);
//need main thread to block until producers/consumers are done,
//so that matrix/sums are not destructed while
//producers/consumers are still trying to use them:
//
mpc.join();
std::cout << "sums:\n";
std::copy(std::begin(sums), std::end(sums), std::ostream_iterator<int>(std::cout, "\n"));
}
catch (std::exception& ex)
{
std::cerr << ex.what() << "\n";
return 1;
}
catch (...)
{
std::cerr << "Unknown exception.\n";
return 1;
}
//...
--- 第 1/2 部分:为什么它不起作用的答案 ---
matrix.push(vstop);
matrix.push(std::vector<int>{ 20, 21, 22, 23 });//testing: this shouldn't get processed: okay, it's not
是的(有时)。我发现当它挂起时,是因为生产者吸走了最后一个项目。
您在广义生产者和消费者函数中存在缺陷。以生产者为例:
virtual bool done(void) const
{
std::lock_guard<std::mutex> lk(m_mutex);
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (!done()) // <---- HERE to...
{
std::lock_guard<std::mutex> lk(m_mutex); // <----- ...HERE. done() condition may not hold, as mutex was released
m_pf(m_queue);consumers...? nope
m_condvar.notify_all();
}
}
您获取互斥量,检查条件,然后释放互斥量。然后再次获取互斥锁,假设条件仍然成立。这是一个竞争条件。它不是那种会破坏你的数据结构的竞争条件,但它仍然是一种竞争条件。
在不同的情况下,这可能确实有效,但在这种情况下,它与您的消费者的等待条件和您的 "done predicate".
的交互效果很差"done predicate":
[](const QVec<int>& qv) { //done predicate: if nonempty top of queue sums up to 0: done; else not done;
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
return false;
}
- 消费者检查完成()。获取互斥量。看到 m_queue 是 空(表示未完成)。释放互斥体。进入功能 body。
- 生产者获取互斥锁,"produces" vstop,然后释放互斥锁。
- 消费者获取互斥体。有条件地等待 !m_queue.empty()。 m_queue不为空,因为producer生产了vstop。由于谓词为真,消费者不会等待。
- 消费者"consumes"vstop。释放互斥量。
- done() 现在永远不会 return true。
修复:
// be careful with the virtual functions + overloading
virtual bool done(std::lock_guard<std::mutex>&) const
{
return m_done(m_queue);
}
virtual bool done(std::unique_lock<std::mutex>&) const
{
return m_done(m_queue);
}
virtual void produce(size_t i)
{
while (true) // 1
{
std::lock_guard<std::mutex> lk(m_mutex);
if (done(lk)) // 2
break;
m_pf(m_queue);
m_condvar.notify_all();
}
m_condvar.notify_all(); // 3. need to break any sleeping consumers
}
virtual void consume(size_t i)
{
while (true) // 1
{
std::unique_lock<std::mutex> lk(m_mutex);
m_condvar.wait(lk, [this]() {
return !m_queue.empty();
});
if (done(lk)) // 2 & 3
break;
m_cf(m_queue);
}
}
这应该足以修复它,但您也可以修补您的 "done" 谓词
[&matrix](const QVec<int>& qv) { // 1
if (!qv.empty())
{
auto v = qv.front();
return (sum(v) == 0);
}
assert(!matrix.empty()); // 2
// or... if (matrix.empty()) throw, since you'll probably want to test in release mode
return false;
--- 第 2/2 部分:严重的设计缺陷 ---
您的生产者和消费者在工作时拥有互斥量。这根本不是并发编程。无非就是复杂的顺序编程。您的算法与仅一次生成所有内容(在一个线程上)然后全部使用它们(在同一线程上)的算法之间没有区别,只是后者的错误可能性减少了 100 倍。
你应该尽快完成临界区。获取互斥量,触摸共享数据,然后释放。然后处理数据 thread-locally.
您正在复制和工作,同时持有互斥锁。这让我难以置信。你真的需要re-think你的设计。
将入队和出队操作移至生产和消费。只获取用于入队和出队的互斥量。并且不要复制...使用移动操作。更改您的 "done predicate",这样它就不会做很多工作。