C#插入和删除下半部分的最有效数据结构

C# Most efficient data structure to insert and to remove lower half

假设我有一个很大的整数排序列表(>1000 项)。我需要能够在此列表上执行两个操作:删除下半部分并通过插入随机整数再次将列表填充到其原始大小。因为我做了大约一百万次这些操作,所以我需要它尽可能高效。

我做的第一件事就是使用 List,我通过在正确的位置添加新项目来保持排序。虽然删除排序列表的下半部分非常容易,但插入需要相当多的时间。

我尝试实现一个跳过列表,但经过一些测试后,似乎列表的大小必须至少为 10 000 才真正重要,否则它的性能比我的普通列表还要差。

这就是我决定使用 AVL 树的原因,这样我就可以更快地插入项目。但问题是我不知道有什么有效的方法可以删除这种二叉搜索树的下半部分。

我的问题是:有没有有效的方法来做到这一点?有没有我可以更轻松地使用的另一种数据结构?

编辑

根据要求,我做了一个小测试,显示列表、跳跃列表和 AVL 树之间的性能差异。我使用 msdn 上的本教程制作了跳过列表:Skip list tutorial. The AVL tree comes from here: AVL tree. I uploaded the test on Pastebin: Program.

在测试中,我在计时时向每个数据结构添加了 100 000 个项目。在我的电脑上,列表大约需要 1 秒,跳过列表需要 0.5 秒,AVL 树需要 0.045 秒。如果我像我想的那样这样做一百万次,列表将需要大约 11.5 天,但 AVL 树只需要大约半天。这个时间差很清楚我为什么要它高效

为什么假设您需要不同的数据结构?这么说:

The first thing I did was just using a List that I kept sorted by adding new items at the right place. Although removing the lower half of a sorted list is very easy, inserting takes quite a bit of time

我很担心,因为您可能使用了正确的[1] 数据结构,但算法很差。我可以强烈建议您看一下 http://sscce.org/ 并将其包含在您的问题中吗?

但是列表插入很慢 O(n)!

不要插入!

正如@usr 所解释的,更好的算法可能是这样的:

  1. Remove the lower half.
  2. Add random integers to restore the original size
  3. Sort the list

无需更改数据结构,但对解决问题的方式有很大的改变。

这尤其重要,因为正如@atlaste 重申的那样,无论 O(?) 是多少,并非每个系统都是平等的:

Things just aren't that easy anymore with modern processors. I've seen cases where different implementations of the same algorithms give you a factor 100+ in difference due to branch prediction, vectorizing and cache locality. O(..) calculations are great if you're comparing apples with apples - but unfortunately that might not be the case. I wish everyone would just watch this video: youtube.com/watch?v=GPpD4BBtA1Y

但我在 O(n) 数据结构上仍然是 O(log n)!

好的,在我们结束这里并继续实际概述您使用的算法和衡量性能之前(目前似乎要求太多)让我问你一个问题:

假设我们有一个 "big sorted list of integers (>1000 items)"。事实上,假设此列表有 10,000 项长!

就 O(?) 而言,哪一个具有更好的插入性能?

A) 列表

B) 链表

C) 二叉树

准备好后,请查看答案:

他们都有 O(1)! O(n) 只告诉你事物 scale 有多好(相对于它们本身,而且只是广义上的)。由于列表的大小固定为“10,000 个项目”,因此没有缩放比例(所有内容都被视为 'constant factor')。请注意,我并不是说这些结构具有同样的性能……只是 O(?) 在其描述中有限制。欲了解更多信息 What is a plain English explanation of "Big O" notation?

基准

这是插入排序的基准,与添加所有新随机项后的排序对比:http://pastebin.com/pNgx73cs

结果(默认设置)

Testing performance of filling a list to 10000 items 1000 times, discarding 1/2
of items after every fill!

Old list: 3248ms
New list: 547ms
DONE

请注意,即使我们在 O(?) 方面有更有效的方法,结果相差并不大,因为在这个大小下 CPU 的速度出奇地快!

