循环矢量化 - 计算 7 字节记录的匹配与屏蔽

Loop vectorization - counting matches of 7-byte records with masking

我有一个相当简单的循环:

auto indexRecord = getRowPointer(0);
bool equals;
// recordCount is about 6 000 000
for (int i = 0; i < recordCount; ++i) {
    equals = BitString::equals(SelectMask, indexRecord, maxBytesValue);
    rowsFound += equals;
    indexRecord += byteSize; // byteSize is 7
}

其中 BitString::equals 是:

static inline bool equals(const char * mask, const char * record, uint64_t maxVal) {
    return !(((*( uint64_t * ) mask) & (maxVal & *( uint64_t * ) record)) ^ (maxVal & *( uint64_t * ) record));
}

此代码用于模拟数据库中的位图索引查询。 我的问题是,是否有一种方法可以对循环进行矢量化,遍历所有记录。 当尝试使用 GCC 和 -fopt-info-vec-missed -O3 进行编译时,我得到:missed: couldn't vectorize loop.

我对这种优化很陌生,想了解更多,感觉好像少了什么。

编辑 首先谢谢大家的回答。我应该包括一个 Reprex。 现在就在这里,所有需要的功能,尽可能接近我能做的。所有这些都是在 x86-64 平台上完成的,我有 GCC 和 Clang 可用。

#include <iostream>
#include <cstdio>
#include <cstring>
#include <cstdint>
#include <bitset>
#include <ctime>
#include <cstdlib>

constexpr short BYTE_SIZE = 8;

class BitString {
public:
    static int getByteSizeFromBits(int bitSize) {
        return (bitSize + BYTE_SIZE - 1) / BYTE_SIZE;
    }

    static void setBitString(char *rec, int bitOffset) {
        rec[bitOffset / 8] |= (1 << (bitOffset % BYTE_SIZE));
    }

    static inline bool equals(const char *mask, const char *record, uint64_t maxVal) {
        return !(((*(uint64_t *) mask) & (maxVal & *(uint64_t *) record)) ^ (maxVal & *(uint64_t *) record));
    }
};

// Class representing a table schema
class TableSchema {
public:
    // number of attributes of a table
    unsigned int attrs_count = -1;
    // the attribute size in bytes, eg. 3 equals to something like CHAR(3) in SQL
    unsigned int *attr_sizes = nullptr;
    // max value (domain) of an attribute, -1 for unlimited, ()
    int *attr_max_values = nullptr;
    // the offset of each attribute, to simplify some pointer arithmetic for further use
    unsigned int *attribute_offsets = nullptr;
    // sum of attr_sizes if the record size;
    unsigned int record_size = -1;

    void calculate_offsets() {
        if (attrs_count <= 0 || attribute_offsets != nullptr) {
            return;
        }

        attribute_offsets = new unsigned int[attrs_count];
        int offset = 0;
        for (int i = 0; i < attrs_count; ++i) {
            attribute_offsets[i] = offset;
            offset += attr_sizes[i];
        }
        record_size = offset;
    }

    TableSchema() = default;

    ~TableSchema() {
        if (attribute_offsets != nullptr) {
            delete[] attribute_offsets;
            attribute_offsets = nullptr;
        }
        attrs_count = -1;
    }
};


class BitmapIndex {
private:
    char *mData = nullptr;
    short bitSize = 0;
    int byteSize = 0;
    int attrsCount = 0;
    int *attrsMaxValue = nullptr;
    int *bitIndexAttributeOffset = nullptr;
    unsigned int recordCount = 0;
    char *SelectMask;

    unsigned int capacity = 0;

    inline char *getRowPointer(unsigned int rowId) const {
        return mData + rowId * byteSize;
    }

    inline bool shouldColBeIndexed(int max_col_value) const {
        return max_col_value > 0;
    }

public:
    BitmapIndex(const int *attrs_max_value, int attrs_count, unsigned int capacity) {
        auto maxValuesSum = 0;
        attrsMaxValue = new int[attrs_count];
        attrsCount = attrs_count;
        bitIndexAttributeOffset = new int[attrs_count];
        auto bitOffset = 0;
        // attribute's max value is the same as number of bits used to encode the current value
        // e.g., if attribute's max value is 3, we use 001 to represent value 1, 010 for 2, 100 for 3 and so on
        for (int i = 0; i < attrs_count; ++i) {
            attrsMaxValue[i] = attrs_max_value[i];
            bitIndexAttributeOffset[i] = bitOffset;
            // col is indexed only if it's max value is > 0, -1 means
            if (!shouldColBeIndexed(attrs_max_value[i]))
                continue;
            maxValuesSum += attrs_max_value[i];
            bitOffset += attrs_max_value[i];
        }
        bitSize = (short) maxValuesSum;
        byteSize = BitString::getByteSizeFromBits(bitSize);
        mData = new char[byteSize * capacity];
        memset(mData, 0, byteSize * capacity);
        SelectMask = new char[byteSize];
        this->capacity = capacity;
    }

