从向量中删除最小的非唯一值

Remove smallest non-unique value from vector

我有一个未排序的双精度向量(实际上是本例中使用的具有双精度成员的对象)。我需要从这个向量中删除最小的非唯一值。但是,不保证存在非唯一值。允许对范围进行排序。

一如既往,我开始寻找 std::algorithm 并找到了 std::unique。在我的第一个想法中,我会将其与 std::sort 结合使用,将所有非唯一值移动到向量的末尾,然后在非唯一值上使用 min_element。但是,std::unique 将在末尾留下非唯一值处于未指定状态。事实上,我失去了所有非 POD 成员。

有没有人建议如何有效地做到这一点?高效地完成它很重要,因为代码被用于程序的瓶颈(已经有点太慢了)。

好吧,如果您可以对范围进行排序,那么这很容易。按升序排序,然后遍历直到遇到两个相等的相邻元素。完成。

像这样:

T findSmallestNonunique(std::vector<T> v)
{
   std::sort(std::begin(v), std::end(v));
   auto it = std::adjacent_find(std::begin(v), std::end(v));
   if (it == std::end(v))
      throw std::runtime_error("No such element found");
   return *it;
}

这里是a demonstration

#include <vector>
#include <algorithm>
#include <stdexcept>
#include <iostream>

template <typename Container>
typename Container::value_type findSmallestNonunique(Container c)
{
   std::sort(std::begin(c), std::end(c));
   auto it = std::adjacent_find(std::begin(c), std::end(c));

   if (it == std::end(c))
      throw std::runtime_error("No such element found");

   return *it;
}

int main(int argc, char** argv)
{
    std::vector<int> v;
    for (int i = 1; i < argc; i++)
        v.push_back(std::stoi(argv[i]));

    std::cout << findSmallestNonunique(v) << std::endl;
}

// g++ -std=c++14 -O2 -Wall -pedantic -pthread main.cpp \
// && ./a.out 1 2 2 3 4 5 5 6 7 \
// && ./a.out 5 2 8 3 9 3 0 1 4 \
// && ./a.out 5 8 9 2 0 1 3 4 7
// 
// 2
// 3
// terminate called after throwing an instance of 'std::runtime_error'
//   what():  No such element found

请注意,这里我没有执行就地搜索,但我可以通过引用获取容器来执行搜索。 (这取决于您是否被允许对原始输入进行排序。)

由于排序操作,这可能像"bad"一样O(N×log(N)),但它简单易维护,并且不需要任何 allocations/copies(整个数据集的单个副本除外,如上所述,您可以完全避免)。如果您的输入很大或者您希望在大多数情况下匹配失败,您可能想要使用其他东西。一如既往:个人资料!

您可以在(预期的)线性时间内完成此操作。

  • 使用unordered_map来计算元素。这在值的数量上是(预期的)线性的。

  • 使用简单循环在非唯一项中找到最小项。

这是一个可能的实现:

#include <unordered_map>
#include <iostream>
#include <vector>

using namespace std;

int main()
{
    const vector<double> elems{1, 3.2, 3.2, 2};
    unordered_map<double, size_t> c;
    for(const double &d: elems)
        ++c[d];
    bool has = false;
    double min_;
    for(const auto &e: c)
        if(e.second > 1)
        {
            min_ = has? min(e.first, min_): e.first;
            has = true;
        }
    cout << boolalpha << has << " " << min_ << endl;
    return 0;
}

Edit 正如 Howard Hinnant 和 Lightness Races In Orbit 指出的那样,这包含分配和哈希。因此它将是线性的,但具有相对较大的因数。其他基于排序的解决方案可能更适合小尺寸。 When/if 分析,使用好的分配器很重要,例如 Google's tcmalloc.

好吧,这是一个算法,它实际上 删除了 最小的非唯一项目(而不是仅仅打印它)。

template <typename Container>
void
removeSmallestNonunique(Container& c)
{
    using value_type = typename Container::value_type;
    if (c.size() > 1)
    {
        std::make_heap(c.begin(), c.end(), std::greater<value_type>{});
        std::pop_heap(c.begin(), c.end(), std::greater<value_type>{});
        for (auto e = std::prev(c.end()); e != c.begin(); --e)
        {
            std::pop_heap(c.begin(), e, std::greater<value_type>{});
            if (*e == e[-1])
            {
                c.erase(e);
                break;
            }
        }
    }
}

之所以选择这个算法,主要是因为Lightness Races in Orbit没有。我不知道这是否会比 sort/adjacent_find 快。答案几乎肯定取决于输入。