备注:

  1. OP 具有相对较小[2] 的整数集合,应该很容易放入 CPU 缓存 [3] 甚至 L1 缓存中。在这些情况下,小的、连续的和可预测的内存使用(例如数组和列表(在基于数组的 C# 中))可能非常有效。
  2. 假设 10,000 作为上限,对于 32 位和 64 位系统,这将分别仅为 ~40KB 或 ~80KB。
  3. Intel Skylake 每个内核有 64KiB 的一级缓存和 256KiB 的二级缓存:https://en.wikipedia.org/wiki/Skylake_(microarchitecture)

我假设您想始终保持列表排序。用随机整数替换下半部分的最佳方法是:

  1. 去掉下半部分。
  2. 添加随机整数以恢复原始大小
  3. 对列表进行排序

之前,您插入的位置正确。这有效地实现了极其缓慢的选择排序。相反,让内置的高效排序算法来完成繁重的工作。

这应该是 O(n * log n)。以前是 O(n^2).

您可以通过不删除前半部分的常数因子对其进行优化。相反,用随机数替换它,然后排序。

关于这个问题,我想指出几点。首先,让我们了解一些关于性能和 C# 的一般情况,因为在仍然存在误解的情况下很难解释清楚。

接下来,我将对此处的特定问题应用所有内容。

C# 中的一般性能

大 O 表示法

在大学里,您会了解到 O(n) 如何始终优于 O(n^2) 以及 O(n) 如何始终优于 O(n log n)。但是,这样做的基本假设是每个操作将花费大致相同的时间。

现在,当我在 1986 年第一次开始在 1802 RISC 处理器上编程时,情况确实如此:内存操作是 1 个时钟周期,加法、减法等也是如此。换句话说,Big -O 在那里工作得很好。

在现代计算机中,难度更大:

  1. 数据缓存在不同级别(速度范围从 15 GB/s 到 1000 GB/s);
  2. 操作在 0.5 个时钟周期和几十个时钟周期之间变化;
  3. 数据通常以突发方式获取 - 因此随机访问比顺序访问差得多;
  4. 矢量化一次最多可以处理 8 个整数对齐数据;
  5. 分支预测失误会使一切失去平衡,因为你必须冲洗地段。

我观察到同一算法的不同实现的性能差异可能高达 1000 (!)

虽然 Big-O 仍然有优点,但您应该正确看待事情。例如,假设您有 N=10000,那么 2log N ~ 13——如果这意味着您可以从所有这些事情中获益,那么它也可能意味着 'stupid' O(n log n) 算法可能只是优于平均 O(n) 算法。

据此您还应该推断出 O(n^2) 永远不会优于 O(n) 算法。所以,Big-O 仍然有它的用处;你只需要正确看待事情。

C#的一些特点

关于 C# 的一个神话是它大约与 C++ 一样快(这是我的“尽可能快”的黄金标准)。在熟练的开发人员手中,事情并非如此简单。对于简单的排序,C++ 大约快 2 倍——但是如果你有更复杂的场景,你可以真正从“低级东西”中受益,那么差异就会变得非常大。我通常估计性能差异是 10 倍。但是,编写适当的高性能 C++ 代码具有挑战性(使用轻描淡写),因此您可能希望坚持使用 C# 并决定将性能下降视为理所当然。

一件有趣的事是 C# 编译器和 JIT 编译东西的速度非常快。部分原因是它们按函数编译所有内容(因此,没有内联等)。此外,C# 通常不会向量化内容。不要相信我的话,使用 ctrl-alt-d 在 Visual studio 并自己检查汇编器输出。

如果我们查看上面的列表,我们可以粗略地说明 (1)、(2) 和 (3) 不受我们使用 C# 的影响; (4) 肯定会受到影响,而 (5) 取决于。

至于 (5),考虑这个简单的例子:

void set(int[] array, int index) 
{
    array[index] = 0;
}

请记住,在 C# 中,方法是按方法编译的。这意味着编译器不能假定 index 不会越界。换句话说:它必须添加两项检查 - 其中一项必须加载内存:

if (index < 0 || index >= array.Length) 
{ 
    throw new IndexOutOfRangeException(); 
}

排序项目

OP 提出的问题是关于维护大小为 m 的排序列表。排序是一项众所周知的操作,您插入的每个项目最多花费 O(log m)。由于您正在处理 n 'random' 项,您将获得 O(n log m).

的最佳速度

二叉堆(基于数组)可能会让你得到那个性能数字,但我现在不想写下堆,并且认为这个替代方案的速度大致相同(如果不是更快的话): )

你的问题

现在我们已经确定了我们正在谈论的内容,让我们进入手头的事实。我将逐一解释这一点。

首先,在处理性能问题时,我养成了删除 using System.Linq 的习惯,因此我们知道我们只是在处理具有预期特征的本机数据结构。

让我们从树结构开始

另一个简单的解决方案是使用红黑树。我们在 .NET 中有一个可供我们使用的工具,称为 SortedSet。它使用引用、算术等——这基本上是我在 (1)、(2) 和 (3) 中警告过的所有讨厌的东西。现在,这里的实现有错误(对于重复项),但速度与您预期的差不多:

static void Main(string[] args)
{
    Random rnd = new Random(12839);

    SortedSet<int> list = new SortedSet<int>();

    for (int i = 0; i < 5000; ++i)
    {
        list.Add(rnd.Next());
    }

    Stopwatch sw = Stopwatch.StartNew();
    for (int i = 0; i < 10000; ++i)
    {
        for (int j = 0; j < 5000; ++j)
        {
            list.Add(rnd.Next());
        }
        int n = 0;
        list.RemoveWhere((a) => n++ < 5000);
    }

    Console.WriteLine(sw.ElapsedMilliseconds);

    Console.ReadLine();
}

与此处几乎所有算法一样,该算法在 O(n log m) 中执行。

我对 AVL 树的大致期望:86220 ms.

天真的实现

通常我不会为红黑树烦恼。尽管如此,由于您在 AVL 树上做了很多工作,我觉得有必要进行这种测量。

当我对算法进行性能优化时,我总是从最容易实现的算法开始,该算法具有大致正确的 Big-O,并且总是更喜欢具有最简单数据结构的算法(阅读:数组).在这种情况下,它是一个与标准排序相结合的列表,它将为每个排序提供 O(m log m),执行 m/n 次,以及 O(n) 数据操作。结果是 O(n + n log m).

那么,为什么要选择您可能会问的最简单的实施方式呢?答案很简单:简单的实现也易于编译和优化,因为它们通常没有很多分支,不使用大量随机内存访问等。

最天真的实现(我已经在我的评论中建议过)是简单地将内容放入数组中,对其进行排序,然后删除它的下半部分。

基本上这可以在最小测试用例中这样实现:

static void Main(string[] args)
{
    Random rnd = new Random(12839);
    List<int> list = new List<int>();

    for (int i = 0; i < 5000; ++i)
    {
        list.Add(rnd.Next());
    }

    Stopwatch sw = Stopwatch.StartNew();
    for (int i = 0; i < 10000; ++i)
    {
        for (int j = 0; j < 5000; ++j)
        {
            list.Add(rnd.Next());
        }

        list.Sort((a, b) => a.CompareTo(b)); // #1
        list.RemoveRange(0, 5000);
    }

    Console.WriteLine(sw.ElapsedMilliseconds);
    Console.ReadLine();
}

基准性能:10047 毫秒

优化1:移除方法调用和分支

方法调用需要时间。分支机构也是如此。所以,如果我们不需要分支,我们不妨直接消除它。换句话说:这是关于 (5).

想到的一件事是将#1 替换为:

list.Sort((a, b) => a - b);

在大多数(!)场景中,这给出了期望的结果,我直截了当地假设这种场景也不例外。

测量值:8768 毫秒。(是的,这是 15% 的变化!)

为了好玩,我们还对 (2) 做了一个简单的测试:

list.Sort((a, b) => (int)((float)a - (float)b));

运算符的大小完全相同(32 位),数据完全相同并且会给出相同的结果——我们只是将所有内容与不同的 CPU 运算进行比较并添加一些强制转换.测量:10902 毫秒。如果每个操作都只是一个时钟滴答,这比您预期的要多。

优化2:数组还是列表?

我也可以关心列表本身;我们对它进行了多次调用,所以我们可以用它代替数组。如果我们反转排序顺序,我们甚至可以消除 RemoveRange。那么,为什么我不专注于此呢?好吧,实际上我可以,但我可以告诉你这不会有太大的不同,因为相对而言,它并不经常被调用。仍然,测试没有坏处,对吧?:

static void Main(string[] args)
{
    Random rnd = new Random(12839);

    int[] list = new int[10000];

    for (int i = 0; i < 5000; ++i)
    {
        list[i] = rnd.Next();
    }

    Stopwatch sw = Stopwatch.StartNew();
    for (int i = 0; i < 10000; ++i)
    {
        for (int j = 0; j < 5000; ++j)
        {
            list[j + 5000] = rnd.Next();
        }
        Array.Sort(list, (a, b) => b - a);
    }

    Console.WriteLine(sw.ElapsedMilliseconds);

    Console.ReadLine();
}

现在,这里有两个测量值:

  • 将列表更改为数组只是将其更改为 +/- 8700 毫秒 - 这没有太大区别
  • 颠倒顺序将结果更改为 7456 毫秒。

这并没有真正产生影响的原因是 List 的底层数据结构是一个数组,所以如果我们要排序,我们只是在做同样的事情。这就是我们的时间。

这里要记住的不是数组和 List 一样快。事实是:我发现如果他们是,他们实际上在很多情况下更快。然而,在这种情况下,我们不是在讨论内部循环中的优化,我们没有过度分配太多内存(可能所有内容都保存在缓存中)并且所有内存访问都是对齐的。总而言之,我们因此可以预期差异非常小。

优化3:移除更多方法调用

现在,你应该注意到这里还有一个替代方案:方法调用需要时间,这里调用最多的是比较器。那么让我们回到 List 的解决方案并删除比较操作。当我们这样做时,我们仍然需要复制。你期待什么?

将行更改为:

list.Sort();

...我们有一个新的时间:4123 毫秒。

现在,为了完全公平,实际上我们在这里所做的是将我们的内联委托更改为 Comparer<int>.Default,这是整数比较器的默认实现。委托将被包装在一个比较器中,创建 2 个虚拟调用——这只是 1 个调用。这意味着我们也可以通过实现我们自己的比较器 class 来颠倒顺序,这将是一个更快的解决方案。

优化4:合并连接

如果我们只需要对一半数据进行排序,为什么还要对所有内容进行排序?这没有道理,对吧?

同样,我选择最简单的算法来完成任务。我们按顺序遍历列表,并按顺序存储新项目 c.f。 (1) 和 (3)。没有交换,请记住我们更喜欢顺序数据访问模式。然后,我们简单地删除所有我们不再需要的东西。

我们需要的算法是合并连接,其工作方式如下:

Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < 10000; ++i)
{
    for (int j = 0; j < 5000; ++j)
    {
        list.Add(rnd.Next());
    }

    // Sort the second half:
    list.Sort(5000, 5000, Comparer<int>.Default);
            
    // Both the lower and upper half are sorted. Merge-join:
    int lhs = 0;
    int rhs = 5000;
    while (list.Count < 15000)
    {
        int l = list[lhs];
        int r = list[rhs];
        if (l < r)
        {
            list.Add(l);
            ++lhs;
        }
        else if (l > r)
        {
            list.Add(r);
            ++rhs;
        }
        else
        {
            while (list.Count < 15000 && list[lhs] == l)
            {
                list.Add(l);
                ++lhs;
            }
            while (list.Count < 15000 && list[rhs] == r)
            {
                list.Add(r);
                ++rhs;
            }
        }
    }

    list.RemoveRange(0, 10000);
}