    ~BitmapIndex() {
        if (mData != nullptr) {
            delete[] mData;
            mData = nullptr;
            delete[] attrsMaxValue;
            attrsMaxValue = nullptr;

            delete[] SelectMask;
            SelectMask = nullptr;
        }
    }

    unsigned long getTotalByteSize() const {
        return byteSize * capacity;
    }

    // add record to index
    void addRecord(const char * record, const unsigned int * attribute_sizes) {
        auto indexRecord = getRowPointer(recordCount);
        unsigned int offset = 0;
        for (int j = 0; j < attrsCount; ++j) {
            if (attrsMaxValue[j] != -1) {
                // byte col value
                char colValue = *(record + offset);
                if (colValue > attrsMaxValue[j]) {
                    throw std::runtime_error("Col value is bigger than max allowed value!");
                }
//            printf("%d ", colValue);
                BitString::setBitString(indexRecord, bitIndexAttributeOffset[j] + colValue);
            }
            offset += attribute_sizes[j];
        }
        recordCount += 1;
    }

    // SELECT COUNT(*)
    int Select(const char *query) const {
        uint64_t rowsFound = 0;
        memset(SelectMask, 0, byteSize);
        for (int col = 0; col < attrsCount; ++col) {
            if (!shouldColBeIndexed(attrsMaxValue[col])) {
                continue;
            }
            auto col_value = query[col];
            if (col_value < 0) {
                for (int i = 0; i < attrsMaxValue[col]; ++i) {
                    BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + i);
                }
            } else {
                BitString::setBitString(SelectMask, bitIndexAttributeOffset[col] + col_value);
            }
        }

        uint64_t maxBytesValue = 0;
        uint64_t byteVals = 0xff;
        for (int i = 0; i < byteSize; ++i) {
            maxBytesValue |= byteVals << (i * 8);
        }

        auto indexRecord = getRowPointer(0);
        for (int i = 0; i < recordCount; ++i) {
            rowsFound += BitString::equals(SelectMask, indexRecord, maxBytesValue);
            indexRecord += byteSize;
        }
        return rowsFound;
    }
};


void generateRecord(
        char *record,
        const unsigned int attr_sizes[],
        const int attr_max_value[],
        int attr_count
    ) {
    auto offset = 0;
    for (int c = 0; c < attr_count; ++c) {
        if (attr_max_value[c] == -1) {
            for (int j = 0; j < attr_sizes[c]; ++j) {
                record[offset + j] = rand() % 256;
            }
        } else {
            for (int j = 0; j < attr_sizes[c]; ++j) {
                record[offset + j] = rand() % attr_max_value[c];
            }
        }
        offset += attr_sizes[c];
    }
}

int main() {
    TableSchema schema;
    const int attribute_count = 13;
    const int record_count = 1000000;
    // for simplicity sake, attr_max_value > 0 is set only for attributes, which size is 1.
    unsigned int attr_sizes[attribute_count] = {1, 5, 1, 5, 1, 1, 1, 6, 1, 1, 1, 11, 1};
    int attr_max_values[attribute_count] = {3, -1, 4, -1, 6, 5, 7, -1, 7, 6, 5, -1, 8};
    schema.attrs_count = attribute_count;
    schema.attr_sizes = attr_sizes;
    schema.attr_max_values = attr_max_values;
    schema.calculate_offsets();

    srand((unsigned ) time(nullptr));

    BitmapIndex bitmapIndex(attr_max_values, attribute_count, record_count);

    char *record = new char[schema.record_size];
    for (int i = 0; i < record_count; ++i) {
        // generate some random records and add them to the index
        generateRecord(record, attr_sizes, attr_max_values, attribute_count);
        bitmapIndex.addRecord(record, attr_sizes);
    }

    char query[attribute_count] = {-1, -1, 0, -1, -1, 3, 2, -1, 3, 3, 4, -1, 6};
    // simulate Select COUNT(*) WHERE a1 = -1, a2 = -1, a3 = 0, ...
    auto found = bitmapIndex.Select(query);

    printf("Query found: %d records\n", found);

    delete[] record;
    return 0;
}

首先,您的代码不是完整的示例。您缺少许多变量的定义和类型,这使得很难回答。您也没有指明您正在编译哪个平台 on/for。

