在 C++ 线程池中打印了重复的数字
A duplicate number is printed in the C++ threadpool
threadpool.h
#ifndef THREADPOOL
#define THREADPOOL
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <list>
#include <condition_variable>
template <typename T>
class Threadpool
{
public:
static int nums;
Threadpool() : _threadSize(8), _queueMaxsize(100)
{
}
Threadpool(int threadSize, int queueMaxsize) : _threadSize(threadSize), _queueMaxsize(queueMaxsize), m_stop(false)
{
for(int i=0; i<_threadSize; ++i)
{
threads.push_back(std::thread(worker,this,i));
}
for (int i = 0; i < _threadSize; ++i)
{
threads[i].detach();
}
}
bool append(T * request)
{
if (workQueue.size() > _queueMaxsize)
{
return 0;
}
std::unique_lock<std::mutex> guard(queueLock);
workQueue.push_back(request);
//request->process();
guard.unlock();
m_cond.notify_one();
}
void printQueueSize()
{
std::cout<<workQueue.size()<<std::endl;
}
private:
static void worker(void * args, int threadId)
{
Threadpool *threadpool=(Threadpool*) args;
threadpool->run(threadId);
}
void run(int threadId)
{
while(!m_stop)
{
std::unique_lock<std::mutex> lk(queueLock);
m_cond.wait(lk,[this]{return !this->workQueue.empty();});
T* request = workQueue.front();
workQueue.pop_front();
std::cout<<"thread"<<threadId<<" process ";
request->process();
//std::cout<<request->_s<<std::endl;
this->nums++;
}
}
private:
int _threadSize;
int _queueMaxsize;
std::vector<std::thread> threads;
std::mutex queueLock;
std::condition_variable m_cond;
std::list<T *> workQueue;
bool m_stop;
};
template<class T>
int Threadpool<T>::nums = 0;
#endif // THREADPOOL
test.cpp
#include<iostream>
#include<string>
#include "threadpool.h"
class Request
{
public:
explicit Request(std::string s):_s(s){}
void process()
{
printf("%s\n",_s.c_str());
fflush(stdout);
}
// private:
std::string _s;
};
int main()
{
Threadpool<Request> threadpool(10,100);
for(int i =0; i<100; i++)
{
std::string str = std::string("job"+std::to_string(i));
//std::cout<<str<<std::endl;
Request r1 (str);
//r1.process();
threadpool.append(&r1);
//std::this_thread::sleep_for(std::chrono::nanoseconds(50));
}
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout<< Threadpool<Request>::nums<<std::endl;
// std::cout<<"fuck"<<std::endl
threadpool.printQueueSize();
return 0;
}
我想创建一个线程池并打印 1 -> 1000 之间的数字。但是,结果不是我所期望的。
结果如下所示,
thread0 process job2
thread0 process job2
thread1 process job5
thread1 process job5
thread1 process job5
thread3 process job6
thread6 process job9
thread6 process job9
thread6 process job9
thread9 process job14
thread9 process job14
thread9 process job14
thread9 process job14
thread9 process job14
thread0 process job15
thread6 process job19
thread6 process job19
thread6 process job19
thread6 process job19
thread8 process job21
thread4 process job21
thread8 process job22
thread3 process job23
thread1 process job25
thread1 process job25
thread6 process job28
thread6 process job28
thread6 process job28
thread6 process job28
thread4 process job30
thread8 process job31
thread9 process job32
thread0 process job35
thread0 process job35
thread0 process job35
thread6 process job37
thread6 process job37
thread4 process job39
thread4 process job39
thread2 process job41
thread2 process job41
thread0 process job45
thread0 process job45
thread5 process job45
thread5 process job45
thread5 process job45
thread5 process job46
thread3 process job47
thread0 process job49
thread8 process job50
thread2 process job51
thread9 process job53
thread9 process job53
thread5 process job54
thread6 process job55
thread1 process job56
thread4 process job57
thread3 process job58
thread0 process job59
thread8 process job60
thread2 process job61
thread9 process job62
thread5 process job63
thread7 process job64
thread6 process job65
thread1 process job66
thread4 process job67
thread3 process job68
thread0 process job69
thread8 process job70
thread2 process job71
thread9 process job72
thread5 process job73
thread7 process job74
thread6 process job75
thread1 process job76
thread3 process job78
thread3 process job78
thread8 process job80
thread8 process job80
thread9 process job82
thread9 process job82
thread9 process job82
thread9 process job83
thread7 process job84
thread1 process job87
thread1 process job87
thread0 process job89
thread0 process job89
thread0 process job89
thread2 process job91
thread9 process job93
thread9 process job93
thread7 process job96
thread7 process job96
thread7 process job96
thread3 process job98
thread3 process job98
thread2 process job99
thread2 process job99
100
0
我已经检查了队列的大小。它是空的。是stdout的缓存问题还是我的线程池有问题?
您的代码中有 UB,您通过引用捕获 r1
,但局部变量在下一次迭代时超出范围,通过引用未定义行为进行访问。
由于编译器很可能会在 r1
秒内重用存储,因此 process
中的 printf("%s\n",_s.c_str());
将使用循环同时写入的当前值(=> 竞争条件) .
这就是为什么你会得到一些重复的值。你很幸运,虽然没有段错误。同样,所有这些都是未定义的。
我建议您将 Request
对象直接存储在 workQueue
中,对 append
使用移动语义。
每当您处理多线程时,请尝试使用线程消毒器 - 将 -fsanitize=thread
添加到 gcc 和 clang。对于此类错误,它确实是一个有价值的工具。
threadpool.h
#ifndef THREADPOOL
#define THREADPOOL
#include <iostream>
#include <thread>
#include <vector>
#include <mutex>
#include <list>
#include <condition_variable>
template <typename T>
class Threadpool
{
public:
static int nums;
Threadpool() : _threadSize(8), _queueMaxsize(100)
{
}
Threadpool(int threadSize, int queueMaxsize) : _threadSize(threadSize), _queueMaxsize(queueMaxsize), m_stop(false)
{
for(int i=0; i<_threadSize; ++i)
{
threads.push_back(std::thread(worker,this,i));
}
for (int i = 0; i < _threadSize; ++i)
{
threads[i].detach();
}
}
bool append(T * request)
{
if (workQueue.size() > _queueMaxsize)
{
return 0;
}
std::unique_lock<std::mutex> guard(queueLock);
workQueue.push_back(request);
//request->process();
guard.unlock();
m_cond.notify_one();
}
void printQueueSize()
{
std::cout<<workQueue.size()<<std::endl;
}
private:
static void worker(void * args, int threadId)
{
Threadpool *threadpool=(Threadpool*) args;
threadpool->run(threadId);
}
void run(int threadId)
{
while(!m_stop)
{
std::unique_lock<std::mutex> lk(queueLock);
m_cond.wait(lk,[this]{return !this->workQueue.empty();});
T* request = workQueue.front();
workQueue.pop_front();
std::cout<<"thread"<<threadId<<" process ";
request->process();
//std::cout<<request->_s<<std::endl;
this->nums++;
}
}
private:
int _threadSize;
int _queueMaxsize;
std::vector<std::thread> threads;
std::mutex queueLock;
std::condition_variable m_cond;
std::list<T *> workQueue;
bool m_stop;
};
template<class T>
int Threadpool<T>::nums = 0;
#endif // THREADPOOL
test.cpp
#include<iostream>
#include<string>
#include "threadpool.h"
class Request
{
public:
explicit Request(std::string s):_s(s){}
void process()
{
printf("%s\n",_s.c_str());
fflush(stdout);
}
// private:
std::string _s;
};
int main()
{
Threadpool<Request> threadpool(10,100);
for(int i =0; i<100; i++)
{
std::string str = std::string("job"+std::to_string(i));
//std::cout<<str<<std::endl;
Request r1 (str);
//r1.process();
threadpool.append(&r1);
//std::this_thread::sleep_for(std::chrono::nanoseconds(50));
}
std::this_thread::sleep_for(std::chrono::seconds(3));
std::cout<< Threadpool<Request>::nums<<std::endl;
// std::cout<<"fuck"<<std::endl
threadpool.printQueueSize();
return 0;
}
我想创建一个线程池并打印 1 -> 1000 之间的数字。但是,结果不是我所期望的。 结果如下所示,
thread0 process job2
thread0 process job2
thread1 process job5
thread1 process job5
thread1 process job5
thread3 process job6
thread6 process job9
thread6 process job9
thread6 process job9
thread9 process job14
thread9 process job14
thread9 process job14
thread9 process job14
thread9 process job14
thread0 process job15
thread6 process job19
thread6 process job19
thread6 process job19
thread6 process job19
thread8 process job21
thread4 process job21
thread8 process job22
thread3 process job23
thread1 process job25
thread1 process job25
thread6 process job28
thread6 process job28
thread6 process job28
thread6 process job28
thread4 process job30
thread8 process job31
thread9 process job32
thread0 process job35
thread0 process job35
thread0 process job35
thread6 process job37
thread6 process job37
thread4 process job39
thread4 process job39
thread2 process job41
thread2 process job41
thread0 process job45
thread0 process job45
thread5 process job45
thread5 process job45
thread5 process job45
thread5 process job46
thread3 process job47
thread0 process job49
thread8 process job50
thread2 process job51
thread9 process job53
thread9 process job53
thread5 process job54
thread6 process job55
thread1 process job56
thread4 process job57
thread3 process job58
thread0 process job59
thread8 process job60
thread2 process job61
thread9 process job62
thread5 process job63
thread7 process job64
thread6 process job65
thread1 process job66
thread4 process job67
thread3 process job68
thread0 process job69
thread8 process job70
thread2 process job71
thread9 process job72
thread5 process job73
thread7 process job74
thread6 process job75
thread1 process job76
thread3 process job78
thread3 process job78
thread8 process job80
thread8 process job80
thread9 process job82
thread9 process job82
thread9 process job82
thread9 process job83
thread7 process job84
thread1 process job87
thread1 process job87
thread0 process job89
thread0 process job89
thread0 process job89
thread2 process job91
thread9 process job93
thread9 process job93
thread7 process job96
thread7 process job96
thread7 process job96
thread3 process job98
thread3 process job98
thread2 process job99
thread2 process job99
100
0
我已经检查了队列的大小。它是空的。是stdout的缓存问题还是我的线程池有问题?
您的代码中有 UB,您通过引用捕获 r1
,但局部变量在下一次迭代时超出范围,通过引用未定义行为进行访问。
由于编译器很可能会在 r1
秒内重用存储,因此 process
中的 printf("%s\n",_s.c_str());
将使用循环同时写入的当前值(=> 竞争条件) .
这就是为什么你会得到一些重复的值。你很幸运,虽然没有段错误。同样,所有这些都是未定义的。
我建议您将 Request
对象直接存储在 workQueue
中,对 append
使用移动语义。
每当您处理多线程时,请尝试使用线程消毒器 - 将 -fsanitize=thread
添加到 gcc 和 clang。对于此类错误,它确实是一个有价值的工具。