缩短线程安全(不平衡)二叉树中的关键部分

Shortening critical sections in thread-safe (unbalanced) Binary Tree

我正在实现一组线程安全函数,这些函数使用 Pthreads 更新和搜索不平衡的二叉树(参见 The Linux Programming Interface: Exercise 30.2)。

实现锁定整个函数体的工作树很容易,例如 adddelete,但我相信可以实现不平衡的二叉(搜索)树粒度锁定策略。

我已经尝试 'port' 来自“The Art of Multiprocessor Programming”中描述的其他数据结构的各种交接锁定方案,但每个都需要我将树转换为不同的原语数据结构(我希望使用一个不平衡的二叉树,因为它是原始数据类型,而不仅仅是提供相同的接口)

为了清楚起见,这里是用来表示这棵树的节点的数据结构。

typedef struct bt_node {
    Key key;
    int value;
    struct bt_node *left;
    struct bt_node *right;
    pthread_mutex_t mutex;
} bt_node;

(我还维护了一个 bt_root 结构用于记账,您可能会在下面的一些函数中看到它)

add 函数本质上是单线程的,因为我几乎在 add 开始时就锁定了根节点,并在其工作完成后将其解锁。有可能在这里做得更好吗? (lockunlock 是对上述结构 pthread_mutex_lock and pthread_mutex_unlock 的包装)

// This function is responsible for finding the last node that is going to be the parent of whichever node we are inserting.
bt_node *closest_leaf(bt_node *root, Key key) {
    bt_node *current = root;
    bt_node *prev = NULL;
    while (current != NULL) {
        prev = current;
        if      (key < current->key)  current = current->left;
        else if (key > current->key)  current = current->right;
        else  {
            prev = current;
            break;
        }
    }

    return prev;
}

int add(bt_root *root_tree, Key key, int value) {
    bt_node *new_node = create_node(key, value);
    bt_node *current = root_tree->root;

    // this root creation code is not and need not be thread-safe.
    if (current == NULL) {
        root_tree->root = new_node;
        root_tree->node_count = 1;
        return 1;
    }

    lock(current);
    // just locking `leaf` & `current` inside closest_leaf is incorrect
    bt_node *leaf = closest_leaf(current, key);
    if (key < leaf->key) {
        leaf->left = new_node;
        ++root_tree->node_count;
    }
    else if (key > leaf->key) {
        leaf->right = new_node;
        ++root_tree->node_count;
    } else {
        free(new_node);
    }
    unlock(current);


    return 1;
}

删除共享这个问题:成功的删除方案有效地锁定了整棵树。下面的代码省略了mutex获取和放弃的代码(但是概念和上面一样)。

// splice is the function responsible for actually removing a node from the tree
void splice(bt_node *node, bt_node *parent) {
    // if it's a leaf, destroy it.
    if (!node->left && !node->right) {
             replace_parent(parent, node, NULL);

    // if it has one child, have them be adopted by their grandparent
    } else if (node->left && !node->right) {
            replace_parent(parent, node, node->left);
    } else if (node->right && !node->left) {
            replace_parent(parent, node, node->right);

    /* if it has two children, take the smallest value of the right tree and
       replace the node with that one. */
    } else if (node->right && node->left) {
        // find the smallest element of the right tree
        bt_node *smallest_parent = node;
        bt_node *smallest = node->right;
        while (smallest->left) {
            smallest_parent = smallest;
            // left is the smaller side
            smallest = smallest->left;
        }

        // "swap" the smallest.
        node->key = smallest->key;
        node->value = smallest->value;

        // unsure if this is correct -- seems to be only sensible thing to do
        smallest_parent->left = smallest->right;
        free(smallest);
    }
}

// search the tree depth-first for the node to remove
int delete_dfs(bt_node *node, bt_node *parent, Key key) {
    if (node == NULL) return 0;
    Key node_key = node->key;

    // we've found the key, let's actually splice it out of the tree
    if (node_key == key) {
        splice(node, parent);
        return 1;
    }

    // otherwise we should search (depth first).
    if (key < node_key) {
        return delete_dfs(node->left, node, key);
    } else if (key > node_key) {
        return delete_dfs(node->right, node, key);
    }

    return 0;
}

void delete(bt_root *root_tree, Key key) {
    if (delete_dfs(root_tree->root, NULL, key)) root_tree->node_count--;
}

有没有更好的方法?

I'm implementing a set of thread-safe functions that update and search an unbalanced binary tree with Pthreads (see The Linux Programming Interface: Exercise 30.2).