矢量化可能失败的原因如下:

  • 您的阅读重叠了!您正在以 7 个字节的间隔读取 8 个字节。仅此一项就可能会混淆矢量化逻辑。
  • 您的指针可能未被 __restrict 编辑,这意味着编译器必须假设它们可能存在别名,这意味着它可能需要在每次访问时重新读取地址。
  • 你的 equals() 函数指针参数绝对不是 __restrict 的(尽管编译器可以通过内联看到)。
  • 对齐。 x86_64 处理器不需要对齐访问,但在某些平台上,一些较大的指令需要知道它们在内存中正确对齐的位置工作。此外,正如@PeterCordes 在评论中指出的那样,编译器和库在对齐方面可能比硬件更挑剔。
  • 为什么不把 *SelectMask 放在局部变量中?

如果记录大小为 8,GCC 和 Clang 都会自动矢量化,for example:(希望对代码出现的实际上下文具有足够的代表性 stand-in)

int count(char * indexRecord, const char * SelectMask, uint64_t maxVal)
{
    bool equals;
    uint64_t rowsFound = 0;
    // some arbitrary number of records
    for (int i = 0; i < 1000000; ++i) {
        equals = tequals(SelectMask, indexRecord, maxVal);
        rowsFound += equals;
        indexRecord += 8; // record size padded out to 8
    }
    return rowsFound;
}

它的重要部分,由 GCC 编译,如下所示:

.L4:
    vpand   ymm0, ymm2, YMMWORD PTR [rdi]
    add     rdi, 32
    vpcmpeqq        ymm0, ymm0, ymm3
    vpsubq  ymm1, ymm1, ymm0
    cmp     rax, rdi
    jne     .L4

不错。它使用与我手动使用的相同的想法:vpand 带掩码的数据(按位逻辑的简化),将其与零进行比较,减去比较的结果(减去因为 True 结果用 - 1)来自一个向量中的 4 个计数器。在循环之后添加四个单独的计数。

顺便说一句,请注意,我将 rowsFound 设为 uint64_t。这很重要。如果 rowsFound 不是 64 位,那么 Clang 和 GCC 都会非常努力地尽快缩小计数范围,这与好的方法恰恰相反:这会在循环中花费更多的指令,而且没有任何好处.如果计数最终打算成为 32 位 int,则可以在循环后简单地缩小它的范围,这样做可能不仅便宜而且实际上免费

用 SIMD 内部函数手动编写与该代码等效的东西并不难,这可以使代码不那么脆弱(它不会基于希望编译器会做正确的事情),但它不会'不再适用于非 x86 平台。

如果记录应该是 7 字节,那将是一个更烦人的问题。 GCC 放弃了,Clang 实际上继续了它的 auto-vectorization,但这并不好:8 字节的加载都是单独完成的,然后将结果放在一个向量中,这完全是在浪费时间。

使用 SIMD 内部函数手动执行时,主要问题是将 7 字节记录解包到 qword 通道中。 SSE4.1 版本可以使用 pshufbpshufb 来自 SSSE3,但 pcmpeqq 来自 SSE4.1,因此以 SSE4.1 为目标很有意义)来做到这一点,很简单。 AVX2 版本可以执行加载,从它尝试加载的第一个记录之前 2 个字节开始,这样 256 位寄存器的两个 128 位半部分之间的“拆分”就落在两个记录之间。然后 vpshufb,无法将字节从一个 128 位的一半移动到另一半,但仍然可以将字节移动到位,因为其中 none 需要跨入另一半。

例如,具有手动矢量化和 7 字节记录的 AVX2 版本可能看起来像这样。这需要在结尾 和开头 处进行一些填充,或者只是跳过第一条记录并在到达最后一条记录之前结束并分别处理。未经测试,但它至少可以让您了解手动矢量化代码的工作原理。

int count(char * indexRecord, uint64_t SelectMask, uint64_t maxVal)
{
    __m256i mask = _mm256_set1_epi64x(~SelectMask & maxVal);
    __m256i count = _mm256_setzero_si256();
    __m256i zero = _mm256_setzero_si256();
    __m256i shufmask = _mm256_setr_epi8(2, 3, 4, 5, 6, 7, 8, -1, 9, 10, 11, 12, 13, 14, 15, -1, 0, 1, 2, 3, 4, 5, 6, -1, 7, 8, 9, 10, 11, 12, 13, -1);
    for (int i = 0; i < 1000000; ++i) {
        __m256i records = _mm256_loadu_si256((__m256i*)(indexRecord - 2));
        indexRecord += 7 * 4;
        records = _mm256_shuffle_epi8(records, shufmask);
        __m256i isZero = _mm256_cmpeq_epi64(_mm256_and_si256(records, mask), zero);
        count = _mm256_sub_epi64(count, isZero);
    }
    __m128i countA = _mm256_castsi256_si128(count);
    __m128i countB = _mm256_extracti128_si256(count, 1);
    countA = _mm_add_epi64(countA, countB);
    return _mm_cvtsi128_si64(countA) + _mm_extract_epi64(countA, 1);
}

