如何更新堆中的元素? (优先队列)

How to update elements within a heap? (priority queue)

使用 min/max-heap 算法时,优先级可能会改变。处理此问题的一种方法是删除和插入元素以更新队列顺序。

对于使用数组实现的优先级队列,这可能是一个似乎可以避免的性能瓶颈,尤其是在优先级变化很小的情况下。

Even if this is not a standard operation for a priority queue,这是一个自定义实现,可以根据我的需要进行修改。

在 min/max-heap 中是否有更新元素的众所周知的最佳实践方法?


背景信息:我不是二叉树方面的专家,我继承了一些在优先级队列中重新插入元素时存在性能瓶颈的代码。我为重新排序新元素的最小堆做了一个重新插入函数——这比(删除和插入)有了可衡量的改进,但是这似乎是其他人可能已经以更优雅的方式解决的问题方式.

如果代码有帮助,我可以 link 但我不想过多地关注实现细节 - 因为这个问答可以保持一般性。

典型解决方案

通常的解决方案是将一个元素标记为无效并插入一个新元素,然后删除无效条目 popped-off。

备选方案

如果该方法不够用,可以在 O(log n) 步内恢复 min-heap 不变量,只要知道要更改的值的位置.

回想一下,min-heap 是使用两个基元构建和维护的,"siftup" 和 "siftdown"(尽管各种来源对哪个是上哪个是下有不同的看法)。其中一个将值向下推到树上,另一个将它们向上浮动。

案例一:价值增加

如果新值x1大于旧值x0,则只有x[下的树=54=] 需要修复,因为 parent(x) <= x0 < x1。只需 通过将 x 与其两个 children 中的较小者交换而 x 推下树]x 大于其中一个 children.

情况二:价值下降

如果新值x1小于旧值x,则x[=54下面的树=] 不需要调整,因为 x1 < x0 <= either_child(x)。相反,我们只需要 向上移动,将 x 与其 parent 交换,而 x 小于其 parent。不需要考虑兄弟节点,因为它们已经大于或等于 parent,可能会被替换为较低的值。

情况三:值不变

不需要工作。现有的不变量不变。

Python

中的工作代码

测试 1,000,000 次试验:创建一个随机堆。更改随机选择的值。恢复堆状况。验证结果是否为 min-heap.

from heapq import _siftup, _siftdown, heapify
from random import random, randrange, choice

