使用 Eigen 重复向量中的元素并在所有元素上应用一组不同的函数的最有效方法是什么?

What is the most efficient way to repeat elements in a vector and apply a set of different functions across all elements using Eigen?

假设我有一个向量只包含如下定义的正实数元素:

Eigen::VectorXd v(1.3876, 8.6983, 5.438, 3.9865, 4.5673);

我想生成一个新的向量 v2,它已将 v 中的元素重复了 k 次。然后我想对向量中的每个重复元素应用 k 个不同的函数。

例如,如果 v2 被 v 重复 2 次并且我应用 floor() 和 ceil() 作为我的两个函数,则基于上述向量的结果将是一个列向量,其值为:[1; 2; 8; 9; 5; 6; 3; 4; 4; 5].保留原始值的顺序在这里也很重要。这些值也是一个简化的示例,在实践中,我正在生成具有约 100,000 个或更多元素的向量 v,并希望使我的代码尽可能可向量化。

由于我是从 Matlab 转到 Eigen 和 C++,我首先采用的最简单的方法是将这个 Nx1 向量转换为 Nx2 矩阵,将 floor 应用于第一列并将 ceil 应用于第二列,取转置以获得 2xN 矩阵,然后利用矩阵的列优先性质并将 2xN 矩阵重塑为 2Nx1 向量,从而产生我想要的结果。但是,对于大向量,这将非常缓慢且效率低下。

有效地解决了我如何通过生成索引序列和索引输入向量来重复输入向量中的元素。然后我可以生成更多的索引序列以将我的函数应用于相关元素 v2 并将结果复制回它们各自的位置。然而,这真的是最有效的方法吗?我没有完全掌握写时复制和移动语义,但我认为第二个索引表达式在某种意义上是多余的?

如果这是真的,那么我的猜测是这里的解决方案是某种无效或一元表达式,我可以在其中定义一个接受向量的表达式,一些索引 k 和 k expressions/functions 应用到每个元素并吐出我正在寻找的向量。我已经阅读了有关该主题的 Eigen 文档,但我正在努力构建一个功能示例。如有任何帮助,我们将不胜感激!

所以,如果我理解正确的话,你不想 replicate(就本征方法而言)向量,你想对相同的元素应用不同的方法并存储每个元素的结果, 正确吗?

在这种情况下,每个函数按顺序计算一次是最简单的方法。无论如何,大多数 CPUs 每个时钟周期只能做一个(向量)内存存储。所以对于简单的一元或二元运算,你的收益有一个上限。

不过,从技术上讲,一个负载总是优于两个负载,这是正确的,没有很好的方法来实现这一点是 Eigen 的局限性。

知道即使您手动编写一个会生成多个输出的循环,您也应该限制自己的输出数量。 CPUs 的行填充缓冲区数量有限。 IIRC Intel 建议在紧密循环中使用少于 10 个“输出流”,否则你可能会在这些上停止 CPU。

另一个方面是 C++ 的弱别名限制使编译器很难对具有多个输出的代码进行矢量化。所以它甚至可能是有害的。

我将如何构建此代码

请记住,Eigen 是列优先的,就像 Matlab 一样。因此,每个输出函数使用一列。或者只是使用单独的向量开始。

Eigen::VectorXd v = ...;
Eigen::MatrixX2d out(v.size(), 2);
out.col(0) = v.array().floor();
out.col(1) = v.array().ceil();

遵循KISS原则,这就足够了。通过做一些更复杂的事情,你不会有太多收获。一点多线程可能会给你带来一些好处(我猜不到因子 2),因为单个 CPU 线程不足以最大化内存带宽,但仅此而已。

一些基准测试

这是我的基线:

int main()
{
  int rows = 100013, repetitions = 100000;
  Eigen::VectorXd v = Eigen::VectorXd::Random(rows);
  Eigen::MatrixX2d out(rows, 2);
  for(int i = 0; i < repetitions; ++i) {
    out.col(0) = v.array().floor();
    out.col(1) = v.array().ceil();
  }
}

使用 gcc-11 编译,-O3 -mavx2 -fno-math-errno 我得到了 ca。 5.7秒。

检查汇编代码发现向量化很好。

普通旧 C++ 版本:

    double* outfloor = out.data();
    double* outceil = outfloor + out.outerStride();
    const double* inarr = v.data();
    for(std::ptrdiff_t j = 0; j < rows; ++j) {
      const double vj = inarr[j];
      outfloor[j] = std::floor(vj);
      outceil[j] = std::ceil(vj);
    }

40 秒而不是 5 秒!这个版本实际上没有矢量化,因为编译器不能证明数组不互相别名。

接下来,让我们使用固定大小的特征向量让编译器生成向量化代码:

    double* outfloor = out.data();
    double* outceil = outfloor + out.outerStride();
    const double* inarr = v.data();
    std::ptrdiff_t j;
    for(j = 0; j + 4 <= rows; j += 4) {
      const Eigen::Vector4d vj = Eigen::Vector4d::Map(inarr + j);
      const auto floorval = vj.array().floor();
      const auto ceilval = vj.array().ceil();
      Eigen::Vector4d::Map(outfloor + j) = floorval;
      Eigen::Vector4d::Map(outceil + j) = ceilval;;
    }
    if(j + 2 <= rows) {
      const Eigen::Vector2d vj = Eigen::Vector2d::MapAligned(inarr + j);
      const auto floorval = vj.array().floor();
      const auto ceilval = vj.array().ceil();
      Eigen::Vector2d::Map(outfloor + j) = floorval;
      Eigen::Vector2d::Map(outceil + j) = ceilval;;
      j += 2;
    }
    if(j < rows) {
      const double vj = inarr[j];
      outfloor[j] = std::floor(vj);
      outceil[j] = std::ceil(vj);      
    }

7.5 秒。汇编程序看起来不错,完全矢量化。我不确定为什么性能会降低。也许缓存行别名?

最后一次尝试:我们不会尝试避免重新读取向量,但我们会按块重新读取它,以便在我们第二次读取它时它会在缓存中。

    const int blocksize = 64 * 1024 / sizeof(double);
    std::ptrdiff_t j;
    for(j = 0; j + blocksize <= rows; j += blocksize) {
      const auto& vj = v.segment(j, blocksize);
      auto outj = out.middleRows(j, blocksize);
      outj.col(0) = vj.array().floor();
      outj.col(1) = vj.array().ceil();
    }
    const auto& vj = v.tail(rows - j);
    auto outj = out.bottomRows(rows - j);
    outj.col(0) = vj.array().floor();
    outj.col(1) = vj.array().ceil();

5.4 秒。所以这里有一些收获,但还不足以证明增加的复杂性是合理的。