如何在 Eigen 中对子矩阵求和
How can I sum sub-matrices in Eigen
我有一些矩阵定义为:
Eigen::MatrixXd DPCint = Eigen::MatrixXd::Zero(p.szZ*(p.na-1),p.szX);
\ perform some computations and fill every sub-matrix of size [p.szZ,p.szX] with some values
#pragma omp parallel for
for (int i=0; i < p.na-1; i++)
{
...
DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ-1),Eigen::all) = ....;
}
\ Now sum every p.szZ rows to get a matrix that is [p.szZ,p.szX]
在 Matlab 中,此操作快速而简单。如果我想将循环与 OpenMP 并行化,我不能简单地在这里执行 += 操作。同样,我可以遍历每组 p.szZ 行并对它们求和,但该循环无法并行化,因为每个线程都会输出相同的数据。有没有一些有效的方法可以使用 Eigen 的索引操作来求和子矩阵?这看起来是一个简单的操作,我觉得我错过了一些东西,但是我有一段时间没有找到解决方案。
澄清
基本上,在上面的循环之后,我想在一行中完成:
for (int i = 0; i < p.na-1; i++)
{
DPC += DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ-1),Eigen::all);
}
在 matlab 中,我可以简单地将矩阵重新整形为 3D 矩阵并沿第三维求和。我对Eigen的tensor库不熟悉,希望这个操作不用借助tensor库也可以实现。但是,我的首要任务是速度和效率,所以我愿意接受任何建议。
在基于 na
的轴上执行平行归约效率不高。事实上,这个维度对于多线程来说已经非常小了,但它也(几乎)强制线程在效率低下的临时矩阵上运行(这是 memory-bound 所以它不能很好地扩展)。
另一种解决方案是 并行化 szZ
维度 。每个线程都可以在一个切片上工作,并在没有临时矩阵的情况下执行局部缩减。此外,这种方法还应该改进 CPU 缓存的使用(因为每个线程计算的 DPC
部分更有可能适合缓存,因此它们不会从 RAM 重新加载)。这是一个(未经测试的)示例:
// All thread will execute the following loops (all iterations but on different data blocks)
#pragma omp parallel
for (int i = 0; i < p.na-1; i++)
{
// "nowait" avoid a synchronization but this require a
// static schedule which is a good idea to use here anyway.
#pragma omp for schedule(static) nowait
for (int j = 0; j < p.szZ; j++)
DPC(j, Eigen::all) += DPCint(i*p.szZ+j, Eigen::all);
}
正如@chtz 所指出的,最好避免使用临时 DPCint
矩阵,因为内存吞吐量是非常有限的资源(尤其是在并行代码中)。
编辑: 我假设矩阵存储在 row-major 存储顺序中,默认情况下并非如此。这可以修改(参见 the doc),实际上它会使第一个和第二个循环 cache-efficient。然而,混合存储顺序通常是 error-prone 并且使用 row-major 排序会强制您重新定义基本类型。 @Homer512 的解决方案是一种替代实现,当然更适合 column-major 矩阵。
这是我的看法。
#pragma omp parallel
{
/*
* We force static schedule to prevent excessive cache-line bouncing
* because the elements per thread are not consecutive.
* However, most (all?) OpenMP implementations use static scheduling
* by default anyway.
* Switching to threads initializing full columns would be
* more effective from a memory POV.
*/
# pragma omp for schedule(static)
for(int i=0; i < p.na-1; i++) {
/*
* Note: The original code looks wrong.
* Remember that indices in Eigen (as with most things C++)
* are exclusive on the end. This touches
* [start, end), not [start, end]
*/
DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ),Eigen::all) = ...;
/*
* Same as
* DPCint.middleRows(i*p.szZ, p.szZ) = ...
*/
}
/*
* We rely on the implicit barrier at the end of the for-construct
* for synchronization. Then start a new loop in the same parallel
* construct. This one can be nowait as it is the last one.
* Again, static scheduling limits cache-line bouncing to the first
* and last column/cache line per thread.
* But since we wrote rows per thread above and now read
* columns per thread, there are still a lot of cache misses
*/
# pragma omp for schedule(static) nowait
for(int i=0; i < p.szX; i++) {
/*
* Now we let a single thread reduce a column.
* Not a row because we deal with column-major matrices
* so this pattern is more cache-efficient
*/
DPC.col(i) += DPCint.col(i).reshaped(
p.szZ, p.na - 1).rowwise().sum();
}
}
整形是 Eigen-3.4 中的新功能。但是,我注意到生成的程序集并不是特别有效 (no vectorization)。
Eigen 中的按行缩减总是有些慢。所以我们可能会做得更好,这在 Eigen-3.3 中也有效:
# pragma omp for schedule(static) nowait
for(int i = 0; i < p.szX; i++) {
const auto& incol = DPCint.col(i);
auto outcol = DPC.col(i);
for(int j = 0; j < p.na - 1; j++)
outcol += incol.segment(j * (p.na - 1), p.na - 1);
}
或者,将重塑后的矩阵与 all-ones 向量相乘也非常有效。它需要进行基准测试,但是,尤其是使用 OpenBLAS 的 Eigen,它可能比按行求和更快。
基准测试
好的,我继续做了一些测试。首先,让我们设置一个最小的可重现示例,因为我们之前没有。
void reference(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
# pragma omp parallel for
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
for(Eigen::Index z = 0; z < szZ; ++z)
DPCint(a * szZ + z, x) =
a * 0.25 + x * 1.34 + z * 12.68;
for(Eigen::Index a = 0; a < na; ++a)
DPC += DPCint.middleRows(a * szZ, szZ);
}
void test(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{...}
int main()
{
const int szZ = 500, szX = 192, na = 15;
const int repetitions = 10000;
Eigen::MatrixXd ref = Eigen::MatrixXd::Zero(szZ, szX);
Eigen::MatrixXd opt = Eigen::MatrixXd::Zero(szZ, szX);
reference(ref, na);
test(opt, na);
std::cout << (ref - opt).cwiseAbs().sum() << std::endl;
for(int i = 0; i < repetitions; ++i)
test(opt, na);
}
数组维度如OP所述。 DPCint 初始化被选择为标量,并允许测试任何优化的实现是否仍然正确。为合理的运行时间选择了重复次数。
在 AMD Ryzen Threadripper 2990WX(32 核,64 线程)上使用 g++-10 -O3 -march=native -DNDEBUG -fopenmp
编译和测试。启用 NUMA。使用 Eigen-3.4.0.
参考给16.6秒
让我们优化初始化以解决这个问题:
void reference_op1(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const Eigen::VectorXd zvals =
Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel for collapse(2)
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
DPCint.col(x).segment(a * szZ, szZ) = zvals.array() + xvals[x] + avals[a];
for(Eigen::Index a = 0; a < na; ++a)
DPC += DPCint.middleRows(a * szZ, szZ);
}
linspaced 并没有真正帮助,但请注意 collapse(2)
。由于 na 在 64 线程机器上只有 15,我们需要并行化两个循环。 15.4 秒
让我们测试一下我建议的版本:
void rowwise(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const Eigen::VectorXd zvals =
Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel
{
# pragma omp for collapse(2)
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
DPCint.col(x).segment(a * szZ, szZ) =
zvals.array() + xvals[x] + avals[a];
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x)
DPC.col(x) += DPCint.col(x).reshaped(szZ, na).rowwise().sum();
}
}
运行时间为 12.5 秒。考虑到我们只是并行化了算法的后半部分,所以加速并不多。
正如我之前建议的那样,按行缩减是废话,matrix-vector 产品可以避免这种情况。让我们看看这是否有帮助:
void rowwise_dot(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const Eigen::VectorXd zvals =
Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
const Eigen::VectorXd ones = Eigen::VectorXd::Ones(szZ);
# pragma omp parallel
{
# pragma omp for collapse(2)
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
DPCint.col(x).segment(a * szZ, szZ) =
zvals.array() + xvals[x] + avals[a];
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x)
DPC.col(x).noalias() +=
DPCint.col(x).reshaped(szZ, na) * ones;
}
}
不,还有12.5秒。当我们用 -DEIGEN_USE_BLAS -lopenblas_openmp
编译时会发生什么?相同的数字。如果您不能为 AVX2 编译但 CPU 支持它,那么这可能是值得的。 Eigen 不支持运行时 CPU 特征检测。或者它对 float 的帮助可能比对 double 的帮助更大,因为矢量化的好处更高。
如果我们以向量化的方式构建自己的按行归约会怎么样?
void rowwise_loop(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const Eigen::VectorXd zvals =
Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel
{
# pragma omp for collapse(2)
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
DPCint.col(x).segment(a * szZ, szZ) =
zvals.array() + xvals[x] + avals[a];
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x)
for(Eigen::Index a = 0; a < na; ++a)
DPC.col(x) += DPCint.col(x).segment(a * szZ, szZ);
}
}
13.3 秒。请注意,在我的笔记本电脑(Intel i7-8850H)上,这比按行版本快得多。 NUMA 和缓存行弹跳可能是较大线程撕裂器上的一个严重问题,但我没有调查性能计数器。
重新排序 DPCint
在这一点上,我认为 DPCint 的布局及其设置中的循环顺序很明显是一种负担。也许这是有原因的。但如果没有,我建议修改如下:
void reordered(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const Eigen::VectorXd avals =
Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel
{
# pragma omp for
for(Eigen::Index x = 0; x < szX; ++x)
for(Eigen::Index z = 0; z < szZ; ++z)
DPCint.col(x).segment(z * na, na) =
avals.array() + xvals[x] + zvals[z];
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x)
DPC.col(x) += DPCint.col(x).reshaped(na, szZ).colwise().sum();
}
}
我们的想法是以这样一种方式重塑它:a) colwise sums 是可能的并且 b) 同一个线程在第一个和第二个循环中接触相同的元素。
有趣的是,这似乎慢了 15.3 秒。我想最里面的作业现在太短了。
如果我们将算法的两个部分合并为一个循环,减少同步开销并改进缓存,会发生什么情况?
void reordered_folded(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const Eigen::VectorXd avals =
Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel for
for(Eigen::Index x = 0; x < szX; ++x) {
for(Eigen::Index z = 0; z < szZ; ++z)
DPCint.col(x).segment(z * na, na) =
avals.array() + xvals[x] + zvals[z];
DPC.col(x) += DPCint.col(x).reshaped(na, szZ).colwise().sum();
}
}
12.3 秒。在这一点上,为什么我们还要有一个共享的 DPCint 数组?让我们使用 per-thread 矩阵。
void reordered_loctmp(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
const Eigen::VectorXd avals =
Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel
{
Eigen::MatrixXd DPCint(na, szZ);
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x) {
for(Eigen::Index z = 0; z < szZ; ++z)
DPCint.col(z) = avals.array() + xvals[x] + zvals[z];
DPC.col(x) += DPCint.colwise().sum();
}
}
}
嘿! 6.8 秒。我们消除了 cache-line 边界。我们制作了所有内容 cache-friendly 并进行了适当的矢量化处理。
我现在唯一能想到的就是将 DPCint 变成一个动态计算的表达式,但这在很大程度上取决于实际的表达式。由于我无法对此进行推测,因此我将保留它。
我有一些矩阵定义为:
Eigen::MatrixXd DPCint = Eigen::MatrixXd::Zero(p.szZ*(p.na-1),p.szX);
\ perform some computations and fill every sub-matrix of size [p.szZ,p.szX] with some values
#pragma omp parallel for
for (int i=0; i < p.na-1; i++)
{
...
DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ-1),Eigen::all) = ....;
}
\ Now sum every p.szZ rows to get a matrix that is [p.szZ,p.szX]
在 Matlab 中,此操作快速而简单。如果我想将循环与 OpenMP 并行化,我不能简单地在这里执行 += 操作。同样,我可以遍历每组 p.szZ 行并对它们求和,但该循环无法并行化,因为每个线程都会输出相同的数据。有没有一些有效的方法可以使用 Eigen 的索引操作来求和子矩阵?这看起来是一个简单的操作,我觉得我错过了一些东西,但是我有一段时间没有找到解决方案。
澄清
基本上,在上面的循环之后,我想在一行中完成:
for (int i = 0; i < p.na-1; i++)
{
DPC += DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ-1),Eigen::all);
}
在 matlab 中,我可以简单地将矩阵重新整形为 3D 矩阵并沿第三维求和。我对Eigen的tensor库不熟悉,希望这个操作不用借助tensor库也可以实现。但是,我的首要任务是速度和效率,所以我愿意接受任何建议。
在基于 na
的轴上执行平行归约效率不高。事实上,这个维度对于多线程来说已经非常小了,但它也(几乎)强制线程在效率低下的临时矩阵上运行(这是 memory-bound 所以它不能很好地扩展)。
另一种解决方案是 并行化 szZ
维度 。每个线程都可以在一个切片上工作,并在没有临时矩阵的情况下执行局部缩减。此外,这种方法还应该改进 CPU 缓存的使用(因为每个线程计算的 DPC
部分更有可能适合缓存,因此它们不会从 RAM 重新加载)。这是一个(未经测试的)示例:
// All thread will execute the following loops (all iterations but on different data blocks)
#pragma omp parallel
for (int i = 0; i < p.na-1; i++)
{
// "nowait" avoid a synchronization but this require a
// static schedule which is a good idea to use here anyway.
#pragma omp for schedule(static) nowait
for (int j = 0; j < p.szZ; j++)
DPC(j, Eigen::all) += DPCint(i*p.szZ+j, Eigen::all);
}
正如@chtz 所指出的,最好避免使用临时 DPCint
矩阵,因为内存吞吐量是非常有限的资源(尤其是在并行代码中)。
编辑: 我假设矩阵存储在 row-major 存储顺序中,默认情况下并非如此。这可以修改(参见 the doc),实际上它会使第一个和第二个循环 cache-efficient。然而,混合存储顺序通常是 error-prone 并且使用 row-major 排序会强制您重新定义基本类型。 @Homer512 的解决方案是一种替代实现,当然更适合 column-major 矩阵。
这是我的看法。
#pragma omp parallel
{
/*
* We force static schedule to prevent excessive cache-line bouncing
* because the elements per thread are not consecutive.
* However, most (all?) OpenMP implementations use static scheduling
* by default anyway.
* Switching to threads initializing full columns would be
* more effective from a memory POV.
*/
# pragma omp for schedule(static)
for(int i=0; i < p.na-1; i++) {
/*
* Note: The original code looks wrong.
* Remember that indices in Eigen (as with most things C++)
* are exclusive on the end. This touches
* [start, end), not [start, end]
*/
DPCint(Eigen::seq(i*p.szZ,(i+1)*p.szZ),Eigen::all) = ...;
/*
* Same as
* DPCint.middleRows(i*p.szZ, p.szZ) = ...
*/
}
/*
* We rely on the implicit barrier at the end of the for-construct
* for synchronization. Then start a new loop in the same parallel
* construct. This one can be nowait as it is the last one.
* Again, static scheduling limits cache-line bouncing to the first
* and last column/cache line per thread.
* But since we wrote rows per thread above and now read
* columns per thread, there are still a lot of cache misses
*/
# pragma omp for schedule(static) nowait
for(int i=0; i < p.szX; i++) {
/*
* Now we let a single thread reduce a column.
* Not a row because we deal with column-major matrices
* so this pattern is more cache-efficient
*/
DPC.col(i) += DPCint.col(i).reshaped(
p.szZ, p.na - 1).rowwise().sum();
}
}
整形是 Eigen-3.4 中的新功能。但是,我注意到生成的程序集并不是特别有效 (no vectorization)。
Eigen 中的按行缩减总是有些慢。所以我们可能会做得更好,这在 Eigen-3.3 中也有效:
# pragma omp for schedule(static) nowait
for(int i = 0; i < p.szX; i++) {
const auto& incol = DPCint.col(i);
auto outcol = DPC.col(i);
for(int j = 0; j < p.na - 1; j++)
outcol += incol.segment(j * (p.na - 1), p.na - 1);
}
或者,将重塑后的矩阵与 all-ones 向量相乘也非常有效。它需要进行基准测试,但是,尤其是使用 OpenBLAS 的 Eigen,它可能比按行求和更快。
基准测试
好的,我继续做了一些测试。首先,让我们设置一个最小的可重现示例,因为我们之前没有。
void reference(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
# pragma omp parallel for
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
for(Eigen::Index z = 0; z < szZ; ++z)
DPCint(a * szZ + z, x) =
a * 0.25 + x * 1.34 + z * 12.68;
for(Eigen::Index a = 0; a < na; ++a)
DPC += DPCint.middleRows(a * szZ, szZ);
}
void test(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{...}
int main()
{
const int szZ = 500, szX = 192, na = 15;
const int repetitions = 10000;
Eigen::MatrixXd ref = Eigen::MatrixXd::Zero(szZ, szX);
Eigen::MatrixXd opt = Eigen::MatrixXd::Zero(szZ, szX);
reference(ref, na);
test(opt, na);
std::cout << (ref - opt).cwiseAbs().sum() << std::endl;
for(int i = 0; i < repetitions; ++i)
test(opt, na);
}
数组维度如OP所述。 DPCint 初始化被选择为标量,并允许测试任何优化的实现是否仍然正确。为合理的运行时间选择了重复次数。
在 AMD Ryzen Threadripper 2990WX(32 核,64 线程)上使用 g++-10 -O3 -march=native -DNDEBUG -fopenmp
编译和测试。启用 NUMA。使用 Eigen-3.4.0.
参考给16.6秒
让我们优化初始化以解决这个问题:
void reference_op1(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const Eigen::VectorXd zvals =
Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel for collapse(2)
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
DPCint.col(x).segment(a * szZ, szZ) = zvals.array() + xvals[x] + avals[a];
for(Eigen::Index a = 0; a < na; ++a)
DPC += DPCint.middleRows(a * szZ, szZ);
}
linspaced 并没有真正帮助,但请注意 collapse(2)
。由于 na 在 64 线程机器上只有 15,我们需要并行化两个循环。 15.4 秒
让我们测试一下我建议的版本:
void rowwise(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const Eigen::VectorXd zvals =
Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel
{
# pragma omp for collapse(2)
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
DPCint.col(x).segment(a * szZ, szZ) =
zvals.array() + xvals[x] + avals[a];
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x)
DPC.col(x) += DPCint.col(x).reshaped(szZ, na).rowwise().sum();
}
}
运行时间为 12.5 秒。考虑到我们只是并行化了算法的后半部分,所以加速并不多。
正如我之前建议的那样,按行缩减是废话,matrix-vector 产品可以避免这种情况。让我们看看这是否有帮助:
void rowwise_dot(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const Eigen::VectorXd zvals =
Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
const Eigen::VectorXd ones = Eigen::VectorXd::Ones(szZ);
# pragma omp parallel
{
# pragma omp for collapse(2)
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
DPCint.col(x).segment(a * szZ, szZ) =
zvals.array() + xvals[x] + avals[a];
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x)
DPC.col(x).noalias() +=
DPCint.col(x).reshaped(szZ, na) * ones;
}
}
不,还有12.5秒。当我们用 -DEIGEN_USE_BLAS -lopenblas_openmp
编译时会发生什么?相同的数字。如果您不能为 AVX2 编译但 CPU 支持它,那么这可能是值得的。 Eigen 不支持运行时 CPU 特征检测。或者它对 float 的帮助可能比对 double 的帮助更大,因为矢量化的好处更高。
如果我们以向量化的方式构建自己的按行归约会怎么样?
void rowwise_loop(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const auto avals = Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const Eigen::VectorXd zvals =
Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel
{
# pragma omp for collapse(2)
for(Eigen::Index a = 0; a < na; ++a)
for(Eigen::Index x = 0; x < szX; ++x)
DPCint.col(x).segment(a * szZ, szZ) =
zvals.array() + xvals[x] + avals[a];
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x)
for(Eigen::Index a = 0; a < na; ++a)
DPC.col(x) += DPCint.col(x).segment(a * szZ, szZ);
}
}
13.3 秒。请注意,在我的笔记本电脑(Intel i7-8850H)上,这比按行版本快得多。 NUMA 和缓存行弹跳可能是较大线程撕裂器上的一个严重问题,但我没有调查性能计数器。
重新排序 DPCint
在这一点上,我认为 DPCint 的布局及其设置中的循环顺序很明显是一种负担。也许这是有原因的。但如果没有,我建议修改如下:
void reordered(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const Eigen::VectorXd avals =
Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel
{
# pragma omp for
for(Eigen::Index x = 0; x < szX; ++x)
for(Eigen::Index z = 0; z < szZ; ++z)
DPCint.col(x).segment(z * na, na) =
avals.array() + xvals[x] + zvals[z];
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x)
DPC.col(x) += DPCint.col(x).reshaped(na, szZ).colwise().sum();
}
}
我们的想法是以这样一种方式重塑它:a) colwise sums 是可能的并且 b) 同一个线程在第一个和第二个循环中接触相同的元素。
有趣的是,这似乎慢了 15.3 秒。我想最里面的作业现在太短了。
如果我们将算法的两个部分合并为一个循环,减少同步开销并改进缓存,会发生什么情况?
void reordered_folded(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
Eigen::MatrixXd DPCint(szZ * na, szX);
const Eigen::VectorXd avals =
Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel for
for(Eigen::Index x = 0; x < szX; ++x) {
for(Eigen::Index z = 0; z < szZ; ++z)
DPCint.col(x).segment(z * na, na) =
avals.array() + xvals[x] + zvals[z];
DPC.col(x) += DPCint.col(x).reshaped(na, szZ).colwise().sum();
}
}
12.3 秒。在这一点上,为什么我们还要有一个共享的 DPCint 数组?让我们使用 per-thread 矩阵。
void reordered_loctmp(Eigen::Ref<Eigen::MatrixXd> DPC,
int na)
{
const Eigen::Index szZ = DPC.rows();
const Eigen::Index szX = DPC.cols();
const Eigen::VectorXd avals =
Eigen::VectorXd::LinSpaced(na, 0., (na - 1) * 0.25);
const auto xvals = Eigen::VectorXd::LinSpaced(szX, 0., (szX - 1) * 1.34);
const auto zvals = Eigen::VectorXd::LinSpaced(szZ, 0., (szZ - 1) * 12.68);
# pragma omp parallel
{
Eigen::MatrixXd DPCint(na, szZ);
# pragma omp for nowait
for(Eigen::Index x = 0; x < szX; ++x) {
for(Eigen::Index z = 0; z < szZ; ++z)
DPCint.col(z) = avals.array() + xvals[x] + zvals[z];
DPC.col(x) += DPCint.colwise().sum();
}
}
}
嘿! 6.8 秒。我们消除了 cache-line 边界。我们制作了所有内容 cache-friendly 并进行了适当的矢量化处理。
我现在唯一能想到的就是将 DPCint 变成一个动态计算的表达式,但这在很大程度上取决于实际的表达式。由于我无法对此进行推测,因此我将保留它。