使用整数索引进行数组 Table 查找的最快方法是什么?

What is the fastest way to do Array Table Lookup with an Integer Index?

我有一个移动大量数据的视频处理应用程序。

为了加快速度,我查了一下table,因为很多计算本质上只需要计算一次就可以重复使用。

但是我现在所有查找都占用了 30% 的处理时间。我想知道它是否可能是 RAM 慢..但是,我仍然想尝试进一步优化它。

目前我有以下内容:

public readonly int[] largeArray = new int[3000*2000];
public readonly int[] lookUp = new int[width*height];

然后我使用指针 p(相当于 width * y + x)执行查找以获取结果。

int[] newResults = new int[width*height];
int p = 0;
for (int y = 0; y < height; y++) {
   for (int x = 0; x < width; x++, p++) {
      newResults[p] = largeArray[lookUp[p]];
   }
}

请注意,我无法复制整个数组来进行优化。此外,该应用程序是高度多线程的。

在缩短函数堆栈方面取得了一些进展,因此没有 getter,而是从只读数组中直接检索。

我也尝试过转换为 ushort,但它似乎更慢(据我了解这是由于字长)。

IntPtr 会更快吗?我该怎么做?

下面附上时间分布截图:

RAM 已经是最快的东西之一。唯一更快的内存是 CPU 缓存。所以它将是 Memory Bound,但这仍然很快。

当然,在给定的大小下,此数组的大小为 6 百万 个条目。这可能不适合任何缓存。并且需要永远迭代。不管速度如何,这简直是数据太多了。

一般来说,现在的视频处理都是在 GPU 上完成的。 GPU 字面上 设计用于在巨型阵列上运行。因为这就是您现在看到的图像 - 一个巨大的阵列。

如果您必须将其保留在 GPU 端,也许缓存或延迟初始化会有所帮助?很可能您并不真正需要每个值。你只需要 common 值。举一个掷骰子的例子:如果你掷 2 个 6 面骰子,则 2-12 的每个结果都是可能的。但是结果 7 发生在 36 个案例中的 6 个。 36例中的2和12各只有1例。因此,存储 7 比存储 2 和 12 更有益。

看起来你在这里做的实际上是 "gather"。现代 CPU 对此有专门的指令,特别是 VPGATHER** 。这是在 .NET Core 3 中公开的,应该 像下面这样工作,这是单循环场景(你可以从这里开始工作以获得双循环版本);

第一个结果:

AVX enabled: False; slow loop from 0
e7ad04457529f201558c8a53f639fed30d3a880f75e613afe203e80a7317d0cb
for 524288 loops: 1524ms

AVX enabled: True; slow loop from 1024
e7ad04457529f201558c8a53f639fed30d3a880f75e613afe203e80a7317d0cb
for 524288 loops: 667ms

代码:

using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Runtime.Intrinsics;
using System.Runtime.Intrinsics.X86;

static class P
{
    static int Gather(int[] source, int[] index, int[] results, bool avx)
    {   // normally you wouldn't have avx as a parameter; that is just so
        // I can turn it off and on for the test; likewise the "int" return
        // here is so I can monitor (in the test) how much we did in the "old"
        // loop, vs AVX2; in real code this would be void return

        int y = 0;
        if (Avx2.IsSupported && avx)
        {
            var iv = MemoryMarshal.Cast<int, Vector256<int>>(index);
            var rv = MemoryMarshal.Cast<int, Vector256<int>>(results);

            unsafe
            {
                fixed (int* sPtr = source)
                {
                    // note: here I'm assuming we are trying to fill "results" in
                    // a single outer loop; for a double-loop, you'll probably need
                    // to slice the spans
                    for (int i = 0; i < rv.Length; i++)
                    {
                        rv[i] = Avx2.GatherVector256(sPtr, iv[i], 4);
                    }
                }
            }
            // move past everything we've processed via SIMD
            y += rv.Length * Vector256<int>.Count;
        }
        // now do anything left, which includes anything not aligned to 256 bits,
        // plus the "no AVX2" scenario
        int result = y;
        int end = results.Length; // hoist, since this is not the JIT recognized pattern
        for (; y < end; y++)
        {
            results[y] = source[index[y]];
        }
        return result;
    }

    static void Main()
    {
        // invent some random data
        var rand = new Random(12345);
        int size = 1024 * 512;
        int[] data = new int[size];
        for (int i = 0; i < data.Length; i++)
            data[i] = rand.Next(255);

        // build a fake index
        int[] index = new int[1024];
        for (int i = 0; i < index.Length; i++)
            index[i] = rand.Next(size);

        int[] results = new int[1024];

        void GatherLocal(bool avx)
        {
            // prove that we're getting the same data
            Array.Clear(results, 0, results.Length);
            int from = Gather(data, index, results, avx);
            Console.WriteLine($"AVX enabled: {avx}; slow loop from {from}");
            for (int i = 0; i < 32; i++)
            {
                Console.Write(results[i].ToString("x2"));
            }
            Console.WriteLine();

            const int TimeLoop = 1024 * 512;
            var watch = Stopwatch.StartNew();
            for (int i = 0; i < TimeLoop; i++)
                Gather(data, index, results, avx);
            watch.Stop();
            Console.WriteLine($"for {TimeLoop} loops: {watch.ElapsedMilliseconds}ms");
            Console.WriteLine();
        }
        GatherLocal(false);
        if (Avx2.IsSupported) GatherLocal(true);
    }
}