It's easy to implement a 'functional' tree that simply locks the entire body of functions like add or delete, but I believe it possible to implement an unbalanced binary (search) tree with a locking strategy that allows for almost all the work to be parallelized by minimizing time spent in a critical section.

是也不是。如果两个或多个线程要访问相同的共享数据,并且其中至少有一个修改了它,那么两个线程的访问都必须通过某种形式的同步来保护。由于您通常无法提前预测哪些节点可能会被修改,因此所有访问都需要同步。另一方面,保护区的范围可以动态变化。不平衡树的树搜索、添加和删除算法可以随着它们的进行缩小所需同步的范围,因为原则上它们只需要保护一个子树。另一个线程可以合理地同时在单独的子树上运行。

然而,在您对此感到兴奋之前,请注意更改同步范围将需要互斥锁定和解锁(或者 反之亦然,具体取决于您的方法) ,而且这些都不便宜。事实上,您可能会发现所有互斥锁锁定和解锁占用了并行化操作的大部分或全部收益。

但是,如果您预计搜索将是树上的主要操作,而添加和删除相对较少,那么您可以考虑实施 read/write 锁。那里的想法是您允许任意数量的线程同时搜索树,但是想要修改它的线程必须被授予对整个树的独占访问权限才能这样做。

要为您的插入函数实现手动锁定方案,您需要在 closest_leaf() 函数中执行此操作

// This function is responsible for finding the last node that is going to be the parent of whichever node we are inserting.

// post-condition: returned node is locked
bt_node *closest_leaf(bt_node *root, Key key) {
    bt_node *current = root;
    bt_node *prev = NULL;
    while (current != NULL) {
        lock(current);
        if (prev)
            unlock(prev);
        prev = current;
        if      (key < current->key)  current = current->left;
        else if (key > current->key)  current = current->right;
        else
            break;
    }

    return prev;
}

(当我们沿着树向下移动时,我们在检查每个节点之前锁定它,并且只有在锁定我们要检查的下一个节点之后才解锁它)。这个 returns 节点被锁定,所以我们可以在 add() 函数中修改它:

int add(bt_root *root_tree, Key key, int value)
{
    bt_node *new_node = create_node(key, value);
    bt_node *current = root_tree->root;

    // this root creation code is not and need not be thread-safe.
    if (current == NULL) {
        root_tree->root = new_node;
        root_tree->node_count = 1;
        return 1;
    }

    bt_node *leaf = closest_leaf(current, key);
    if (key < leaf->key) {
        leaf->left = new_node;
        ++root_tree->node_count;
    }
    else if (key > leaf->key) {
        leaf->right = new_node;
        ++root_tree->node_count;
    } else {
        free(new_node);
    }
    unlock(leaf);

    return 1;
}

删除有点复杂。首先,我们需要修复 splice() 函数中的错误 - 考虑如果 while() 循环之后 smallest_parent == node 会发生什么(即循环执行零次):它将覆盖 node->left 当它应该被覆盖时 node->right。解决此问题的最简单方法是使用指向我们要更新的父字段的指针,而不是指向父字段本身的指针。

与此同时,splice() 可以通过观察它对 parent 所做的修改来简化,只是更改指向 node 的父项中的指针字段 -所以我们可以只接受一个参数,它是指向该指针字段的指针。这意味着我们也不再需要 replace_parent() 函数了。简化的 splice() 没有 锁定,看起来像:

// splice is the function responsible for actually removing a node from the tree
void splice(bt_node **node_ptr) {
    bt_node *node = *node_ptr;
    // if it has one or zero child nodes, have them be adopted by their grandparent
    if (!node->left) {
        *node_ptr = node->right;
        free(node);
    }
    else if (!node->right) {
        *node_ptr = node->left;
        free(node);
    }
    /* if it has two children, take the smallest value of the right tree and
       replace the node with that one. */
    else {
        // find the smallest element of the right tree
        bt_node **smallest_ptr = &node->right;
        bt_node *smallest = *smallest_ptr;

        while (smallest->left) {
            smallest_ptr = &smallest->left;
            smallest = *smallest_ptr;
        }

        // "swap" the smallest.
        node->key = smallest->key;
        node->value = smallest->value;
        *smallest_ptr = smallest->right;
        free(smallest);
    }
}

