Eigen ConditionType 数组:广播而不是循环的有效方式
Eigen ConditionType array: Efficient way to broadcast instead of looping
我有一段性能关键代码,我需要检查一个数组的值是否低于阈值,然后有条件地设置其他两个数组的值。我的代码如下所示:
#include <Eigen/Dense>
int main(){
Eigen::ArrayXXd
a (1, 100),
b (2, 100),
c (3, 100);
a.setRandom();
b.setRandom();
c.setRandom();
constexpr double minVal { 1e-8 };
/* the code segment in question */
/* option 1 */
for ( int i=0; i<2; ++i ){
b.row(i) = (a < minVal).select( 0, c.row(i+1) / a );
c.row(i+1) = (a < minVal).select( 0, c.row(i+1) );
}
/* option 2, which is slower */
b = (a < minVal).replicate(2,1).select( 0, c.bottomRows(2) / a.replicate(2,1) );
c.bottomRows(2) = (a < minVal).replicate(2,1).select( 0, c.bottomRows(2) );
return 0;
}
检查其值是否达到阈值 minVal
的数组 a
有一行和动态列数。另外两个数组 b
和 c
分别有两行和三行,列数与 a
.
相同
现在我想以更 eigen
的方式执行上述逻辑,没有选项 1 中的那个循环,因为通常 eigen
有提高性能的技巧,我可以编写原始循环时永远不要希望匹配。
但是,我能想到的唯一方法是选项 2,它明显比选项 1 慢。
执行上述操作的正确有效方法是什么?或者循环已经是我最好的选择了?
您可以尝试以下方法:
- 使用固定行数和动态列数定义数组类型,即,您可以将 Eigen::ArrayXXd 替换为 Eigen::Array<双倍,1/2/3,Eigen::Dynamic>.
- 使用 fixed-size 版本的块操作(参见 https://eigen.tuxfamily.org/dox/group__TutorialBlockOperations.html),即,您可以将 bottomRows(N) 替换为 bottomRows
() 和类似的 replicate(2,1) with replicate<2,1>().
我已经更改了您代码中的数组类型,并包含了第三个选项以及我提到的可能的改进:
#include <Eigen/Dense>
#include <iostream>
#include <chrono>
constexpr int numberOfTrials = 1000000;
constexpr double minVal{ 1e-8 };
typedef Eigen::Array<double, 1, Eigen::Dynamic> Array1Xd;
typedef Eigen::Array<double, 2, Eigen::Dynamic> Array2Xd;
typedef Eigen::Array<double, 3, Eigen::Dynamic> Array3Xd;
inline void option1(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
for (int i = 0; i < 2; ++i) {
b.row(i) = (a < minVal).select(0, c.row(i + 1) / a);
c.row(i + 1) = (a < minVal).select(0, c.row(i + 1));
}
}
inline void option2(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
b = (a < minVal).replicate(2, 1).select(0, c.bottomRows(2) / a.replicate(2, 1));
c.bottomRows(2) = (a < minVal).replicate(2, 1).select(0, c.bottomRows(2));
}
inline void option3(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
b = (a < minVal).replicate<2, 1>().select(0, c.bottomRows<2>() / a.replicate<2, 1>());
c.bottomRows<2>() = (a < minVal).replicate<2, 1>().select(0, c.bottomRows<2>());
}
int main() {
Array1Xd a(1, 100);
Array2Xd b(2, 100);
Array3Xd c(3, 100);
a.setRandom();
b.setRandom();
c.setRandom();
auto tpBegin1 = std::chrono::steady_clock::now();
for (int i = 0; i < numberOfTrials; i++)
option1(a, b, c);
auto tpEnd1 = std::chrono::steady_clock::now();
auto tpBegin2 = std::chrono::steady_clock::now();
for (int i = 0; i < numberOfTrials; i++)
option2(a, b, c);
auto tpEnd2 = std::chrono::steady_clock::now();
auto tpBegin3 = std::chrono::steady_clock::now();
for (int i = 0; i < numberOfTrials; i++)
option3(a, b, c);
auto tpEnd3 = std::chrono::steady_clock::now();
std::cout << "(Option 1) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd1 - tpBegin1).count() / (long double)(numberOfTrials) << " us" << std::endl;
std::cout << "(Option 2) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd2 - tpBegin2).count() / (long double)(numberOfTrials) << " us" << std::endl;
std::cout << "(Option 3) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd3 - tpBegin3).count() / (long double)(numberOfTrials) << " us" << std::endl;
return 0;
}
我获得的平均执行时间如下(i7-9700K,msvc2019,启用优化,NDEBUG):
(Option 1) Average execution time: 0.527717 us
(Option 2) Average execution time: 3.25618 us
(Option 3) Average execution time: 0.512029 us
并启用 AVX2+OpenMP:
(Option 1) Average execution time: 0.374309 us
(Option 2) Average execution time: 3.31356 us
(Option 3) Average execution time: 0.260551 us
我不确定这是否是最“本征”的方式,但我希望它能有所帮助!
我有一段性能关键代码,我需要检查一个数组的值是否低于阈值,然后有条件地设置其他两个数组的值。我的代码如下所示:
#include <Eigen/Dense>
int main(){
Eigen::ArrayXXd
a (1, 100),
b (2, 100),
c (3, 100);
a.setRandom();
b.setRandom();
c.setRandom();
constexpr double minVal { 1e-8 };
/* the code segment in question */
/* option 1 */
for ( int i=0; i<2; ++i ){
b.row(i) = (a < minVal).select( 0, c.row(i+1) / a );
c.row(i+1) = (a < minVal).select( 0, c.row(i+1) );
}
/* option 2, which is slower */
b = (a < minVal).replicate(2,1).select( 0, c.bottomRows(2) / a.replicate(2,1) );
c.bottomRows(2) = (a < minVal).replicate(2,1).select( 0, c.bottomRows(2) );
return 0;
}
检查其值是否达到阈值 minVal
的数组 a
有一行和动态列数。另外两个数组 b
和 c
分别有两行和三行,列数与 a
.
现在我想以更 eigen
的方式执行上述逻辑,没有选项 1 中的那个循环,因为通常 eigen
有提高性能的技巧,我可以编写原始循环时永远不要希望匹配。
但是,我能想到的唯一方法是选项 2,它明显比选项 1 慢。
执行上述操作的正确有效方法是什么?或者循环已经是我最好的选择了?
您可以尝试以下方法:
- 使用固定行数和动态列数定义数组类型,即,您可以将 Eigen::ArrayXXd 替换为 Eigen::Array<双倍,1/2/3,Eigen::Dynamic>.
- 使用 fixed-size 版本的块操作(参见 https://eigen.tuxfamily.org/dox/group__TutorialBlockOperations.html),即,您可以将 bottomRows(N) 替换为 bottomRows
() 和类似的 replicate(2,1) with replicate<2,1>().
我已经更改了您代码中的数组类型,并包含了第三个选项以及我提到的可能的改进:
#include <Eigen/Dense>
#include <iostream>
#include <chrono>
constexpr int numberOfTrials = 1000000;
constexpr double minVal{ 1e-8 };
typedef Eigen::Array<double, 1, Eigen::Dynamic> Array1Xd;
typedef Eigen::Array<double, 2, Eigen::Dynamic> Array2Xd;
typedef Eigen::Array<double, 3, Eigen::Dynamic> Array3Xd;
inline void option1(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
for (int i = 0; i < 2; ++i) {
b.row(i) = (a < minVal).select(0, c.row(i + 1) / a);
c.row(i + 1) = (a < minVal).select(0, c.row(i + 1));
}
}
inline void option2(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
b = (a < minVal).replicate(2, 1).select(0, c.bottomRows(2) / a.replicate(2, 1));
c.bottomRows(2) = (a < minVal).replicate(2, 1).select(0, c.bottomRows(2));
}
inline void option3(const Array1Xd& a, Array2Xd& b, Array3Xd& c)
{
b = (a < minVal).replicate<2, 1>().select(0, c.bottomRows<2>() / a.replicate<2, 1>());
c.bottomRows<2>() = (a < minVal).replicate<2, 1>().select(0, c.bottomRows<2>());
}
int main() {
Array1Xd a(1, 100);
Array2Xd b(2, 100);
Array3Xd c(3, 100);
a.setRandom();
b.setRandom();
c.setRandom();
auto tpBegin1 = std::chrono::steady_clock::now();
for (int i = 0; i < numberOfTrials; i++)
option1(a, b, c);
auto tpEnd1 = std::chrono::steady_clock::now();
auto tpBegin2 = std::chrono::steady_clock::now();
for (int i = 0; i < numberOfTrials; i++)
option2(a, b, c);
auto tpEnd2 = std::chrono::steady_clock::now();
auto tpBegin3 = std::chrono::steady_clock::now();
for (int i = 0; i < numberOfTrials; i++)
option3(a, b, c);
auto tpEnd3 = std::chrono::steady_clock::now();
std::cout << "(Option 1) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd1 - tpBegin1).count() / (long double)(numberOfTrials) << " us" << std::endl;
std::cout << "(Option 2) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd2 - tpBegin2).count() / (long double)(numberOfTrials) << " us" << std::endl;
std::cout << "(Option 3) Average execution time: " << std::chrono::duration_cast<std::chrono::microseconds>(tpEnd3 - tpBegin3).count() / (long double)(numberOfTrials) << " us" << std::endl;
return 0;
}
我获得的平均执行时间如下(i7-9700K,msvc2019,启用优化,NDEBUG):
(Option 1) Average execution time: 0.527717 us
(Option 2) Average execution time: 3.25618 us
(Option 3) Average execution time: 0.512029 us
并启用 AVX2+OpenMP:
(Option 1) Average execution time: 0.374309 us
(Option 2) Average execution time: 3.31356 us
(Option 3) Average execution time: 0.260551 us
我不确定这是否是最“本征”的方式,但我希望它能有所帮助!