为什么这个 JavaScript SkipList 实现有 "nodes" 和 "groups",它们的结构是这样的?

Why does this JavaScript SkipList implementation have "nodes" and "groups" which are structured the way they are?

A SkipList is a probabilistic data structure used, at least in part, for implementing an ordered key/value map. It is arranged in levels where higher levels skip nodes (the higher up, the more it skips), and the lowest level contains the nodes. As far as I have read, each level is implemented as a linked list, possibly a doubly linked list. And for some reason,跳过列表更适合并发,因为在多线程环境中,与 red/black 树或 B 相比,它们可以在没有锁的情况下实现 fast/optimal 性能+树木。

这里我们有一个在 JavaScript 中实现的“proper skip list”,复制到这里以供参考。 link 有一套很好的测试来展示它是如何工作的。

const DEFAULT_STACK_UP_PROBABILITY = 0.25;

class ProperSkipList {
  constructor(options) {
    options = options || {};
    this.stackUpProbability = options.stackUpProbability || DEFAULT_STACK_UP_PROBABILITY;
    this.updateLength = options.updateLength !== false;
    this.typePriorityMap = {
      'undefined': 0,
      'object': 1,
      'number': 2,
      'bigint': 2,
      'string': 3
    };
    this.clear();
  }

  clear() {
    let headNode = {
      prev: null
    };
    let tailNode = {
      next: null
    };
    this.head = {
      isHead: true,
      key: undefined,
      value: undefined,
      nodes: [headNode]
    };
    this.tail = {
      isTail: true,
      key: undefined,
      value: undefined,
      nodes: [tailNode]
    };
    headNode.next = tailNode;
    tailNode.prev = headNode;
    headNode.group = this.head;
    tailNode.group = this.tail;
    this.length = this.updateLength ? 0 : undefined;
  }

  upsert(key, value) {
    let {matchingNode, prevNode, searchPath} = this._searchAndTrack(key);
    if (matchingNode) {
      let previousValue = matchingNode.group.value;
      matchingNode.group.value = value;
      return previousValue;
    }

    // Insert the entry.
    let newNode = {
      prev: prevNode,
      next: prevNode.next
    };
    let newGroup = {
      key,
      value,
      nodes: [newNode]
    };
    newNode.group = newGroup;
    prevNode.next = newNode;
    newNode.next.prev = newNode;

    // Stack up the entry for fast search.
    let layerIndex = 1;
    while (Math.random() < this.stackUpProbability) {
      let prevLayerNode = searchPath[layerIndex];
      if (!prevLayerNode) {
        let newHeadNode = {
          prev: null,
          group: this.head
        };
        let newTailNode = {
          next: null,
          group: this.tail
        };
        newHeadNode.next = newTailNode;
        this.head.nodes.push(newHeadNode);
        newTailNode.prev = newHeadNode;
        this.tail.nodes.push(newTailNode);
        prevLayerNode = newHeadNode;
      }
      let newNode = {
        prev: prevLayerNode,
        next: prevLayerNode.next,
        group: newGroup
      };
      prevLayerNode.next = newNode;
      newNode.next.prev = newNode;
      newGroup.nodes.push(newNode);
      layerIndex++;
    }
    if (this.updateLength) this.length++;

    return undefined;
  }

  find(key) {
    let {matchingNode} = this._search(key);
    return matchingNode ? matchingNode.group.value : undefined;
  }

  has(key) {
    return !!this.find(key);
  }

  _isAGreaterThanB(a, b) {
    let typeA = typeof a;
    let typeB = typeof b;
    if (typeA === typeB) {
      return a > b;
    }
    let typeAPriority = this.typePriorityMap[typeA];
    let typeBPriority = this.typePriorityMap[typeB];
    if (typeAPriority === typeBPriority) {
      return a > b;
    }
    return typeAPriority > typeBPriority;
  }

  // The two search methods are similar but were separated for performance reasons.
  _searchAndTrack(key) {
    let layerCount = this.head.nodes.length;
    let searchPath = new Array(layerCount);
    let layerIndex = layerCount - 1;
    let currentNode = this.head.nodes[layerIndex];
    let prevNode = currentNode;

    while (true) {
      let currentNodeGroup = currentNode.group;
      let currentKey = currentNodeGroup.key;
      if (!currentNodeGroup.isTail) {
        if (this._isAGreaterThanB(key, currentKey) || currentNodeGroup.isHead) {
          prevNode = currentNode;
          currentNode = currentNode.next;
          continue;
        }
        if (key === currentKey) {
          let matchingNode = currentNodeGroup.nodes[0];
          searchPath[layerIndex] = matchingNode;
          return {matchingNode, prevNode: matchingNode.prev, searchPath};
        }
      }
      searchPath[layerIndex] = prevNode;
      if (--layerIndex < 0) {
        return {matchingNode: undefined, prevNode, searchPath};
      }
      currentNode = prevNode.group.nodes[layerIndex];
    }
  }