我们有一个新的测量值,它是 3563 毫秒。

优化 5:RemoveRange #2

请记住,处理突发数据非常快。最后一段代码是展示这一点的绝佳机会。我们这里使用了一个RemoveRange,它以burst方式处理数据。我们还可以使用两个缓冲区并交换它们。基本上我们在合并连接期间写了第二个 list2 并将 RemoveRange 替换为:

list.Clear();

var tmp = list;
list = list2;
list2 = tmp;

我们现在有一个新的计时:3542 毫秒。一模一样!

从最后两个你应该得出结论,执行突发操作花费的时间很少,你通常甚至不应该费心。

结论

我从一棵树开始,该树在 86220 毫秒内执行了所有操作,并以一个耗时 3542 毫秒的算法结束。直言不讳,这意味着最后一个实现的执行时间是第一次尝试的 4%。

现在,我确实在这个冗长的回答中使用了不同的算法 - 但重点是向您展示如何进行所有这些优化以及优化的真正效果。

@atlaste 的post 很有参考意义,不过不管怎样,能不能做得更快?我稍微改变了实现方式,从 3750 毫秒增加到 3350 毫秒,这是我的起点。如果你看一下这个算法,随着时间的推移,你会用随机数填充数组的一半,但很可能你会用到很少的随机数。您可以立即丢弃所有大于上半部分最大数字的数字,而不必费心对它们进行排序。这将是大部分新数据,因此加速将是巨大的(对于随机输入)。应用这个想法我达到了 640ms。鉴于 470ms 生成随机数这一事实,在处理算法中大约是 17 倍的加速。但它可能会有所不同,具体取决于数据的特征。