要添加锁定,我们将要求在调用 splice() 时同时锁定要删除的父节点和子节点。那么在简单的拼接情况下,我们只需要在释放子节点之前解锁它。在复杂的情况下,当我们找到下一个要交换的最小节点时,我们需要再次进行手动锁定:

// splice is the function responsible for actually removing a node from the tree
// pre-condition: node **node_ptr and its parent (containing the pointer *node_ptr) are both locked
void splice(bt_node **node_ptr) {
    bt_node *node = *node_ptr;
    // if it has one or zero child nodes, have them be adopted by their grandparent
    if (!node->left) {
        *node_ptr = node->right;
        unlock(node);
        free(node);
    }
    else if (!node->right) {
        *node_ptr = node->left;
        unlock(node);
        free(node);
    }
    /* if it has two children, take the smallest value of the right tree and
       replace the node with that one. */
    else {
        // find the smallest element of the right tree
        bt_node **smallest_ptr = &node->right;
        bt_node *smallest = *smallest_ptr;

        lock(smallest);
        while (smallest->left) {
            smallest_ptr = &smallest->left;
            lock(*smallest_ptr);
            unlock(smallest);
            smallest = *smallest_ptr;
        }

        // "swap" the smallest.
        node->key = smallest->key;
        node->value = smallest->value;
        *smallest_ptr = smallest->right;
        unlock(node);
        unlock(smallest);
        free(smallest);
    }
}

请注意,在我们找到下一个最小节点的删除的最后一部分中,即使我们修改了 'smallest' 的父节点,我们也不需要保持锁定。原因是我们一直保持 'node' 锁定,所以在我们完成交接锁定到 'smallest' 之后,我们可以独占访问该链中的所有节点。

要使这种手动删除有效,最简单的方法是将递归删除替换为迭代版本。 (还需要提供一个函数来锁住树的根节点,因为如果根节点的子节点少于两个被删除,根节点中的指针会更新,所以这个需要互斥):

void delete(bt_root *root_tree, Key key)
{
    bt_node *node;
    bt_node *prev = NULL;
    bt_node **node_ptr = &root_tree->root;
    int deleted = 0;

    lock_root(root_tree);
    while ((node = *node_ptr) != NULL)
    {
        lock(node);
        if (key == node->key)
        {
            splice(node_ptr);
            deleted = 1;
            break;
        }
        else
        {
            if (prev)
                unlock(prev);
            else
                unlock_root(root_tree);
            prev = node;
            if (key < node->key)
                node_ptr = &node->left;
            else
                node_ptr = &node->right;
        }
    }

    if (prev) {
        unlock(prev);
        if (deleted) {
            lock_root(root_tree);
            root_tree->node_count--;
            unlock_root(root_tree);
        }           
    } else {
        if (deleted) {
            root_tree->node_count--;
        }
        unlock_root(root_tree);
    }
}

在为添加函数遍历树时,您还需要使用此 lock_root() / unlock_root() 对,以确保您不会遍历到即将被释放的根节点通过并发删除。这需要更新 closest_leaf() 以采用 bt_root * 而不是 bt_node * 参数,但也使第一次插入树线程安全:

// This function is responsible for finding the last node that is going to be the parent of whichever node we are inserting.
// post-condition: returned node is locked, or tree is locked if NULL is returned
bt_node *closest_leaf(bt_root *tree, Key key)
{
    bt_node *current;
    bt_node *prev = NULL;

    lock_root(tree);
    current = tree->root;
    while (current != NULL) {
        lock(current);
        if (prev)
            unlock(prev);
        else
            unlock_root(tree);
        prev = current;
        if      (key < current->key)  current = current->left;
        else if (key > current->key)  current = current->right;
        else
            break;
    }

    return prev;
}

int add(bt_root *root_tree, Key key, int value)
{
    bt_node *new_node = create_node(key, value);
    bt_node *leaf = closest_leaf(root_tree, key);

    /* NULL returned by closest_leaf() means new node is the root */
    if (leaf == NULL) {
        root_tree->root = new_node;
        root_tree->node_count = 1;
        unlock_root(root_tree);
        return 1;
    }

    if (key == leaf->key) {
        free(new_node);
        unlock(leaf);
    } else {    
        if (key < leaf->key) {
            leaf->left = new_node;
        }
        else {
            leaf->right = new_node;
        }
        unlock(leaf);

        lock_root(root_tree);
        ++root_tree->node_count;
        unlock_root(root_tree);
    }

    return 1;
}

请注意保持 node_count 最新所需的额外复杂性和锁定 - 您应该重新考虑它是否值得拥有。