  _search(key) {
    let layerIndex = this.head.nodes.length - 1;
    let currentNode = this.head.nodes[layerIndex];
    let prevNode = currentNode;
    while (true) {
      let currentNodeGroup = currentNode.group;
      let currentKey = currentNodeGroup.key;
      if (!currentNodeGroup.isTail) {
        if (this._isAGreaterThanB(key, currentKey) || currentNodeGroup.isHead) {
          prevNode = currentNode;
          currentNode = currentNode.next;
          continue;
        }
        if (key === currentKey) {
          let matchingNode = currentNodeGroup.nodes[0];
          return {matchingNode, prevNode: matchingNode.prev};
        }
      }
      if (--layerIndex < 0) {
        return {matchingNode: undefined, prevNode};
      }
      currentNode = prevNode.group.nodes[layerIndex];
    }
  }

  findEntriesFromMin() {
    return this._createEntriesIteratorAsc(this.head.nodes[0].next);
  }

  findEntriesFromMax() {
    return this._createEntriesIteratorDesc(this.tail.nodes[0].prev);
  }

  minEntry() {
    let [key, value] = this.findEntriesFromMin().next().value;
    return [key, value];
  }

  maxEntry() {
    let [key, value] = this.findEntriesFromMax().next().value;
    return [key, value];
  }

  minKey() {
    return this.minEntry()[0];
  }

  maxKey() {
    return this.maxEntry()[0];
  }

  minValue() {
    return this.minEntry()[1];
  }

  maxValue() {
    return this.maxEntry()[1];
  }

  _extractNode(matchingNode) {
    let nodes = matchingNode.group.nodes;
    for (let layerNode of nodes) {
      let prevNode = layerNode.prev;
      prevNode.next = layerNode.next;
      prevNode.next.prev = prevNode;
    }
    if (this.updateLength) this.length--;
    return matchingNode.group.value;
  }

  extract(key) {
    let {matchingNode} = this._search(key);
    if (matchingNode) {
      return this._extractNode(matchingNode);
    }
    return undefined;
  }

  delete(key) {
    return this.extract(key) !== undefined;
  }

  findEntries(fromKey) {
    let {matchingNode, prevNode} = this._search(fromKey);
    if (matchingNode) {
      return {
        matchingValue: matchingNode.group.value,
        asc: this._createEntriesIteratorAsc(matchingNode),
        desc: this._createEntriesIteratorDesc(matchingNode)
      };
    }
    return {
      matchingValue: undefined,
      asc: this._createEntriesIteratorAsc(prevNode.next),
      desc: this._createEntriesIteratorDesc(prevNode)
    };
  }

  deleteRange(fromKey, toKey, deleteLeft, deleteRight) {
    if (fromKey == null) {
      fromKey = this.minKey();
      deleteLeft = true;
    }
    if (toKey == null) {
      toKey = this.maxKey();
      deleteRight = true;
    }
    if (this._isAGreaterThanB(fromKey, toKey)) {
      return;
    }
    let {prevNode: fromNode, searchPath: leftSearchPath, matchingNode: matchingLeftNode} = this._searchAndTrack(fromKey);
    let {prevNode: toNode, searchPath: rightSearchPath, matchingNode: matchingRightNode} = this._searchAndTrack(toKey);
    let leftNode = matchingLeftNode ? matchingLeftNode : fromNode;
    let rightNode = matchingRightNode ? matchingRightNode : toNode.next;

    if (leftNode === rightNode) {
      if (deleteLeft) {
        this._extractNode(leftNode);
      }
      return;
    }

    if (this.updateLength) {
      let currentNode = leftNode;
      while (currentNode && currentNode.next !== rightNode) {
        this.length--;
        currentNode = currentNode.next;
      }
    }

    let leftGroupNodes = leftNode.group.nodes;
    let rightGroupNodes = rightNode.group.nodes;
    let layerCount = this.head.nodes.length;

    for (let layerIndex = 0; layerIndex < layerCount; layerIndex++) {
      let layerLeftNode = leftGroupNodes[layerIndex];
      let layerRightNode = rightGroupNodes[layerIndex];

      if (layerLeftNode && layerRightNode) {
        layerLeftNode.next = layerRightNode;
        layerRightNode.prev = layerLeftNode;
        continue;
      }
      if (layerLeftNode) {
        let layerRightmostNode = rightSearchPath[layerIndex];
        if (!layerRightmostNode.group.isTail) {
          layerRightmostNode = layerRightmostNode.next;
        }
        layerLeftNode.next = layerRightmostNode;
        layerRightmostNode.prev = layerLeftNode;
        continue;
      }
      if (layerRightNode) {
        let layerLeftmostNode = leftSearchPath[layerIndex];
        layerLeftmostNode.next = layerRightNode;
        layerRightNode.prev = layerLeftmostNode;
        continue;
      }
      // If neither left nor right nodes are present on the layer, connect based
      // on search path to remove in-between entries.
      let layerRightmostNode = rightSearchPath[layerIndex];
      if (!layerRightmostNode.group.isTail) {
        layerRightmostNode = layerRightmostNode.next;
      }
      let layerLeftmostNode = leftSearchPath[layerIndex];
      layerLeftmostNode.next = layerRightmostNode;
      layerRightmostNode.prev = layerLeftmostNode;
    }
    if (deleteLeft && matchingLeftNode) {
      this._extractNode(matchingLeftNode);
    }
    if (deleteRight && matchingRightNode) {
      this._extractNode(matchingRightNode);
    }
  }

