如何知道 std::thread 在 C++ 中何时完成?

How to know when a std::thread has finished in c++?

我正在学习如何在 c++11 中使用 std::thread。基本上,我有一个长长的数据列表(想象一个 0-15000 之间的 for 循环)和我正在使用的硬件中的 1568 个线程。我想要一个单独的线程来处理每个样本。我了解如何创建前 1568 个线程,它工作正常。但是一旦到达 N_thread+1 样本,我就想检查是否有任何可用的线程。如果有,将该数据样本发送到该线程。每个线程都被发送到一个互斥锁函数,该函数在最后解锁。也许我误解了线程的工作方式并且不能以这种方式做事?或者也许有更好的 threading/CPU 分配库可以提供帮助?

正如我所说,我可以达到创建 1568 个线程并 运行 并加入的地步,最终结果很好。只需要更多信息。

这是我的主

int main(){
  cout<<"In main"<<endl;
  CSVReaderUpdatedStructure reader("data.csv");
  vector<STMDataPacket> DataList = reader.GetData();

  thread_pool Pool(THREAD_COUNT);
  auto startT0 = chrono::high_resolution_clock::now();
   for(unsigned s=0; s<DataList.size()-1; s++){
      cout<<"analysing sample "<<s<<endl;
      auto done = Pool.add_task([s= s, Sample= DataList[s], t_inf = time_info,wf=writefile, f=factor]{GetDMWPulses(s, Sample, t_inf, wf,f);});
      done.wait();

    }

  auto stop = chrono::high_resolution_clock::now();
  cout<<"pulses "<<pulses.size()<<endl;
  auto duration = chrono::duration_cast<chrono::microseconds>(stop - startT0); 
  cout <<"time for MWD full process = "<< duration.count() <<" microseconds "<< endl;

  return 0;

}

您可能不想要 1568 个线程。也许你想要 1568+ 个任务。

您可能需要一个线程池。 TBB 有一个线程池,几乎在每个平台上都可用。

编写自己的线程池并不难。这是一个草图:

template<class T>
struct threadsafe_queue {
  optional<T> pop() {
    auto l = lock();
    cv.wait( l, [&]{
      return abort || !data.empty();
    });
    if (abort) return {};
    T retval = std::move(data.front());
    data.pop();
    return retval;
  }
  void push( T in ) {
    auto l = lock();
    data.push( std::move(in) );
    cv.notify_one();
  }
  void abort_queue() {
    auto l = lock();
    abort = true;
    cv.notify_all();
  }
private:
  mutable std::mutex m;
  std::condition_variable cv;
  std::queue<T> data;
  bool abort = false;

  std::unique_lock<std::mutex> lock() const {
    return std::unique_lock<std::mutex>(m);
  }
};

struct thread_pool {
  template<class F, class R=typename std::decay< typename std::result_of< F&() >::type>::type>
  auto add_task( F&& f )
  -> std::future< R >
  {
     std::packaged_task<R()> task( std::forward<F>(f) );
     auto retval = task.get_future();
     tasks.push( std::packaged_task<void()>(std::move(task)) );
     return retval;
  }

  void start_thread( std::size_t N=1 )
  {
    if (shutdown) return;
    for (std::size_t i = 0; i < N; ++i)
    {
      threads.emplace_back( [this]{
        while (true)
        {
          if(shutdown) return;
          auto task = tasks.pop();
          if (!task)
            return;
          (*task)();
        }
      } );
    }
  }
  void cleanup() {
    shutdown = true;
    tasks.abort_queue();
    for (auto&& t:threads)
      t.join();
    threads.clear();
  }
  ~thread_pool() {
    cleanup();
  }

  thread_pool():thread_pool( std::thread::hardware_concurrency() ) {}
  explicit thread_pool( std::size_t N ) {
    start_thread(N);
  }
private:
  threadsafe_queue<std::packaged_task<void()>> tasks;
  std::vector<std::thread> threads;
  std::atomic<bool> shutdown = false;
};