和代码

public static List<int> orig()
{
    Random rnd = new Random(12839);

    List<int> list = new List<int>(10000);

    for (int i = 0; i < 5000; ++i)
    {
        list.Add(rnd.Next());
    }

    for (int i = 0; i < 10000; ++i)
    {
        for (int j = 0; j < 5000; ++j)
        {
            list.Add(rnd.Next());
        }

        // Sort the second half:
        list.Sort(5000, 5000, Comparer<int>.Default);

        // Both the lower and upper half are sorted. Merge-join:
        int lhs = 0;
        int rhs = 5000;
        while (list.Count < 15000)
        {
            int l = list[lhs];
            int r = list[rhs];
            if (l < r)
            {
                list.Add(l);
                ++lhs;
            }
            else if (l > r)
            {
                list.Add(r);
                ++rhs;
            }
            else
            {
                while (list.Count < 15000 && list[lhs] == l)
                {
                    list.Add(l);
                    ++lhs;
                }
                while (list.Count < 15000 && list[rhs] == r)
                {
                    list.Add(r);
                    ++rhs;
                }
            }
        }

        list.RemoveRange(0, 10000);
    }

    return list;
}


public static int[] altered()
{
    Random rnd = new Random(12839);

    int HALFSIZE = 5000;
    int SIZE = 2 * HALFSIZE;
    int TESTLOOPS = 10000;

    int[] list = new int[SIZE];
    int[] list2 = new int[SIZE];

    for (int i = 0; i < HALFSIZE; ++i)
    {
        list[i] = rnd.Next();
    }

    for (int i = 0; i < TESTLOOPS; ++i)
    {
        for (int j = HALFSIZE; j < list.Length; ++j)
        {
            list[j] = rnd.Next();
        }

        // Sort the second half:
        Array.Sort(list, HALFSIZE, HALFSIZE, Comparer<int>.Default);

        // Both the lower and upper half are sorted. Merge-join:
        int lhs = 0;
        int rhs = HALFSIZE;
        int i2 = 0;
        while (i2 < HALFSIZE)
        {
            int l = list[lhs];
            int r = list[rhs];
            if (l <= r)
            {
                list2[i2++] = l;
                ++lhs;
            }
            if (l >= r)
            {
                list2[i2++] = r;
                ++rhs;
            }
        }

        var tmp = list;
        list = list2;
        list2 = tmp;
    }

    return list;
}

