大型数组或列表的 4 桶直方图的微优化

Micro Optimization of a 4-bucket histogram of a large array or list

我有一个特别的问题。我会尽量准确地描述这一点。

我正在做一件很重要的事情"micro-optimization"。一次运行数天的循环。所以如果我能减少这个循环时间,它需要一半的时间。 10 天会减少到只有 5 天等等

我现在的循环是函数:"testbenchmark1"。

我有 4 个索引需要在这样的循环中增加。但是当我注意到从列表中访问索引时实际上需要一些额外的时间。如果还有其他解决方案,这就是我要尝试的。

indexes[n]++; //increase correct index

"testbenchmark1" 的完整代码需要 122 毫秒:

void testbenchmark00()
{
    Random random = new Random();
    List<int> indexers = new List<int>();
    for (int i = 0; i < 9256408; i++)
    {
        indexers.Add(random.Next(0, 4));
    }
    int[] valueLIST = indexers.ToArray();


    Stopwatch stopWatch = new Stopwatch();
    stopWatch.Start();

    int[] indexes = { 0, 0, 0, 0 };
    foreach (int n in valueLIST) //Takes 122 ms
    {
        indexes[n]++; //increase correct index
    }

    stopWatch.Stop();
    MessageBox.Show("stopWatch: " + stopWatch.ElapsedMilliseconds.ToString() + " milliseconds");
}

现在下面的 "testbenchmark2" 代码只是实验性的,我知道它是不正确的,但我想知道是否有任何类似的方法来使用这种数字:“1_00_00_00_00”,如果它可以将:“00_00_00_00”视为四个不同的整数。例如,如果我要对以下内容求和: 1_00_00_00_00 + 1_00_01_00_00 = 1_00_01_00_00 然后最后可以提取每个数字,每个像这样的四个:00, 01, 00, 00

但我不知道即使使用二进制数也能以任何方式做到这一点。是的任何一种解决方案。只是添加这样的数字。正如一个循环只用了 59 毫秒的测试,这是 122 毫秒时间的一半。所以我很想知道这是否有任何想法?

double num3 = 1_00_00_00_00;
double num4 = 1_00_01_00_00;
for (int i = 0; i < valueLIST.Count; i++) //Takes 59 ms
{
    num3 += num4;
}

"testbenchmark2" 的完整代码需要 59 毫秒:

void testbenchmark2()
{
    List<String> valueLIST = new List<String>(); 
    for (int i = 0; i < 9256408; i++) //56
    {
        valueLIST.Add(i.ToString());
    }

    //https://www.geeksforgeeks.org/binary-literals-and-digit-separators-in-c-sharp/
    double num3 = 1_00_00_00_00;
    double num4 = 1_00_01_00_00;

    Stopwatch stopWatch = new Stopwatch();
    stopWatch.Start();
    for (int i = 0; i < valueLIST.Count; i++) //Takes 59 ms
    {
        num3 += num4;
    }
    stopWatch.Stop();
    MessageBox.Show("stopWatch: " + stopWatch.ElapsedMilliseconds.ToString() + " milliseconds\n\n" + num3);
}

编辑
下面是我正在尝试做的更清晰的代码!
但是下面的代码可能是正确的或解决方案,但它显示了我相信的尝试。

        void newtest()
        {
            double num1 = 1_00_00_00_00;
            double num2 = 1_00_01_00_00;
            double num3 = 1_00_01_01_00;

            List<double> testnumbers = new List<double>();
            testnumbers.Add(num1);
            testnumbers.Add(num2);
            testnumbers.Add(num3);

            double SUM = 0;
            for (int i = 0; i < testnumbers.Count; i++)
            {
                SUM += testnumbers[i];
            }

            //The result is
            //300020100

            //Would it possible to extract the "four buckets" that I am interesting in somehow?
            //00_02_01_00
        }

如 Peter Cordes 所述,您可以使用 SIMD 一次将多个值相加,请参阅 vector。但我不清楚这是否真的有帮助。

编辑: 如果您是 运行 .Net 核心,还有 SIMD intrinstics 提供对硬件的较低级别访问。

