c++ 中的多线程尝试-它在做我想做的事吗?

Multithreading attempt in c++- is it doing what I want?

我试图创建一个线程池 - 我有一个数据样本列表,我想 运行 每个样本的函数。我想调用所有线程以使总时间更快地处理所有样本。每个样本都在不同的线程上完成的想法,首先从 Event_1-->thread_1 的线程池中选择线程,然后一旦所有线程都被使用,它就会找到下一个可用线程并使用它....至少那是计划....

似乎 运行 但我对所有样本 运行 花费的总时间没有太大变化。我不期望每个样本的时间会有所改善,但总时间应该会有所改善吗?我是不是做错了什么?

我目前有这个线程池功能:

class ThreadPool {
private:
  class ThreadPooler {

  private:
    int n_id;
    ThreadPool *n_pool;

  public:
    ThreadPooler(ThreadPool *pool, const int id) : n_pool(pool), n_id(id) {}

     void operator()() {
      std::function<void()> func;
      bool dequeued;
      while (!n_pool->shutdown) {
        {
          std::unique_lock<std::mutex> lock(n_pool->n_conditional_mutex);
          if (n_pool->queue.empty()) {
            n_pool->n_conditional_lock.wait(lock);
          }
          dequeued = n_pool->queue.dequeue(func);
        }
        if (dequeued) {
          func();
        }
      }
    }
  };


public:
  bool shutdown; 
  std::vector<std::thread> n_threads;
  std::mutex n_conditional_mutex;
  std::condition_variable n_conditional_lock;
  Queue<std::function<void()>> queue;
  ThreadPool(const int N_threads) : n_threads(std::vector<std::thread>(N_threads)), shutdown(false) {}

  ThreadPool(const ThreadPool &) = delete;
  ThreadPool(ThreadPool &&) = delete;

  ThreadPool & operator=(const ThreadPool &) = delete;
  ThreadPool & operator=(ThreadPool &&) = delete;

  void InitThreadPool() {
    cout<<"Initializing "<<n_threads.size()<<" threads "<<endl;
    for (int i = 0; i < n_threads.size(); ++i) {
      n_threads[i] = std::thread(ThreadPooler(this, i));
    }
  }
  void ShutdownThreadPool() {
    shutdown = true;
    n_conditional_lock.notify_all();
    cout<<"Shutting Down "<<n_threads.size()<<" threads "<<endl;
    for (int i = 0; i < n_threads.size(); ++i) {
      if(n_threads[i].joinable()) {
        n_threads[i].join();
      }
    }
  }

  template<typename F, typename...Args> 
  auto SubmitToThreadPool(F&& f, Args&&... args) -> std::future<decltype(f(args...))> {

    std::function<decltype(f(args...))()> func = std::bind(std::forward<F>(f), std::forward<Args>(args)...); //wrapper

    auto task_ptr = std::make_shared<std::packaged_task<decltype(f(args...))()>>(func);
    std::function<void()> wrapper_func = [task_ptr]() {
      (*task_ptr)(); 
    };

    queue.enqueue(wrapper_func);

    n_conditional_lock.notify_one();

    return task_ptr->get_future();
  }
}; //end thread pooler

还有以下形式的队列包装器:

template <typename T>
class Queue {
private:
  std::queue<T> i_queue;
  std::mutex mtx;
public:
  Queue() {}
 ~Queue() {}

  bool empty() {
    std::unique_lock<std::mutex> lock(mtx);
    return i_queue.empty();
  }

  int size() {
    std::unique_lock<std::mutex> lock(mtx);
    return i_queue.size();
  }

  void enqueue(T& t) {
    std::unique_lock<std::mutex> lock(mtx);
    i_queue.push(t);
  }

  bool dequeue(T& t) {
    std::unique_lock<std::mutex> lock(mtx);
    if (i_queue.empty()) {
      return false;
    }
    t = std::move(i_queue.front());
    i_queue.pop();
    return true;
  }
};

我 运行 在我的主要代码中这样:

ThreadPool Pool(N_THREADS);
  Pool.InitThreadPool();
  auto startT0 = chrono::high_resolution_clock::now(); 
  for(unsigned s=0; s<DataList.size()-1; s++){//
    std::future<void> Future = Pool.SubmitToThreadPool(GetDMWPulses, s, DataList[s], time_info,writefile, factor);
    Future.get();

  }//end s
  Pool.ShutdownThreadPool();
  auto stop = chrono::high_resolution_clock::now();
  auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0); 

这是 DMWPulses 函数:

void GetDMWPulses(int s, STMDataPacket  DataList,  bool time_info, bool writefile, float factor){

    unsigned int nDigits = DataList.datasample.channel[0].ADCList.size();
    STMDataProduct pulse;
    vector<float>  hist,A, MWD, T;

    hist.assign(nDigits,0);
    A.assign(nDigits,0);
    MWD.assign(nDigits,0);
    T.assign(nDigits,0);

    for(unsigned j=1;j<nDigits-1; j++){      

           hist[j] =(DataList.datasample.channel[0].ADCList[j]);

    } 
     for(unsigned n=1;n<nDigits-1; n++){   
          A[n]= ((hist[n]-factor*hist[n-1]+A[n-1])); 

     }
     for(unsigned k = m+2; k < nDigits-2; k++){
      MWD[k]=((A[k]-A[k-m]));  

     }
     for(unsigned h = m+l+2; h< nDigits-2; h++){ 
       for (unsigned p = h-l-1; p <h-2 ; p++){ 
            T[h] += ((MWD[p]))/l;              
   }
       if(writefile){outputTfile<<" "<<T[h]<<" "<<"\n";}
     }

   float maximum_height = *std::max_element(T.begin(), T.end());
   int maxElementIndex = std::max_element(T.begin(),T.end()) - T.begin();
   float peak_time = DataList.datasample.channel[0].ADCList.at(maxElementIndex);
   pulse.SetPulseTime(peak_time) ;
   pulse.SetPulseHeight(maximum_height);
   pulses.push_back(pulse);

      hist.clear();
      A.clear();
      MWD.clear();
      T.clear(); 

    }//End Algorithim Function

Future.get()阻塞,直到打包任务完成。在上一个样本完全处理后,您只需为下一个样本调用 SubmitToThreadPool。完全没有并行性,一切都是顺序进行的。