public static int[] altered2()
{
    Random rnd = new Random(12839);

    int HALFSIZE = 5000;
    int SIZE = 2 * HALFSIZE;
    int TESTLOOPS = 10000;

    int[] list = new int[SIZE];
    int[] list2 = new int[SIZE];

    for (int i = 0; i < HALFSIZE; ++i)
    {
        list[i] = rnd.Next();
    }

    for (int i = 0; i < TESTLOOPS; ++i)
    {
        for (int j = HALFSIZE; j < list.Length; ++j)
        {
            list[j] = rnd.Next();
        }

        // quicksort one level to skip values >= maxValue
        int maxValue = list[HALFSIZE - 1];
        int ll = HALFSIZE;
        int rr = SIZE - 1;
        do
        {
            while (ll <= rr && list[ll] < maxValue) { ++ll; }
            while (ll < rr && list[rr] >= maxValue) { --rr; }
            if (ll < rr)
            {
                int swap = list[ll];
                list[ll] = list[rr];
                list[rr] = swap;
                ++ll;
                --rr;
            }
        }
        while (ll < rr);

        // Sort the second half:
        Array.Sort(list, HALFSIZE, ll - HALFSIZE, Comparer<int>.Default);

        // Both the lower and upper half are sorted. Merge-join:
        int lhs = 0;
        int rhs = HALFSIZE;
        int i2 = 0;
        while (i2 < HALFSIZE)
        {
            int l = list[lhs];
            int r = list[rhs];
            if (l <= r)
            {
                list2[i2++] = l;
                ++lhs;
            }
            if (l >= r)
            {
                list2[i2++] = r;
                ++rhs;
            }
        }

        var tmp = list;
        list = list2;
        list2 = tmp;
    }

    return list;
}

