使用 TBB 在并行循环结束时刷新线程本地缓冲区

flushing thread local buffer at end of parallel loop with TBB

我想并行化一个循环(使用 tbb),其中包含一些昂贵但可向量化的迭代(随机分布)。我的想法是缓冲它们并在达到矢量大小时刷新缓冲区。这样的缓冲区必须是线程本地的。例如,

// dummy for testing
void do_vectorized_work(size_t k, size_t*indices)
{}
// dummy for testing
bool requires_expensive_work(size_t k)
{ return (k&7)==0; }

struct buffer
{
  size_t K=0, B[vector_size];
  void load(size_t i)
  {
    B[K++]=i;
    if(K==vector_size)
      flush();
  }
  void flush()
  {
    do_vectorized_work(K,B);
    K=0;
  }
};

void do_work_in_parallel(size_t N)
{
  tbb::enumerable_thread_specific<buffer> tl_buffer;

  tbb::parallel_for(size_t(0),N,[&](size_t i)
  {
    if(requires_expensive_work(i))
      tl_buffer.local().load(i);
  });
}

但是,这会使缓冲区非空,因此我仍然必须最后一次刷新每个缓冲区

for(auto&b:tl_buffer)
  b.flush();

但这是连载!当然,我也可以尝试并行

using tl_range = typename tbb::enumerable_thread_specific<buffer>::range_type;
tbb::parallel_for(tl_buffer.range(),[](tl_range const&range)
{
  for(auto r:range)
    r->flush();
});

但我不确定这是否有效(因为缓冲区的数量与线程的数量一样多)。我想知道是否有可能在事件发生后避免这种最后的冲洗。 IE。是否可以以每个线程的最终任务是刷新其缓冲区的方式使用 tbb::tasks(替换 tbb::parallel_for)?

不,工作线程没有关于此特定任务是否是给定工作的最后一个任务的完整信息(这就是工作窃取的工作原理)。因此,不可能在 parallel_for 级别或调度程序本身上实现这样的功能。因此,我建议您采用您描述的这两种方法。

尽管如此,您还可以做两件事。

  • 使其异步。 IE。排入一个任务,它将刷新所有内容。这将有助于从主线程的热路径中删除此代码。如果在完成此任务时需要设置任何依赖项,请小心。
  • 使用 tbb::task_scheduler_observer in order to initialize thread-specific data and release it lazily when threads get shut down or when there is no work remains for some time. The latter requires using local observer feature 尚未得到官方支持,但已经稳定了几年。

示例:

#define TBB_PREVIEW_LOCAL_OBSERVER 1
#include <tbb/tbb.h>
#include <assert.h>

typedef void * buffer_t;
const static int bufsz = 1024;
class thread_buffer_allocator: public tbb::task_scheduler_observer {
  tbb::enumerable_thread_specific<buffer_t> _buf;
public:
  thread_buffer_allocator( )
    : tbb::task_scheduler_observer( /*local=*/ true ) {
    observe(true); // activate the observer
  }
  ~thread_buffer_allocator( ) {
    observe(false); // deactivate the observer
    for(auto &b : _buf) {
        printf("destructor: cleared: %p\n", b);
        free(b);
    }
  }
  /*override*/ void on_scheduler_entry( bool worker ) {
    assert(_buf.local() == nullptr);
    _buf.local() = malloc(bufsz);
    printf("on entry: %p\n", _buf.local());
  }
  /*override*/ void on_scheduler_exit( bool worker ) {
    printf("on exit\n");
    if(_buf.local()) {
        printf("on exit: cleared %p\n", _buf.local());
        free(_buf.local());
        _buf.local() = nullptr;
    }
  }
};

int main() {
  thread_buffer_allocator buffers_scope;
  tbb::parallel_for(0, 1024*1024*1024, [&](auto i){
    usleep(i%3);
  });
  return 0;
}

我想到这个可以通过减少来解决。

struct buffer
{
  std::size_t K=0, B[vector_size];
  void load(std::size_t i)
  {
    B[K++]=i;
    if(K==vector_size) flush();
  }
  void flush()
  {
    do_vectorized_work(K,B);
    K=0;
  }
  buffer(buffer const&, tbb::split)
  {}
  void operator()(tbb::block_range<std::size_t> const&range)
  { for(i:range) load(i); }
  bool empty()
  { return K==0; }
  std::size_t pop()
  { return K? B[--K] : 0; }
  void join(buffer&rhs)
  { while(!rhs.empty()) load(rhs.pop()); }
};

void do_work_in_parallel(std::size_t N)
{
  buffer buff;
  tbb::parallel_reduce(tbb::block_range<std::size_t>(0,N,vector_size),buff);
  if(!buff.empty())
    buff.flush();
}