例如如果没有重复,那么这个算法肯定比sort/adjacent_find慢。如果输入非常非常大,并且最小唯一值很可能在排序范围的早期,这个算法很可能比 sort/adjacent_find 更快。

我上面所说的一切都只是猜测。在对实际问题的统计上可能的输入执行所需的测量之前,我要退出。

也许 Omid 可以将其包含在他的测试中并提供一个摘要答案。 :-)

7 小时后...计时

我使用了 Omid's 代码,更正了其中的一个小错误,更正了其他两个算法以实际删除一个元素,并更改了测试工具以更广泛地改变大小和重复项的数量。

这是我在 -O3 使用 clang / libc++ 测试的代码:

#include <unordered_map>
#include <iostream>
#include <vector>
#include <algorithm>
#include <random>
#include <chrono>
#include <cassert>

template <typename Container>
void
erase_using_hashTable(Container& vec)
{
    using T = typename Container::value_type;
    std::unordered_map<T, int> c;
    for (const auto& elem : vec){
        ++c[elem];
    }
    bool has = false;
    T min_;
    for (const auto& e : c)
    {
        if (e.second > 1)
        {
            min_ = has ? std::min(e.first, min_) : e.first;
            has = true;
        }
    }
    if (has)
        vec.erase(std::find(vec.begin(), vec.end(), min_));
}

template <typename Container>
void 
eraseSmallestNonunique(Container& c)
{
   std::sort(std::begin(c), std::end(c));
   auto it = std::adjacent_find(std::begin(c), std::end(c));

   if (it != std::end(c))
       c.erase(it);
}

template <typename Container>
void
removeSmallestNonunique(Container& c)
{
    using value_type = typename Container::value_type;
    if (c.size() > 1)
    {
        std::make_heap(c.begin(), c.end(), std::greater<value_type>{});
        std::pop_heap(c.begin(), c.end(), std::greater<value_type>{});
        for (auto e = std::prev(c.end()); e != c.begin(); --e)
        {
            std::pop_heap(c.begin(), e, std::greater<value_type>{});
            if (*e == e[-1])
            {
                c.erase(e);
                break;
            }
        }
    }
}

template<typename iterator>
iterator partition_and_find_smallest_duplicate(iterator begin, iterator end)
{
    using std::swap;
    if (begin == end)
        return end; // empty sequence

    // The range begin,end is split in four partitions:
    // 1. equal to the pivot
    // 2. smaller than the pivot
    // 3. unclassified
    // 4. greater than the pivot

    // pick pivot (TODO: randomize pivot?)
    iterator pivot = begin;
    iterator first = next(begin);
    iterator last = end;

    while (first != last) {
        if (*first > *pivot) {
            --last;
            swap(*first, *last);
        } else if (*first < *pivot) {
            ++first;
        } else {
            ++pivot;
            swap(*pivot, *first);
            ++first;
        }
    }

    // look for duplicates in the elements smaller than the pivot
    auto res = partition_and_find_smallest_duplicate(next(pivot), first);
    if (res != first)
        return res;

    // if we have more than just one equal to the pivot, it is the smallest duplicate
    if (pivot != begin)
        return pivot;

    // neither, look for duplicates in the elements greater than the pivot
    return partition_and_find_smallest_duplicate(last, end);
}

template<typename container>
void remove_smallest_duplicate(container& c)
{
    using std::swap;
    auto it = partition_and_find_smallest_duplicate(c.begin(), c.end());
    if (it != c.end())
    {
        swap(*it, c.back());
        c.pop_back();
    }
}