def is_minheap(arr):
    return all(arr[i] >= arr[(i-1)//2] for i in range(1, len(arr)))

n = 40
trials = 1_000_000
for _ in range(trials):

    # Create a random heap
    data = [random() for i in range(n)]
    heapify(data)

    # Randomly alter a heap element
    i = randrange(n)
    x0 = data[i]
    x1 = data[i] = choice(data)

    # Restore the heap
    if x1 > x0:                 # value is increased
        _siftup(data, i)
    elif x1 < x0:               # value is decreased
        _siftdown(data, 0, i)

    # Verify the results
    assert is_minheap(data), direction

发布自己问题的答案,因为它包含指向工作代码的链接。


这个其实很简单

通常,最小堆实现具有排序函数,see example: BubbleUp/Down

这些函数可以运行修改元素,取决于相对于当前值的变化。例如:

if new_value < old_value {
    heap_bubble_up(heap, node);
} else if new_value > old_value {
    heap_bubble_down(heap, node);
}

虽然操作数取决于值的分布,但与维护排序列表相比,这将等于或减少步骤。

一般来说,小改动比删除+插入效率_多_多。

查看工作 code, and test,它使用 insert/remove/re-prioritize 实现最小堆,无需初始查找(调用者存储不透明引用)。


即使只重新排序所需的元素也可能最终成为大型堆的许多操作。

如果这太低效,最小堆可能不太合适。

二叉树可能更好(例如红黑树),其中删除和插入的规模更好。

但是我不确定 rb-trees 是否具有 re-order in-place 的能力,因为最小堆可以做到。

对于那些对各种算法的相对性能感兴趣的人,我 运行 进行了一些调查。

专注

  1. 相对较小的堆(3 =>​​ 100 个元素,但可以在下面的代码中轻松调整)
  2. 操作:仅更新 heap.top() 元素
  3. 优化标准=与re-heapify
  4. 的比较次数

使用 c++ STL 执行此操作的规范方法是:

  // Note I am using a max-heap, but the comparator is customisable below
  std::pop_heap(heap.begin(), heap.end(), comp);
  heap.pop_back();
  heap.push_back(newval);
  std::push_heap(heap.begin(), heap.end(), comp);

这是一个非常简单的案例。没有“知道元素在堆中的位置”的问题。所以我们当然可以做得更好,对吧...?

注意:对于那些感兴趣的人,我的应用程序是合并排序,其中一个大的数据文件被分成 20-50 个块,这些块经过排序并写入磁盘。然后re-read并合并到最终排序好的文件中。事实证明,选择从哪个文件合并下一个元素是瓶颈,所以我使用了 std::priority_queue,它在下面使用了一个堆。然而,这仍然是瓶颈,很难跟上磁盘,并且CPU受到比较的限制。

下面的代码研究了 4 种实现:

  1. STL方式如上(使用libstdc++-11)。这是基本情况。 (请注意,我相信 libc++ 在这方面不太成熟)。
  2. 递归“向下冒泡”算法
  3. 迭代“向下冒泡”算法
  4. A libstd++ __adjust_heap 后跟 __push_heap 算法

我生成 运行dom 堆大小、内容和替换值并迭代 1M 次。

调查结果

  1. 事实证明,Raymond Hettinger 的回答中的“案例 1、2、3 pre-comparision”平均而言总是有更多的比较。所以我把它拿出来直接运行每个算法。
  2. STL
  3. 递归和迭代向下冒泡日志几乎相同(预期,因为 -O3 上的编译 tailall 优化了递归,无论如何)并且始终有 更多 比较即使对于这种非常特殊的情况,也比 STL 好。 因此,仅使用 heap-primitive 不会给您任何收益
  4. 我们可以使用... STL,通过复制未发布函数的代码,“清理它们”(下划线等)并使用它们来击败 STL以专门的方式来解决这个有限的、专门的问题。 收益大约为 10-20%

典型结果:

method                              avg_cmp_cnt
std::heap / std::priority_queue     7.568285
bubble up recursively               8.031054
bubble up iteratively               8.047352
libstc++ __adjust_heap              6.327297

测试代码:

#include "fmt/core.h"
#include <algorithm>
#include <cassert>
#include <concepts>
#include <cstddef>
#include <cstdlib>
#include <execution>
#include <iostream>
#include <random>
#include <stdexcept>

template <std::unsigned_integral T>
T log2(T x) {
  T log = 0;
  while (x >>= 1U) ++log;
  return log;
}

template <typename T, typename Comp = std::less<>>
void print_heap(const std::vector<T>& heap, Comp comp = {}) {
  std::size_t levels = log2(heap.size()) + 1;
  unsigned    width  = 6 * (1U << (levels - 1U));
  std::cout << "\n\n";
  for (const auto& e: heap) std::cout << e << " ";
  std::cout << "\n";
  std::cout << fmt::format("is_heap = {:}\n\n", std::is_heap(heap.begin(), heap.end(), comp));
  if (heap.empty()) {
    std::cout << "<empty heap>\n";
    return;
  }
  unsigned idx  = 0;
  bool     done = false;
  for (unsigned l = 0; l != levels; ++l) {
    for (unsigned e = 0; e != 1U << l; ++e) {
      std::cout << fmt::format("{:^{}}", heap[idx], width);
      ++idx;
      if (idx == heap.size()) {
        done = true;
        break;
      }
    }
    width /= 2;
    std::cout << "\n\n";
    if (done) break;
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_stl(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  std::pop_heap(heap.begin(), heap.end(), comp);
  heap.pop_back();
  heap.push_back(newval);
  std::push_heap(heap.begin(), heap.end(), comp);
}

template <typename T, typename Comp = std::less<>>
// NOLINTNEXTLINE recursion is tailcall eliminated by compiler
void bubble_down_recursively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
  const auto left  = 2 * i + 1;
  const auto right = 2 * i + 2;
  const auto n     = heap.size();

  using std::swap; // enable ADL

  if (left >= n) { // no children
    return;
  } else if (right >= n) { // left exists right does not.  NOLINT else after return
    if (comp(heap[i], heap[left])) {
      swap(heap[i], heap[left]);
      bubble_down_recursively(heap, left, comp);
    }
  } else { // both children exist
    // 'larger' is only well named if comp = std::less<>{}
    auto larger = comp(heap[right], heap[left]) ? left : right;
    if (comp(heap[i], heap[larger])) {
      swap(heap[i], heap[larger]);
      bubble_down_recursively(heap, larger, comp);
    }
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_recursively(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;
  bubble_down_recursively(heap, 0, comp);
}

template <typename T, typename Comp = std::less<>>
void bubble_down_iteratively(std::vector<T>& heap, std::size_t i, Comp comp = {}) {
  const auto n = heap.size();

  while (true) {
    const std::size_t left  = 2 * i + 1;
    const std::size_t right = 2 * i + 2;

    std::size_t largest = i;

    if ((left < n) && comp(heap[largest], heap[left])) {
      largest = left;
    }
    if ((right < n) && comp(heap[largest], heap[right])) {
      largest = right;
    }

    if (largest == i) {
      break;
    }

    using std::swap; // enable ADL
    swap(heap[i], heap[largest]);
    i = largest;
  }
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_bubble_down_iteratively(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;                       // stick it in anyway
  bubble_down_iteratively(heap, 0, comp); // and fix the heap
}

// borrowed from libstdc++ __push_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void push_heap(RandomAccessIterator first, Distance holeIndex, Distance topIndex,
                         Tp value, Compare& comp) {
  Distance parent = (holeIndex - 1) / 2;
  while (holeIndex > topIndex && comp(*(first + parent), value)) {
    *(first + holeIndex) = *(first + parent);
    holeIndex            = parent;
    parent               = (holeIndex - 1) / 2;
  }
  *(first + holeIndex) = std::move(value);
}

// borrowed from libstdc++ __adjust_heap
template <typename RandomAccessIterator, typename Distance, typename Tp, typename Compare>
constexpr void adjust_heap(RandomAccessIterator first, Distance holeIndex, Distance len, Tp value,
                           Compare comp) {
  const Distance topIndex    = holeIndex;
  Distance       secondChild = holeIndex;
  while (secondChild < (len - 1) / 2) {
    secondChild = 2 * (secondChild + 1);
    if (comp(*(first + secondChild), *(first + (secondChild - 1)))) secondChild--;
    *(first + holeIndex) = *(first + secondChild);
    holeIndex            = secondChild;
  }
  if ((len & 1) == 0 && secondChild == (len - 2) / 2) {
    secondChild          = 2 * (secondChild + 1);
    *(first + holeIndex) = *(first + (secondChild - 1));
    holeIndex            = secondChild - 1;
  }
  push_heap(first, holeIndex, topIndex, value, comp);
}

template <typename T, typename Comp = std::less<>>
void replace_top_using_adjust_heap(std::vector<T>& heap, T newval, Comp comp = {}) {

  if (heap.empty()) throw std::domain_error("can't replace_top on an empty heap");

  assert(std::is_heap(heap.begin(), heap.end(), comp));

  heap[0] = newval;
  adjust_heap(heap.begin(), 0L, heap.end() - heap.begin(), newval, comp);
}

template <typename T>
struct cmp_counter {
  static std::size_t cmpcount; // NOLINT must be static because STL takes Comp by value
  bool               operator()(T a, T b) {
    ++cmpcount;
    return a < b; // effectively std::less<>{};
  }
  static void reset() { cmpcount = 0; }
};
template <typename T>
std::size_t cmp_counter<T>::cmpcount = 0; // NOLINT global static

int main() {

  using ValueType = int;
  
  struct method {
    using cb_t = void (*)(std::vector<ValueType>&, ValueType, cmp_counter<ValueType>);
    std::string label;
    cb_t        cb;
  };
  auto methods = std::vector<method>{
      {"std::heap / std::priority_queue", &replace_top_using_stl},
      {"bubble up recursively", &replace_top_using_bubble_down_recursively},
      {"bubble up iteratively", &replace_top_using_bubble_down_iteratively},
      {"libstc++ __adjust_heap", &replace_top_using_adjust_heap},
  };

  std::cout << fmt::format("{:35s} {:s}\n", "method", "avg_cmp_cnt");
  for (auto& method: methods) {
    auto prng              = std::mt19937_64(1); // NOLINT fixed seed for repeatability
    auto heap_element_dist = std::uniform_int_distribution<>(1, 1000);
    auto heap_size_dist    = std::uniform_int_distribution<std::size_t>(3, 100);

    const std::size_t number_of_trials = 1'000'000;

    std::size_t total_cmpcount = 0;
    cmp_counter<ValueType> comp;
    for (unsigned i = 0; i != number_of_trials; ++i) {
      std::vector<int> h(heap_size_dist(prng));
      std::generate(h.begin(), h.end(), [&] { return ValueType(heap_element_dist(prng)); });

      std::make_heap(h.begin(), h.end(), comp);

      auto newval = ValueType(heap_element_dist(prng));
      cmp_counter<ValueType>::reset();
      method.cb(h, newval, comp);
      total_cmpcount += cmp_counter<ValueType>::cmpcount;

      if (!std::is_heap(h.begin(), h.end(), comp)) {
        std::cerr << method.label << "NOT A HEAP ANYMORE!!\n";
        return EXIT_FAILURE;
      }
    }
    std::cout << fmt::format("{:35s} {:f}\n", method.label,
                             double(total_cmpcount) / number_of_trials);
  }
}