这是另一种方法。此代码不使用未对齐的加载技巧(如果您将输入数据对齐 16 字节尤其有用),但总体上使用更多的指令,因为更多的混洗,并且仅在 16 字节的 SSE 向量上运行。

我不知道它与其他答案相比如何,可能更快或更慢。该代码需要 SSSE3 和 SSE 4.1 指令集。

// Load 7 bytes from memory into the vector
inline __m128i load7( const uint8_t* rsi )
{
    __m128i v = _mm_loadu_si32( rsi );
    v = _mm_insert_epi16( v, *(const uint16_t*)( rsi + 4 ), 2 );
    v = _mm_insert_epi8( v, rsi[ 6 ], 6 );
    return v;
}

// Prepare mask vector: broadcast the mask, and duplicate the high byte
inline __m128i loadMask( uint64_t mask )
{
    __m128i vec = _mm_cvtsi64_si128( (int64_t)mask );
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 0, 1, 2, 3, 4, 5, 6, 6 );
    return _mm_shuffle_epi8( vec, perm );
}

// Prepare needle vector: load 7 bytes, duplicate 7-th byte into 8-th, duplicate 8-byte lanes
inline __m128i loadNeedle( const uint8_t* needlePointer, __m128i mask )
{
    __m128i vec = load7( needlePointer );
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 0, 1, 2, 3, 4, 5, 6, 6 );
    vec = _mm_shuffle_epi8( vec, perm );
    return _mm_and_si128( vec, mask );
}

// Compare first 14 bytes with the needle, update the accumulator
inline void compare14( __m128i& acc, __m128i vec, __m128i needle, __m128i mask )
{
    // Shuffle the vector matching the needle and mask; this duplicates two last bytes of each 7-byte record
    const __m128i perm = _mm_setr_epi8( 0, 1, 2, 3, 4, 5, 6, 6, 7, 8, 9, 10, 11, 12, 13, 13 );
    vec = _mm_shuffle_epi8( vec, perm );
    // bitwise AND with the mask
    vec = _mm_and_si128( vec, mask );
    // Compare 8-byte lanes for equality with the needle
    vec = _mm_cmpeq_epi64( vec, needle );
    // Increment the accumulator if comparison was true
    acc = _mm_sub_epi64( acc, vec );
}

size_t countRecords( const uint8_t* rsi, size_t count, const uint8_t* needlePointer, uint64_t maskValue )
{
    const __m128i mask = loadMask( maskValue );
    const __m128i needle = loadNeedle( needlePointer, mask );
    __m128i acc = _mm_setzero_si128();

    // An iteration of this loop consumes 16 records = 112 bytes = 7 SSE vectors
    const size_t countBlocks = count / 16;
    for( size_t i = 0; i < countBlocks; i++ )
    {
        const __m128i* p = ( const __m128i* )rsi;
        rsi += 7 * 16;

        __m128i a = _mm_loadu_si128( p );
        compare14( acc, a, needle, mask );

        __m128i b = _mm_loadu_si128( p + 1 );
        compare14( acc, _mm_alignr_epi8( b, a, 14 ), needle, mask );

        a = _mm_loadu_si128( p + 2 );
        compare14( acc, _mm_alignr_epi8( a, b, 12 ), needle, mask );

        b = _mm_loadu_si128( p + 3 );
        compare14( acc, _mm_alignr_epi8( b, a, 10 ), needle, mask );

        a = _mm_loadu_si128( p + 4 );
        compare14( acc, _mm_alignr_epi8( a, b, 8 ), needle, mask );

        b = _mm_loadu_si128( p + 5 );
        compare14( acc, _mm_alignr_epi8( b, a, 6 ), needle, mask );

        a = _mm_loadu_si128( p + 6 );
        compare14( acc, _mm_alignr_epi8( a, b, 4 ), needle, mask );
        compare14( acc, _mm_srli_si128( a, 2 ), needle, mask );
    }

    // Sum high / low lanes of the accumulator
    acc = _mm_add_epi64( acc, _mm_srli_si128( acc, 8 ) );

    // Handle the remainder, 7 bytes per iteration
    // Compared to your 6M records, the remainder is small, the performance doesn't matter much.
    for( size_t i = 0; i < count % 16; i++ )
    {
        __m128i a = load7( rsi );
        rsi += 7;
        compare14( acc, a, needle, mask );
    }

    return (size_t)_mm_cvtsi128_si64( acc );
}

P.S。此外,尽管有 15% 的 RAM 带宽开销,但我预计 8 字节索引会更快。特别是当矢量化为 AVX2 时。