正如 NerualHandle 所提到的,使用 for 循环可能比使用 foreach 更好。但是当我测试它时,似乎没有显着差异。我猜编译器可以在这种特殊情况下优化 foreach。

当我在 运行 你的 testbenchmark00 代码时,它在我的电脑上完成大约 6 毫秒。一些粗略的计算表明循环的每次迭代大约需要 0.78ns,或者大约 2-4 个处理器周期,这似乎接近最佳。对你来说,它需要大约 20 倍的时间,这似乎很奇怪。您 运行 处于发布模式吗?

您可以将问题并行化。将索引器数组拆分为多个部分,并在不同线程上为每个部分构建直方图,最后对每个线程的直方图求和。 See Parallel.For 因为这可以为你做分区等,但它需要使用 localInit 和 localFinally 来确保每个线程写入单独的直方图以避免并发问题。

与性能优化一样,推荐的执行顺序是:

  1. 用于识别问题区域的配置文件代码
  2. 寻求算法改进
  3. 寻找减少工作量的方法,例如缓存
  4. 更快地完成现有工作

在使用 AVX2 的现代 x86-64(如 Skylake 或 Zen 2)上,每 2.5 个时钟周期左右(每个内核)大约有 8 个元素(1 个 AVX2 向量)。或每 2 个时钟展开。或者在你的打桩机 CPU 上,每 3 个时钟的 1x 16 字节索引向量可能与 AVX1 _mm_cmpeq_epi32.

一般策略适用于 2 到 8 个桶。对于字节、16 位或 32 位元素。 (因此 byte 元素为您提供了每 2 个时钟周期直方图显示的 32 个元素 最好的情况,在字节计数器溢出之前需要一些 outer-loop 开销来收集字节计数器。)

更新:或将 int 映射到 1UL << (array[i]*8) 以增加 SIMD / SWAR 添加计数器的 4 个字节之一,我们可以接近 SKL 上每个 8 int 向量的 1 个时钟,或每个 2 Zen2 上的时钟。 (这更具体到 4 个或更少的桶和 int 输入,并且不会缩小到 SSE2。它需要 variable-shifts 或至少 AVX1 variable-shuffles。)第一个策略使用字节元素就每个周期的元素而言可能仍然更好。

正如@JonasH 指出的那样,您可以让不同的内核处理输入数组的不同部分。在典型台式机上,单个内核可能接近饱和内存带宽,但 many-core Xeons 具有较低的 per-core 内存带宽和更高的聚合,并且需要更多内核来饱和 L3 或 DRAM 带宽。


A loop that runs for days at a time.

在一个输入列表上迭代非常非常慢,所以它仍然不会溢出 int 计数器?或者用不同的大列表重复调用(比如你的 ~900k 测试数组)?

I believe I want to avoid increasing an index for a list or array as it seem to consume a lot of time?

这可能是因为您在禁用优化的情况下进行基准测试。不要那样做,一点意义也没有;通过禁用优化,不同的代码会减慢不同的速度。更明确的步骤和 tmp 变量通常会使 debug-mode 代码变慢,因为有更多的东西需要用调试器查看。但是当您使用正常优化进行编译时,它们可以优化为正常的 pointer-increment 循环。

遍历数组可以高效地编译成asm。

缓慢的部分是通过内存递增数组变量索引的依赖链。例如,在具有相同地址的 Skylake CPU、memory-destination add 上,每 6 个时钟周期以大约一个增量重复出现瓶颈,因为下一个 add 必须等待加载该值由前一个存储。 (Store-forwarding 来自存储缓冲区意味着它不必等待它先提交到缓存,但它仍然比添加到寄存器要慢得多。)另请参阅 Agner Fog 的优化指南:https://agner.org/optimize/

由于计数仅分布在 4 个存储桶中,您会遇到很多情况,其中指令正在等待重新加载另一条最近指令存储的数据,因此您甚至无法实现每个时钟周期近 1 个元素如果计数很好地分布在 L1d 缓存中仍然很热的更多计数器上,则可能。

