使用 SIMD 优化列式最大值
optimising column-wise maximum with SIMD
我在代码中花费了大量时间来实现这个功能,如果可能的话,我想通过向量化-SIMD-编译器内部函数对其进行优化。
它本质上是在列矩阵中找到最大值和最大值的位置,并存储它们:
- val_ptr:输入矩阵:列优先(Fortran 风格)n_rows-by-n_cols(通常 n_rows>>n_cols)
- opt_pos_ptr : int vector of length n_rows 存储最大值的位置。在条目中填充零。
- max_ptr:长度为 n_rows 的浮点向量,用于存储最大值。在条目上填写了第一列的副本 val_ptr
- 该函数将在并行循环中调用
- 保证内存区域不重叠
- 我真的不需要 max_ptr 填充,目前它只是用于记账和避免内存分配
- 我在 Windows 10 上使用 MSVC、C++17。意味着 运行 现代英特尔 CPU
模板类型为 float 或 double 的代码:
template <typename eT>
find_max(const int n_cols,
const int n_rows,
const eT* val_ptr,
int* opt_pos_ptr,
eT* max_ptr){
for (int col = 1; col < n_cols; ++col)
{
//Getting the pointer to the beginning of the column
const auto* value_col = val_ptr + col * n_rows;
//Looping over the rows
for (int row = 0; row < n_rows; ++row)
{
//If the value is larger than the current maximum, we replace and we store its positions
if (value_col[row] > max_ptr[row])
{
max_ptr[row] = value_col[row];
opt_pos_ptr[row] = col;
}
}
}
}
到目前为止我尝试了什么:
- 我尝试在内部循环中使用 OpenMP parallel for,但只在非常大的行上带来一些东西,比我当前的使用量大一点。
- 内部循环中的 if 阻止了#pragma omp simd 的工作,没有它我无法重写它。
根据您发布的代码示例,您似乎想要计算垂直最大值,这意味着在您的情况下“列”是水平的。在 C/C++ 中,元素的水平序列(即两个相邻元素在内存中具有一个元素的距离)通常称为行和垂直(其中两个相邻元素在内存中具有行大小的距离) - 列。在我下面的回答中,我将使用传统术语,其中行是水平的,列是垂直的。
此外,为简洁起见,我将重点关注一种可能的矩阵元素类型 - float
。 double
的基本思想相同,主要区别在于每个向量的元素数量和 _ps
/_pd
内在函数 selection。我会在最后提供 double
的版本。
我们的想法是,您可以使用 _mm_max_ps
/_mm_max_pd
并行计算多列的垂直最大值。为了同时记录找到的最大值的位置,您可以将先前的最大值与当前元素进行比较。比较的结果是一个掩码,其中元素是all-ones,其中最大值被更新。该掩码可用于 select 哪个位置也需要更新。
我必须注意,如果一列中有多个相等的最大元素,下面的算法假定记录哪个最大元素的位置并不重要。另外,我假设矩阵不包含 NaN 值,这会影响比较。稍后会详细介绍。
void find_max(const int n_cols,
const int n_rows,
const float* val_ptr,
int* opt_pos_ptr,
float* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 4 floats or 2 doubles
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128 mm_max = _mm_loadu_ps(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128 mm_value = _mm_loadu_ps(col_ptr);
// See if this value is greater than the old maximum
__m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));
// Compute the maximum
mm_max = _mm_max_ps(mm_value, mm_max);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_ps(max_ptr + row, mm_max);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}
上面的代码需要 SSE4.1 因为混合内在函数。您可以将它们替换为 _mm_and_si128
/_ps
、_mm_andnot_si128
/_ps
和 _mm_or_si128
/_ps
的组合,在这种情况下,要求将降为 SSE2。有关特定内在函数的更多详细信息,包括它们需要哪些指令集扩展,请参阅 Intel Intrinsics Guide。
关于 NaN 值的注释。如果您的矩阵可以有 NaN,则 _mm_cmplt_ps
测试将始终 return 为假。至于_mm_max_ps
,一般不知道会是什么return。如果其中一个操作数是 NaN,则内在转换为 return 其第二个(源)操作数的 maxps
指令,因此通过安排指令的操作数,您可以实现任一行为。但是,没有记录 _mm_max_ps
内在函数的哪个参数代表指令的哪个操作数,编译器甚至可能在不同情况下使用不同的关联。有关详细信息,请参阅 答案。
为了确保正确的行为。 NaNs 你可以使用内联汇编器来强制 maxps
操作数的正确顺序。不幸的是,这不是 MSVC 用于 x86-64 目标的选项,您说您正在使用它,因此您可以将 _mm_cmplt_ps
结果重新用于第二次混合,如下所示:
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);
这将抑制生成的最大值中的 NaN。如果您想保留 NaN,可以使用第二次比较来检测 NaN:
// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));
如果你使用更宽的向量(__m256
或 __m512
)并且将外循环展开一个小因子,你可能会进一步提高上述算法的性能,这样至少有一个缓存行在内循环的每次迭代中加载行数据的价值。
这是 double
的实施示例。这里要注意的重点是,因为每个向量只有两个 double
个元素,而且每个向量还有四个位置,所以我们必须展开外循环来一次处理两个 double
的向量然后压缩与先前最大值比较的两个掩码以混合 32 位位置。
void find_max(const int n_cols,
const int n_rows,
const double* val_ptr,
int* opt_pos_ptr,
double* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 2 doubles, but we want to process
// two vectors at a time.
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
__m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
__m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
// Compress the 2 masks into one
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_max2 = _mm_max_pd(mm_value2, mm_max2);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
_mm_storeu_pd(max_ptr + row + 2, mm_max2);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process 2 doubles at once
if (tail_size >= 2)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
// Compress the mask. The upper half doesn't matter.
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
// Only store the lower two positions
_mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
row += 2;
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}
我在代码中花费了大量时间来实现这个功能,如果可能的话,我想通过向量化-SIMD-编译器内部函数对其进行优化。
它本质上是在列矩阵中找到最大值和最大值的位置,并存储它们:
- val_ptr:输入矩阵:列优先(Fortran 风格)n_rows-by-n_cols(通常 n_rows>>n_cols)
- opt_pos_ptr : int vector of length n_rows 存储最大值的位置。在条目中填充零。
- max_ptr:长度为 n_rows 的浮点向量,用于存储最大值。在条目上填写了第一列的副本 val_ptr
- 该函数将在并行循环中调用
- 保证内存区域不重叠
- 我真的不需要 max_ptr 填充,目前它只是用于记账和避免内存分配
- 我在 Windows 10 上使用 MSVC、C++17。意味着 运行 现代英特尔 CPU
模板类型为 float 或 double 的代码:
template <typename eT>
find_max(const int n_cols,
const int n_rows,
const eT* val_ptr,
int* opt_pos_ptr,
eT* max_ptr){
for (int col = 1; col < n_cols; ++col)
{
//Getting the pointer to the beginning of the column
const auto* value_col = val_ptr + col * n_rows;
//Looping over the rows
for (int row = 0; row < n_rows; ++row)
{
//If the value is larger than the current maximum, we replace and we store its positions
if (value_col[row] > max_ptr[row])
{
max_ptr[row] = value_col[row];
opt_pos_ptr[row] = col;
}
}
}
}
到目前为止我尝试了什么:
- 我尝试在内部循环中使用 OpenMP parallel for,但只在非常大的行上带来一些东西,比我当前的使用量大一点。
- 内部循环中的 if 阻止了#pragma omp simd 的工作,没有它我无法重写它。
根据您发布的代码示例,您似乎想要计算垂直最大值,这意味着在您的情况下“列”是水平的。在 C/C++ 中,元素的水平序列(即两个相邻元素在内存中具有一个元素的距离)通常称为行和垂直(其中两个相邻元素在内存中具有行大小的距离) - 列。在我下面的回答中,我将使用传统术语,其中行是水平的,列是垂直的。
此外,为简洁起见,我将重点关注一种可能的矩阵元素类型 - float
。 double
的基本思想相同,主要区别在于每个向量的元素数量和 _ps
/_pd
内在函数 selection。我会在最后提供 double
的版本。
我们的想法是,您可以使用 _mm_max_ps
/_mm_max_pd
并行计算多列的垂直最大值。为了同时记录找到的最大值的位置,您可以将先前的最大值与当前元素进行比较。比较的结果是一个掩码,其中元素是all-ones,其中最大值被更新。该掩码可用于 select 哪个位置也需要更新。
我必须注意,如果一列中有多个相等的最大元素,下面的算法假定记录哪个最大元素的位置并不重要。另外,我假设矩阵不包含 NaN 值,这会影响比较。稍后会详细介绍。
void find_max(const int n_cols,
const int n_rows,
const float* val_ptr,
int* opt_pos_ptr,
float* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 4 floats or 2 doubles
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128 mm_max = _mm_loadu_ps(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128 mm_value = _mm_loadu_ps(col_ptr);
// See if this value is greater than the old maximum
__m128 mm_mask = _mm_cmplt_ps(mm_max, mm_value);
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, _mm_castps_si128(mm_mask));
// Compute the maximum
mm_max = _mm_max_ps(mm_value, mm_max);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_ps(max_ptr + row, mm_max);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}
上面的代码需要 SSE4.1 因为混合内在函数。您可以将它们替换为 _mm_and_si128
/_ps
、_mm_andnot_si128
/_ps
和 _mm_or_si128
/_ps
的组合,在这种情况下,要求将降为 SSE2。有关特定内在函数的更多详细信息,包括它们需要哪些指令集扩展,请参阅 Intel Intrinsics Guide。
关于 NaN 值的注释。如果您的矩阵可以有 NaN,则 _mm_cmplt_ps
测试将始终 return 为假。至于_mm_max_ps
,一般不知道会是什么return。如果其中一个操作数是 NaN,则内在转换为 return 其第二个(源)操作数的 maxps
指令,因此通过安排指令的操作数,您可以实现任一行为。但是,没有记录 _mm_max_ps
内在函数的哪个参数代表指令的哪个操作数,编译器甚至可能在不同情况下使用不同的关联。有关详细信息,请参阅
为了确保正确的行为。 NaNs 你可以使用内联汇编器来强制 maxps
操作数的正确顺序。不幸的是,这不是 MSVC 用于 x86-64 目标的选项,您说您正在使用它,因此您可以将 _mm_cmplt_ps
结果重新用于第二次混合,如下所示:
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, mm_mask);
这将抑制生成的最大值中的 NaN。如果您想保留 NaN,可以使用第二次比较来检测 NaN:
// Detect NaNs
__m128 mm_nan_mask = _mm_cmpunord_ps(mm_value, mm_value);
// Compute the maximum
mm_max = _mm_blendv_ps(mm_max, mm_value, _mm_or_ps(mm_mask, mm_nan_mask));
如果你使用更宽的向量(__m256
或 __m512
)并且将外循环展开一个小因子,你可能会进一步提高上述算法的性能,这样至少有一个缓存行在内循环的每次迭代中加载行数据的价值。
这是 double
的实施示例。这里要注意的重点是,因为每个向量只有两个 double
个元素,而且每个向量还有四个位置,所以我们必须展开外循环来一次处理两个 double
的向量然后压缩与先前最大值比较的两个掩码以混合 32 位位置。
void find_max(const int n_cols,
const int n_rows,
const double* val_ptr,
int* opt_pos_ptr,
double* max_ptr){
const __m128i mm_one = _mm_set1_epi32(1);
// Pre-compute the number of rows that can be processed in full vector width.
// In a 128-bit vector there are 2 doubles, but we want to process
// two vectors at a time.
int tail_size = n_rows & 3;
int n_rows_aligned = n_rows - tail_size;
int row = 0;
for (; row < n_rows_aligned; row += 4)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128d mm_max2 = _mm_loadu_pd(col_ptr + 2);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
__m128d mm_value2 = _mm_loadu_pd(col_ptr + 2);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
__m128d mm_mask2 = _mm_cmplt_pd(mm_max2, mm_value2);
// Compress the 2 masks into one
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask2));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_max2 = _mm_max_pd(mm_value2, mm_max2);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
_mm_storeu_pd(max_ptr + row + 2, mm_max2);
_mm_storeu_si128(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
}
// Process 2 doubles at once
if (tail_size >= 2)
{
const auto* col_ptr = val_ptr + row;
__m128d mm_max1 = _mm_loadu_pd(col_ptr);
__m128i mm_max_pos = _mm_setzero_si128();
__m128i mm_pos = mm_one;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
__m128d mm_value1 = _mm_loadu_pd(col_ptr);
// See if this value is greater than the old maximum
__m128d mm_mask1 = _mm_cmplt_pd(mm_max1, mm_value1);
// Compress the mask. The upper half doesn't matter.
__m128i mm_mask = _mm_packs_epi32(
_mm_castpd_si128(mm_mask1), _mm_castpd_si128(mm_mask1));
// If it is, save its position
mm_max_pos = _mm_blendv_epi8(mm_max_pos, mm_pos, mm_mask);
// Compute the maximum
mm_max1 = _mm_max_pd(mm_value1, mm_max1);
mm_pos = _mm_add_epi32(mm_pos, mm_one);
col_ptr += n_rows;
}
// Store the results
_mm_storeu_pd(max_ptr + row, mm_max1);
// Only store the lower two positions
_mm_storel_epi64(reinterpret_cast< __m128i* >(opt_pos_ptr + row), mm_max_pos);
row += 2;
}
// Process tail serially
for (; row < n_rows; ++row)
{
const auto* col_ptr = val_ptr + row;
auto max = *col_ptr;
int max_pos = 0;
col_ptr += n_rows;
for (int col = 1; col < n_cols; ++col)
{
auto value = *col_ptr;
if (value > max)
{
max = value;
max_pos = col;
}
col_ptr += n_rows;
}
max_ptr[row] = max;
opt_pos_ptr[row] = max_pos;
}
}