如何在 Eigen 中对子矩阵求和

How can I sum sub-matrices in Eigen

我有一些矩阵定义为:

Eigen::MatrixXd DPCint = Eigen::MatrixXd::Zero(p.szZ*(p.na-1),p.szX);

\ perform some computations and fill every sub-matrix of size [p.szZ,p.szX] with some values
#pragma omp parallel for
for (int i=0; i < p.na-1; i++)
{
...
DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ-1),Eigen::all) = ....;
}

\ Now sum every p.szZ rows to get a matrix that is [p.szZ,p.szX]

在 Matlab 中,此操作快速而简单。如果我想将循环与 OpenMP 并行化,我不能简单地在这里执行 += 操作。同样,我可以遍历每组 p.szZ 行并对它们求和,但该循环无法并行化,因为每个线程都会输出相同的数据。有没有一些有效的方法可以使用 Eigen 的索引操作来求和子矩阵?这看起来是一个简单的操作,我觉得我错过了一些东西,但是我有一段时间没有找到解决方案。

澄清

基本上,在上面的循环之后,我想在一行中完成:

for (int i = 0; i < p.na-1; i++)
{
DPC += DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ-1),Eigen::all);
}

在 matlab 中,我可以简单地将矩阵重新整形为 3D 矩阵并沿第三维求和。我对Eigen的tensor库不熟悉,希望这个操作不用借助tensor库也可以实现。但是,我的首要任务是速度和效率,所以我愿意接受任何建议。

在基于 na 的轴上执行平行归约效率不高。事实上,这个维度对于多线程来说已经非常小了,但它也(几乎)强制线程在效率低下的临时矩阵上运行(这是 memory-bound 所以它不能很好地扩展)。

另一种解决方案是 并行化 szZ 维度 。每个线程都可以在一个切片上工作,并在没有临时矩阵的情况下执行局部缩减。此外,这种方法还应该改进 CPU 缓存的使用(因为每个线程计算的 DPC 部分更有可能适合缓存,因此它们不会从 RAM 重新加载)。这是一个(未经测试的)示例:

// All thread will execute the following loops (all iterations but on different data blocks)
#pragma omp parallel
for (int i = 0; i < p.na-1; i++)
{
    // "nowait" avoid a synchronization but this require a 
    // static schedule which is a good idea to use here anyway.
    #pragma omp for schedule(static) nowait
    for (int j = 0; j < p.szZ; j++)
        DPC(j, Eigen::all) += DPCint(i*p.szZ+j, Eigen::all);
}

正如@chtz 所指出的,最好避免使用临时 DPCint 矩阵,因为内存吞吐量是非常有限的资源(尤其是在并行代码中)。

编辑: 我假设矩阵存储在 row-major 存储顺序中,默认情况下并非如此。这可以修改(参见 the doc),实际上它会使第一个和第二个循环 cache-efficient。然而,混合存储顺序通常是 error-prone 并且使用 row-major 排序会强制您重新定义基本类型。 @Homer512 的解决方案是一种替代实现,当然更适合 column-major 矩阵。

这是我的看法。

#pragma omp parallel
{
     /*
      * We force static schedule to prevent excessive cache-line bouncing
      * because the elements per thread are not consecutive.
      * However, most (all?) OpenMP implementations use static scheduling
      * by default anyway.
      * Switching to threads initializing full columns would be
      * more effective from a memory POV.
      */
#    pragma omp for schedule(static)
     for(int i=0; i < p.na-1; i++) {
         /*
          * Note: The original code looks wrong.
          * Remember that indices in Eigen (as with most things C++)
          * are exclusive on the end. This touches
          * [start, end), not [start, end]
          */
         DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ),Eigen::all) = ...;
         /*
          * Same as
          * DPCint.middleRows(i*p.szZ, p.szZ) = ...
          */
     }
     /*
      * We rely on the implicit barrier at the end of the for-construct
      * for synchronization. Then start a new loop in the same parallel
      * construct. This one can be nowait as it is the last one.
      * Again, static scheduling limits cache-line bouncing to the first
      * and last column/cache line per thread.
      * But since we wrote rows per thread above and now read
      * columns per thread, there are still a lot of cache misses
      */
#    pragma omp for schedule(static) nowait
     for(int i=0; i < p.szX; i++) {
         /*
          * Now we let a single thread reduce a column.
          * Not a row because we deal with column-major matrices
          * so this pattern is more cache-efficient
          */
         DPC.col(i) += DPCint.col(i).reshaped(
               p.szZ, p.na - 1).rowwise().sum(); 
     }
}

整形是 Eigen-3.4 中的新功能。但是,我注意到生成的程序集并不是特别有效 (no vectorization)。

Eigen 中的按行缩减总是有些慢。所以我们可能会做得更好,这在 Eigen-3.3 中也有效:

#    pragma omp for schedule(static) nowait
     for(int i = 0; i < p.szX; i++) {
         const auto& incol = DPCint.col(i);
         auto outcol = DPC.col(i);
         for(int j = 0; j < p.na - 1; j++)
             outcol += incol.segment(j * (p.na - 1), p.na - 1); 
     }

