std::vector 上 numpy.unique 与 return_index 和 return_inverse 的 C++ 等价物

C++ equivalent of numpy.unique on std::vector with return_index and return_inverse

numpy 实现了 unique 算法 returns :

  1. numpy 数组的已排序的唯一元素,没有重复项)

此外,numpy.unique()还可以return:

  1. 给出唯一值的输入数组的索引
  2. 重建输入数组的唯一数组的索引

C++ 标准库还实现了 unique 算法 (here),该算法以某种方式消除了连续的重复项。结合 std::vector<>::erasestd::sort(),这可以 return 向量的 排序的唯一元素 numpy.unique() 的输出 1)。

我的问题是:stl 或其他地方是否有任何算法也可以 return 输出 numpy.unique() 的 2 和 3。如果没有,有没有办法有效地实现它?

一个 std::map 与一个 std::vector 相结合,可以为您提供所需的所有信息。

#include <map>
#include <vector>

template <typename Container>
auto unique_map( const Container & xs )
-> std::map <typename Container::value_type, std::vector <std::size_t> >
{
  decltype(unique_map(xs)) S;

  std::size_t n = 0;
  for (const auto & x : xs)
  {
    S[ x ].push_back( n++ );
  }

  return S;
}

现在您可以将任何容器转换为排序的 std::map 其中:

  • 每个键都是来自原始容器的唯一元素xs
  • 每个值都是 std::vector 列表索引 xs

如果你想从映射中重建 xs,你可以很容易地做到这一点:

#include <numeric>

template <typename UniqueMap>
auto rebuild_from_unique_map( const UniqueMap & um )
-> std::vector <typename UniqueMap::key_type>  // <std::size_t>
{
  decltype(rebuild_from_unique_map(um)) xs;
  if (um.size())
  {
    xs.resize( std::accumulate( um.begin(), um.end(), (std::size_t)0, 
        []( auto n, auto p ) { return n + p.second.size(); } ) );
    for (const auto & m : um)
      for (const auto & n : m.second)
        xs[n] = m.first;                       // index++
  }
  return xs;
}

如果您真的非常想要那个索引列表 (return_inverse),您可以按照指示更改注释行并在嵌套循环之前添加 std::size_t index = 0; 来获得它。

这是一个可以玩的玩具。只需将 post 中的三个代码片段连接起来并编译即可。 irange.

需要 Boost
#include <algorithm>
#include <boost/range/irange.hpp>
#include <cmath>
#include <iomanip>
#include <iostream>
#include <string>

int main( int argc, char ** argv )
{
  std::vector <int> xs( argc-1, 0 );
  std::transform( argv+1, argv+argc, xs.begin(), []( auto s ) { return std::stoi( s ); } );

  auto width = std::setw( 3 + (int)std::log10( *std::max_element( xs.begin(), xs.end() ) ) );

  std::cout << xs.size() << " integers";
  std::cout << "\nvalue:  "; for (auto x : xs)                         std::cout << " " << width << x;
  std::cout << "\nindex: ["; for (auto n : boost::irange( xs.size() )) std::cout << " " << width << n;
  std::cout << " ]\n";

  auto ys = unique_map( xs );

  std::cout << "\n" << ys.size() << " unique integers";
  std::cout << "\nsorted:"; for (auto y : ys) std::cout << " " << width << y.first;

  std::cout << "\n\n";
  for (const auto & y : ys)
  {
    std::cout << width << y.first << " appears " << y.second.size() << " times at [";
    for (const auto & n : y.second) std::cout << " " << n;
    std::cout << " ]\n";
  }
  
  std::cout << "\nrebuilt:"; for (auto x : rebuild_from_unique_map( ys )) std::cout << " " << width << x;
  std::cout << "\n";
}

用类似的东西编译:

  • cl /EHsc /W4 /Ox /std:c++17 /I C:\boost\include a.cpp
  • clang++ -Wall -Wextra -pedantic-errors -O3 -std=c++17 a.cpp
  • 等等

玩具不能容忍错误输入:确保至少给它一个参数,并且所有参数都是整数.如果您愿意,您可以更改它以处理任意 std::string:只需调整 main() 中的前三个语句以用字符串填充 xs 并正确计算 width

这是我在我的 Windows 盒子上玩的:

a 2 -1 2 4 -1 2 3 7 -1 2 0 5

12 integers
value:     2  -1   2   4  -1   2   3   7  -1   2   0   5
index: [   0   1   2   3   4   5   6   7   8   9  10  11 ]

7 unique integers
sorted:  -1   0   2   3   4   5   7

 -1 appears 3 times at [ 1 4 8 ]
  0 appears 1 times at [ 10 ]
  2 appears 4 times at [ 0 2 5 9 ]
  3 appears 1 times at [ 6 ]
  4 appears 1 times at [ 3 ]
  5 appears 1 times at [ 11 ]
  7 appears 1 times at [ 7 ]

rebuilt:   2  -1   2   4  -1   2   3   7  -1   2   0   5

a 1 1 1

3 integers
value:     1   1   1
index: [   0   1   2 ]

1 unique integers
sorted:   1

  1 appears 3 times at [ 0 1 2 ]

