C++ 就地排序索引文件(使用堆排序)

C++ sort index file in-place (with heapsort)

更新问题:

你好。我正在尝试就地对索引文件进行排序。该文件由 14B 数据块组成,通常太大而无法加载到 RAM 中。前 8B 是我要排序的字节。我实现了 Heapsort 算法,到目前为止,除了性能之外,它运行良好!

我想知道是否可以改进我的实现,以及如何通过使用一些 RAM 来加快此过程。我正在考虑可能将堆部分保留在 RAM 中,但我不确定这将如何工作。

到目前为止我的代码:

sortidx.h

#ifndef SORTIDX_H
#define SORTIDX_H

// Includes
#include <atomic>
#include <fstream>
#include <iostream>
#include <limits>
#include <string>
#include <thread>

// Constants
constexpr size_t hashSize = 8;
constexpr size_t offsetSize = 6;
constexpr size_t writeSize = hashSize + offsetSize;

// Typedefs & Structs
typedef std::lock_guard<std::mutex> scoped_lock;

struct IndexEntry {
    unsigned char hash[hashSize]; // First 64 bits of the hash
    unsigned char position[offsetSize]; // Position of word in dictionary (48-bit little endian integer)
} __attribute__( (__packed__) );

// Functions
bool operator>( const IndexEntry &rhs, const IndexEntry &lhs );

constexpr size_t getParent( size_t i ) {
    return (i - 1) / 2;
}

constexpr size_t getLeft( size_t i ) {
    return i * 2 + 1;
}

constexpr size_t getRight( size_t i ) {
    return i * 2 + 2;
}

void sortIDX( std::string idxFile );

void heapifyIDX( size_t heapifyLimit );
void sortIDXHeap( size_t numDataSets );

void readData( IndexEntry* entry, size_t pos );
void writeData( IndexEntry* entry, size_t pos );
bool isInHeap( size_t pos );
void orderHeap( IndexEntry &top, size_t posTop );

#endif

sortidx.cpp

#include "sortidx.h"

using namespace std;

streampos fileSize;
size_t numDataSets;
size_t limit;
atomic<size_t> pos;
fstream* file;

bool operator>( const IndexEntry &rhs, const IndexEntry &lhs ) {
    for ( size_t i = 0; i < hashSize; i++ ) {
        if ( rhs.hash[i] > lhs.hash[i] )
            return true;
        else if ( rhs.hash[i] < lhs.hash[i] )
            return false;
    }

    return false;
}

void sortIDX( string idxFile ) {
    file = new fstream( idxFile, ios::in | ios::out | ios::binary | ios::ate );
    fileSize = file->tellg();
    numDataSets = fileSize / writeSize;
    limit = numDataSets - 1;
    const size_t localLimit = limit;
    const size_t heapifyLimit = getParent( limit );
    thread* sorterThread;

    sorterThread = new thread( heapifyIDX, heapifyLimit );

    while ( pos <= heapifyLimit ) {
        // Some progressbar stuff (uses pos)
    }

    sorterThread->join();
    delete sorterThread;

    pos = 0;
    sorterThread = new thread( sortIDXHeap, localLimit );

    while ( pos < localLimit ) {
        // Some progressbar stuff (uses pos)
    }

    sorterThread->join();
    delete sorterThread;

    file->close();
    delete file;
}

void heapifyIDX( size_t heapifyLimit ) {
    IndexEntry top;
    size_t posTop;

    for ( pos = 0; pos <= heapifyLimit; pos++ ) {
        posTop = heapifyLimit - pos;

        readData( &top, posTop );

        orderHeap( top, posTop );
    }
}

void sortIDXHeap( size_t numDataSets ) {
    IndexEntry last;
    IndexEntry top;
    size_t posLast;
    size_t posTop;

    for ( pos = 0; pos < numDataSets; pos++ ) {
        posLast = numDataSets - pos;
        posTop = 0;
        limit = posLast - 1;

        readData( &last, posTop );
        readData( &top, posLast );
        writeData( &last, posLast );

        orderHeap( top, posTop );
    }
}

void readData( IndexEntry* entry, size_t pos ) {
    file->seekg( pos * writeSize );
    file->read( (char*)entry, writeSize );
}

void writeData( IndexEntry* entry, size_t pos ) {
    file->seekp( pos * writeSize );
    file->write( (char*)entry, writeSize );
}

bool isInHeap( size_t pos ) {
    return pos <= limit;
}