或者,将重塑后的矩阵与 all-ones 向量相乘也非常有效。它需要进行基准测试,但是,尤其是使用 OpenBLAS 的 Eigen,它可能比按行求和更快。

基准测试

好的,我继续做了一些测试。首先,让我们设置一个最小的可重现示例,因为我们之前没有。

void reference(Eigen::Ref<Eigen::MatrixXd> DPC,
               int na)
{
    const Eigen::Index szZ = DPC.rows();
    const Eigen::Index szX = DPC.cols();
    Eigen::MatrixXd DPCint(szZ * na, szX);
#   pragma omp parallel for
    for(Eigen::Index a = 0; a < na; ++a)
        for(Eigen::Index x = 0; x < szX; ++x)
            for(Eigen::Index z = 0; z < szZ; ++z)
                DPCint(a * szZ + z, x) =
                      a * 0.25 + x * 1.34 + z * 12.68;
    for(Eigen::Index a = 0; a < na; ++a)
        DPC += DPCint.middleRows(a * szZ, szZ);
}
void test(Eigen::Ref<Eigen::MatrixXd> DPC,
          int na)
{...}
int main()
{
    const int szZ = 500, szX = 192, na = 15;
    const int repetitions = 10000;
    Eigen::MatrixXd ref = Eigen::MatrixXd::Zero(szZ, szX);
    Eigen::MatrixXd opt = Eigen::MatrixXd::Zero(szZ, szX);
    reference(ref, na);
    test(opt, na);
    std::cout << (ref - opt).cwiseAbs().sum() << std::endl;
    for(int i = 0; i < repetitions; ++i)
        test(opt, na);
}

数组维度如OP所述。 DPCint 初始化被选择为标量,并允许测试任何优化的实现是否仍然正确。为合理的运行时间选择了重复次数。

在 AMD Ryzen Threadripper 2990WX(32 核,64 线程)上使用 g++-10 -O3 -march=native -DNDEBUG -fopenmp 编译和测试。启用 NUMA。使用 Eigen-3.4.0.

参考给16.6秒

让我们优化初始化以解决这个问题:

void reference_op1(Eigen::Ref<Eigen::MatrixXd> DPC,
                   int na)
{
    const Eigen::Index szZ = DPC.rows();
    const Eigen::Index szX = DPC.cols();
    Eigen::MatrixXd DPCint(szZ * na, szX);
    const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
    const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
    const Eigen::VectorXd zvals =
          Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
#   pragma omp parallel for collapse(2)
    for(Eigen::Index a = 0; a < na; ++a)
        for(Eigen::Index x = 0; x < szX; ++x)
            DPCint.col(x).segment(a * szZ, szZ) = zvals.array() + xvals[x] + avals[a];
    for(Eigen::Index a = 0; a < na; ++a)
        DPC += DPCint.middleRows(a * szZ, szZ);
}

linspaced 并没有真正帮助,但请注意 collapse(2)。由于 na 在 64 线程机器上只有 15,我们需要并行化两个循环。 15.4 秒

让我们测试一下我建议的版本:

void rowwise(Eigen::Ref<Eigen::MatrixXd> DPC,
             int na)
{
    const Eigen::Index szZ = DPC.rows();
    const Eigen::Index szX = DPC.cols();
    Eigen::MatrixXd DPCint(szZ * na, szX);
    const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
    const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
    const Eigen::VectorXd zvals =
          Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
#   pragma omp parallel
    {
#       pragma omp for collapse(2)
        for(Eigen::Index a = 0; a < na; ++a)
            for(Eigen::Index x = 0; x < szX; ++x)
                DPCint.col(x).segment(a * szZ, szZ) =
                      zvals.array() + xvals[x] + avals[a];

#       pragma omp for nowait
        for(Eigen::Index x = 0; x < szX; ++x)
              DPC.col(x) += DPCint.col(x).reshaped(szZ, na).rowwise().sum();
    }
}

运行时间为 12.5 秒。考虑到我们只是并行化了算法的后半部分,所以加速并不多。

正如我之前建议的那样,按行缩减是废话,matrix-vector 产品可以避免这种情况。让我们看看这是否有帮助:

void rowwise_dot(Eigen::Ref<Eigen::MatrixXd> DPC,
                 int na)
{
    const Eigen::Index szZ = DPC.rows();
    const Eigen::Index szX = DPC.cols();
    Eigen::MatrixXd DPCint(szZ * na, szX);
    const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
    const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
    const Eigen::VectorXd zvals =
          Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
    const Eigen::VectorXd ones = Eigen::VectorXd::Ones(szZ);
#   pragma omp parallel
    {
#       pragma omp for collapse(2)
        for(Eigen::Index a = 0; a < na; ++a)
            for(Eigen::Index x = 0; x < szX; ++x)
                DPCint.col(x).segment(a * szZ, szZ) =
                      zvals.array() + xvals[x] + avals[a];

#       pragma omp for nowait
        for(Eigen::Index x = 0; x < szX; ++x)
            DPC.col(x).noalias() +=
                  DPCint.col(x).reshaped(szZ, na) * ones;
    }
}

