节点预分配向量中的无锁树节点分配

Lock-free Tree Node Assignment in Preallocated Vector of Nodes

我目前正在尝试多线程创建一个,其中包含树的class预分配了一个std::vectorNodes 在特定大小的块中(概念上大小是任意的)。 Node 的额外块仅在必要时创建,这是因为树很快变得非常大,我想避免不断使用 new 运算符以提高时间效率。

这些 Node 的向量定义为:std::vector< std::vector< Node > > nodes

head 跟踪内部向量中的位置,chunkCount 跟踪当前正在使用的外部向量。

矢量在构造函数中调整大小为:

nodes.resize( 1 );
nodes[chunkCount].resize( CHUNK_SIZE );

Node 的简化版本是:

typedef struct Node {
    int val;
    Node* subnodes[5];
} Node;

Node 的创建过程如下:

void TreeClass::createNode( Node* node, short index, int val )
{
    omp_set_lock( &treeLock ); // treeLock belongs to TreeClass
    head++;
    if( head == CHUNK_SIZE ) {
        std::vector< Node > tempNodeVec( CHUNK_SIZE );
        nodes.push_back( tempNodeVec );
        chunkCount++;
        head = 0;
    }
    node->subnodes[index] = &( nodes[chunkCount][head] );
    omp_unset_lock( &treeLock );

    node->subnodes[index]->val = val;
}

这很好用。然而,我担心的是,在创建节点时,除了一个线程外,所有线程都被阻塞了,而且这种情况经常发生,所以很多时间都被阻塞或 locking/unlocking treeLock,因此我希望使这个函数 无锁 但到目前为止我的尝试都失败了。

不使用 #pragma omp atomic(或使用 std::atomic< int >s)的锁,改变 headchunkCount 很容易,但这是确保if( ... ) 语句仅在任何线程继续分配子地址之前执行一次,即确保它们使用 correct/updated chunkCounthead.

阅读无锁算法的一个想法是在 Node 中使用 std::atomic< Node* > subnodes[5] 并执行 CAS 操作等待正确更新的 headchunkCnt但不知道会是什么 "correct",我怎么知道我在等什么?

另一个(天真的)想法是:

int myHead;
if( ++head == CHUNK_SIZE ) {
    std::vector< Node > tempNodeVec( CHUNK_SIZE );
    nodes.push_back( tempNodeVec );
    chunkCount++;
    myhead = head = 0;
} else {
    myhead = head;
    while( head > CHUNK_SIZE )
        myHead = ++head;
}
node->subnodes[index] = &( nodes[chunkCount][myHead] );

想法是只有一个线程进入 if( ... ) 并且直到它已将 head 设置为 0,其余的将卡在 else { ... } 但我已经可以看到这种方法存在很多问题

如有任何帮助,我们将不胜感激。

我建议您使用线程专用内存池。为此,您可以使用如下注释:

#pragma omp threadprivate(nodes)

这不仅比尝试保护对共享内存池的访问要简单得多,而且由于数据局部性,它还可能会提高性能。

注意:使用您的解决方案实现基于原子的无锁是不可能的,因为 nodes[chunkCount] - 每次分配都需要 - 必须始终受到保护免受 nodes.push_back.

全功能内存池比较复杂,但作为一个小步骤,您可以尝试使用 std::deque。它提供了您需要的东西,而不会弄乱两个向量 - 在恒定时间内插入元素,同时不会使现有元素的指针无效。您的控制力较弱,但这是一个好的开始。