void orderHeap( IndexEntry &top, size_t posTop ) {
    static IndexEntry left;
    static IndexEntry right;
    static size_t posLeft;
    static size_t posRight;
    static bool swapped;

    do {
        posLeft = getLeft( posTop );
        posRight = getRight( posTop );

        if ( isInHeap( posLeft ) ) {
            readData( &left, posLeft );

            if ( isInHeap( posRight ) ) {
                readData( &right, posRight );

                if ( right > left ) {
                    if ( right > top ) {
                        writeData( &right, posTop );
                        posTop = posRight;

                        swapped = true;
                    } else {
                        swapped = false;
                    }
                } else {
                    if ( left > top ) {
                        writeData( &left, posTop );
                        posTop = posLeft;

                        swapped = true;
                    } else {
                        swapped = false;
                    }
                }
            } else {
                if ( left > top ) {
                    writeData( &left, posTop );
                    posTop = posLeft;

                    swapped = true;
                } else {
                    swapped = false;
                }
            }
        } else {
            swapped = false;
        }
    } while ( swapped );

    writeData( &top, posTop );
}

原问题:

我希望你能帮我解决一个我已经困扰了很长时间的问题。
我正在实施一个简单的查找 table 来快速搜索文件。我当前的问题是索引文件。目前,我正在遍历数据文件并使用我要查找的 8 字节数据创建索引条目,后跟 6 字节数据,指示该数据集在原始文件中的位置。所以我的索引文件由 14 字节的数据块组成。现在我想对该文件进行排序,以便通过在索引文件中进行二进制搜索轻松找到我的数据。到目前为止,这是我一直在努力的部分。

我需要按前 8 个字节对这 14 个字节的条目进行就地排序。仅按前 8 个字节排序应该不是什么大问题。我对如何对文件本身进行排序感到困惑。
我正在考虑尝试为文件实现 "iterator" class,这样我就可以将它传递给 std::sort,这应该可以很好地完成这项工作。但是由于我不确定我应该为它提供什么接口才能工作而且我也无法阅读当前的进展我做了一些研究并且被提醒 Heapsort 算法听起来非常好因为它有 O(n*log(n)),到位了,我可以很好地估计进度。

到目前为止一切顺利。我仍然对这个的实际实现有点困惑,因为我不确定在文件中交换几个字节数据的最佳方法是什么。我也很想知道您是否有关于如何排序此文件的其他建议,因为索引文件的大​​小为数 GB,性能非常重要!

为什么需要就地排序?您的硬盘应该没有问题来存储另一个 17 GB 的文件。

我会这样排序。

  1. 以您可以在 RAM 中处理的块的形式读取文件。在该块上使用您想要的任何排序算法(例如快速排序、堆排序、..)

  2. 将排序后的块写入磁盘(每个块在单独的文件中)

  3. 转到 1 直到到达数据末尾

  4. 合并chunks排序后的文件,将整体排序后的结果作为最终排序后的文件。

  5. 删除已排序的块文件。

由于数组的前几个元素访问次数最多,因此我决定将前几个元素加载到 RAM 中,直到达到限制(通过参数传递)。我这样修改我的代码的实现:

// ...
size_t arraySize = 0;
IndexEntry* cacheArray;

void readIntoArray( size_t numElements ) {
    if ( arraySize != 0 )
        writeFromArray();

    arraySize = numElements;
    cacheArray = new IndexEntry[arraySize];
    file->seekg( 0 );

    for ( size_t i = 0; i < arraySize; i++ ) {
        file->read( (char*)(cacheArray + i), writeSize );
    }
}

void writeFromArray() {
    file->seekp( 0 );

    for ( size_t i = 0; i < arraySize; i++ ) {
        file->write( (char*)(cacheArray + i), writeSize );
    }

    arraySize = 0;
    delete[] cacheArray;
}

void sortIDX( string idxFile, size_t cacheSize, bool quiet ) {
    // ...

    cacheSize /= writeSize;
    readIntoArray( min(cacheSize, numDataSets) );

    sorterThread = new thread( heapifyIDX, heapifyLimit );

    // ...

    sorterThread->join();
    delete sorterThread;

    writeFromArray();

    file->close();
    delete file;
}

void readData( IndexEntry* entry, size_t pos ) {
    if ( pos < arraySize ) {
        *entry = cacheArray[pos];
    } else {
        file->seekg( pos * writeSize );
        file->read( (char*)entry, writeSize );
    }
}

void writeData( IndexEntry* entry, size_t pos ) {
    if ( pos < arraySize ) {
        cacheArray[pos] = *entry;
    } else {
        file->seekp( pos * writeSize );
        file->write( (char*)entry, writeSize );
    }
}