解决此问题的一个好方法是使用 多个计数器数组展开循环。 Methods to vectorise histogram in SIMD?。就像 int[] indexes = { 0, 0, 0, 0 }; 一样,您可以将其设为每个包含四个计数器的二维数组。您必须手动展开源中的循环以遍历输入数组,并处理展开部分之后剩下的最后 0..3 个元素。

对于中小型计数数组来说,这是一种很好的技术,但如果复制计数器开始导致缓存未命中,就会变得很糟糕。


使用窄整数来节省缓存占用空间/内存带宽。

您 can/should 做的另一件事是 对 0..3 值的数组使用尽可能窄的类型:每个数字都可以放在一个字节中所以使用 8 位整数将为您节省 4 倍的缓存占用空间/内存带宽。

x86 可以有效地将 load/store 个字节放入 to/from 个完整的寄存器中。使用 SSE4.1,当你在循环中使用 byte_array[i]int_array[i] 时,你还可以使用 SIMD pmovzxbd 来提高 auto-vectorize 的效率。

(当我说 x86 时,我的意思是包括 x86-64,而不是 ARM 或 PowerPC。当然你实际上并不想编译 32 位代码,Microsoft 称之为 "x86")


桶的数量非常少,比如 4

这看起来像是 SIMD 比较的工作。对于 x86 SSE2,每个 16 字节数据向量的 int 元素数等于您的直方图 bin 数。

您已经有了类似 SIMD 的想法,试图将数字视为四个单独的字节元素。参见 https://en.wikipedia.org/wiki/SIMD#Software

但是 00_01_10_11 只是 source-level 数字中 human-readable 分隔符的语法,而 double 是一种 floating-point 类型,其内部表示不是与整数相同。而且您绝对不想使用字符串; SIMD 让您可以同时对整数数组的 4 个元素进行操作。

我能看到的最好的方法是分别计算 4 个值中每个值的匹配项,而不是映射元素到计数器。 我们想并行处理多个元素,但是当一个元素向量中有重复值时,将它们映射到计数器可能会发生冲突。您需要将该计数器递增两次。

这等价的标量是:

int counts[4] = {0,0,0,0};
for () {
    counts[0] += (arr[i] == 0);
    counts[1] += (arr[i] == 1);
    counts[2] += (arr[i] == 2);  // count matches
  //counts[3] += (arr[i] == 3);  // we assume any that aren't 0..2 are this
}
counts[3] = size - counts[0] - counts[1] - counts[2];
// calculate count 3 from other counts

哪个(在 C++ 中)GCC -O3 实际上 auto-vectorize 完全按照我在下面手动执行的方式 : https://godbolt.org/z/UJfzuH。 Clang 甚至在 auto-vectorizing 时展开它,因此它应该比 int 输入的 hand-vectorized 版本 更好 。不过,在这种情况下,仍然不如替代 vpermilps 策略。

(如果您希望字节元素具有有效的窄和,仅在外循环中加宽,您仍然需要手动矢量化。)


带字节元素,参见How to count character occurrences using SIMD。元素大小对于计数器来说太窄;它会在 256 次计数后溢出。所以你必须在内部循环中加宽,或者使用嵌套循环在加宽之前做一些累加。

我不懂 C#,所以我可以用 x86 汇编或使用内部函数的 C++ 编写代码。也许 C++ 内在函数对你更有用。 C# 有某种矢量扩展,应该可以移植它。

这是用于 x86-64 的 C++,使用 AVX2 SIMD 内部函数。请参阅 https://whosebug.com/tags/sse/info 了解一些信息。

// Manually vectorized for AVX2, for int element size
// Going nearly 4x as fast should be possible for byte element size

#include <immintrin.h>

