同步 push_back 和 std::thread

Synchronise push_back and std::thread

我的代码

void build(std::vector<RKD <DivisionSpace> >& roots, ...) {
  try {
    // using a local lock_guard to lock mtx guarantees unlocking on destruction / exception:
    std::lock_guard<std::mutex> lck (mtx);
    roots.push_back(RKD<DivisionSpace>(...));
  }
  catch (const std::bad_alloc&) {
    std::cout << "[exception caught when constructing tree]\n";
    return;
  }
}

现在,实际工作应该按顺序进行,而不是并行进行。

RKD 的构造函数可以 运行 与 RKD 的其他构造函数并行。但是,将对象推回 std::Vector 是临界区,对吧?

我要构建的对象的数量是已知的。在实践中它会在 [2, 16] 范围内。理论上它可以是任何正数。

此外,我对它们插入容器的顺序不感兴趣。

所以我可以这样做:

RKD tree = RKD(...);
mutex_lock(...);
roots.push_back(tree);

但是这意味着复制,不是吗?

我应该怎么做才能使我的代码并行化?


由于 this 答案,我决定使用锁(而不仅仅是互斥锁)。

Tomasz Lewowski 在他的评论中提出的建议非常简单,并且基于以下观察:std::vector 上的 push_back 可能需要重新分配后备存储和复制(或者,最好是移动)元素。这就构成了一个需要同步的临界区。

对于接下来的例子,假设我们想要一个用前 12 个素数填充的向量,但我们不关心它们的顺序。 (我刚刚在这里对数字进行了硬编码,但假设它们是通过一些并行执行有意义的昂贵计算获得的。)在以下场景中存在危险的竞争条件。

std::vector<int> numbers {};  // an empty vector

// thread A             // thread B             // thread C

numbers.push_back( 2);  numbers.push_back(11);  numbers.push_back(23);
numbers.push_back( 3);  numbers.push_back(13);  numbers.push_back(27);
numbers.push_back( 5);  numbers.push_back(17);  numbers.push_back(29);
numbers.push_back( 7);  numbers.push_back(19);  numbers.push_back(31);

push_back 还有一个问题。如果两个线程同时调用它,它们都将尝试在同一索引处构造一个对象,这可能会带来灾难性的后果。所以在分叉线程之前用 reserve(n) 没有解决问题。

但是,由于您事先知道元素的数量,您可以简单地将它们 分配 std::vector 中的特定位置,而无需更改其大小。如果您不更改大小,则没有关键部分。因此,在以下场景中不存在竞争。

std::vector<int> numbers(12);  // 12 elements initialized with 0

// thread A          // thread B          // thread C

numbers[ 0] =  2;    numbers[ 1] =  3;    numbers[ 2] =  5;
numbers[ 3] =  7;    numbers[ 4] = 11;    numbers[ 5] = 13;
numbers[ 6] = 17;    numbers[ 7] = 19;    numbers[ 8] = 23;
numbers[ 9] = 29;    numbers[10] = 31;    numbers[11] = 37;

当然,如果两个线程都试图写入相同的 索引,竞争将再次出现。幸运的是,在实践中防止这种情况并不困难。如果你的向量有 n 个元素并且你有 p 个线程,线程 i 只写入元素 [i n / p, (i + 1) n / p)。请注意,仅当 j mod p = i 因为它导致更少的缓存失效。所以上面例子中的访问模式是次优的,最好是这样。

std::vector<int> numbers(12);  // 12 elements initialized with 0

// thread A          // thread B          // thread C

numbers[ 0] =  2;    numbers[ 4] = 11;    numbers[ 8] = 23;
numbers[ 1] =  3;    numbers[ 5] = 13;    numbers[ 9] = 29;
numbers[ 2] =  5;    numbers[ 6] = 17;    numbers[10] = 31;
numbers[ 3] =  7;    numbers[ 7] = 19;    numbers[11] = 37;

到目前为止一切顺利。但是,如果您没有 std::vector<int> 而是 std::vector<Foo> 怎么办?如果 Foo 没有默认构造函数,则

std::vector<Foo> numbers(10);

将无效。即使它有一个,创建许多昂贵的默认构造对象只是为了很快重新分配它们而不检索值也是离谱的。

当然,大多数设计良好的类应该有一个非常便宜的默认构造函数。例如,std::string 默认构造为不需要内存分配的空字符串。一个好的实现会将默认构造字符串的成本降低到

std::memset(this, 0, sizeof(std::string));

并且如果编译器足够聪明,可以判断出我们正在分配和初始化整个 std::vector<std::string>(n),它可能能够进一步优化到对

的单个调用
std::calloc(n, sizeof(std::string));

因此,如果您有任何机会可以让 Foo 成为廉价的默认构造和可分配的,您就完成了。但是,如果事实证明这很困难,您可以通过将其移至堆中来避免该问题。智能指针是廉价的默认构造的,所以

std::vector<std::unique_ptr<Foo>> foos(n);

最终会减少到

std::calloc(n, sizeof(std::unique_ptr<Foo>));

无需对 Foo 做任何事情。当然,这种便利是以为每个元素动态分配内存为代价的。

std::vector<std::unique_ptr<Foo>> foos(n);

// thread A                    // thread B                           // thread C

