C++ multi-threading Error: Single producer Multiple consumer
C++ multi-threading Error: Single producer Multiple consumer
我正在尝试实现单生产者多消费者,但是下面的代码无法编译。
有人可以帮助解决这个错误吗?也可以从这个池中唤醒所有线程并且随机线程能够获取锁吗?
- TIA
`
threadPool/main.cpp:4:
/Library/Developer/CommandLineTools/usr/bin/../include/c++/v1/thread:364:17:
error: no matching constructor for initialization of '_Gp' (aka
'tuple<unique_ptrstd::__1::__thread_struct, void (TestClass::*)(),
TestClass>')
new _Gp(std::move(__tsp),
^ ~~~~~~~~~~~~~~~~~
ls/usr/bin/../include/c++/v1/type_traits:2422:12: error: call to implicitly-deleted copy constructor of 'typename
decay::type' (aka 'TestClass')
return _VSTD::forward<_Tp>(__t);
----------------------------------------------
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <condition_variable>
using namespace std;
class TestClass{
public:
void producer(int i) {
unique_lock<mutex> lockGuard(mtx);
Q.push(i);
lockGuard.unlock();
cond.notify_all();
}
void consumer() {
unique_lock<mutex> lockGuard(mtx);
cond.wait(lockGuard, [this]() {
return !Q.empty();
});
cout<<this_thread::get_id();
cout<<Q.front()<<endl;
Q.pop();
lockGuard.unlock();
};
private:
mutex mtx;
condition_variable cond;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
int MAX_THREADS = std::thread::hardware_concurrency()-1;
vector<thread> ThreadVector;
TestClass testObj;
for(int i=0; i<MAX_THREADS; i++){
ThreadVector.emplace_back(&TestClass::consumer, std::move(testObj));
cout<<"Pool threadID:" <<ThreadVector[i].get_id()<<endl;
}
TestClass testObj2;
for(int i=0; i<10; i++) {
testObj.producer(i);
}
for(auto &&t : ThreadVector) {
t.join();
}
return 0;
}
`
Another version to call threads
int main()
{
std::vector<std::thread> vecOfThreads;
std::function<void(TestClass&)> func = [&](TestClass &obj) {
while(1) {
obj.consumer();
}
};
unsigned MAX_THREADS = std::thread::hardware_concurrency()-1;
TestClass obj;
for(int i=0; i<MAX_THREADS; i++) {
std::thread th1(func, std::ref(obj));
vecOfThreads.emplace_back(std::move(th1));
}
TestClass prod;
for(int i=0; i<10; i++) {
prod.producer(i);
}
for (std::thread & th : vecOfThreads)
{
if (th.joinable())
th.join();
}
return 0;
}
std::move(testObj)
应该是 &testObj
(指向调用 consumer
的对象的指针)- 或 std::ref(testobj)
(变成 reference_wrapper
(持有也是指向该对象的指针)。
- 您应该至少调用
produce
的次数与线程数一样多,否则程序将无法完成。
- 您不需要手动
unlock
。守卫离开范围时会自动解锁。
示例:
class TestClass {
public:
void producer(int i) {
lock_guard<mutex> lockGuard(mtx); // here a lock_guard is enough
Q.push(i);
// no manual unlocking
cond.notify_all();
}
void consumer() {
unique_lock<mutex> lockGuard(mtx);
cond.wait(lockGuard, [this] { return !Q.empty(); });
cout << this_thread::get_id();
cout << Q.front() << endl;
Q.pop();
};
private:
mutex mtx;
condition_variable cond;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
unsigned MAX_THREADS = std::thread::hardware_concurrency() - 1;
vector<thread> ThreadVector;
ThreadVector.reserve(MAX_THREADS); // since you know how many, reserve
TestClass testObj;
for(unsigned i = 0; i < MAX_THREADS; i++) {
// here, &testobj
ThreadVector.emplace_back(&TestClass::consumer, &testObj);
cout << "Pool threadID:" << ThreadVector[i].get_id() << endl;
}
// produce MAX_THREADS of things to put in the queue:
for(int i = 0; i < MAX_THREADS; i++) {
testObj.producer(i);
}
for(auto&& t : ThreadVector) {
t.join();
}
}
关于您在评论部分的问题:如果您想保留 consumer
线程 运行 直到您告诉他们退出,您可以添加另一个变量(称为 run
这里)consumer
线程监控。
示例:
#include <condition_variable>
#include <iostream>
#include <queue>
#include <thread>
#include <vector>
using namespace std;
class TestClass {
public:
void producer(int i) {
lock_guard<mutex> lockGuard(mtx);
Q.push(i);
to_pool.notify_one();
}
void consumer() {
while(true) {
unique_lock<mutex> lockGuard(mtx);
to_pool.wait(lockGuard, [this] { return !run || !Q.empty(); });
if(!run) break; // time to quit
cout << this_thread::get_id() << ' ' << Q.front() << endl;
Q.pop();
// Tell producer that we picked one from the queue.
// if it's only interesting to notify when the queue is empty,
// add: if(Q.empty())
to_producer.notify_one();
}
};
void stop() {
lock_guard<mutex> lockGuard(mtx);
run = false; // tell all pool threads to quit
to_pool.notify_all();
}
void wait_for_all_work_to_be_done() {
std::unique_lock<mutex> lg(mtx);
to_producer.wait(lg, [this] { return Q.empty(); });
}
private:
bool run = true;
mutex mtx;
condition_variable to_pool;
condition_variable to_producer;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
unsigned MAX_THREADS = std::thread::hardware_concurrency() - 1;
vector<thread> ThreadVector;
ThreadVector.reserve(MAX_THREADS);
TestClass testObj;
for(unsigned i = 0; i < MAX_THREADS; i++) {
ThreadVector.emplace_back(&TestClass::consumer, &testObj);
cout << "Pool threadID:" << ThreadVector[i].get_id() << endl;
}
for(int i = 0; i < MAX_THREADS / 2; i++) {
testObj.producer(i);
}
testObj.wait_for_all_work_to_be_done();
// stop pool threads
testObj.stop();
for(auto&& t : ThreadVector) t.join();
}
我正在尝试实现单生产者多消费者,但是下面的代码无法编译。 有人可以帮助解决这个错误吗?也可以从这个池中唤醒所有线程并且随机线程能够获取锁吗?
- TIA
` threadPool/main.cpp:4: /Library/Developer/CommandLineTools/usr/bin/../include/c++/v1/thread:364:17:
error: no matching constructor for initialization of '_Gp' (aka 'tuple<unique_ptrstd::__1::__thread_struct, void (TestClass::*)(), TestClass>') new _Gp(std::move(__tsp), ^ ~~~~~~~~~~~~~~~~~
ls/usr/bin/../include/c++/v1/type_traits:2422:12: error: call to implicitly-deleted copy constructor of 'typename
decay::type' (aka 'TestClass') return _VSTD::forward<_Tp>(__t);
----------------------------------------------
#include <iostream>
#include <vector>
#include <queue>
#include <thread>
#include <condition_variable>
using namespace std;
class TestClass{
public:
void producer(int i) {
unique_lock<mutex> lockGuard(mtx);
Q.push(i);
lockGuard.unlock();
cond.notify_all();
}
void consumer() {
unique_lock<mutex> lockGuard(mtx);
cond.wait(lockGuard, [this]() {
return !Q.empty();
});
cout<<this_thread::get_id();
cout<<Q.front()<<endl;
Q.pop();
lockGuard.unlock();
};
private:
mutex mtx;
condition_variable cond;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
int MAX_THREADS = std::thread::hardware_concurrency()-1;
vector<thread> ThreadVector;
TestClass testObj;
for(int i=0; i<MAX_THREADS; i++){
ThreadVector.emplace_back(&TestClass::consumer, std::move(testObj));
cout<<"Pool threadID:" <<ThreadVector[i].get_id()<<endl;
}
TestClass testObj2;
for(int i=0; i<10; i++) {
testObj.producer(i);
}
for(auto &&t : ThreadVector) {
t.join();
}
return 0;
}
`
Another version to call threads
int main()
{
std::vector<std::thread> vecOfThreads;
std::function<void(TestClass&)> func = [&](TestClass &obj) {
while(1) {
obj.consumer();
}
};
unsigned MAX_THREADS = std::thread::hardware_concurrency()-1;
TestClass obj;
for(int i=0; i<MAX_THREADS; i++) {
std::thread th1(func, std::ref(obj));
vecOfThreads.emplace_back(std::move(th1));
}
TestClass prod;
for(int i=0; i<10; i++) {
prod.producer(i);
}
for (std::thread & th : vecOfThreads)
{
if (th.joinable())
th.join();
}
return 0;
}
std::move(testObj)
应该是&testObj
(指向调用consumer
的对象的指针)- 或std::ref(testobj)
(变成reference_wrapper
(持有也是指向该对象的指针)。- 您应该至少调用
produce
的次数与线程数一样多,否则程序将无法完成。 - 您不需要手动
unlock
。守卫离开范围时会自动解锁。
示例:
class TestClass {
public:
void producer(int i) {
lock_guard<mutex> lockGuard(mtx); // here a lock_guard is enough
Q.push(i);
// no manual unlocking
cond.notify_all();
}
void consumer() {
unique_lock<mutex> lockGuard(mtx);
cond.wait(lockGuard, [this] { return !Q.empty(); });
cout << this_thread::get_id();
cout << Q.front() << endl;
Q.pop();
};
private:
mutex mtx;
condition_variable cond;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
unsigned MAX_THREADS = std::thread::hardware_concurrency() - 1;
vector<thread> ThreadVector;
ThreadVector.reserve(MAX_THREADS); // since you know how many, reserve
TestClass testObj;
for(unsigned i = 0; i < MAX_THREADS; i++) {
// here, &testobj
ThreadVector.emplace_back(&TestClass::consumer, &testObj);
cout << "Pool threadID:" << ThreadVector[i].get_id() << endl;
}
// produce MAX_THREADS of things to put in the queue:
for(int i = 0; i < MAX_THREADS; i++) {
testObj.producer(i);
}
for(auto&& t : ThreadVector) {
t.join();
}
}
关于您在评论部分的问题:如果您想保留 consumer
线程 运行 直到您告诉他们退出,您可以添加另一个变量(称为 run
这里)consumer
线程监控。
示例:
#include <condition_variable>
#include <iostream>
#include <queue>
#include <thread>
#include <vector>
using namespace std;
class TestClass {
public:
void producer(int i) {
lock_guard<mutex> lockGuard(mtx);
Q.push(i);
to_pool.notify_one();
}
void consumer() {
while(true) {
unique_lock<mutex> lockGuard(mtx);
to_pool.wait(lockGuard, [this] { return !run || !Q.empty(); });
if(!run) break; // time to quit
cout << this_thread::get_id() << ' ' << Q.front() << endl;
Q.pop();
// Tell producer that we picked one from the queue.
// if it's only interesting to notify when the queue is empty,
// add: if(Q.empty())
to_producer.notify_one();
}
};
void stop() {
lock_guard<mutex> lockGuard(mtx);
run = false; // tell all pool threads to quit
to_pool.notify_all();
}
void wait_for_all_work_to_be_done() {
std::unique_lock<mutex> lg(mtx);
to_producer.wait(lg, [this] { return Q.empty(); });
}
private:
bool run = true;
mutex mtx;
condition_variable to_pool;
condition_variable to_producer;
queue<int> Q;
};
int main() {
std::cout << "Hello, World!" << std::endl;
unsigned MAX_THREADS = std::thread::hardware_concurrency() - 1;
vector<thread> ThreadVector;
ThreadVector.reserve(MAX_THREADS);
TestClass testObj;
for(unsigned i = 0; i < MAX_THREADS; i++) {
ThreadVector.emplace_back(&TestClass::consumer, &testObj);
cout << "Pool threadID:" << ThreadVector[i].get_id() << endl;
}
for(int i = 0; i < MAX_THREADS / 2; i++) {
testObj.producer(i);
}
testObj.wait_for_all_work_to_be_done();
// stop pool threads
testObj.stop();
for(auto&& t : ThreadVector) t.join();
}