Eigen ConditionType 数组:广播而不是循环的有效方式

Eigen ConditionType array: Efficient way to broadcast instead of looping

我有一段性能关键代码,我需要检查一个数组的值是否低于阈值,然后有条件地设置其他两个数组的值。我的代码如下所示:

#include <Eigen/Dense>

int main(){
    Eigen::ArrayXXd
        a (1, 100),
        b (2, 100),
        c (3, 100);
    
    a.setRandom();
    b.setRandom();
    c.setRandom();
    
    constexpr double minVal { 1e-8 };
    
    /* the code segment in question */
    /* option 1 */
    for ( int i=0; i<2; ++i ){
        b.row(i)   = (a < minVal).select( 0, c.row(i+1) / a );
        c.row(i+1) = (a < minVal).select( 0, c.row(i+1) );
    }
    /* option 2, which is slower */
    b = (a < minVal).replicate(2,1).select( 0, c.bottomRows(2) / a.replicate(2,1) );
    c.bottomRows(2) = (a < minVal).replicate(2,1).select( 0, c.bottomRows(2) );

    return 0;
}

检查其值是否达到阈值 minVal 的数组 a 有一行和动态列数。另外两个数组 bc 分别有两行和三行,列数与 a.

相同

现在我想以更 eigen 的方式执行上述逻辑,没有选项 1 中的那个循环,因为通常 eigen 有提高性能的技巧,我可以编写原始循环时永远不要希望匹配。 但是,我能想到的唯一方法是选项 2,它明显比选项 1 慢。

执行上述操作的正确有效方法是什么?或者循环已经是我最好的选择了?

您可以尝试以下方法:

  • 使用固定行数和动态列数定义数组类型,即,您可以将 Eigen::ArrayXXd 替换为 Eigen::Array<双倍,1/2/3,Eigen::Dynamic>.
  • 使用 fixed-size 版本的块操作(参见 https://eigen.tuxfamily.org/dox/group__TutorialBlockOperations.html),即,您可以将 bottomRows(N) 替换为 bottomRows () 和类似的 replicate(2,1) with replicate<2,1>().

我已经更改了您代码中的数组类型,并包含了第三个选项以及我提到的可能的改进:

#include <Eigen/Dense>

#include <iostream>
#include <chrono>

constexpr int numberOfTrials = 1000000;
constexpr double minVal{ 1e-8 };

typedef Eigen::Array<double, 1, Eigen::Dynamic> Array1Xd;
typedef Eigen::Array<double, 2, Eigen::Dynamic> Array2Xd;
typedef Eigen::Array<double, 3, Eigen::Dynamic> Array3Xd;

inline void option1(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
    for (int i = 0; i < 2; ++i) {
        b.row(i) = (a < minVal).select(0, c.row(i + 1) / a);
        c.row(i + 1) = (a < minVal).select(0, c.row(i + 1));
    }
}

inline void option2(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
    b = (a < minVal).replicate(2, 1).select(0, c.bottomRows(2) / a.replicate(2, 1));
    c.bottomRows(2) = (a < minVal).replicate(2, 1).select(0, c.bottomRows(2));
}

inline void option3(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
    b = (a < minVal).replicate<2, 1>().select(0, c.bottomRows<2>() / a.replicate<2, 1>());
    c.bottomRows<2>() = (a < minVal).replicate<2, 1>().select(0, c.bottomRows<2>());
}

int main() {
    Array1Xd a(1, 100);
    Array2Xd b(2, 100);
    Array3Xd c(3, 100);

    a.setRandom();
    b.setRandom();
    c.setRandom();

    auto tpBegin1 = std::chrono::steady_clock::now();
    for (int i = 0; i < numberOfTrials; i++)
        option1(a, b, c);
    auto tpEnd1 = std::chrono::steady_clock::now();

    auto tpBegin2 = std::chrono::steady_clock::now();
    for (int i = 0; i < numberOfTrials; i++)
        option2(a, b, c);
    auto tpEnd2 = std::chrono::steady_clock::now();

    auto tpBegin3 = std::chrono::steady_clock::now();
    for (int i = 0; i < numberOfTrials; i++)
        option3(a, b, c);
    auto tpEnd3 = std::chrono::steady_clock::now();

    std::cout << "(Option 1) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd1 - tpBegin1).count() / (long double)(numberOfTrials) << " us" << std::endl;
    std::cout << "(Option 2) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd2 - tpBegin2).count() / (long double)(numberOfTrials) << " us" << std::endl;
    std::cout << "(Option 3) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd3 - tpBegin3).count() / (long double)(numberOfTrials) << " us" << std::endl;

    return 0;
}

我获得的平均执行时间如下(i7-9700K,msvc2019,启用优化,NDEBUG):

(Option 1) Average execution time: 0.527717 us
(Option 2) Average execution time: 3.25618 us
(Option 3) Average execution time: 0.512029 us

并启用 AVX2+OpenMP:

(Option 1) Average execution time: 0.374309 us
(Option 2) Average execution time: 3.31356 us
(Option 3) Average execution time: 0.260551 us

我不确定这是否是最“本征”的方式,但我希望它能有所帮助!