foos[0].reset(new Foo {...});  foos[n / 3 + 0].reset(new Foo {...});  foos[2 * n / 3 + 0].reset(new Foo {...});
foos[1].reset(new Foo {...});  foos[n / 3 + 1].reset(new Foo {...});  foos[2 * n / 3 + 1].reset(new Foo {...});
foos[2].reset(new Foo {...});  foos[n / 3 + 2].reset(new Foo {...});  foos[2 * n / 3 + 2].reset(new Foo {...});
...                            ...                                    ...

这可能没有你想象的那么糟糕,因为虽然动态内存分配不是免费的,但 sizeofstd::unique_ptr 非常小,所以如果 sizeof(Foo) 很大,你获得迭代速度更快的更紧凑向量的好处。当然,这完全取决于您打算如何使用您的数据。

如果您事先不知道元素的确切数量或担心会弄乱索引,还有另一种方法:让每个线程填充自己的向量并在结尾。继续素数的例子,我们得到这个。

std::vector<int> numbersA {};  // private store for thread A
std::vector<int> numbersB {};  // private store for thread B
std::vector<int> numbersC {};  // private store for thread C

// thread A              // thread B              // thread C

numbersA.push_back( 2);  numbersB.push_back(11);  numbersC.push_back(23);
numbersA.push_back( 3);  numbersB.push_back(13);  numbersC.push_back(27);
numbersA.push_back( 5);  numbersB.push_back(17);  numbersC.push_back(29);
numbersA.push_back( 7);  numbersB.push_back(21);  numbersC.push_back(31);

// Back on the main thread after A, B and C are joined:

std::vector<int> numbers(
    numbersA.size() + numbersB.size() + numbersC.size());
auto pos = numbers.begin();
pos = std::move(numbersA.begin(), numbersA.end(), pos);
pos = std::move(numbersB.begin(), numbersB.end(), pos);
pos = std::move(numbersC.begin(), numbersC.end(), pos);
assert(pos == numbers.end());

// Now dispose of numbersA, numbersB and numbersC as soon as possible
// in order to release their no longer needed memory.

(上面代码中使用的std::movethe one from the algorithms library。)

这种方法具有最理想的内存访问模式,因为 numbersAnumbersBnumbersC 正在写入完全独立分配的内存。当然,我们必须完成额外的 顺序 连接中间结果的工作。请注意,效率在很大程度上取决于这样一个事实,即与查找/创建元素的成本相比,移动分配元素的成本可以忽略不计。至少如上所述,代码还假定您的类型具有廉价的默认构造函数。当然,如果你的类型不是这种情况,你可以再次使用智能指针。

我希望这为您提供了足够的想法来优化您的问题。

如果您以前从未使用过智能指针,请查看 “RAII and smart pointers in C++ and check out the standard library's dynamic memory management library。上面显示的技术当然也适用于 std::vector<Foo *>,但我们不再在 modern C++ 中使用这样的拥有资源的原始指针。

问题似乎是你的构造函数做了很多工作,这打破了关于构造和容器插入的各种库约定。

通过将插入与创建解耦来修复它。

下面的代码 非常 类似于 @5gon12eder 建议的代码,只是它 "force" 您不需要更改对象位置。

在我的小演示中

  • 我们使用真正未初始化的原始内存区域(这对于 vector 是不可能的,其中插入意味着初始化),所以而不是 "canonical"

    std::array<RKD, 500> rkd_buffer; // OR
    std::vector<RKD> rkd_buffer(500); // OR even
    std::unique_ptr<RKD[]> rkd_buffer(new RKD[500]);
    

    我们将使用自定义组合:

    std::unique_ptr<RKD[N], decltype(&::free)> rkd_buffer(
        static_cast<RKD(*)[N]>(::malloc(sizeof(RKD) * N)),
        ::free
    );
    
  • 然后我们创建几个线程(示例中有 5 个线程)来构建所有元素。这些项目只是就地构建,它们各自的析构函数将在程序退出时调用

  • 因此,在 rkd_buffer 超出范围之前所有项目都已完全初始化是至关重要的(join 确保了这一点)。
  • 线程可以通过不同的方式同步:构造可以例如通过工作队列分派到线程池,其中可以使用条件变量、承诺、线程屏障(来自 boost)或者甚至只是原子共享计数器来进行协调。

    所有这些选择在本质上与并行构建 运行 的任务无关,所以我将把它留给你的想象(或其他 SO 答案)

Live On Coliru

struct RKD {
    RKD() { this_thread::sleep_for(chrono::milliseconds(rand() % 100)); } // expensive
};

int main() {
    static const int N         = 500;
    static const int ChunkSize = 100;
    std::unique_ptr<RKD[N], decltype(&::free)> rkd_buffer(static_cast<RKD(*)[N]>(::malloc(sizeof(RKD) * N)), ::free);

    vector<thread> group;
    for (int chunk = 0; chunk < N/ChunkSize; chunk += ChunkSize)
        group.emplace_back([&] { 
            for (int i=chunk * ChunkSize; i<(ChunkSize + chunk*ChunkSize); ++i)
                new (rkd_buffer.get() + i) RKD;
        });

    for (auto& t:group) if (t.joinable()) t.join();

    // we are responsible for destructing, since we also took responsibility for construction
    for (RKD& v : *rkd_buffer)
        v.~RKD();
}

可以看到有5个线程划分了500个构造。每个构造需要(平均)~50ms,所以总时间应该是 100*50ms ~= 5s。事实上,这正是发生的事情:

real    0m5.193s
user    0m0.004s
sys 0m0.000s