public static int[] random()
{
    Random rnd = new Random(12839);

    int HALFSIZE = 5000;
    int SIZE = 2 * HALFSIZE;
    int TESTLOOPS = 10000;

    int[] list = new int[SIZE];
    for (int i = 0; i < HALFSIZE; ++i)
    {
        list[i] = rnd.Next();
    }

    for (int i = 0; i < TESTLOOPS; ++i)
    {
        for (int j = HALFSIZE; j < list.Length; ++j)
        {
            list[j] = rnd.Next();
        }              
    }

    return list;
}

static void Main(string[] args)
{
    int HALFSIZE = 5000;
    var origTest = orig();                       
    Stopwatch sw = Stopwatch.StartNew();
    orig();
    sw.Stop();
    Console.WriteLine("Orig time: " + sw.ElapsedMilliseconds);

    var alteredTest = altered();
    sw = Stopwatch.StartNew();
    altered();
    sw.Stop();
    Console.WriteLine("Altered time: " + sw.ElapsedMilliseconds);
    Console.WriteLine("Test: " + (origTest.Take(HALFSIZE).SequenceEqual(alteredTest.Take(HALFSIZE)) ? "OK" : "BAD"));

    var altered2Test = altered2();
    sw = Stopwatch.StartNew();
    altered2();
    sw.Stop();
    Console.WriteLine("Altered2 time: " + sw.ElapsedMilliseconds);
    Console.WriteLine("Test: " + (origTest.Take(HALFSIZE).SequenceEqual(altered2Test.Take(HALFSIZE)) ? "OK" : "BAD"));

    sw = Stopwatch.StartNew();
    random();
    sw.Stop();
    Console.WriteLine("Just random time: " + sw.ElapsedMilliseconds);
    
    Console.ReadKey();
}