大型矩阵线性组合的C++性能优化?

C++ performance optimization for linear combination of large matrices?

我有一个很大的浮点数据张量,尺寸为 35k(rows) x 45(cols) x 150(slices),我存储在犰狳立方体容器中。我需要在 35 毫秒内将所有 150 个切片线性组合在一起(我的应用程序必须这样做)。线性组合浮点权重也存储在犰狳容器中。到目前为止,我最快的实现需要 70 毫秒,平均超过 30 帧的 window,我似乎无法超越它。请注意,我允许 CPU 并行计算,但不允许 GPU。

我已经尝试了多种不同的方式来执行这种线性组合,但下面的代码似乎是我能得到的最快的(70 毫秒),因为我相信我通过获取尽可能大的连续内存来最大化缓存命中机会每次迭代的块。

请注意 Armadillo 以列主要格式存储数据。所以在张量中,它首先存储第一个通道的列,然后是第二个通道的列,然后是第三个,依此类推。

typedef std::chrono::system_clock Timer;
typedef std::chrono::duration<double> Duration;

int rows = 35000;
int cols = 45;
int slices = 150;
arma::fcube tensor(rows, cols, slices, arma::fill::randu);
arma::fvec w(slices, arma::fill::randu);

double overallTime = 0;
int window = 30;
for (int n = 0; n < window; n++) {

    Timer::time_point start = Timer::now();

    arma::fmat result(rows, cols, arma::fill::zeros);
    for (int i = 0; i < slices; i++)
        result += tensor.slice(i) * w(i);

    Timer::time_point end = Timer::now();
    Duration span = end - start;
    double t = span.count();
    overallTime += t;
    cout << "n = " << n << " --> t = " << t * 1000.0 << " ms" << endl;
}

cout << endl << "average time = " << overallTime * 1000.0 / window << " ms" << endl;

我需要将此代码优化 至少 2 倍,非常感谢任何建议。

首先我得承认,我不熟悉 arma 框架或内存布局;如果语法 result += slice(i) * weight 延迟计算,则最少。

内存布局和memory-to-arithmetic计算率是两个主要问题及其解决方案。

a+=b*c是有问题的,因为它需要读取ba,写入a并且最多使用两个算术运算(两个,如果架构不结合乘法和累加)。

如果内存布局的形式为 float tensor[rows][columns][channels],问题将转换为生成 rows * columns 长度 channels 的点积,并且应该这样表示。

如果是float tensor[c][h][w],最好将循环展开到result+= slice(i) + slice(i+1)+...。一次读取四个切片可将内存传输减少 50%。

在 N<16 的情况下,以 4*N 个结果块(从所有 150 个 channels/slices 中读取)来处理结果甚至可能更好,这样累加器就可以显式或隐式地分配给编译器到 SIMD 寄存器。

通过将切片计数填充为 4 或 8 的倍数、使用 -ffast-math 进行编译以启用融合乘法累加(如果可用)和使用多线程,有可能实现较小的改进。

约束表明需要执行 13.5GFlops,这在算术方面是一个合理的数字(对于许多现代体系结构),但也意味着至少 54 Gb/s 内存带宽,可以放宽fp16 或 16 位定点运算。

编辑

知道内存顺序是 float tensor[150][45][35000]float tensor[kSlices][kRows * kCols == kCols * kRows] 建议我先尝试将外循环展开 4(甚至可能是 5,因为 150 不能被 4 整除需要特殊情况对于多余的)流。

void blend(int kCols, int kRows, float const *tensor, float *result, float const *w) {
    // ensure that the cols*rows is a multiple of 4 (pad if necessary)
    // - allows the auto vectorizer to skip handling the 'excess' code where the data
    //   length mod simd width != 0
    // one could try even SIMD width of 16*4, as clang 14
    // can further unroll the inner loop to 4 ymm registers
    auto const stride = (kCols * kRows + 3) & ~3;
    // try also s+=6, s+=3, or s+=4, which would require a dedicated inner loop (for s+=2)
    for (int s = 0; s < 150; s+=5) {
        auto src0 = tensor  + s * stride;
        auto src1 = src0 + stride;
        auto src2 = src1 + stride;
        auto src3 = src2 + stride;
        auto src4 = src3 + stride;
        auto dst = result;
        for (int x = 0; x < stride; x++) {
            // clang should be able to optimize caching the weights
            // to registers outside the innerloop
            auto add = src0[x] * w[s] +
                       src1[x] * w[s+1] +
                       src2[x] * w[s+2] +
                       src3[x] * w[s+3] +
                       src4[x] * w[s+4];
            // clang should be able to optimize this comparison
            // out of the loop, generating two inner kernels
            if (s == 0) {
                dst[x] = add;
            } else {
                dst[x] += add;
            }
        }
    }
}

编辑 2

另一个起点(在添加多线程之前)将考虑将布局更改为

float tensor[kCols][kRows][kSlices + kPadding]; // padding is optional

现在的缺点是 kSlices = 150 不能再将所有权重放入寄存器(其次 kSlices 不是 4 或 8 的倍数)。此外,最终减少需要水平。

好处是减少不再需要通过内存,这对于添加的多线程来说是一件大事。

void blendHWC(float const *tensor, float const *w, float *dst, int n, int c) {
     // each thread will read from 4 positions in order
     // to share the weights -- finding the best distance
     // might need some iterations
     auto src0 = tensor;
     auto src1 = src0 + c;
     auto src2 = src1 + c;
     auto src3 = src2 + c; 
     for (int i = 0; i < n/4; i++) {
         vec8 acc0(0.0f), acc1(0.0f), acc2(0.0f), acc3(0.0f);
         // #pragma unroll?
         for (auto j = 0; j < c / 8; c++) {
             vec8 w(w + j);
             acc0 += w * vec8(src0 + j);
             acc1 += w * vec8(src1 + j);
             acc2 += w * vec8(src2 + j);
             acc3 += w * vec8(src3 + j);
         }
         vec4 sum = horizontal_reduct(acc0,acc1,acc2,acc3);
         sum.store(dst); dst+=4;
     } 
}

这些 vec4vec8 是一些自定义 SIMD 类,它们通过内部函数映射到 SIMD 指令,或者借助编译器能够编译 using vec4 = float __attribute__ __attribute__((vector_size(16))); 到高效的 SIMD 代码。

正如@hbrerkere 在评论部分建议的那样,通过使用 -O3 标志并进行以下更改,性能提高了近 65%。代码现在以 45 毫秒 运行,而不是最初的 70 毫秒。

int lastStep = (slices / 4 - 1) * 4;
int i = 0;
while (i <= lastStep) {
    result += tensor.slice(i) * w_id(i) + tensor.slice(i + 1) * w_id(i + 1) + tensor.slice(i + 2) * w_id(i + 2) + tensor.slice(i + 3) * w_id(i + 3);
    i += 4;
}
while (i < slices) {
    result += tensor.slice(i) * w_id(i);
    i++;
}

没有实际代码,我猜

+= tensor.slice(i) * w_id(i)

创建一个临时对象,然后将其添加到 lhs。是的,重载运算符看起来不错,但我会写一个函数

addto( lhs, slice1, w1, slice2, w2, ....unroll to 4... )

转换为元素的纯循环:

for (i=....)
  for (j=...)
    lhs[i][j] += slice1[i][j]*w1[j] + slice2[i][j] &c

如果这不能给你带来额外因素,我会感到惊讶。