int  main()
{
    const int MaxArraySize = 5000000;
    const int minArraySize = 5;
    const int numberOfTests = 3;

    //std::ofstream file;
    //file.open("test.txt");
    std::mt19937 generator;

    for (int t = minArraySize; t <= MaxArraySize; t *= 10)
    {
        const int range = 3*t/2;
        std::uniform_int_distribution<int> distribution(0,range);

        std::cout << "Array size = " << t << "  range = " << range << '\n';

        std::chrono::duration<double> avg{},avg2{}, avg3{}, avg4{};
        for (int n = 0; n < numberOfTests; n++)
        {
            std::vector<int> save_vec;
            save_vec.reserve(t);
            for (int i = 0; i < t; i++){//por kardan array ba anasor random
                save_vec.push_back(distribution(generator));
            }
            //method1
            auto vec = save_vec;
            auto start = std::chrono::steady_clock::now();
            erase_using_hashTable(vec);
            auto end = std::chrono::steady_clock::now();
            avg += end - start;
            auto answer1 = vec;
            std::sort(answer1.begin(), answer1.end());

            //method2
            vec = save_vec;
            start = std::chrono::steady_clock::now();
            eraseSmallestNonunique(vec);
            end = std::chrono::steady_clock::now();
            avg2 += end - start;
            auto answer2 = vec;
            std::sort(answer2.begin(), answer2.end());
            assert(answer2 == answer1);

            //method3
            vec = save_vec;
            start = std::chrono::steady_clock::now();
            removeSmallestNonunique(vec);
            end = std::chrono::steady_clock::now();
            avg3 += end - start;
            auto answer3 = vec;
            std::sort(answer3.begin(), answer3.end());
            assert(answer3 == answer2);

            //method4
            vec = save_vec;
            start = std::chrono::steady_clock::now();
            remove_smallest_duplicate(vec);
            end = std::chrono::steady_clock::now();
            avg4 += end - start;
            auto answer4 = vec;
            std::sort(answer4.begin(), answer4.end());
            assert(answer4 == answer3);
        }
        //file << avg/numberOfTests <<" "<<avg2/numberOfTests<<'\n';
        //file << "__\n";
        std::cout <<   "Method1 : " << (avg  / numberOfTests).count() << 's'
                  << "\nMethod2 : " << (avg2 / numberOfTests).count() << 's'
                  << "\nMethod3 : " << (avg3 / numberOfTests).count() << 's'
                  << "\nMethod4 : " << (avg4 / numberOfTests).count() << 's'
                  << "\n\n";
    }

}

这是我的结果:

Array size = 5  range = 7
Method1 : 8.61967e-06s
Method2 : 1.49667e-07s
Method3 : 2.69e-07s
Method4 : 2.47667e-07s

Array size = 50  range = 75
Method1 : 2.0749e-05s
Method2 : 1.404e-06s
Method3 : 9.23e-07s
Method4 : 8.37e-07s

Array size = 500  range = 750
Method1 : 0.000163868s
Method2 : 1.6899e-05s
Method3 : 4.39767e-06s
Method4 : 3.78733e-06s

Array size = 5000  range = 7500
Method1 : 0.00124788s
Method2 : 0.000258637s
Method3 : 3.32683e-05s
Method4 : 4.70797e-05s

Array size = 50000  range = 75000
Method1 : 0.0131954s
Method2 : 0.00344415s
Method3 : 0.000346838s
Method4 : 0.000183092s

Array size = 500000  range = 750000
Method1 : 0.25375s
Method2 : 0.0400779s
Method3 : 0.00331022s
Method4 : 0.00343761s

Array size = 5000000  range = 7500000
Method1 : 3.82532s
Method2 : 0.466848s
Method3 : 0.0426554s
Method4 : 0.0278986s

更新

我已经用 更新了上面的结果。他的算法 相当 具有竞争力。乌尔里希干得好!

我应该警告这个答案的读者,Ulrich 的算法容易受到 "quick-sort O(N^2) problem" 的影响,其中对于特定的输入,算法可能会严重退化。通用算法是可修复的,Ulrich 显然知道此评论所证明的漏洞:

// pick pivot (TODO: randomize pivot?)

这是针对O(N^2)问题的一个防御,还有其他的,比如检测不合理recursion/iteration,中途切换到另一个算法(比如方法3或者方法2) .如所写,方法 4 在给定有序序列时受到严重影响,而在给定逆序序列时则是灾难性的。在我的平台上,对于这些情况,方法 3 也不是方法 2 的最佳选择,尽管不如方法 4 差。

为类似快速排序的算法找到解决 O(N^2) 问题的理想技术有点像魔法,但值得花时间。我肯定会将方法 4 视为工具箱中的一个有价值的工具。

首先,关于删除元素的任务,最难的是找到它,但实际上删除它很容易(与最后一个元素交换然后pop_back())。因此,我只会解决这个发现。另外,您提到对序列进行排序是可以接受的,但我认为不仅是排序,任何类型的重新排序都是可以接受的。

看看快速排序算法。它选择一个随机元素,然后将序列左右划分。如果您编写分区以便区分 "less" 和 "not less",您可以部分排序序列并在 运行.

上找到最小的重复项

执行以下步骤即可:

  • 首先,选择一个随机主元并对序列进行划分。同时可以检测pivot是否重复。请注意,如果您在此处找到重复项,您可以丢弃(!)任何更大的东西,因此您甚至不必为两个分区投资存储容量和带宽。
  • 然后,递归到较小元素的序列。
  • 如果较小的分区中有一组重复项,这些就是您的解决方案。
  • 如果第一个数据透视表有重复,这就是你的解决方案。
  • 否则,递归到较大的元素以查找重复项。

