AVX2 列人口计数算法分别对每个位列
AVX2 column population count algorithm over each bit-column separately
对于我正在处理的项目,我需要计算翻录的 PDF
图像数据中每列的设置位数。
我正在尝试获取整个 PDF
作业(所有页面)中每一列的总设置位数。
数据一旦被翻录,就会存储在 MemoryMappedFile
中,没有后备文件(在内存中)。
PDF 页面尺寸为 13952 像素 x 15125 像素。可以通过将 PDF
的长度(高度)(以像素为单位)乘以宽度(以字节为单位)来计算生成的翻录数据的总大小。翻录数据为1 bit == 1 pixel
。所以翻录页面的大小(以字节为单位)为 (13952 / 8) * 15125
.
请注意,宽度始终是 64 bits
的倍数。
我得数一数PDF
(可能有几万页)被撕掉后每一页每一列的设置位数。
我首先通过一个基本解决方案解决了这个问题,即循环遍历每个字节并计算设置位的数量并将结果放在 vector
中。我已经将算法缩减为如下所示。我的执行时间从 ~350 毫秒减少到 ~120 毫秒。
static void count_dots( )
{
using namespace diag;
using namespace std::chrono;
std::vector<std::size_t> dot_counts( 13952, 0 );
uint64_t* ptr_dot_counts{ dot_counts.data( ) };
std::vector<uint64_t> ripped_pdf_data( 3297250, 0xFFFFFFFFFFFFFFFFUL );
const uint64_t* ptr_data{ ripped_pdf_data.data( ) };
std::size_t line_count{ 0 };
std::size_t counter{ ripped_pdf_data.size( ) };
stopwatch sw;
sw.start( );
while( counter > 0 )
{
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000000001UL ) >> 0;
++ptr_data;
--counter;
if( ++line_count >= 218 )
{
ptr_dot_counts = dot_counts.data( );
line_count = 0;
}
}
sw.stop( );
std::cout << sw.elapsed<milliseconds>( ) << "ms\n";
}
不幸的是,这仍然会增加很多额外的处理时间,这是不可接受的。
上面的代码很丑,不会赢得任何选美比赛,但它有助于减少执行时间。
从我写的原始版本开始,我做了以下事情:
- 使用
pointers
代替indexers
- 以
uint64
的块而不是 uint8
的形式处理数据
- 手动展开
for
循环以遍历 uint64
的每个 byte
中的每个 bit
- 使用最终的
bit shift
而不是 __popcnt64
来计算掩码后的集合 bit
对于此测试,我生成了伪造的翻录数据,其中每个 bit
都设置为 1
。测试完成后,dot_counts
vector
应包含每个 element
的 15125
。
我希望这里的一些人可以帮助我使算法的平均执行时间低于 100 毫秒。
我不关心这里的可移植性。
- 目标机器的CPU:
Xeon E5-2680 v4 - Intel
- 编译器:
MSVC++ 14.23
- OS:
Windows 10
- C++ 版本:
C++17
- 编译器标志:
/O2
/arch:AVX2
大约 8 年前有人问过一个非常相似的问题:
How to quickly count bits into separate bins in a series of ints on Sandy Bridge?
(编者注:也许您错过了 ,它有一些更新更快的答案,至少对于在连续内存中沿着一列而不是沿着一行向下移动。也许您可以使用 1 或 2 个缓存-行向下一列,这样您就可以在 SIMD 寄存器中保持计数器热。)
当我将目前的答案与已接受的答案进行比较时,我的答案相当接近。我已经在处理 uint64
而不是 uint8
的块。我只是想知道是否还有更多我可以做的事情,无论是内在函数、汇编,还是一些简单的事情,比如改变我正在使用的数据结构。
可以使用 AVX2 完成,如标记的那样。
为了使这项工作正常进行,我建议 vector<uint16_t>
计数。添加计数是最大的问题,我们需要扩大的越多,问题就越大。 uint16_t
足以计算一页,因此您可以一次计算一页并将计数器添加到一组更宽的计数器中以计算总数。这是一些开销,但比在主循环中必须扩大更多开销要少得多。
计数的大端排序非常烦人,需要引入更多的洗牌才能使其正确。所以我建议得到它 错误 并在以后重新排序计数(也许在将它们相加到总数中?)。 "right shift by 7 first, then 6, then 5" 的顺序可以免费维护,因为我们可以随意选择 64 位块的移位计数。所以在下面的代码中,计数的实际顺序是:
- 最低有效字节的第 7 位,
- 第二个字节的第 7 位
- ...
- 最高有效字节的第 7 位,
- 最低有效字节的第 6 位,
- ...
所以每组8个都颠倒了。 (至少这是我打算做的,AVX2 unpacks 令人困惑)
代码(未测试):
while( counter > 0 )
{
__m256i data = _mm256_set1_epi64x(*ptr_data);
__m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
__m256i zero = _mm256_setzero_si256();
__m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
ptr_dot_counts += 64;
++ptr_data;
--counter;
if( ++line_count >= 218 )
{
ptr_dot_counts = dot_counts.data( );
line_count = 0;
}
}
这可以进一步展开,一次处理多行。这很好,因为如前所述,对计数器求和是最大的问题,按行展开会做更少的事情,而会在寄存器中进行更简单的求和。
使用的 Somme 内在函数:
_mm256_set1_epi64x
,复制一个int64_t
到向量的所有4个64位元素。 uint64_t
. 也可以
_mm256_set_epi64x
, 将4个64位的值转为向量
_mm256_srlv_epi64
,逻辑右移,带变量计数(每个元素可以是不同的计数)。
_mm256_and_si256
,只是按位与。
_mm256_add_epi16
,另外,适用于 16 位元素。
_mm256_unpacklo_epi8
and _mm256_unpackhi_epi8
,可能最好用该页上的图表解释
可以求和 "vertically",使用一个 uint64_t
保存 64 个单独和的所有第 0 位,另一个 uint64_t
保存所有和的第 1 位等加法可以通过用按位运算模拟全加器(电路组件)来完成。然后,不是只向计数器添加 0 或 1,而是一次添加更大的数字。
垂直总和也可以矢量化,但这会显着增加将垂直总和添加到列总和的代码,所以我在这里没有这样做。它应该有所帮助,但它只是很多代码。
示例(未测试):
size_t y;
// sum 7 rows at once
for (y = 0; (y + 6) < 15125; y += 7) {
ptr_dot_counts = dot_counts.data( );
ptr_data = ripped_pdf_data.data( ) + y * 218;
for (size_t x = 0; x < 218; x++) {
uint64_t dataA = ptr_data[0];
uint64_t dataB = ptr_data[218];
uint64_t dataC = ptr_data[218 * 2];
uint64_t dataD = ptr_data[218 * 3];
uint64_t dataE = ptr_data[218 * 4];
uint64_t dataF = ptr_data[218 * 5];
uint64_t dataG = ptr_data[218 * 6];
// vertical sums, 7 bits to 3
uint64_t abc0 = (dataA ^ dataB) ^ dataC;
uint64_t abc1 = (dataA ^ dataB) & dataC | (dataA & dataB);
uint64_t def0 = (dataD ^ dataE) ^ dataF;
uint64_t def1 = (dataD ^ dataE) & dataF | (dataD & dataE);
uint64_t bit0 = (abc0 ^ def0) ^ dataG;
uint64_t c1 = (abc0 ^ def0) & dataG | (abc0 & def0);
uint64_t bit1 = (abc1 ^ def1) ^ c1;
uint64_t bit2 = (abc1 ^ def1) & c1 | (abc1 & def1);
// add vertical sums to column counts
__m256i bit0v = _mm256_set1_epi64x(bit0);
__m256i data01 = _mm256_srlv_epi64(bit0v, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data02 = _mm256_srlv_epi64(bit0v, _mm256_set_epi64x(0, 2, 1, 3));
data01 = _mm256_and_si256(data01, _mm256_set1_epi8(1));
data02 = _mm256_and_si256(data02, _mm256_set1_epi8(1));
__m256i bit1v = _mm256_set1_epi64x(bit1);
__m256i data11 = _mm256_srlv_epi64(bit1v, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data12 = _mm256_srlv_epi64(bit1v, _mm256_set_epi64x(0, 2, 1, 3));
data11 = _mm256_and_si256(data11, _mm256_set1_epi8(1));
data12 = _mm256_and_si256(data12, _mm256_set1_epi8(1));
data11 = _mm256_add_epi8(data11, data11);
data12 = _mm256_add_epi8(data12, data12);
__m256i bit2v = _mm256_set1_epi64x(bit2);
__m256i data21 = _mm256_srlv_epi64(bit2v, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data22 = _mm256_srlv_epi64(bit2v, _mm256_set_epi64x(0, 2, 1, 3));
data21 = _mm256_and_si256(data21, _mm256_set1_epi8(1));
data22 = _mm256_and_si256(data22, _mm256_set1_epi8(1));
data21 = _mm256_slli_epi16(data21, 2);
data22 = _mm256_slli_epi16(data22, 2);
__m256i data1 = _mm256_add_epi8(_mm256_add_epi8(data01, data11), data21);
__m256i data2 = _mm256_add_epi8(_mm256_add_epi8(data02, data12), data22);
__m256i zero = _mm256_setzero_si256();
__m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
ptr_dot_counts += 64;
++ptr_data;
}
}
// leftover rows
for (; y < 15125; y++) {
ptr_dot_counts = dot_counts.data( );
ptr_data = ripped_pdf_data.data( ) + y * 218;
for (size_t x = 0; x < 218; x++) {
__m256i data = _mm256_set1_epi64x(*ptr_data);
__m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
__m256i zero = _mm256_setzero_si256();
__m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
ptr_dot_counts += 64;
++ptr_data;
}
}
到目前为止第二好的方法是一种更简单的方法,更像第一个版本,除了一次运行 yloopLen
行以利用快速 8 位求和:
size_t yloopLen = 32;
size_t yblock = yloopLen * 1;
size_t yy;
for (yy = 0; yy < 15125; yy += yblock) {
for (size_t x = 0; x < 218; x++) {
ptr_data = ripped_pdf_data.data() + x;
ptr_dot_counts = dot_counts.data() + x * 64;
__m256i zero = _mm256_setzero_si256();
__m256i c1 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
__m256i c2 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
__m256i c3 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
__m256i c4 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
size_t end = std::min(yy + yblock, size_t(15125));
size_t y;
for (y = yy; y < end; y += yloopLen) {
size_t len = std::min(size_t(yloopLen), end - y);
__m256i count1 = zero;
__m256i count2 = zero;
for (size_t t = 0; t < len; t++) {
__m256i data = _mm256_set1_epi64x(ptr_data[(y + t) * 218]);
__m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
count1 = _mm256_add_epi8(count1, data1);
count2 = _mm256_add_epi8(count2, data2);
}
c1 = _mm256_add_epi16(_mm256_unpacklo_epi8(count1, zero), c1);
c2 = _mm256_add_epi16(_mm256_unpackhi_epi8(count1, zero), c2);
c3 = _mm256_add_epi16(_mm256_unpacklo_epi8(count2, zero), c3);
c4 = _mm256_add_epi16(_mm256_unpackhi_epi8(count2, zero), c4);
}
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c1);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c2);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c3);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c4);
}
}
之前有一些测量问题,最后这实际上并没有更好,但也没有比上面更漂亮的 "vertical sum" 版本差多少。
对于我正在处理的项目,我需要计算翻录的 PDF
图像数据中每列的设置位数。
我正在尝试获取整个 PDF
作业(所有页面)中每一列的总设置位数。
数据一旦被翻录,就会存储在 MemoryMappedFile
中,没有后备文件(在内存中)。
PDF 页面尺寸为 13952 像素 x 15125 像素。可以通过将 PDF
的长度(高度)(以像素为单位)乘以宽度(以字节为单位)来计算生成的翻录数据的总大小。翻录数据为1 bit == 1 pixel
。所以翻录页面的大小(以字节为单位)为 (13952 / 8) * 15125
.
请注意,宽度始终是 64 bits
的倍数。
我得数一数PDF
(可能有几万页)被撕掉后每一页每一列的设置位数。
我首先通过一个基本解决方案解决了这个问题,即循环遍历每个字节并计算设置位的数量并将结果放在 vector
中。我已经将算法缩减为如下所示。我的执行时间从 ~350 毫秒减少到 ~120 毫秒。
static void count_dots( )
{
using namespace diag;
using namespace std::chrono;
std::vector<std::size_t> dot_counts( 13952, 0 );
uint64_t* ptr_dot_counts{ dot_counts.data( ) };
std::vector<uint64_t> ripped_pdf_data( 3297250, 0xFFFFFFFFFFFFFFFFUL );
const uint64_t* ptr_data{ ripped_pdf_data.data( ) };
std::size_t line_count{ 0 };
std::size_t counter{ ripped_pdf_data.size( ) };
stopwatch sw;
sw.start( );
while( counter > 0 )
{
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 7 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 6 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 5 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 4 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 3 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 2 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 1 ) & 0x0000000000000001UL ) >> 0;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0100000000000000UL ) >> 56;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0001000000000000UL ) >> 48;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000010000000000UL ) >> 40;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000100000000UL ) >> 32;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000001000000UL ) >> 24;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000010000UL ) >> 16;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000000100UL ) >> 8;
*ptr_dot_counts++ += ( ( *ptr_data >> 0 ) & 0x0000000000000001UL ) >> 0;
++ptr_data;
--counter;
if( ++line_count >= 218 )
{
ptr_dot_counts = dot_counts.data( );
line_count = 0;
}
}
sw.stop( );
std::cout << sw.elapsed<milliseconds>( ) << "ms\n";
}
不幸的是,这仍然会增加很多额外的处理时间,这是不可接受的。
上面的代码很丑,不会赢得任何选美比赛,但它有助于减少执行时间。 从我写的原始版本开始,我做了以下事情:
- 使用
pointers
代替indexers
- 以
uint64
的块而不是uint8
的形式处理数据
- 手动展开
for
循环以遍历uint64
的每个 - 使用最终的
bit shift
而不是__popcnt64
来计算掩码后的集合bit
byte
中的每个 bit
对于此测试,我生成了伪造的翻录数据,其中每个 bit
都设置为 1
。测试完成后,dot_counts
vector
应包含每个 element
的 15125
。
我希望这里的一些人可以帮助我使算法的平均执行时间低于 100 毫秒。 我不关心这里的可移植性。
- 目标机器的CPU:
Xeon E5-2680 v4 - Intel
- 编译器:
MSVC++ 14.23
- OS:
Windows 10
- C++ 版本:
C++17
- 编译器标志:
/O2
/arch:AVX2
大约 8 年前有人问过一个非常相似的问题: How to quickly count bits into separate bins in a series of ints on Sandy Bridge?
(编者注:也许您错过了
当我将目前的答案与已接受的答案进行比较时,我的答案相当接近。我已经在处理 uint64
而不是 uint8
的块。我只是想知道是否还有更多我可以做的事情,无论是内在函数、汇编,还是一些简单的事情,比如改变我正在使用的数据结构。
可以使用 AVX2 完成,如标记的那样。
为了使这项工作正常进行,我建议 vector<uint16_t>
计数。添加计数是最大的问题,我们需要扩大的越多,问题就越大。 uint16_t
足以计算一页,因此您可以一次计算一页并将计数器添加到一组更宽的计数器中以计算总数。这是一些开销,但比在主循环中必须扩大更多开销要少得多。
计数的大端排序非常烦人,需要引入更多的洗牌才能使其正确。所以我建议得到它 错误 并在以后重新排序计数(也许在将它们相加到总数中?)。 "right shift by 7 first, then 6, then 5" 的顺序可以免费维护,因为我们可以随意选择 64 位块的移位计数。所以在下面的代码中,计数的实际顺序是:
- 最低有效字节的第 7 位,
- 第二个字节的第 7 位
- ...
- 最高有效字节的第 7 位,
- 最低有效字节的第 6 位,
- ...
所以每组8个都颠倒了。 (至少这是我打算做的,AVX2 unpacks 令人困惑)
代码(未测试):
while( counter > 0 )
{
__m256i data = _mm256_set1_epi64x(*ptr_data);
__m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
__m256i zero = _mm256_setzero_si256();
__m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
ptr_dot_counts += 64;
++ptr_data;
--counter;
if( ++line_count >= 218 )
{
ptr_dot_counts = dot_counts.data( );
line_count = 0;
}
}
这可以进一步展开,一次处理多行。这很好,因为如前所述,对计数器求和是最大的问题,按行展开会做更少的事情,而会在寄存器中进行更简单的求和。
使用的 Somme 内在函数:
_mm256_set1_epi64x
,复制一个int64_t
到向量的所有4个64位元素。uint64_t
. 也可以
_mm256_set_epi64x
, 将4个64位的值转为向量_mm256_srlv_epi64
,逻辑右移,带变量计数(每个元素可以是不同的计数)。_mm256_and_si256
,只是按位与。_mm256_add_epi16
,另外,适用于 16 位元素。_mm256_unpacklo_epi8
and_mm256_unpackhi_epi8
,可能最好用该页上的图表解释
可以求和 "vertically",使用一个 uint64_t
保存 64 个单独和的所有第 0 位,另一个 uint64_t
保存所有和的第 1 位等加法可以通过用按位运算模拟全加器(电路组件)来完成。然后,不是只向计数器添加 0 或 1,而是一次添加更大的数字。
垂直总和也可以矢量化,但这会显着增加将垂直总和添加到列总和的代码,所以我在这里没有这样做。它应该有所帮助,但它只是很多代码。
示例(未测试):
size_t y;
// sum 7 rows at once
for (y = 0; (y + 6) < 15125; y += 7) {
ptr_dot_counts = dot_counts.data( );
ptr_data = ripped_pdf_data.data( ) + y * 218;
for (size_t x = 0; x < 218; x++) {
uint64_t dataA = ptr_data[0];
uint64_t dataB = ptr_data[218];
uint64_t dataC = ptr_data[218 * 2];
uint64_t dataD = ptr_data[218 * 3];
uint64_t dataE = ptr_data[218 * 4];
uint64_t dataF = ptr_data[218 * 5];
uint64_t dataG = ptr_data[218 * 6];
// vertical sums, 7 bits to 3
uint64_t abc0 = (dataA ^ dataB) ^ dataC;
uint64_t abc1 = (dataA ^ dataB) & dataC | (dataA & dataB);
uint64_t def0 = (dataD ^ dataE) ^ dataF;
uint64_t def1 = (dataD ^ dataE) & dataF | (dataD & dataE);
uint64_t bit0 = (abc0 ^ def0) ^ dataG;
uint64_t c1 = (abc0 ^ def0) & dataG | (abc0 & def0);
uint64_t bit1 = (abc1 ^ def1) ^ c1;
uint64_t bit2 = (abc1 ^ def1) & c1 | (abc1 & def1);
// add vertical sums to column counts
__m256i bit0v = _mm256_set1_epi64x(bit0);
__m256i data01 = _mm256_srlv_epi64(bit0v, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data02 = _mm256_srlv_epi64(bit0v, _mm256_set_epi64x(0, 2, 1, 3));
data01 = _mm256_and_si256(data01, _mm256_set1_epi8(1));
data02 = _mm256_and_si256(data02, _mm256_set1_epi8(1));
__m256i bit1v = _mm256_set1_epi64x(bit1);
__m256i data11 = _mm256_srlv_epi64(bit1v, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data12 = _mm256_srlv_epi64(bit1v, _mm256_set_epi64x(0, 2, 1, 3));
data11 = _mm256_and_si256(data11, _mm256_set1_epi8(1));
data12 = _mm256_and_si256(data12, _mm256_set1_epi8(1));
data11 = _mm256_add_epi8(data11, data11);
data12 = _mm256_add_epi8(data12, data12);
__m256i bit2v = _mm256_set1_epi64x(bit2);
__m256i data21 = _mm256_srlv_epi64(bit2v, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data22 = _mm256_srlv_epi64(bit2v, _mm256_set_epi64x(0, 2, 1, 3));
data21 = _mm256_and_si256(data21, _mm256_set1_epi8(1));
data22 = _mm256_and_si256(data22, _mm256_set1_epi8(1));
data21 = _mm256_slli_epi16(data21, 2);
data22 = _mm256_slli_epi16(data22, 2);
__m256i data1 = _mm256_add_epi8(_mm256_add_epi8(data01, data11), data21);
__m256i data2 = _mm256_add_epi8(_mm256_add_epi8(data02, data12), data22);
__m256i zero = _mm256_setzero_si256();
__m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
ptr_dot_counts += 64;
++ptr_data;
}
}
// leftover rows
for (; y < 15125; y++) {
ptr_dot_counts = dot_counts.data( );
ptr_data = ripped_pdf_data.data( ) + y * 218;
for (size_t x = 0; x < 218; x++) {
__m256i data = _mm256_set1_epi64x(*ptr_data);
__m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
__m256i zero = _mm256_setzero_si256();
__m256i c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data1, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
c = _mm256_add_epi16(_mm256_unpacklo_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c);
c = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
c = _mm256_add_epi16(_mm256_unpackhi_epi8(data2, zero), c);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c);
ptr_dot_counts += 64;
++ptr_data;
}
}
到目前为止第二好的方法是一种更简单的方法,更像第一个版本,除了一次运行 yloopLen
行以利用快速 8 位求和:
size_t yloopLen = 32;
size_t yblock = yloopLen * 1;
size_t yy;
for (yy = 0; yy < 15125; yy += yblock) {
for (size_t x = 0; x < 218; x++) {
ptr_data = ripped_pdf_data.data() + x;
ptr_dot_counts = dot_counts.data() + x * 64;
__m256i zero = _mm256_setzero_si256();
__m256i c1 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[0]);
__m256i c2 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[16]);
__m256i c3 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[32]);
__m256i c4 = _mm256_loadu_si256((__m256i*)&ptr_dot_counts[48]);
size_t end = std::min(yy + yblock, size_t(15125));
size_t y;
for (y = yy; y < end; y += yloopLen) {
size_t len = std::min(size_t(yloopLen), end - y);
__m256i count1 = zero;
__m256i count2 = zero;
for (size_t t = 0; t < len; t++) {
__m256i data = _mm256_set1_epi64x(ptr_data[(y + t) * 218]);
__m256i data1 = _mm256_srlv_epi64(data, _mm256_set_epi64x(4, 6, 5, 7));
__m256i data2 = _mm256_srlv_epi64(data, _mm256_set_epi64x(0, 2, 1, 3));
data1 = _mm256_and_si256(data1, _mm256_set1_epi8(1));
data2 = _mm256_and_si256(data2, _mm256_set1_epi8(1));
count1 = _mm256_add_epi8(count1, data1);
count2 = _mm256_add_epi8(count2, data2);
}
c1 = _mm256_add_epi16(_mm256_unpacklo_epi8(count1, zero), c1);
c2 = _mm256_add_epi16(_mm256_unpackhi_epi8(count1, zero), c2);
c3 = _mm256_add_epi16(_mm256_unpacklo_epi8(count2, zero), c3);
c4 = _mm256_add_epi16(_mm256_unpackhi_epi8(count2, zero), c4);
}
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[0], c1);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[16], c2);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[32], c3);
_mm256_storeu_si256((__m256i*)&ptr_dot_counts[48], c4);
}
}
之前有一些测量问题,最后这实际上并没有更好,但也没有比上面更漂亮的 "vertical sum" 版本差多少。