void count_elements_avx2(const std::vector<int> &input,  unsigned output_counts[4])
{
    __m256i  counts[4] = { _mm256_setzero_si256() };  // 4 vectors of zeroed counters
                  // each vector holds counts for one bucket, to be hsummed at the end

    size_t size = input.size();
    for(size_t i = 0 ; i<size ; i+=8) {  // 8x 32-bit elements per vector
        __m256i v = _mm256_loadu_si256((const __m256i*)&input[i]);  // unaligned load of 8 ints
        for (int val = 0 ; val < 3; val++) {
           // C++ compilers will unroll this with 3 vector constants and no memory access
            __m256i match = _mm256_cmpeq_epi32(v, _mm256_set1_epi32(val));  // 0 or all-ones aka -1
            counts[val] = _mm256_sub_epi32(counts[val], match);   // x -= -1 or 0 conditional increment
        }
    }


    // transpose and sum 4 vectors of 8 elements down to 1 vector of 4 elements
    __m128i summed_counts = hsum_xpose(counts);   // helper function defined in Godbolt link
    _mm_storeu_si128((__m128i*)output_counts, summed_counts);

    output_counts[3] = size - output_counts[0]
                       - output_counts[1] - output_counts[2];

    // TODO: handle the last size%8 input elements; scalar would be easy
}

这与 clang 编译得很好(在 Godbolt compiler explorer 上)。据推测,您可以编写编译为类似机器代码的 C#。如果不是,请考虑从 C++ 编译器调用本机代码(如果无法从编译器获得真正最佳的代码,则在 asm 中调用 hand-written)。如果您的真实 use-case 运行 迭代次数与基准测试一样多,那么如果不必复制输入数组,则可以分摊额外的开销。

 # from an earlier version of the C++, doing all 4 compares in the inner loop
 # clang -O3 -march=skylake
.LBB0_2:                                     # do {
    vmovdqu ymm7, ymmword ptr [rcx + 4*rdx]    # v = load arr[i + 0..7]
    vpcmpeqd        ymm8, ymm7, ymm3           # compare v == 0
    vpsubd  ymm4, ymm4, ymm8                   # total0 -= cmp_result
    vpcmpeqd        ymm8, ymm7, ymm5
    vpsubd  ymm2, ymm2, ymm8
    vpcmpeqd        ymm7, ymm7, ymm6           # compare v == 2
    vpsubd  ymm1, ymm1, ymm7                   # total2 -= cmp_result
    add     rdx, 8                             # i += 8
    cmp     rdx, rax
    jb      .LBB0_2                          # }while(i < size)

估计 best-case Skylake 性能:每个向量约 2.5 个周期(8 int 或 32 int8_t)

或 2 个展开。

如果没有 AVX2,仅使用 SSE2,您将有一些额外的 movdqa 指令,并且每个向量仅执行 4 个元素。不过,这仍然是内存中与标量直方图的胜利。即使是 1 个元素/时钟也很好,并且应该可以在任何 x86-64 CPU.

上 运行 使用 SSE2

当然假设没有缓存未命中,硬件预取到 L1d 保持领先于循环。这可能只发生在至少 L2 缓存中已经很热的数据上。 我还假设内存对齐没有停顿;理想情况下,您的数据按 32 字节对齐。 如果通常不是,可能值得处理第一个未对齐的部分,然后使用对齐的加载,如果数组足够大。