与常规排序相比的优势在于,如果您在较低的数字中发现重复项,则无需对整个序列进行排序。但是,在一系列唯一数字上,您将对它们进行完全排序。与建议的使用 hashmap 的元素计数相比,这确实具有更高的渐近复杂度。它是否表现更好取决于您的实现和输入数据。

请注意,这要求元素可以排序和比较。你提到你使用 double 值,当你在那里有 NaN 时,这些值是众所周知的不好排序。我可以想象标准容器中的散列算法虽然可以与 NaN 一起使用,所以使用散列映射进行计数还有一点。

以下代码实现了上述算法。它使用一个递归函数来划分输入并查找重复项,从第二个函数调用,然后最终删除重复项:

#include <vector>
#include <algorithm>
#include <iostream>

template<typename iterator>
iterator partition_and_find_smallest_duplicate(iterator begin, iterator end)
{
    using std::swap;

    std::cout << "find_duplicate(";
    for (iterator it=begin; it!=end; ++it)
        std::cout << *it << ", ";
    std::cout << ")\n";
    if (begin == end)
        return end; // empty sequence

    // The range begin,end is split in four partitions:
    // 1. equal to the pivot
    // 2. smaller than the pivot
    // 3. unclassified
    // 4. greater than the pivot

    // pick pivot (TODO: randomize pivot?)
    iterator pivot = begin;
    std::cout << "picking pivot: " << *pivot << '\n';

    iterator first = next(begin);
    iterator last = end;

    while (first != last) {
        if (*first > *pivot) {
            --last;
            swap(*first, *last);
        } else if (*first < *pivot) {
            ++first;
        } else {
            ++pivot;
            swap(*pivot, *first);
            ++first;
            std::cout << "found duplicate of pivot\n";
        }
    }

    // look for duplicates in the elements smaller than the pivot
    auto res = partition_and_find_smallest_duplicate(next(pivot), first);
    if (res != first)
        return res;

    // if we have more than just one equal to the pivot, it is the smallest duplicate
    if (pivot != begin)
        return pivot;

    // neither, look for duplicates in the elements greater than the pivot
    return partition_and_find_smallest_duplicate(last, end);
}

template<typename container>
void remove_smallest_duplicate(container& c)
{
    using std::swap;
    auto it = partition_and_find_smallest_duplicate(c.begin(), c.end());
    if (it != c.end())
    {
        std::cout << "removing duplicate: " << *it << std::endl;

        // swap with the last last element before popping
        // to avoid copying the elements in between
        swap(*it, c.back());
        c.pop_back();
    }
}

int main()
{
    std::vector<int> data = {66, 3, 11, 7, 75, 62, 62, 52, 9, 24, 58, 72, 37, 2, 9, 28, 15, 58, 3, 60, 2, 14};

    remove_smallest_duplicate(data);
}

(我添加了一个额外的答案,因为 1)第一个答案的重点是使用现成的 STL 组件,以及 2)Howard Hinnant 从那以后提出了一些有趣的观点。)

感谢 Howard Hinnant 提供了对不同方法进行基准测试的原则(以及一个非常独特的解决方案)!它导致了一些我个人觉得有趣的东西(但并不完全理解)。

但是,恕我直言,基准测试的执行有些问题。

问题说明问题是

... objects with a double member which is used in this case ... It is important to do it efficiently since the code is used in the bottleneck of the program

然而测试:

  • int进行了操作,这是sorting-based机制的优势;虽然 double 比较和散列都比 ints' 更昂贵,但是比较的次数是 Theta(n log(n)),而哈希是 O(n).

  • 获取了我的 main 函数的主体,并将其包装在一个函数中(而不是 class 对象),并且没有使用池分配器。坦率地说,我认为这是一个导致结果毫无意义的缺陷,因为它基本上确立了一个众所周知的事实,即动态分配 + 不必要的 re-initializations 大型容器是昂贵的。

  • 依赖于这样一个事实,即排序算法只能 return 他们操作的 vector (对于原始问题无法做到)。在下文中,我忽略了这一点,因为 vector of doubles 的问题本身很有趣,但 OP 应该注意这可能会改变更多事情。


所以,为了处理第二个问题,我最初使用了自己gcc libstdc++ pb_ds extension中的probing-based hash-table。这本身就减少了解决方案 #1 的 运行 宁时间低于解决方案 #2 (sort + adjacent_find),但它仍然比 #3 (make_heap).

为了进一步减少这种情况,我使用了似乎相关的最退化的 "hash table" 形式。

