c++ 中的多线程尝试-它在做我想做的事吗?
Multithreading attempt in c++- is it doing what I want?
我试图创建一个线程池 - 我有一个数据样本列表,我想 运行 每个样本的函数。我想调用所有线程以使总时间更快地处理所有样本。每个样本都在不同的线程上完成的想法,首先从 Event_1-->thread_1 的线程池中选择线程,然后一旦所有线程都被使用,它就会找到下一个可用线程并使用它....至少那是计划....
似乎 运行 但我对所有样本 运行 花费的总时间没有太大变化。我不期望每个样本的时间会有所改善,但总时间应该会有所改善吗?我是不是做错了什么?
我目前有这个线程池功能:
class ThreadPool {
private:
class ThreadPooler {
private:
int n_id;
ThreadPool *n_pool;
public:
ThreadPooler(ThreadPool *pool, const int id) : n_pool(pool), n_id(id) {}
void operator()() {
std::function<void()> func;
bool dequeued;
while (!n_pool->shutdown) {
{
std::unique_lock<std::mutex> lock(n_pool->n_conditional_mutex);
if (n_pool->queue.empty()) {
n_pool->n_conditional_lock.wait(lock);
}
dequeued = n_pool->queue.dequeue(func);
}
if (dequeued) {
func();
}
}
}
};
public:
bool shutdown;
std::vector<std::thread> n_threads;
std::mutex n_conditional_mutex;
std::condition_variable n_conditional_lock;
Queue<std::function<void()>> queue;
ThreadPool(const int N_threads) : n_threads(std::vector<std::thread>(N_threads)), shutdown(false) {}
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool & operator=(const ThreadPool &) = delete;
ThreadPool & operator=(ThreadPool &&) = delete;
void InitThreadPool() {
cout<<"Initializing "<<n_threads.size()<<" threads "<<endl;
for (int i = 0; i < n_threads.size(); ++i) {
n_threads[i] = std::thread(ThreadPooler(this, i));
}
}
void ShutdownThreadPool() {
shutdown = true;
n_conditional_lock.notify_all();
cout<<"Shutting Down "<<n_threads.size()<<" threads "<<endl;
for (int i = 0; i < n_threads.size(); ++i) {
if(n_threads[i].joinable()) {
n_threads[i].join();
}
}
}
template<typename F, typename...Args>
auto SubmitToThreadPool(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); //wrapper
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> wrapper_func = [task_ptr]() {
(*task_ptr)();
};
queue.enqueue(wrapper_func);
n_conditional_lock.notify_one();
return task_ptr->get_future();
}
}; //end thread pooler
还有以下形式的队列包装器:
template <typename T>
class Queue {
private:
std::queue<T> i_queue;
std::mutex mtx;
public:
Queue() {}
~Queue() {}
bool empty() {
std::unique_lock<std::mutex> lock(mtx);
return i_queue.empty();
}
int size() {
std::unique_lock<std::mutex> lock(mtx);
return i_queue.size();
}
void enqueue(T& t) {
std::unique_lock<std::mutex> lock(mtx);
i_queue.push(t);
}
bool dequeue(T& t) {
std::unique_lock<std::mutex> lock(mtx);
if (i_queue.empty()) {
return false;
}
t = std::move(i_queue.front());
i_queue.pop();
return true;
}
};
我 运行 在我的主要代码中这样:
ThreadPool Pool(N_THREADS);
Pool.InitThreadPool();
auto startT0 = chrono::high_resolution_clock::now();
for(unsigned s=0; s<DataList.size()-1; s++){//
std::future<void> Future = Pool.SubmitToThreadPool(GetDMWPulses, s, DataList[s], time_info,writefile, factor);
Future.get();
}//end s
Pool.ShutdownThreadPool();
auto stop = chrono::high_resolution_clock::now();
auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0);
这是 DMWPulses 函数:
void GetDMWPulses(int s, STMDataPacket DataList, bool time_info, bool writefile, float factor){
unsigned int nDigits = DataList.datasample.channel[0].ADCList.size();
STMDataProduct pulse;
vector<float> hist,A, MWD, T;
hist.assign(nDigits,0);
A.assign(nDigits,0);
MWD.assign(nDigits,0);
T.assign(nDigits,0);
for(unsigned j=1;j<nDigits-1; j++){
hist[j] =(DataList.datasample.channel[0].ADCList[j]);
}
for(unsigned n=1;n<nDigits-1; n++){
A[n]= ((hist[n]-factor*hist[n-1]+A[n-1]));
}
for(unsigned k = m+2; k < nDigits-2; k++){
MWD[k]=((A[k]-A[k-m]));
}
for(unsigned h = m+l+2; h< nDigits-2; h++){
for (unsigned p = h-l-1; p <h-2 ; p++){
T[h] += ((MWD[p]))/l;
}
if(writefile){outputTfile<<" "<<T[h]<<" "<<"\n";}
}
float maximum_height = *std::max_element(T.begin(), T.end());
int maxElementIndex = std::max_element(T.begin(),T.end()) - T.begin();
float peak_time = DataList.datasample.channel[0].ADCList.at(maxElementIndex);
pulse.SetPulseTime(peak_time) ;
pulse.SetPulseHeight(maximum_height);
pulses.push_back(pulse);
hist.clear();
A.clear();
MWD.clear();
T.clear();
}//End Algorithim Function
Future.get()
阻塞,直到打包任务完成。在上一个样本完全处理后,您只需为下一个样本调用 SubmitToThreadPool
。完全没有并行性,一切都是顺序进行的。
我试图创建一个线程池 - 我有一个数据样本列表,我想 运行 每个样本的函数。我想调用所有线程以使总时间更快地处理所有样本。每个样本都在不同的线程上完成的想法,首先从 Event_1-->thread_1 的线程池中选择线程,然后一旦所有线程都被使用,它就会找到下一个可用线程并使用它....至少那是计划....
似乎 运行 但我对所有样本 运行 花费的总时间没有太大变化。我不期望每个样本的时间会有所改善,但总时间应该会有所改善吗?我是不是做错了什么?
我目前有这个线程池功能:
class ThreadPool {
private:
class ThreadPooler {
private:
int n_id;
ThreadPool *n_pool;
public:
ThreadPooler(ThreadPool *pool, const int id) : n_pool(pool), n_id(id) {}
void operator()() {
std::function<void()> func;
bool dequeued;
while (!n_pool->shutdown) {
{
std::unique_lock<std::mutex> lock(n_pool->n_conditional_mutex);
if (n_pool->queue.empty()) {
n_pool->n_conditional_lock.wait(lock);
}
dequeued = n_pool->queue.dequeue(func);
}
if (dequeued) {
func();
}
}
}
};
public:
bool shutdown;
std::vector<std::thread> n_threads;
std::mutex n_conditional_mutex;
std::condition_variable n_conditional_lock;
Queue<std::function<void()>> queue;
ThreadPool(const int N_threads) : n_threads(std::vector<std::thread>(N_threads)), shutdown(false) {}
ThreadPool(const ThreadPool &) = delete;
ThreadPool(ThreadPool &&) = delete;
ThreadPool & operator=(const ThreadPool &) = delete;
ThreadPool & operator=(ThreadPool &&) = delete;
void InitThreadPool() {
cout<<"Initializing "<<n_threads.size()<<" threads "<<endl;
for (int i = 0; i < n_threads.size(); ++i) {
n_threads[i] = std::thread(ThreadPooler(this, i));
}
}
void ShutdownThreadPool() {
shutdown = true;
n_conditional_lock.notify_all();
cout<<"Shutting Down "<<n_threads.size()<<" threads "<<endl;
for (int i = 0; i < n_threads.size(); ++i) {
if(n_threads[i].joinable()) {
n_threads[i].join();
}
}
}
template<typename F, typename...Args>
auto SubmitToThreadPool(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {
std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); //wrapper
auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
std::function<void()> wrapper_func = [task_ptr]() {
(*task_ptr)();
};
queue.enqueue(wrapper_func);
n_conditional_lock.notify_one();
return task_ptr->get_future();
}
}; //end thread pooler
还有以下形式的队列包装器:
template <typename T>
class Queue {
private:
std::queue<T> i_queue;
std::mutex mtx;
public:
Queue() {}
~Queue() {}
bool empty() {
std::unique_lock<std::mutex> lock(mtx);
return i_queue.empty();
}
int size() {
std::unique_lock<std::mutex> lock(mtx);
return i_queue.size();
}
void enqueue(T& t) {
std::unique_lock<std::mutex> lock(mtx);
i_queue.push(t);
}
bool dequeue(T& t) {
std::unique_lock<std::mutex> lock(mtx);
if (i_queue.empty()) {
return false;
}
t = std::move(i_queue.front());
i_queue.pop();
return true;
}
};
我 运行 在我的主要代码中这样:
ThreadPool Pool(N_THREADS);
Pool.InitThreadPool();
auto startT0 = chrono::high_resolution_clock::now();
for(unsigned s=0; s<DataList.size()-1; s++){//
std::future<void> Future = Pool.SubmitToThreadPool(GetDMWPulses, s, DataList[s], time_info,writefile, factor);
Future.get();
}//end s
Pool.ShutdownThreadPool();
auto stop = chrono::high_resolution_clock::now();
auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0);
这是 DMWPulses 函数:
void GetDMWPulses(int s, STMDataPacket DataList, bool time_info, bool writefile, float factor){
unsigned int nDigits = DataList.datasample.channel[0].ADCList.size();
STMDataProduct pulse;
vector<float> hist,A, MWD, T;
hist.assign(nDigits,0);
A.assign(nDigits,0);
MWD.assign(nDigits,0);
T.assign(nDigits,0);
for(unsigned j=1;j<nDigits-1; j++){
hist[j] =(DataList.datasample.channel[0].ADCList[j]);
}
for(unsigned n=1;n<nDigits-1; n++){
A[n]= ((hist[n]-factor*hist[n-1]+A[n-1]));
}
for(unsigned k = m+2; k < nDigits-2; k++){
MWD[k]=((A[k]-A[k-m]));
}
for(unsigned h = m+l+2; h< nDigits-2; h++){
for (unsigned p = h-l-1; p <h-2 ; p++){
T[h] += ((MWD[p]))/l;
}
if(writefile){outputTfile<<" "<<T[h]<<" "<<"\n";}
}
float maximum_height = *std::max_element(T.begin(), T.end());
int maxElementIndex = std::max_element(T.begin(),T.end()) - T.begin();
float peak_time = DataList.datasample.channel[0].ADCList.at(maxElementIndex);
pulse.SetPulseTime(peak_time) ;
pulse.SetPulseHeight(maximum_height);
pulses.push_back(pulse);
hist.clear();
A.clear();
MWD.clear();
T.clear();
}//End Algorithim Function
Future.get()
阻塞,直到打包任务完成。在上一个样本完全处理后,您只需为下一个样本调用 SubmitToThreadPool
。完全没有并行性,一切都是顺序进行的。