对于字节元素,inner-most 循环看起来很相似(vpcmpeqbvpsubb 但 运行 在 hsumming 之前最多只有 255(不是 256)次迭代64 位计数器,以避免溢出。因此每个向量的吞吐量将相同,但每个向量的元素数量是原来的 4 倍。

有关性能分析的详细信息,请参阅 https://agner.org/optimize/https://uops.info/。例如vpcmpeqd 在 uops.info

内部循环对于 Haswell/Skylake 只有 9 fused-domain 微指令,所以最好的情况 front-end 瓶颈是每 2.25 个周期大约 1 次迭代(流水线是 4 uops 宽)。 Small-loop 效果有点妨碍: - Skylake 的循环缓冲区被错误的微代码更新禁用,但甚至在此之前 9 uop 循环结束up 平均每 2.25 个周期发出一个 iter 稍差,比方说 2.5 个周期。

Skylake 运行s vpsubd 在端口 0,1 或 5 上,运行s vpcmpeqd 在端口 0 或 1 上。所以 back-end 端口 0、1、5 上的瓶颈是 3 个端口的 6 个向量 ALU 微指令,或每 2 个周期 1 次迭代。 所以 front-end 瓶颈占主导地位。(Ice Lake 更宽 front-end 可能会在 back-end 上成为瓶颈,即使不展开;相同 back-end那里的吞吐量,除非你使用 AVX512...)

如果 clang 从数组的末尾开始索引并将索引向上计数为零(因为无论如何它选择使用索引寻址模式)它可以保存一个 uop 总共 8 uops = 一个迭代器front-end 中的 2 个周期,匹配 back-end 瓶颈。 (无论哪种方式,标量 add 和 macro-fused cmp/jcc,或 add/jcc 循环分支可以 运行 在端口 6 上,负载不会竞争 ALU 端口...数据。

按 2 展开具有相同的好处:分摊 2 微指令的循环开销。所以 2 个输入向量为 16 微指令。 这是 SKL 和 IceLake 上的管道宽度以及 Zen 上的 single-uop 管道宽度的一个很好的倍数。展开更多可以让 front-end 可以保持在执行之前,但即使是任何 back-end 延迟也会让前端在调度程序中建立微指令缓冲。这将让它执行负载够早了。

Zen2 具有更宽的 front-end(6 微指令或 5 条指令宽度,IIUC)。 None 这些指令是 multi-uop,因为 Zen2 将向量 ALU 扩展到 256 位,所以这是 5 single-uop 条指令。 vpcmpeq* 运行s 在 FP 0,1 或 3 上,与 vpsubd 相同,因此 back-end 瓶颈与 Skylake 相同:每 2 个周期 1 个向量。但是更宽的 front-end 消除了瓶颈,即使没有展开,关键路径仍然是 back-end。

Zen1 每 256 位向量运算需要 2 微指令(或 lane-crossing 需要更多,但这些都是简单的 2 微指令)。所以大概 12/3 = 每个 8 或 32 个元素的向量有 4 个周期,假设它可以有效地通过 front-end 获得这些微指令。

我假设 back-ends 很好地安排了通过计数向量的 1 周期延迟依赖链,并且不会导致很多浪费的周期。可能没什么大不了的,特别是如果你在现实生活中有任何内存瓶颈的话。 (在 Piledriver 上,SIMD-integer 操作有 2 个周期的延迟,但是 6 个 ALU 微指令用于 2 个矢量 ALU 端口可以 运行 它们是每 3 个周期 1 个矢量(128 位)所以即使没有展开也有足够的工作隐藏延迟。)

我没有分析horizontal-sum部分。它在循环之外,因此每次调用只需 运行 一次。你确实标记了这个 micro-optimization,但我们可能不需要担心那部分。


其他桶数

此策略的基本情况是 2 个桶:计算一件事情的匹配项,count_other = 大小 - 计数。

我们知道每个元素都是这 4 种可能性中的一种,因此我们可以假设任何不是 0、1 或 2 的 x 都是 3 而无需检查。这意味着我们根本不需要计算 3 的匹配项,并且可以从 size - sum(counts[0..2]).

中获取该桶的计数

(在进行此优化之前,请参阅上述性能分析的编辑历史记录。我在进行此优化和更新 Godbolt 后更改了数字 link,希望我没有遗漏任何内容。)


AVX512 在 Skylake-Xeon

对于 64 字节向量,没有 vpcmpeqd 来创建 all-zero (0) 或 all-one (-1) 个元素的向量。相反,您将比较掩码寄存器并使用它来执行 merge-masked 添加 set1(1)。喜欢 c = _mm512_mask_add_epi32(c, _mm512_set1_epi32(1)).

不幸的是,对 compare-result 位掩码进行标量弹出计数效率不高。


随机代码审查:在您的第一个基准测试中:

int[] valueLIST = indexers.ToArray();

这似乎毫无意义;根据 MS 的文档 (https://docs.microsoft.com/en-us/dotnet/standard/collections/),List 可以有效地索引。我认为它等同于 C++ std::vector<T>。您可以只迭代它而不复制到数组。


Alt 策略 - 将 0..3 映射到 int 的一个字节中的一组位

如果您不能将输入的元素缩小为字节以节省内存带宽,那很好。

但说到这里,也许值得使用 2x _mm256_packs_epi32 (vpackssdw) 和 _mm256_packs_epi16 (vpacksswb) 在使用 3x pcmpeqb 计数之前缩小到 8 位整数/ psubb。每 4 个输入向量需要 3 微指令才能用字节元素打包成 1 个。

但如果您的输入以 int 元素开头,这可能是最好的方式,而不是打包然后比较 3 种方式。

你有 4 个桶,一个 int 有 4 个字节。 如果我们可以将每个 int 元素转换为适当字节底部的 1,那么我们可以添加 _mm256_add_epi8 最多在扩大到 64 位计数器之前进行 255 inner-loop 次迭代。 (使用标准 _mm256_sad_epu8 反零技巧对无符号字节进行 hsum 而不会溢出。)

有两种方法可以做到这一点。第一个:使用随机播放作为查找 table。 AVX2 vpermd 使用数据作为索引向量和常量 _mm256_permutexvar_epi32 工作(_mm256_permutexvar_epi32) =57=] 作为被洗牌的数据。或者 type-pun 将 AVX1 vpermilps 用作 LUT 的向量,LUT 向量的上半部分也有这些字节。

vpermilps 更好:它在 AMD Zen 1 上的微指令更少,而且由于它是 in-lane,所以所有地方的延迟都更低。 (可能会在某些 CPU 上造成旁路延迟,从而降低延迟优势,但仍然不比 vpermd 差)。

出于某种原因 vpermilps 具有矢量控制的 Zen2 具有 2 个周期的吞吐量,即使它仍然是单个 uop。或者 Zen1 上的 4 个周期(对于 2 uop YMM 版本)。在 Intel 上是 1 个周期。 vpermd 在 AMD 上更糟:更多微指令和同样糟糕的吞吐量。

根据 Agner Fog 的测试,

vpermilps xmm(16 字节向量)在 Piledriver 上具有 1/clock 的吞吐量,在 "ivec" 域中具有 运行s。 (因此当用于 "intended" 浮点操作数而不是整数时,它实际上有额外的旁路延迟延迟)。

   // Or for Piledriver, __m128 version of this

    __m256 bytepatterns = _mm256_casts256_ps(_mm256_set_epi32(
         1<<24, 1<<16, 1<<8, 1<<0,
         1<<24, 1<<16, 1<<8, 1<<0) );
    __m256i v = _mm256_loadu_si256((const __m256i*)&input[i]);
    v = _mm256_castps_si256(_mm256_permutevar_ps(bytepatterns, v));  // vpermilps 32-bit variable shuffle
    counts = _mm256_add_epi8(counts, v);

     // after some inner iterations, separate out the 
     // set1_epi32(0x000000ff) counts, 0x0000ff00 counts, etc.

这将在每个 int 元素内生成交错计数器。如果您在 256 次计数之前不累积它们,它们将会溢出。请参阅 How to count character occurrences using SIMD 了解带有单个计数器的简单版本。

这里我们可能展开并使用 2 个不同的 LUT 向量,所以当我们想要对所有计数进行分组时对于 0 在一起,我们可以 2 个向量混合在一起并屏蔽掉其他向量。


除了混洗,我们还可以使用 AVX2 变量移位来做到这一点。

sums += 1UL << (array[i]*8); 其中 *8 是字节中的位数,也是通过移位完成的。我将其编写为标量 C++ 表达式,因为现在您有机会了解您的 bytes-in-an-integer 想法如何真正发挥作用。只要我们不让单个字节溢出,无论 SIMD 字节在字节之间添加块进位还是使用 32 位双字元素都没有关系。

我们将使用 AVX2 执行此操作:

__m256i v = loadu...();
v = _mm256_slli_epi32(v, 3);  // v *= 8
v = _mm256_sllv_epi32(_mm256_set1_epi32(1), v);
counts = _mm256_add_epi8(counts, v);

这是 2 个移位指令加上 vpaddb。在 Skylake 上,variable-count 转移 vpsllvd 很便宜:single-uop 和 运行 在多个端口上。但在 Haswell 和 Zen 上,速度要慢一些。 (与 AMD 上的 vpermilps 相同的吞吐量)

对于 shuffle 版本,2 个端口的 2 微指令仍然没有超过 1 个端口的 1 微指令。 (除非你交替使用这两种策略将工作分配到 SKL 上的所有 ALU 端口。)

因此,无论哪种方式,inner-most 循环都可以每个时钟执行 1 个向量,或者通过仔细交错 shift 与 shuffle 方法可能稍微好一些。

但它需要在 128 或 255 次内循环迭代中分摊一些开销。

最后的清理可能会将 2 个向量混合在一起以获得仅包含 2 个桶的计数的向量,然后 vpshufb (_mm256_shuffle_epi8) 将同一桶的字节计数器分组到同一桶中q字。然后 vpsadbw (_mm256_sad_epu8) 对零可以对 _mm256_add_epi64 的每个 qword 中的那些字节元素进行水平求和。所以 outer-loop 工作应该是 2 vpblendw, 2x vpshufb, 2x vpsadbw, 2x vpaddq 然后回到内循环的另一个 255 次迭代。可能还会检查您是否在数组末尾的 255 次迭代内,以设置内部迭代的循环界限。

这是@PeterCordes 回答的未经测试C# 版本。

private static Vector128<int> HsumTranspose( ReadOnlySpan<Vector256<int>> counts )
{
    var sum01 = Avx2.HorizontalAdd( counts[ 0 ], counts[ 1 ] );
    var sum23 = Avx2.HorizontalAdd( counts[ 2 ], counts[ 3 ] );
    var sum0123 = Avx2.HorizontalAdd( sum01, sum23 );

    var sumHigh = Avx2.ExtractVector128( sum0123, 1 );
    var sumLow = Avx2.ExtractVector128( sum0123, 0 );
    return Sse2.Add( sumHigh, sumLow );
}


private unsafe static int[ ] CountElements( ReadOnlySpan<int> input )
{
    var outputCounts = new int[ 4 ];
    // Four vectors of zeroed counters each vector holds
    // counts for one bucket, to be hsummed at the end.
    Span<Vector256<int>> counts = stackalloc Vector256<int>[ 4 ]
    {
        Vector256<int>.Zero,
        Vector256<int>.Zero,
        Vector256<int>.Zero,
        Vector256<int>.Zero
    };

    unsafe
    {
        fixed ( int* fixedInput = input )
        {
            var size = input.Length;
            for ( var i = 0; i < size; i += 8 )
            {
                var v = Avx.LoadVector256( &fixedInput[ i ] );
                for ( var val = 0; val < 3; val++ )
                {
                    var match = Avx2.CompareEqual( v, Vector256.Create( val ) );
                    counts[ val ] = Avx2.Subtract( counts[ val ], match );
                }
             }

             Vector128<int> summedCounts = HsumTranspose( counts );

             fixed ( int* fixedOutputCounts = outputCounts )
                 Sse2.Store( fixedOutputCounts, summedCounts );

             outputCounts[ 3 ] = size - outputCounts[ 0 ] -
                 outputCounts[ 1 ] - outputCounts[ 2 ];

             // TODO: handle the last size%8 input elements; scalar would be easy
            }                
        }            
    }
    return outputCounts;
}

我曾尝试重写 Vector128<byte> 的代码并想出了这个代码。

我首先创建了 indexesToSumFirst,这是迭代次数,因此剩余的将是 16 的倍数,以供以下循环使用。

我创建了 3 个循环,其中存在一个 16x16 = 256 的内循环,不会为 byte 创建任何溢出。然后 "outerloop" 有一个从之前计算的精确计数来维护这个。

在这 3 个循环之后。其余低于 16*16 次迭代的内容在其自己的循环中汇总。

当我在 normalCalculationCountElements 之间运行基准测试时,CountElements SIMD 方法大约快 7.2 倍。

    void calc()
    { 
        //Create 16 indexes with numbers between: 0-3. The goal is to count how many of those occurences we have for the numbers: 0-3
        int times = 6250;
        int bytes = times * 16;
        byte[] v1 = new byte[bytes];
        for (int i = 0; i < times; i++)
        {
            v1[0 + (i * 16)] = 0;
            v1[1 + (i * 16)] = 1;
            v1[2 + (i * 16)] = 2;
            v1[3 + (i * 16)] = 3;

            v1[4 + (i * 16)] = 1;
            v1[5 + (i * 16)] = 1;
            v1[6 + (i * 16)] = 1;
            v1[7 + (i * 16)] = 1;

            v1[8 + (i * 16)] = 1;
            v1[9 + (i * 16)] = 0;
            v1[10 + (i * 16)] = 0;
            v1[11 + (i * 16)] = 3;

            v1[12 + (i * 16)] = 1;
            v1[13 + (i * 16)] = 1;
            v1[14 + (i * 16)] = 1;
            v1[15 + (i * 16)] = 3;
        }
        /*---------------*/

        ReadOnlySpan<byte> input = v1;

        //Call function
        //normalCalculation(input);
        CountElements(input);
    }

    void normalCalculation(ReadOnlySpan<byte> inputArray)
    {
        int[] countArray0 = new int[4];
        for (int i = 0; i < inputArray.Length; i++)
        {
            countArray0[inputArray[i]]++;
        }

    }
    private unsafe static int[] CountElements(ReadOnlySpan<byte> inputArray)
    {

        //100000 indexes (This SIMD code goes 7.2 times faster than normal C# code)
        double[] countArray = new double[4];
        double arraylength = inputArray.Length; int loops = Convert.ToInt32(arraylength);
        double loopcount = arraylength / 3840; //100000 / 240 * 16 = 26.04
        double indexesToSumFirst = loopcount - Math.Floor(loopcount); //26.04 - 26 = 0.04
        indexesToSumFirst = indexesToSumFirst * 3840; //Num of indexes to be SUMMED first
        loopcount = arraylength - indexesToSumFirst; //100000 - 153.6 = 99846.4
        int outerloop = Convert.ToInt32(loopcount / 3840); //24

        //Sum the first indexes first. So the loops after those are exactly counts of: x16
        int index = Convert.ToInt32(indexesToSumFirst);
        if (index > 0)
        {
            for (int t = 0; t < index; t++)
            {
                countArray[inputArray[t]]++;
            }
        }

        //Below starts the SIMD calculations!
        Span<Vector128<byte>> counts = stackalloc Vector128<byte>[3];
        Span<Vector128<UInt64>> sum64 = stackalloc Vector128<UInt64>[3];
        unsafe
        {
            fixed (byte* fixedInput = inputArray)
            {
                for (int i = 0; i < outerloop; i++)
                {
                    counts.Clear();
                    for (int i2 = 0; i2 < 240; i2++)
                    {
                        var v = Avx.LoadVector128(&fixedInput[index]);
                        for (byte val = 0; val < 3; val++)
                        {
                            var match = Avx.CompareEqual(v, Vector128.Create(val)); //[1,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0] == [1,0,0,0,1,0,1,0,0,0,0,0,0,0,0,0]
                            counts[val] = Avx.Subtract(counts[val], match);
                        }
                        index += 16;
                    }
                    //Here sum
                    for (int i3 = 0; i3 < 3; i3++)
                    {
                        //SumAbsoluteDifferences
                        sum64[i3] = Sse2.Add(sum64[i3], Sse2.SumAbsoluteDifferences(counts[i3], Vector128<byte>.Zero).AsUInt64()); //sum64: <2,0,0,0,3,0,0,0>
                    }
                }

                //UnpackHigh and get the lower element from the Vector128<UInt64>
                if (outerloop > 0)
                {
                    for (int i3 = 0; i3 < 3; i3++)
                    {
                        Vector128<UInt64> upper = Sse2.UnpackHigh(sum64[i3], sum64[i3]).AsUInt64(); //3
                        countArray[i3] += Sse2.Add(sum64[i3], upper).ToScalar();
                    }
                }
                //Calculate the last index
                countArray[3] = loops - countArray[0] - countArray[1] - countArray[2];
            }
        }

        var outputCounts = new int[4];
        return outputCounts;
    }