rebuilt:   1   1   1

a 3 1 2

3 integers
value:     3   1   2
index: [   0   1   2 ]

3 unique integers
sorted:   1   2   3

  1 appears 1 times at [ 1 ]
  2 appears 1 times at [ 2 ]
  3 appears 1 times at [ 0 ]

rebuilt:   3   1   2

问得好!

这是一个只使用一个或两个临时向量的版本。对于 n 个输入值,总体复杂度为 O(n log(n))。

#include <algorithm>
// using std::stable_sort, std::unique, std::unique_copy
#include <iterator>
// using std::back_inserter
#include <vector>
// using std::vector


/** Helper to implement argsort-like behavior */
template<class T>
struct SortPair
{
  T value;
  std::size_t index;

  SortPair(const T& value, std::size_t index)
    : value(value), index(index)
  {}
  SortPair(T&& value, std::size_t index)
    : value(std::move(value)), index(index)
  {}
  bool operator<(const SortPair& o) const
  { return value < o.value; }

  bool operator<(const T& o) const
  { return value < o; }

  friend bool operator<(const T& left, const SortPair& right)
  { return left < right.value; }

  bool operator==(const SortPair& o) const
  { return value == o.value; }

  friend void swap(SortPair& left, SortPair& right)
  {
      using std::swap;
      swap(left.value, right.value);
      swap(left.index, right.index);
  }
};
/**
 * Implements numpy.unique
 *
 * \tparam T scalar value type
 * \tparam Iterator input iterator for type T
 * \param first, last range of values
 * \param index if not null, returns the first indices of each unique value in
 *    the input range
 * \param inverse if not null, returns a mapping to reconstruct the input range
 *    from the output array. first[i] == returnvalue[inverse[i]]
 * \param count if not null, returns the number of times each value appears
 * \return sorted unique values from the input range
 */
template<class T, class Iterator>
std::vector<T> unique(Iterator first, Iterator last,
                      std::vector<std::size_t>* index=nullptr,
                      std::vector<std::size_t>* inverse=nullptr,
                      std::vector<std::size_t>* count=nullptr)
{
  std::vector<T> uvals;
  if(! (index || inverse || count)) { // simple case
    uvals.assign(first, last);
    using t_iter = typename std::vector<T>::iterator;
    const t_iter begin = uvals.begin(), end = uvals.end();
    std::sort(begin, end);
    uvals.erase(std::unique(begin, end), end);
    return uvals;
  }
  if(first == last) { // early out. Helps with inverse computation
    for(std::vector<std::size_t>* arg: {index, inverse, count})
      if(arg)
        arg->clear();
    return uvals;
  }
  using sort_pair = SortPair<T>;
  using sort_pair_iter = typename std::vector<sort_pair>::iterator;
  std::vector<sort_pair> sorted;
  for(std::size_t i = 0; first != last; ++i, ++first)
    sorted.emplace_back(*first, i);
  const sort_pair_iter sorted_begin = sorted.begin(), sorted_end = sorted.end();
  // stable_sort to keep first unique index
  std::stable_sort(sorted_begin, sorted_end);
  /*
   * Compute the unique values. If we still need the sorted original values,
   * this must be a copy. Otherwise we just reuse the sorted vector.
   * Note that the equality operator in SortPair only uses the value, not index
   */
  std::vector<sort_pair> upairs_copy;
  const std::vector<sort_pair>* upairs;
  if(inverse || count) {
    std::unique_copy(sorted_begin, sorted_end, std::back_inserter(upairs_copy));
    upairs = &upairs_copy;
  }
  else {
    sorted.erase(std::unique(sorted_begin, sorted_end), sorted_end);
    upairs = &sorted;
  }
  uvals.reserve(upairs->size());
  for(const sort_pair& i: *upairs)
    uvals.push_back(i.value);
  if(index) {
    index->clear();
    index->reserve(upairs->size());
    for(const sort_pair& i: *upairs)
      index->push_back(i.index);
  }
  if(count) {
    count->clear();
    count->reserve(uvals.size());
  }
  if(inverse) {
    inverse->assign(sorted.size(), 0);
    // Since inverse->resize assigns index 0, we can skip the 0-th outer loop
    sort_pair_iter sorted_i =
      std::upper_bound(sorted_begin, sorted_end, uvals.front());
    if(count)
      count->push_back(std::distance(sorted_begin, sorted_i));
    for(std::size_t i = 1; i < uvals.size(); ++i) {
      const T& uval = uvals[i];
      const sort_pair_iter range_start = sorted_i;
      // we know there is at least one value
      do
        (*inverse)[sorted_i->index] = i;
      while(++sorted_i != sorted_end && sorted_i->value == uval);
      if(count)
        count->push_back(std::distance(range_start, sorted_i));
    }
  }
  if(count && ! inverse) {
    sort_pair_iter range_start = sorted_begin;
    for(const T& uval: uvals) {
      sort_pair_iter range_end =
        std::find_if(std::next(range_start), sorted_end,
                     [&uval](const sort_pair& i) -> bool {
                       return i.value != uval;
                     });
      count->push_back(std::distance(range_start, range_end));
      range_start = range_end;
    }
    /*
     * We could use equal_range or a custom version based on an
     * exponential search to reduce the number of comparisons.
     * The reason we don't use equal_range is because it has worse
     * performance in the worst case (uvals.size() == sorted.size()).
     * We could make a heuristic and dispatch between both implementations
     */
  }
  return uvals;
}