  _createEntriesIteratorAsc(currentNode) {
    let i = 0;
    return {
      next: function () {
        let currentGroup = currentNode.group;
        if (currentGroup.isTail) {
          return {
            value: [currentNode.key, currentNode.value, i],
            done: true
          }
        }
        currentNode = currentNode.next;
        return {
          value: [currentGroup.key, currentGroup.value, i++],
          done: currentGroup.isTail
        };
      },
      [Symbol.iterator]: function () { return this; }
    };
  }

  _createEntriesIteratorDesc(currentNode) {
    let i = 0;
    return {
      next: function () {
        let currentGroup = currentNode.group;
        if (currentGroup.isHead) {
          return {
            value: [currentNode.key, currentNode.value, i],
            done: true
          }
        }
        currentNode = currentNode.prev;
        return {
          value: [currentGroup.key, currentGroup.value, i++],
          done: currentGroup.isHead
        };
      },
      [Symbol.iterator]: function () { return this; }
    };
  }
}

我想知道 SkipList 的理论概念如何应用于此实现。但这不是我的问题,我的问题非常具体,如底部所述。但首先要看几件事。比如clear函数就是这样实现的,创建一个全新的SkipList:

clear() {
  let headNode = {
    prev: null
  };
  let tailNode = {
    next: null
  };
  this.head = {
    isHead: true,
    key: undefined,
    value: undefined,
    nodes: [headNode]
  };
  this.tail = {
    isTail: true,
    key: undefined,
    value: undefined,
    nodes: [tailNode]
  };
  headNode.next = tailNode;
  tailNode.prev = headNode;
  headNode.group = this.head;
  tailNode.group = this.tail;
  this.length = this.updateLength ? 0 : undefined;
}

请注意 headNode.group = this.head 中的“组”具有以下结构:

  {
    isTail: boolean,
    isHead: boolean.
    key: undefined,
    value: undefined,
    nodes: [node]
  }

或插入新节点:

// Insert the entry.
let newNode = {
  prev: prevNode,
  next: prevNode.next
};
let newGroup = {
  key,
  value,
  nodes: [newNode]
};
newNode.group = newGroup;
prevNode.next = newNode;
newNode.next.prev = newNode;

因此 node 具有 prev/next/group 组属性,而 group 具有 key/value/nodes 组。最后,请注意 this.head.nodes[layerIndex] 等在几个地方被调用,在我看来它只是绕过了 SkipList 的 linked-list 性质并且有一个内部数组来存储所有节点.基本上我很困惑,我已经看了一天这段代码,在我看来,每个级别节点都在存储该级别的所有子节点的数组,或类似的东西。我对这里发生的事情感到困惑。

可以解释一下这个 SkipList 实现的一般模型是什么吗?即:

对于上下文,在阅读了几篇关于它的论文并理解了总体思路之后,我正在尝试学习如何实现 SkipList。我想对其施加一些限制,所以不想使用任何现有的实现。我希望它在 JavaScript 和 C 中工作,所以它需要在一个上下文中是单线程的,而在另一个上下文中是多线程的,所以我有一点要学习。但是知道如何在 JavaScript 中实现它,就像上面的代码所做的那样,是很好的第一步。

你的问题总结如下:

  • The meaning of group and node objects, and why they have their structure they do.
  • What the nodes array is doing on a group.

我将使用来自 brilliant.org:

的图像

链表显示为水平带,而数组显示为垂直堆栈。

A group 对应于集合中的 one 不同的键——以黄色标记。相应的值未在图像中描述,而只是有效载荷,对于理解数据结构不是必需的。一个组有一个 nodes 数组——显示为键顶部的堆栈。这些节点中的每一个都属于不同的链表,其中它们代表相同的键。这些节点再次对该组进行反向引用,因此一个节点知道它代表哪个键,以及哪些其他链表具有该相同键的节点。

所以 nodes 数组实际上将一个键复制到不同的列表。一个节点数组不代表集合中的不同 个键——只有一个。它就像一部电梯,从一列火车(链表)跳到另一列,使用相同的钥匙。

这些数组的大小是随机确定的,但平均而言应该很小。这是由 stackUpProbability 驱动的。它表示一个数组扩展一个节点(在图像中向上)的概率。至少有 一个 节点(对于一个键)的概率显然是 1,但是数组中至少还有一个节点的概率(因此大小为 at至少 2),是 stackUpProbability。它的尺寸为 3 的概率是 (stackUpProbability)²... 等

这样,底部链表(在 layerIndex 1)将在集合中有 all 个键(每个键一个节点,按排序顺序),但是任何下一层的钥匙都将减少,而顶层可能只有一只手。节点较少的列表提供了一种在集合中快速移动以搜索值的方法。这提供了可以找到值的位置的接近度。通过“电梯”,搜索算法可以降级到较低(较慢)的链表并在那里继续搜索,直到它到达最低级别,在那里可以找到密钥(如果存在),或者它应该在哪里(如果不存在) ).