根据英特尔博客实施 concurrent_vector
Implementing concurrent_vector according to intel blog
我正在尝试实现一个线程安全的无锁容器,类似于 std::vector,根据这个 https://software.intel.com/en-us/blogs/2008/07/24/tbbconcurrent_vector-secrets-of-memory-organization
据我了解,为了防止重新分配和使所有线程上的所有迭代器失效,他们添加了新的连续块,而不是单个连续数组。
他们添加的每个块的大小都是 2 的递增幂,因此他们可以使用 log(index) 找到 [index] 处的项目应该位于的正确段。
据我所知,他们有一个指向段的静态指针数组,因此他们可以快速访问它们,但是他们不知道用户想要多少段,所以他们做了一个小的初始段,如果段数量超过了当前计数,他们分配了一个巨大的段并切换到使用那个。
问题是,无法以无锁线程安全的方式添加新段,或者至少我还没有弄清楚该怎么做。我可以自动增加当前大小,但仅限于此。
而且从小段指针数组切换到大段指针数组涉及大量分配和内存复制,所以我不明白他们是怎么做到的。
他们在网上发布了一些代码,但是所有重要的功能都没有可用的源代码,它们在他们的线程构建块 DLL 中。下面是一些演示该问题的代码:
template<typename T>
class concurrent_vector
{
private:
int size = 0;
int lastSegmentIndex = 0;
union
{
T* segmentsSmall[3];
T** segmentsLarge;
};
void switch_to_large()
{
//Bunch of allocations, creates a T* segmentsLarge[32] basically and reassigns all old entries into it
}
public:
concurrent_vector()
{
//The initial array is contiguous just for the sake of cache optimization
T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
segmentsSmall[0] = initialContiguousBlock;
segmentsSmall[1] = initialContiguousBlock + 2;
segmentsSmall[2] = initialContiguousBlock + 2 + 4;
}
void push_back(T& item)
{
if(size > 2 + 4 + 8)
{
switch_to_large(); //This is the problem part, there is no possible way to make this thread-safe without a mutex lock. I don't understand how Intel does it. It includes a bunch of allocations and memory copies.
}
InterlockedIncrement(&size); //Ok, so size is atomically increased
//afterwards adds the item to the appropriate slot in the appropriate segment
}
};
我不会尝试将 segmentsLarge
和 segmentsSmall
合并。是的,这又浪费了一个指针。然后指针,让我们称之为 segments
可以最初指向 segmentsSmall.
另一方面,其他方法总是可以使用相同的指针,这使它们更简单。
并且从小到大的切换可以通过指针的一次比较交换来完成。
我不确定如何通过工会安全地完成这项工作。
这个想法看起来像这样(请注意,我使用了 C++11,Intel 库早于它,所以他们很可能用他们的原子内在函数来实现)。
这可能遗漏了很多细节,我相信英特尔人员已经考虑得更多,因此您可能需要对照所有其他方法的实现来检查这一点。
#include <atomic>
#include <array>
#include <cstddef>
#include <climits>
template<typename T>
class concurrent_vector
{
private:
std::atomic<size_t> size;
std::atomic<T**> segments;
std::array<T*, 3> segmentsSmall;
unsigned lastSegmentIndex = 0;
void switch_to_large()
{
T** segmentsOld = segments;
if( segmentsOld == segmentsSmall.data()) {
// not yet switched
T** segmentsLarge = new T*[sizeof(size_t) * CHAR_BIT];
// note that we leave the original segment allocations alone and just copy the pointers
std::copy(segmentsSmall.begin(), segmentsSmall.end(), segmentsLarge);
for(unsigned i = segmentsSmall.size(); i < numSegments; ++i) {
segmentsLarge[i] = nullptr;
}
// now both the old and the new segments array are valid
if( segments.compare_exchange_strong(segmentsOld, segmentsLarge)) {
// success!
return;
} else {
// already switched, just clean up
delete[] segmentsLarge;
}
}
}
public:
concurrent_vector() : size(0), segments(segmentsSmall.data())
{
//The initial array is contiguous just for the sake of cache optimization
T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
segmentsSmall[0] = initialContiguousBlock;
segmentsSmall[1] = initialContiguousBlock + 2;
segmentsSmall[2] = initialContiguousBlock + 2 + 4;
}
void push_back(T& item)
{
if(size > 2 + 4 + 8) {
switch_to_large();
}
// here we may have to allocate more segments atomically
++size;
//afterwards adds the item to the appropriate slot in the appropriate segment
}
};
我正在尝试实现一个线程安全的无锁容器,类似于 std::vector,根据这个 https://software.intel.com/en-us/blogs/2008/07/24/tbbconcurrent_vector-secrets-of-memory-organization
据我了解,为了防止重新分配和使所有线程上的所有迭代器失效,他们添加了新的连续块,而不是单个连续数组。
他们添加的每个块的大小都是 2 的递增幂,因此他们可以使用 log(index) 找到 [index] 处的项目应该位于的正确段。
据我所知,他们有一个指向段的静态指针数组,因此他们可以快速访问它们,但是他们不知道用户想要多少段,所以他们做了一个小的初始段,如果段数量超过了当前计数,他们分配了一个巨大的段并切换到使用那个。
问题是,无法以无锁线程安全的方式添加新段,或者至少我还没有弄清楚该怎么做。我可以自动增加当前大小,但仅限于此。
而且从小段指针数组切换到大段指针数组涉及大量分配和内存复制,所以我不明白他们是怎么做到的。
他们在网上发布了一些代码,但是所有重要的功能都没有可用的源代码,它们在他们的线程构建块 DLL 中。下面是一些演示该问题的代码:
template<typename T>
class concurrent_vector
{
private:
int size = 0;
int lastSegmentIndex = 0;
union
{
T* segmentsSmall[3];
T** segmentsLarge;
};
void switch_to_large()
{
//Bunch of allocations, creates a T* segmentsLarge[32] basically and reassigns all old entries into it
}
public:
concurrent_vector()
{
//The initial array is contiguous just for the sake of cache optimization
T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
segmentsSmall[0] = initialContiguousBlock;
segmentsSmall[1] = initialContiguousBlock + 2;
segmentsSmall[2] = initialContiguousBlock + 2 + 4;
}
void push_back(T& item)
{
if(size > 2 + 4 + 8)
{
switch_to_large(); //This is the problem part, there is no possible way to make this thread-safe without a mutex lock. I don't understand how Intel does it. It includes a bunch of allocations and memory copies.
}
InterlockedIncrement(&size); //Ok, so size is atomically increased
//afterwards adds the item to the appropriate slot in the appropriate segment
}
};
我不会尝试将 segmentsLarge
和 segmentsSmall
合并。是的,这又浪费了一个指针。然后指针,让我们称之为 segments
可以最初指向 segmentsSmall.
另一方面,其他方法总是可以使用相同的指针,这使它们更简单。
并且从小到大的切换可以通过指针的一次比较交换来完成。
我不确定如何通过工会安全地完成这项工作。
这个想法看起来像这样(请注意,我使用了 C++11,Intel 库早于它,所以他们很可能用他们的原子内在函数来实现)。 这可能遗漏了很多细节,我相信英特尔人员已经考虑得更多,因此您可能需要对照所有其他方法的实现来检查这一点。
#include <atomic>
#include <array>
#include <cstddef>
#include <climits>
template<typename T>
class concurrent_vector
{
private:
std::atomic<size_t> size;
std::atomic<T**> segments;
std::array<T*, 3> segmentsSmall;
unsigned lastSegmentIndex = 0;
void switch_to_large()
{
T** segmentsOld = segments;
if( segmentsOld == segmentsSmall.data()) {
// not yet switched
T** segmentsLarge = new T*[sizeof(size_t) * CHAR_BIT];
// note that we leave the original segment allocations alone and just copy the pointers
std::copy(segmentsSmall.begin(), segmentsSmall.end(), segmentsLarge);
for(unsigned i = segmentsSmall.size(); i < numSegments; ++i) {
segmentsLarge[i] = nullptr;
}
// now both the old and the new segments array are valid
if( segments.compare_exchange_strong(segmentsOld, segmentsLarge)) {
// success!
return;
} else {
// already switched, just clean up
delete[] segmentsLarge;
}
}
}
public:
concurrent_vector() : size(0), segments(segmentsSmall.data())
{
//The initial array is contiguous just for the sake of cache optimization
T* initialContiguousBlock = new T[2 + 4 + 8]; //2^1 + 2^2 + 2^3
segmentsSmall[0] = initialContiguousBlock;
segmentsSmall[1] = initialContiguousBlock + 2;
segmentsSmall[2] = initialContiguousBlock + 2 + 4;
}
void push_back(T& item)
{
if(size > 2 + 4 + 8) {
switch_to_large();
}
// here we may have to allocate more segments atomically
++size;
//afterwards adds the item to the appropriate slot in the appropriate segment
}
};