现在创建 thread_pool.

将任务推入其中。把期货拿出来。

让工作任务递增 std::atomic<unsigned int> 并等待它达到最大值,或者做一些更有趣的事情。

struct counting_barrier {
  explicit counting_barrier( std::size_t n ):count(n) {}
  void operator--() {
    --count;
    if (count <= 0)
    {
       std::unique_lock<std::mutex> l(m);
       cv.notify_all();
    }
  }
  void wait() {
    std::unique_lock<std::mutex> l(m);
    cv.wait( l, [&]{ return count <= 0; } );
  }
private:
  std::mutex m;
  std::condition_variable cv;
  std::atomic<std::ptrdiff_t> count = 0;
};

创建一个 counting_barrier barrier( 15000 ) 或其他。线程完成后可以 --barrier (它是线程安全的)。主线程可以barrier.wait(),当15000--被调用时会被唤醒

以上代码可能有错别字,但设计合理。对于工业强度用途,您还需要更好的关机程序。

Live example.

如果您没有可选或增强可选,请使用:

template<class T>
struct optional {
  T* get() { return static_cast<T*>( static_cast<void*>( & data ) ); };
  T const* get() const { return static_cast<T*>( static_cast<void*>( & data ) ); };

  T& operator*() & { return *get(); }
  T&& operator*() && { return std::move(*get()); }
  T const& operator*() const & { return *get(); }
  T const&& operator*() const&& { return std::move(*get()); }

  explicit operator bool() const { return engaged; }
  bool has_value() const { return (bool)*this; }
  template< class U >
  T value_or( U&& default_value ) const& {
    if (*this) return **this;
    return std::forward<U>(default_value);
  }
  template< class U >
  T value_or( U&& default_value ) && {
    if (*this) return std::move(**this);
    return std::forward<U>(default_value);
  }

  optional(T const& t) {
    emplace(t);
  }
  optional(T&& t) {
    emplace(std::move(t));
  }
  optional() = default;
  optional(optional const& o) {
    if (o) {
      emplace( *o );
    }
  }
  optional(optional && o) {
    if (o) {
      emplace( std::move(*o) );
    }
  }
  optional& operator=(optional const& o) & {
    if (!o) {
      reset();
    } else if (*this) {
      **this = *o;
    } else {
      emplace( *o );
    }
    return *this;
  }
  optional& operator=(optional && o) & {
    if (!o) {
      reset();
    } else if (*this) {
      **this = std::move(*o);
    } else {
      emplace( std::move(*o) );
    }
    return *this;
  }
  template<class...Args>
  T& emplace(Args&&...args) {
    if (*this) reset();
    ::new( static_cast<void*>(&data) ) T(std::forward<Args>(args)...);
    engaged = true;
    return **this;
  }
  void reset() {
    if (*this) {
      get()->~T();
      engaged = false;
    }
  }
  ~optional() { reset(); }
private:
  using storage = typename std::aligned_storage<sizeof(T), alignof(T)>::type;
  bool engaged = false;
  storage data;
};

请注意,此选项不是工业强度;我从字面上写了它并没有测试它。它缺少许多真正可选的工业强度功能。但是您可以在其位置上放置一个真正的可选项并获得几乎相同或更好的行为,因此如果您缺少一个可以使用它。

counting_barrier barrier(100);
thread_pool p(10);
for (int i = 0; i < 100; ++i)
{
  p.add_task([&barrier,i]{
    std::stringstream ss;
    ss << i << ",";
    std::cout << ss.str();
    --barrier;
  });
}
barrier.wait();
std::cout << "\n";
auto done1 = p.add_task([]{ std::cout << "hello" << std::endl; });
done1.wait();
auto done2 = p.add_task([]{ std::cout << "world" << std::endl; });
done2.wait();