不,还有12.5秒。当我们用 -DEIGEN_USE_BLAS -lopenblas_openmp 编译时会发生什么?相同的数字。如果您不能为 AVX2 编译但 CPU 支持它,那么这可能是值得的。 Eigen 不支持运行时 CPU 特征检测。或者它对 float 的帮助可能比对 double 的帮助更大,因为矢量化的好处更高。

如果我们以向量化的方式构建自己的按行归约会怎么样?

void rowwise_loop(Eigen::Ref<Eigen::MatrixXd> DPC,
                  int na)
{
    const Eigen::Index szZ = DPC.rows();
    const Eigen::Index szX = DPC.cols();
    Eigen::MatrixXd DPCint(szZ * na, szX);
    const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
    const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
    const Eigen::VectorXd zvals =
          Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
#   pragma omp parallel
    {
#       pragma omp for collapse(2)
        for(Eigen::Index a = 0; a < na; ++a)
            for(Eigen::Index x = 0; x < szX; ++x)
                DPCint.col(x).segment(a * szZ, szZ) =
                      zvals.array() + xvals[x] + avals[a];

#       pragma omp for nowait
        for(Eigen::Index x = 0; x < szX; ++x)
            for(Eigen::Index a = 0; a < na; ++a)
                DPC.col(x) += DPCint.col(x).segment(a * szZ, szZ);
    }
}

13.3 秒。请注意,在我的笔记本电脑(Intel i7-8850H)上,这比按行版本快得多。 NUMA 和缓存行弹跳可能是较大线程撕裂器上的一个严重问题,但我没有调查性能计数器。

重新排序 DPCint

在这一点上,我认为 DPCint 的布局及其设置中的循环顺序很明显是一种负担。也许这是有原因的。但如果没有,我建议修改如下:

void reordered(Eigen::Ref<Eigen::MatrixXd> DPC,
               int na)
{
    const Eigen::Index szZ = DPC.rows();
    const Eigen::Index szX = DPC.cols();
    Eigen::MatrixXd DPCint(szZ * na, szX);
    const Eigen::VectorXd avals =
          Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
    const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
    const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
#   pragma omp parallel
    {
#       pragma omp for
        for(Eigen::Index x = 0; x < szX; ++x)
            for(Eigen::Index z = 0; z < szZ; ++z)
                DPCint.col(x).segment(z * na, na) =
                      avals.array() + xvals[x] + zvals[z];

#       pragma omp for nowait
        for(Eigen::Index x = 0; x < szX; ++x)
            DPC.col(x) += DPCint.col(x).reshaped(na, szZ).colwise().sum();
    }
}

我们的想法是以这样一种方式重塑它:a) colwise sums 是可能的并且 b) 同一个线程在第一个和第二个循环中接触相同的元素。

有趣的是,这似乎慢了 15.3 秒。我想最里面的作业现在太短了。

如果我们将算法的两个部分合并为一个循环,减少同步开销并改进缓存,会发生什么情况?

void reordered_folded(Eigen::Ref<Eigen::MatrixXd> DPC,
                        int na)
{
    const Eigen::Index szZ = DPC.rows();
    const Eigen::Index szX = DPC.cols();
    Eigen::MatrixXd DPCint(szZ * na, szX);
    const Eigen::VectorXd avals =
          Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
    const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
    const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
#   pragma omp parallel for
    for(Eigen::Index x = 0; x < szX; ++x) {
        for(Eigen::Index z = 0; z < szZ; ++z)
            DPCint.col(x).segment(z * na, na) =
                  avals.array() + xvals[x] + zvals[z];
        DPC.col(x) += DPCint.col(x).reshaped(na, szZ).colwise().sum();
    }
}

12.3 秒。在这一点上,为什么我们还要有一个共享的 DPCint 数组?让我们使用 per-thread 矩阵。

void reordered_loctmp(Eigen::Ref<Eigen::MatrixXd> DPC,
                      int na)
{
    const Eigen::Index szZ = DPC.rows();
    const Eigen::Index szX = DPC.cols();
    const Eigen::VectorXd avals =
        Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
    const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
    const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
#   pragma omp parallel
    {
        Eigen::MatrixXd DPCint(na, szZ);
#       pragma omp for nowait
        for(Eigen::Index x = 0; x < szX; ++x) {
            for(Eigen::Index z = 0; z < szZ; ++z)
                DPCint.col(z) = avals.array() + xvals[x] + zvals[z];
            DPC.col(x) += DPCint.colwise().sum();
        }
    }
}

嘿! 6.8 秒。我们消除了 cache-line 边界。我们制作了所有内容 cache-friendly 并进行了适当的矢量化处理。

我现在唯一能想到的就是将 DPCint 变成一个动态计算的表达式,但这在很大程度上取决于实际的表达式。由于我无法对此进行推测,因此我将保留它。