诀窍是实现类似于 np.argsort 的东西。其他一切自然而然。逆向映射有点棘手,但当你先做它的 pen-and-paper 版本时并不难。

unordered_map

我们也可以使用 unordered_map。这将复杂度降低到类似 O(n) + O(m log(m)) 的程度,对于具有 m 个唯一值的 n 个条目进行排序,而 O(n) 不对唯一值进行排序。

但是,如果唯一值的数量不小于 n,则涉及大量临时分配。这个问题可以通过切换到散列映射实现来解决,例如 robin_hood,它使用平面散列映射,只要 key-value 对在默认情况下最多 6 size_t 大。

然而,人们不应该低估哈希映射的绝对性能,即使是像 std::unordered_map 这样平庸的哈希映射。这是一个在实践中运行良好的简单版本。


#ifdef USE_ROBIN_HOOD
# include <robin_hood.h>
#else
# include <unordered_map>
#endif

template<class T, class Iterator>
std::vector<T>
unordered_unique(Iterator first, Iterator last,
                 std::vector<std::size_t>* index=nullptr,
                 std::vector<std::size_t>* inverse=nullptr,
                 std::vector<std::size_t>* count=nullptr)
{
#ifdef USE_ROBIN_HOOD
  using index_map = robin_hood::unordered_map<T, std::size_t>;
#else
  using index_map = std::unordered_map<T, std::size_t>;
#endif
  using map_iter = typename index_map::iterator;
  using map_value = typename index_map::value_type;
  for(std::vector<std::size_t>* arg: {index, inverse, count})
    if(arg)
      arg->clear();
  std::vector<T> uvals;
  index_map map;
  std::size_t cur_idx = 0;
  for(Iterator i = first; i != last; ++cur_idx, ++i) {
    const std::pair<map_iter, bool> inserted =
      map.emplace(*i, uvals.size());
    map_value& ival = *inserted.first;
    if(inserted.second) {
      uvals.push_back(ival.first);
      if(index)
        index->push_back(cur_idx);
      if(count)
        count->push_back(1);
    }
    else if(count)
      (*count)[ival.second] += 1;
    if(inverse)
      inverse->push_back(ival.second);
  }
  return uvals;
}

我用 N = 100 万个随机整数条目对 M 取模进行了测试。M=N(~600,000 个唯一值),Robin-Hood 版本优于排序版本。使用 M=N/10(~100,000 个唯一值),即使是 std 版本也优于排序版本。

除了当前发布的两个重要答案之外,为了编写高效的实现,还有一些重要的要点需要考虑。

首先,使用 hash-map 可以非常有效 如果不同项目的数量很少,那么 hash-map 可以 适合缓存。由于复杂性中的 log n 因素,经典排序算法(例如 introsort/mergesort/heapsort)往往要慢得多。然而,当不同项目的数量很大时,排序通常要快得多,因为 RAM 中的 不可预测 random-like hash-map 访问模式可能非常昂贵:每个访问速度可能与 RAM latency(通常为 40~100 ns)一样慢。

此外,排序实现可以使用 SIMD 指令(请参阅 here for AVX-512 and there 了解 SVE)进行矢量化,并使用 多线程 进行并行化。在这种情况下尤其如此,因为 np.unique 通常应用于数字。 C++17 提供了包括并行排序在内的并行算法(PSTL)。最近改进了 Numpy 排序算法以使用 AVX-512,从而使执行速度提高了一个数量级。或者,可以使用 O(n) 基数排序对具有 short-sized 键(例如 4 字节)的小数组进行高效排序。

此外,并非所有 hash-map 实施在性能方面都是等效的 。 STL std::unordered_map 往往很慢。这是由于 C++ 标准中的限制(关于迭代器的无效)(几乎)强制 STL 不使用 open addressing strategy. Instead, STL implementations generally use linked lists to solve hash conflicts and this cause slow allocation as described by @Homer512. The thing the fastest existing hash-map implementations mainly use open addressing. For example, hopscotch hashing provide good performance with some interesting guarantees (eg. good performance when the load factor is close to 90~100%). Tessil 提供了一个关于几个 hash-map 实现的非常有趣的基准并且写了非常快速实现(通常比主流 STL 实现快得多)。

std::map 通常使用 red-black tree. It is written in a pretty efficient way in mainstream STL implementations. However, the allocator used by default is clearly not optimized for this use-case. Indeed, the number of nodes to be inserted is bounded. Thus, one can use a monotonic allocator 来实现以加快计算速度。优化的实现甚至可以使用几个简单的大数组,无需额外分配

最后,请注意 np.unique 提供第一个唯一值 的 索引,而不是全部。这可以进一步优化。例如,@Dúthom 的基于 std::map 的实现不需要 std::vector,从而导致更小的内存占用和可能更高的性能。