为什么这个 Cilk Array Notation 矩阵的执行速度几乎是串行版本的两倍?
Why does this Cilk Array Notation matrix perform nearly twice as slow as the serial version?
我正在对串行和 Cilk 数组表示法版本的矩阵乘法性能进行基准测试。 Cilk 实现的时间几乎是串行的两倍,我不明白为什么。
这是单核执行
这是 Cilk 乘法的核心。由于等级限制,我必须将每个乘法存储在一个数组中,然后 __sec_reduce_add
这个数组,然后再设置目标矩阵元素值。
int* sum = new int[VEC_SIZE];
for (int r = 0; (r < rowsThis); r++) {
for (int c = 0; (c < colsOther); c++) {
sum[0:VEC_SIZE] = myData[r][0:VEC_SIZE] * otherData[0:VEC_SIZE][c];
product.data[r][c] = __sec_reduce_add(sum[0:VEC_SIZE]);
}
}
我理解缓存问题,并且没有看到 Cilk 版本的缓存命中率低于串行版本的任何原因,因为它们都访问了一个希望在缓存中的列数组以及一系列行元素未命中。
是否存在我应该使用的 Cilk 的明显依赖项或句法元素?我是否应该以不同的方式使用 Cilk 来实现使用 SIMD 运算的非块矩阵乘法的最佳性能?
我是 Cilk 的新手,所以欢迎 help/suggestion。
编辑:
这是串行实现:
for (int row = 0; (row < rowsThis); row++) {
for (int col = 0; (col < colsOther); col++) {
int sum = 0;
for (int i = 0; (i < VEC_SIZE); i++) {
sum += (matrix1[row][i] * matrix2[i][col]);
}
matrix3[row][col] = sum;
}
}
内存已适当分配(解除),并且两种实现的乘法结果都是正确的。矩阵大小在编译时未知,并且在整个基准测试中使用了多种矩阵。
编译器:icpc (ICC) 15.0.0 20140723
编译标志:icpc -g Wall -O2 -std=c++11
忽略分配内存的使用,从向量到普通数组的转换,等等。我凭直觉将另一个程序分解成 运行,假设它比实际结果更简单.我无法让编译器在 2D 向量的两个维度上接受 cilk 符号,所以为了基准测试,我决定使用传统数组。
以下是所有合适的文件:
MatrixTest.cpp
#include <string>
#include <fstream>
#include <stdlib.h>
#include "Matrix.h"
#define MATRIX_SIZE 2000
#define REPETITIONS 1
int main(int argc, char** argv) {
auto init = [](int row, int col) { return row + col; };
const Matrix matrix1(MATRIX_SIZE, MATRIX_SIZE, init);
const Matrix matrix2(MATRIX_SIZE, MATRIX_SIZE, init);
for (size_t i = 0; (i < REPETITIONS); i++) {
const Matrix matrix3 = matrix1 * matrix2;
std::cout << "Diag sum: " << matrix3.sumDiag() << std::endl;
}
return 0;
}
Matrix.h
#ifndef MATRIX_H
#define MATRIX_H
#include <iostream>
#include <functional>
#include <vector>
using TwoDVec = std::vector<std::vector<int>>;
class Matrix {
public:
Matrix();
~Matrix();
Matrix(int rows, int cols);
Matrix(int rows, int cols, std::function<Val(int, int)> init);
Matrix(const Matrix& src);
Matrix(Matrix&& src);
Matrix operator*(const Matrix& src) const throw(std::exception);
Matrix& operator=(const Matrix& src);
int sumDiag() const;
protected:
int** getRawData() const;
private:
TwoDVec data;
};
#endif
Matrix.cpp
#include <iostream>
#include <algorithm>
#include <stdexcept>
#include "Matrix.h"
#if defined(CILK)
Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
int rowsOther = other.data.size();
int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
int rowsThis = data.size();
int colsThis = rowsThis > 0 ? data[0].size() : 0;
if (colsThis != rowsOther) {
throw std::runtime_error("Invalid matrices for multiplication.");
}
int** thisRaw = this->getRawData(); // held until d'tor
int** otherRaw = other.getRawData();
Matrix product(rowsThis, colsOther);
const int VEC_SIZE = colsThis;
for (int r = 0; (r < rowsThis); r++) {
for (int c = 0; (c < colsOther); c++) {
product.data[r][c] = __sec_reduce_add(thisRaw[r][0:VEC_SIZE]
* otherRaw[0:VEC_SIZE][c]);
}
}
delete[] thisRaw;
delete[] otherRaw;
return product;
}
#elif defined(SERIAL)
Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
int rowsOther = other.data.size();
int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
int rowsThis = data.size();
int colsThis = rowsThis > 0 ? data[0].size() : 0;
if (colsThis != rowsOther) {
throw std::runtime_error("Invalid matrices for multiplication.");
}
int** thisRaw = this->getRawData(); // held until d'tor
int** otherRaw = other.getRawData();
Matrix product(rowsThis, colsOther);
const int VEC_SIZE = colsThis;
for (int r = 0; (r < rowsThis); r++) {
for (int c = 0; (c < colsOther); c++) {
int sum = 0;
for (int i = 0; (i < VEC_SIZE); i++) {
sum += (thisRaw[r][i] * otherRaw[i][c]);
}
product.data[r][c] = sum;
}
}
delete[] thisRaw;
delete[] otherRaw;
return product;
}
#endif
// Default c'tor
Matrix::Matrix()
: Matrix(1,1) { }
Matrix::~Matrix() { }
// Rows/Cols c'tor
Matrix::Matrix(int rows, int cols)
: data(TwoDVec(rows, std::vector<int>(cols, 0))) { }
// Init func c'tor
Matrix::Matrix(int rows, int cols, std::function<int(int, int)> init)
: Matrix(rows, cols) {
for (int r = 0; (r < rows); r++) {
for (int c = 0; (c < cols); c++) {
data[r][c] = init(r,c);
}
}
}
// Copy c'tor
Matrix::Matrix(const Matrix& other) {
int rows = other.data.size();
int cols = rows > 0 ? other.data[0].size() : 0;
this->data.resize(rows, std::vector<int>(cols, 0));
for(int r = 0; (r < rows); r++) {
this->data[r] = other.data[r];
}
}
// Move c'tor
Matrix::Matrix(Matrix&& other) {
if (this == &other) return;
this->data.clear();
int rows = other.data.size();
for (int r = 0; (r < rows); r++) {
this->data[r] = std::move(other.data[r]);
}
}
Matrix&
Matrix::operator=(const Matrix& other) {
int rows = other.data.size();
this->data.resize(rows, std::vector<int>(0));
for (int r = 0; (r < rows); r++) {
this->data[r] = other.data[r];
}
return *this;
}
int
Matrix::sumDiag() const {
int rows = data.size();
int cols = rows > 0 ? data[0].size() : 0;
int dim = (rows < cols ? rows : cols);
int sum = 0;
for (int i = 0; (i < dim); i++) {
sum += data[i][i];
}
return sum;
}
int**
Matrix::getRawData() const {
int** rawData = new int*[data.size()];
for (int i = 0; (i < data.size()); i++) {
rawData[i] = const_cast<int*>(data[i].data());
}
return rawData;
}
[更新于 2015-3-30 以匹配长代码示例。]
icc 可能会自动矢量化您的累积循环,因此 Cilk Plus 正在与矢量化代码竞争。这里有两个可能的性能问题:
临时数组sum增加了加载和存储的次数。串行代码仅执行两次 (SIMD) 加载,并且每次 (SIMD) 乘法几乎没有存储。对于临时数组,每次乘法有 3 个加载和 1 个存储。
matrix2 具有非单位步幅访问模式(在串行代码中也是如此)。典型的当前硬件在单位步幅访问下工作得更好。
要解决问题 (1),请删除临时数组,就像您稍后在较长示例中所做的那样。例如:
for (int r = 0; (r < rowsThis); r++) {
for (int c = 0; (c < colsOther); c++) {
product.data[r][c] = __sec_reduce_add(thisRaw[r][0:VEC_SIZE] * otherRaw[0:VEC_SIZE][c]);
}
}
__sec_reduce_add
结果的秩为零,因此可以分配给标量。
更好的是,解决步幅问题。除非结果矩阵非常宽,否则一个好的方法是按行累加结果,如下所示:
for (int r = 0; (r < rowsThis); r++) {
product.data[r].data()[0:colsOther] = 0;
for (int k = 0; (k < VEC_SIZE); k++) {
product.data[r].data()[0:colsOther] +=thisRaw[r][k] * otherRaw[k][0:colsOther];
}
}
注意这里使用了data()
。数组表示法目前不允许将节表示法与来自 std::vector
的重载 []
一起使用。所以我使用 data()
获取指向基础数据的指针,我可以将其与数组符号一起使用。
现在数组部分都有单位步长,因此编译器可以有效地使用矢量硬件。上面的方案通常是我最喜欢的构造无阻塞矩阵乘法的方法。在 i7-4770 处理器上使用 icpc -g -Wall -O2 -xHost -std=c++11 -DCILK
和 运行 编译时,我看到 MatrixTest 程序时间从大约 52 秒下降到 1.75 秒,性能提高了 29 倍。
通过消除清零(在构造 product
时已经完成)和消除临时指针数组的构造,可以简化代码并加快一点速度。这是修改后的运算符的完整定义*:
Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
int rowsOther = other.data.size();
int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
int rowsThis = data.size();
int colsThis = rowsThis > 0 ? data[0].size() : 0;
if (colsThis != rowsOther) {
throw std::runtime_error("Invalid matrices for multiplication.");
}
Matrix product(rowsThis, colsOther);
for (int r = 0; (r < rowsThis); r++) {
product.data[r].data()[0:colsOther] = 0;
for (int k = 0; (k < colsThis); k++) {
product.data[r].data()[0:colsOther] += (*this).data[r][k] * other.data[k].data()[0:colsOther];
}
}
return product;
}
如果Matrix
有计算data[i].data()
的方法,长线会更短。
我正在对串行和 Cilk 数组表示法版本的矩阵乘法性能进行基准测试。 Cilk 实现的时间几乎是串行的两倍,我不明白为什么。
这是单核执行
这是 Cilk 乘法的核心。由于等级限制,我必须将每个乘法存储在一个数组中,然后 __sec_reduce_add
这个数组,然后再设置目标矩阵元素值。
int* sum = new int[VEC_SIZE];
for (int r = 0; (r < rowsThis); r++) {
for (int c = 0; (c < colsOther); c++) {
sum[0:VEC_SIZE] = myData[r][0:VEC_SIZE] * otherData[0:VEC_SIZE][c];
product.data[r][c] = __sec_reduce_add(sum[0:VEC_SIZE]);
}
}
我理解缓存问题,并且没有看到 Cilk 版本的缓存命中率低于串行版本的任何原因,因为它们都访问了一个希望在缓存中的列数组以及一系列行元素未命中。
是否存在我应该使用的 Cilk 的明显依赖项或句法元素?我是否应该以不同的方式使用 Cilk 来实现使用 SIMD 运算的非块矩阵乘法的最佳性能?
我是 Cilk 的新手,所以欢迎 help/suggestion。
编辑:
这是串行实现:
for (int row = 0; (row < rowsThis); row++) {
for (int col = 0; (col < colsOther); col++) {
int sum = 0;
for (int i = 0; (i < VEC_SIZE); i++) {
sum += (matrix1[row][i] * matrix2[i][col]);
}
matrix3[row][col] = sum;
}
}
内存已适当分配(解除),并且两种实现的乘法结果都是正确的。矩阵大小在编译时未知,并且在整个基准测试中使用了多种矩阵。
编译器:icpc (ICC) 15.0.0 20140723
编译标志:icpc -g Wall -O2 -std=c++11
忽略分配内存的使用,从向量到普通数组的转换,等等。我凭直觉将另一个程序分解成 运行,假设它比实际结果更简单.我无法让编译器在 2D 向量的两个维度上接受 cilk 符号,所以为了基准测试,我决定使用传统数组。
以下是所有合适的文件:
MatrixTest.cpp
#include <string>
#include <fstream>
#include <stdlib.h>
#include "Matrix.h"
#define MATRIX_SIZE 2000
#define REPETITIONS 1
int main(int argc, char** argv) {
auto init = [](int row, int col) { return row + col; };
const Matrix matrix1(MATRIX_SIZE, MATRIX_SIZE, init);
const Matrix matrix2(MATRIX_SIZE, MATRIX_SIZE, init);
for (size_t i = 0; (i < REPETITIONS); i++) {
const Matrix matrix3 = matrix1 * matrix2;
std::cout << "Diag sum: " << matrix3.sumDiag() << std::endl;
}
return 0;
}
Matrix.h
#ifndef MATRIX_H
#define MATRIX_H
#include <iostream>
#include <functional>
#include <vector>
using TwoDVec = std::vector<std::vector<int>>;
class Matrix {
public:
Matrix();
~Matrix();
Matrix(int rows, int cols);
Matrix(int rows, int cols, std::function<Val(int, int)> init);
Matrix(const Matrix& src);
Matrix(Matrix&& src);
Matrix operator*(const Matrix& src) const throw(std::exception);
Matrix& operator=(const Matrix& src);
int sumDiag() const;
protected:
int** getRawData() const;
private:
TwoDVec data;
};
#endif
Matrix.cpp
#include <iostream>
#include <algorithm>
#include <stdexcept>
#include "Matrix.h"
#if defined(CILK)
Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
int rowsOther = other.data.size();
int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
int rowsThis = data.size();
int colsThis = rowsThis > 0 ? data[0].size() : 0;
if (colsThis != rowsOther) {
throw std::runtime_error("Invalid matrices for multiplication.");
}
int** thisRaw = this->getRawData(); // held until d'tor
int** otherRaw = other.getRawData();
Matrix product(rowsThis, colsOther);
const int VEC_SIZE = colsThis;
for (int r = 0; (r < rowsThis); r++) {
for (int c = 0; (c < colsOther); c++) {
product.data[r][c] = __sec_reduce_add(thisRaw[r][0:VEC_SIZE]
* otherRaw[0:VEC_SIZE][c]);
}
}
delete[] thisRaw;
delete[] otherRaw;
return product;
}
#elif defined(SERIAL)
Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
int rowsOther = other.data.size();
int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
int rowsThis = data.size();
int colsThis = rowsThis > 0 ? data[0].size() : 0;
if (colsThis != rowsOther) {
throw std::runtime_error("Invalid matrices for multiplication.");
}
int** thisRaw = this->getRawData(); // held until d'tor
int** otherRaw = other.getRawData();
Matrix product(rowsThis, colsOther);
const int VEC_SIZE = colsThis;
for (int r = 0; (r < rowsThis); r++) {
for (int c = 0; (c < colsOther); c++) {
int sum = 0;
for (int i = 0; (i < VEC_SIZE); i++) {
sum += (thisRaw[r][i] * otherRaw[i][c]);
}
product.data[r][c] = sum;
}
}
delete[] thisRaw;
delete[] otherRaw;
return product;
}
#endif
// Default c'tor
Matrix::Matrix()
: Matrix(1,1) { }
Matrix::~Matrix() { }
// Rows/Cols c'tor
Matrix::Matrix(int rows, int cols)
: data(TwoDVec(rows, std::vector<int>(cols, 0))) { }
// Init func c'tor
Matrix::Matrix(int rows, int cols, std::function<int(int, int)> init)
: Matrix(rows, cols) {
for (int r = 0; (r < rows); r++) {
for (int c = 0; (c < cols); c++) {
data[r][c] = init(r,c);
}
}
}
// Copy c'tor
Matrix::Matrix(const Matrix& other) {
int rows = other.data.size();
int cols = rows > 0 ? other.data[0].size() : 0;
this->data.resize(rows, std::vector<int>(cols, 0));
for(int r = 0; (r < rows); r++) {
this->data[r] = other.data[r];
}
}
// Move c'tor
Matrix::Matrix(Matrix&& other) {
if (this == &other) return;
this->data.clear();
int rows = other.data.size();
for (int r = 0; (r < rows); r++) {
this->data[r] = std::move(other.data[r]);
}
}
Matrix&
Matrix::operator=(const Matrix& other) {
int rows = other.data.size();
this->data.resize(rows, std::vector<int>(0));
for (int r = 0; (r < rows); r++) {
this->data[r] = other.data[r];
}
return *this;
}
int
Matrix::sumDiag() const {
int rows = data.size();
int cols = rows > 0 ? data[0].size() : 0;
int dim = (rows < cols ? rows : cols);
int sum = 0;
for (int i = 0; (i < dim); i++) {
sum += data[i][i];
}
return sum;
}
int**
Matrix::getRawData() const {
int** rawData = new int*[data.size()];
for (int i = 0; (i < data.size()); i++) {
rawData[i] = const_cast<int*>(data[i].data());
}
return rawData;
}
[更新于 2015-3-30 以匹配长代码示例。]
icc 可能会自动矢量化您的累积循环,因此 Cilk Plus 正在与矢量化代码竞争。这里有两个可能的性能问题:
临时数组sum增加了加载和存储的次数。串行代码仅执行两次 (SIMD) 加载,并且每次 (SIMD) 乘法几乎没有存储。对于临时数组,每次乘法有 3 个加载和 1 个存储。
matrix2 具有非单位步幅访问模式(在串行代码中也是如此)。典型的当前硬件在单位步幅访问下工作得更好。
要解决问题 (1),请删除临时数组,就像您稍后在较长示例中所做的那样。例如:
for (int r = 0; (r < rowsThis); r++) {
for (int c = 0; (c < colsOther); c++) {
product.data[r][c] = __sec_reduce_add(thisRaw[r][0:VEC_SIZE] * otherRaw[0:VEC_SIZE][c]);
}
}
__sec_reduce_add
结果的秩为零,因此可以分配给标量。
更好的是,解决步幅问题。除非结果矩阵非常宽,否则一个好的方法是按行累加结果,如下所示:
for (int r = 0; (r < rowsThis); r++) {
product.data[r].data()[0:colsOther] = 0;
for (int k = 0; (k < VEC_SIZE); k++) {
product.data[r].data()[0:colsOther] +=thisRaw[r][k] * otherRaw[k][0:colsOther];
}
}
注意这里使用了data()
。数组表示法目前不允许将节表示法与来自 std::vector
的重载 []
一起使用。所以我使用 data()
获取指向基础数据的指针,我可以将其与数组符号一起使用。
现在数组部分都有单位步长,因此编译器可以有效地使用矢量硬件。上面的方案通常是我最喜欢的构造无阻塞矩阵乘法的方法。在 i7-4770 处理器上使用 icpc -g -Wall -O2 -xHost -std=c++11 -DCILK
和 运行 编译时,我看到 MatrixTest 程序时间从大约 52 秒下降到 1.75 秒,性能提高了 29 倍。
通过消除清零(在构造 product
时已经完成)和消除临时指针数组的构造,可以简化代码并加快一点速度。这是修改后的运算符的完整定义*:
Matrix
Matrix::operator*(const Matrix& other) const throw(std::exception) {
int rowsOther = other.data.size();
int colsOther = rowsOther > 0 ? other.data[0].size() : 0;
int rowsThis = data.size();
int colsThis = rowsThis > 0 ? data[0].size() : 0;
if (colsThis != rowsOther) {
throw std::runtime_error("Invalid matrices for multiplication.");
}
Matrix product(rowsThis, colsOther);
for (int r = 0; (r < rowsThis); r++) {
product.data[r].data()[0:colsOther] = 0;
for (int k = 0; (k < colsThis); k++) {
product.data[r].data()[0:colsOther] += (*this).data[r][k] * other.data[k].data()[0:colsOther];
}
}
return product;
}
如果Matrix
有计算data[i].data()
的方法,长线会更短。