template<typename T, class Hash=std::hash<T>>
class smallest_dup_remover
{
public:
    explicit smallest_dup_remover(std::size_t max_size) 
    {
        while(m_mask < max_size)
            m_mask *= 2;

        m_status.resize(m_mask);
        m_vals.resize(m_mask);

        --m_mask;
    }

    void operator()(std::vector<T> &vals)
    {
        std::fill(std::begin(m_status), std::end(m_status), 0);
        bool has = false;
        T min_;
        std::vector<T> spillover;
        spillover.reserve(vals.size());
        for(auto v: vals)
        {
            const std::size_t pos = m_hash(v) & m_mask;
            char &status = m_status[pos];
            switch(status)
            {
            case 0:
                status = 1;
                m_vals[pos] = v;
                break;
            case 1:
                if(m_vals[pos] == v)
                {
                    status = 2;
                    min_ = has? std::min(min_, v): v;
                    has = true;
                }
                else
                    spillover.push_back(v);
                break;
            case 2:
               if(m_vals[pos] != v)
                    spillover.push_back(v);
            }
        }
        std::sort(std::begin(spillover), std::end(spillover));
        auto it = std::adjacent_find(std::begin(spillover), std::end(spillover));
        if(has && it == std::end(spillover))
            remove_min(vals, min_);
        else if(has && it != std::end(spillover))
            remove_min(vals, std::min(min_, *it));
        else if(!has && it != std::end(spillover))
            remove_min(vals, *it);
    }

private:
    void remove_min(std::vector<T> &vals, T t)
    {
        vals.erase(std::find(vals.begin(), vals.end(), t)); 
    }

private:
    size_t m_mask = 1;
    std::vector<char> m_status;
    std::vector<T> m_vals;
    Hash m_hash;
};

数据结构包含三个vector

  • a "status" vector,包含 0、1 和 "many"

  • 的代码
  • 一个"values"vector,包含"hashed values"

  • 一个"spillover"向量,用于碰撞

具有 "many" 状态的对象会即时比较最小值。碰撞对象(即与其他对象发生碰撞的对象)被推送到 "spillover" vector。使用#2 中的方法检查溢出 vector 的最低重复项。这是与从 "many" 值中找到的最低值进行比较。


Here is the code for a benchmark that retests this benchmark, and here 是生成以下图表的代码。

(提醒#1是hash-based,#2是quicksort-based,#3是heap-based。)

从 Howard Hinnant 之前执行的测试开始(值是从值长度的 1.5 大小的范围内随机生成的),结果如下:

所以他出色的 heap-based 算法确实在这种情况下表现最好,但它看起来与以前有很大不同。特别是,hash-based 算法在分析其内存分配时并没有那么糟糕。

但是,假设我们将范围更改为完全随机的范围。然后结果变成这样:

在这种情况下,hash-based 解决方案效果最好,sort-based 其次,heap-based 解决方案效果最差。

为了验证原因,这里再做两个测试

这里是一个完全随机值+两个零值(即最低重复为零)的测试:

最后,这里是所有值都是从 100 个可能值(不考虑长度)生成的情况:

事情的经过如下。 heap-based 解决方案是三个解决方案中最依赖分布的。 MakeHeap 算法是线性时间的,如果之后几乎立即遇到重复项,则证明它是线性算法(但没有任何哈希)。相反,取另一个极端,根本没有重复项。本质上,这个算法就变成了heapsort。堆排序相对于快速排序的劣势在理论上得到了理解,在实践中也得到了很多验证。

所以,heap-based算法实际上是一个令人惊讶的好算法。虽然它确实有很大的差异,但在实践中可能需要考虑避免它。


一些观察:

  • 图表似乎没有意义:n log(n) 行为在哪里,至少对于解决方案 #2?

  • 为什么 Hinnant 测试与随机 + 较低重复测试的工作方式如此相似?使用 1.5 X 范围,并且考虑到这与 Bootstrap Resampling 非常相似这一事实,以及已知的 ~37% 重复结果,我只是看不到它。

  • 正如 Howard Hinnant 指出的那样,它确实结束于分配。但是,这个情况和之前的benchmark相去甚远。

  • 一些实用的要点:

    • OP,您可能想 re-time 这对于您的原始问题,使用真实分布和 twice-copying 您的原始结构向量的开销 to+from 排序解决方案。

    • 我想了很多如何并行化这个问题,没有什么有趣的。这样做的一种方法(可能提出的问题多于答案)是 运行 Howard Hinnant 在一个线程上的解决方案,另一个线程上的不同解决方案,并使用找到的第一个结果。考虑到它对于某些发行版来说要快得多,而对于其他发行版要慢得多,它可能会覆盖你的基础。