C++ 什么是大型数据集(数百万行)的最佳排序容器和方法

C++ what is the best sorting container and approach for large datasets (millions of lines)

我正在处理一个练习,该练习应该准确地对此类代码的时间复杂度进行基准测试。

我正在处理的数据由这样的字符串对组成 hbFvMF,PZLmRb,每个字符串在数据集中出现两次,一次在位置 1一旦在位置 2 。所以第一个字符串将指向 zvEcqe,hbFvMF 例如,列表继续....

example dataset of 50k pairs

我已经能够生成代码,可以毫无问题地将这些数据集排序到 50k 对,大约需要 4-5 分钟。 10k 在几秒钟内得到排序。

问题是我的代码应该处理多达 500 万对的数据集。所以我想看看我还能做些什么。我将 post 我的两个最佳尝试,最初的一个是向量,我认为我可以通过将 vector 替换为 unsorted_map 来升级,因为搜索时的时间复杂度更高,但令我惊讶的是,当我测试它时,这两个容器之间几乎没有区别。我不确定是我解决问题的方法还是我选择的容器导致了陡峭的分拣时间...

尝试向量:

#include <iostream>
#include <fstream>
#include <string>
#include <sstream>
#include <map>
#include <unordered_map>
#include <stdio.h>
#include <vector>
#include <iterator>
#include <utility>
#include <functional>
#include <algorithm>

using namespace std;


template<typename T>
void search_bricks_backwards(string resume, vector<T>& vec, vector<string>& vec2) {
    int index = 0;
    int temp_index = 0;
    while (true) {
        if (index == vec.size()) {
            vec2.insert(vec2.begin(), vec[temp_index].first); 
            cout << "end of backward search, exitting..." << endl;
            break;


        }
        
        if (vec[index].second == resume) {
            vec2.insert(vec2.begin(), resume);
            

            resume = vec[index].first;
            //vec.erase(vec.begin() + index);
            temp_index = index;

            index = 0;
        }
        
        index++;
    }

}


template<typename T>
void search_bricks(string start, vector<T>& vec, vector<string>& vec2) {
    int index = 0;
    int temp_index = 0;
    while (true) {
        //cout << "iteration " << index << endl;
        if (index == vec.size()) {
            vec2.push_back(vec[temp_index].second);
            
            cout << "all forward bricks sorted" << endl;
            break;


        }
        if (vec[index].first == start) {
            vec2.push_back(vec[index].first);
            
            
            start = vec[index].second;
            //vec.erase(vec.begin() + index);
            temp_index = index;
            index = 0;
            
        }
        
        index++;
    }

    search_bricks_backwards(vec[0].first, vec, vec2);

}

template<typename T>
void search_bricks_recursion(string start, vector<T>& vec, vector<string>& vec2) {
    int index = 0;
    for (const auto& pair : vec) {
        //cout << "iteration " << index << endl;
        if (pair.first == start) {
            vec2.push_back(start);
            cout << "found " << start << " and " << pair.first << endl;
            search_bricks(pair.second, vec, vec2);
        }
        if (index + 1 == vec.size()) {
            search_bricks_backwards(start, vec, vec2);
            

        }
        index++;
    }
    
}

template<typename T>
void printVectorElements(vector<T>& vec)
{
    for (auto i = 0; i < vec.size(); ++i) {
        cout << "(" << vec.at(i).first << ","
            << vec.at(i).second << ")" << " ";
    }
    cout << endl;
}

vector<string> split(string s, string delimiter) {
    size_t pos_start = 0, pos_end, delim_len = delimiter.length();
    string token;
    vector<string> res;

    while ((pos_end = s.find(delimiter, pos_start)) != string::npos) {
        token = s.substr(pos_start, pos_end - pos_start);
        pos_start = pos_end + delim_len;
        res.push_back(token);
    }

    res.push_back(s.substr(pos_start));
    return res;
}

unordered_map<string, string> brick_to_map(string const& s)
{
    unordered_map<string, string> m;

    string key, val;
    istringstream iss(s);

    while (getline(getline(iss, key, ',') >> ws, val))
        m[key] = val;

    return m;
}

int main()
{
    vector<pair<string, string>> bricks;
    
    vector<string> sorted_bricks;

    ifstream inFile;
    inFile.open("input-pairs-50K.txt"); //open the input file

    stringstream strStream;
    strStream << inFile.rdbuf(); //read the file
    string str = strStream.str(); //str holds the content of the file

    //cout << str << endl;
    
    istringstream iss(str);
    
    for (string line; getline(iss, line); )
    {
     
        string delimiter = ",";
        string s = line;
        vector<string> v = split(s, delimiter);
        string s1 = v.at(0);
        string s2 = v.at(1);
        

        bricks.push_back(make_pair(s1, s2));
    }

   
    search_bricks(bricks[0].second, bricks, sorted_bricks);
    
    
    //display the results
    for (auto i = sorted_bricks.begin(); i != sorted_bricks.end(); ++i)
        cout << *i << " ";


    
 
}

尝试 unsorted map:

#include <iostream>
#include <fstream>
#include <string>
#include <sstream>
#include <map>
#include <unordered_map>
#include <stdio.h>
#include <vector>
#include <iterator>
#include <utility>
#include <functional>
#include <algorithm>

using namespace std;


void search_bricks_backwards(string resume, unordered_map<string, string> brick_map, vector<string>& vec2) {
    
    typedef unordered_map<string, string>::value_type map_value_type;
    while (true) {

        unordered_map<string, string>::const_iterator got = find_if(brick_map.begin(), brick_map.end(), [&resume](const map_value_type& vt)
            { return vt.second == resume; }
        );
        if (got == brick_map.end()) {
            vec2.insert(vec2.begin(), resume); 
            cout << "end of backward search, exitting..." << endl;
            break;


        }
        //cout << "iteration " << index << endl;
        else if (got->second == resume) {
            vec2.insert(vec2.begin(), resume);

            
            resume = got->first;
        
        }

       
    }

}


void search_bricks(string start, unordered_map<string, string> brick_map, vector<string>& vec2) {
    
    typedef unordered_map<string, string>::value_type map_value_type;
    while (true) {
        

        unordered_map<string, string>::const_iterator got = find_if(brick_map.begin(), brick_map.end(), [&start](const map_value_type& vt)
            { return vt.first == start; }
        );
        if (got == brick_map.end()) {
            vec2.push_back(start);

            cout << "all forward bricks sorted" << endl;
            
            break;
        }
        else if (got->first == start) {
            vec2.push_back(start);

            //cout << "found " << start << " and " << vec[index].first << endl;
            start = got->second;
            
        }
    }
    auto it = brick_map.begin();
    search_bricks_backwards(it->first, brick_map, vec2);
    


    

    

}


template<typename T>
void printVectorElements(vector<T>& vec)
{
    for (auto i = 0; i < vec.size(); ++i) {
        cout << "(" << vec.at(i).first << ","
            << vec.at(i).second << ")" << " ";
    }
    cout << endl;
}

vector<string> split(string s, string delimiter) {
    size_t pos_start = 0, pos_end, delim_len = delimiter.length();
    string token;
    vector<string> res;

    while ((pos_end = s.find(delimiter, pos_start)) != string::npos) {
        token = s.substr(pos_start, pos_end - pos_start);
        pos_start = pos_end + delim_len;
        res.push_back(token);
    }

    res.push_back(s.substr(pos_start));
    return res;
}


int main()
{
    unordered_map<string, string> bricks;
    
    vector<string> sorted_bricks;

    ifstream inFile;
    inFile.open("input-pairs-50K.txt"); //open the input file

    for (string line; getline(inFile, line); )
    {

        string delimiter = ",";
        string s = line;
        vector<string> v = split(s, delimiter);
        string s1 = v.at(0);
        string s2 = v.at(1);


        bricks.insert(make_pair(s1, s2));
    }


    /*for (auto& x : bricks)
        std::cout << x.first << "," << x.second << " ";*/


    auto it = bricks.begin();
    search_bricks(it->second, bricks, sorted_bricks);


    // display results
    for (auto i = sorted_bricks.begin(); i != sorted_bricks.end(); ++i)
        cout << *i << " ";




}

我正在寻求提高我的代码的时间复杂度,以便能够更有效地处理数据,如果有人可以建议我的代码或容器明智的改进之处,我将非常感激。

您可以使用 trie 数据结构,这里有一篇论文解释了这样做的算法:https://people.eng.unimelb.edu.au/jzobel/fulltext/acsc03sz.pdf

但是你必须从头开始实现 trie,因为据我所知,在 c++ 中没有默认的 trie 实现。

首先,purpose-driven 类比这里真正发生的事情。这个问题有时会出现在面试问题中。它通常被表述为一组公交车票:

You're walking down the street with a thick stack of bus tickets for your whirlwind tour of the surrounding countries. Each ticket has a starting city, and an ending city. You accidentally drop the tickets on the ground and they blow all over the place. You pick them up, but then realize they're out of order. Your task is to put them back in order so you don't have to search the stack each time you need to use the next ticket.

一个例子,其中Sn是公交车站ID,Sn->Sm表示从车站Sn到车站Sm的行程。给出以下四张票,涵盖五个车站,排名不分先后:

S4->S2
S1->S5
S3->S4
S5->S3

正确的顺序可以这样想:

S1->S5
    S5->S3
        S3->S4
            S4->S2

因此,正确的“排序”顺序是

S1
S5
S3
S4
S2

算法分析

  • 你的算法中最大的杀手是那些序列扫描。地图提供 键查找 操作。一个 map find 成员是让他们打勾(和闪耀)的原因。如果你想发出这种尖叫声,你需要使用键控来做到这一点。
  • 接近 运行 惩罚顺序扫描,这是 pre-pending 到 std::vector 的高昂费用,后者很快就会变得非常昂贵。向量顺序连续存储。附加到它们并没有 相当那么糟糕,因为它们的 sub-allocators 倾向于 over-allocate 扩展以在必须重新分配之前为更多 push-backs 留出空间。但是在他们面前是可怕的。

我扫描了你提供的测试源码。数据里面居然有49999行,不是50000行,经过一番试错,挠头等等,我发现了这个:

  • bWyUVV 只出现一次,作为左侧。
  • EZkYGM 在文件中只出现一次,作为右侧

这些必须是排序列表的终端。如果一切顺利并且数据是原始的,最终列表将以 bWyUVV 开头,并以 EZkYGM 结尾。这对编写算法没有帮助,但绝对有助于验证我们做对了什么。


全面提升性能

在保持基本前提的同时从中剥离 很多 代码,考虑以下内容:

#include <iostream>
#include <fstream>
#include <string>
#include <sstream>
#include <unordered_map>
#include <deque>
#include <algorithm>
#include <chrono>

void search_bricks_backwards
(
    std::string resume, 
    std::unordered_map<std::string, std::string>& rbricks, 
    std::deque<std::string>& dst
) 
{
    while (true) 
    {
        auto it = rbricks.find(resume);
        dst.emplace_front(std::move(resume));

        if (it == rbricks.end()) 
            break;

        resume = it->second;
        rbricks.erase(it);
    }
}

void search_bricks
(
    std::unordered_map<std::string, std::string>& bricks, 
    std::unordered_map<std::string, std::string>& rbricks, 
    std::deque<std::string>& dst
) 
{
    // remember these
    std::string start = bricks.begin()->second;
    std::string resume = bricks.begin()->first;

    while (true) 
    {
        auto it = bricks.find(start);
        dst.emplace_back(std::move(start));

        if (it == bricks.end()) 
            break;

        start = it->second;
        bricks.erase(it);
    }

    // same search, but different keyed map
    search_bricks_backwards(resume, rbricks, dst);
}


int main()
{
    using namespace std::chrono;

    std::unordered_map<std::string, std::string> bricks, rbricks;
    std::deque<std::string> sorted_bricks;

    std::ifstream inFile("input-pairs-50K.txt");
    if (inFile.is_open())
    {
        for (std::string line; std::getline(inFile, line);)
        {
            // make damn sure we get two keys
            std::istringstream iss(line);
            std::string s1, s2;
            if (std::getline(iss, s1, ',') && 
                std::getline(iss, s2) &&
                !s1.empty() &&
                !s2.empty())
            {
                bricks.emplace(std::make_pair(s1, s2));
                rbricks.emplace(std::make_pair(s2, s1));
            }
        }

        auto tp0 = high_resolution_clock::now();
        search_bricks(bricks, rbricks, sorted_bricks);
        auto tp1 = high_resolution_clock::now();

        std::cout << duration_cast<milliseconds>(tp1-tp0).count() << "ms\n";

        // display results
        int n = 0;
        for (auto i = sorted_bricks.begin(); i != sorted_bricks.end(); ++i)
            std::cout << ++n << ". " << *i << '\n';
    }
}

最显着的差异:

  • 使用std::deque<std::string>作为排序结果。您为这些矢量展示位置付出了高昂的代价。您所有的向量操作要么是向后推(对于保留向量是可以的),要么是在前面推(向量中的性能很糟糕)。 std::deque 专门用于非常快速的前后插入和 p运行ing。我们不做后者,但大量做前者。

  • 两张地图,分别为s1==>s2和s2==>s1。有第三方容器可以为您执行此操作(例如:boost::bimap)。然而,为此,考虑到密钥大小,只存储两个映射很容易。

  • 在适当的地方使用移动语义(虽然不多)

  • 在搜索过程中,每个发现的键都会从映射中删除 just-searched。这确保我们在应该停止的时候停止,并相当快地恢复内存占用。

  • 参考参数(如适用)。这些容器在构建结果集时被有效地销毁,但这没关系(事实上保证在一端或另一端正确检测终端)。

double-keying 让世界变得不同。使用你的测试数据集在一个微不足道的小 four-year-old macbook pro 笔记本电脑上的发布构建产生以下结果(你可以用你的 know-answer 测试来验证)。


所有代码均使用 -O2 优化构建运行ning Apple clang 版本 13.0.0 (clang-1300.0.29.30)

std::unordered_map

34ms
1. bWyUVV
2. mPBGRC
3. WfkvWy
4. vjWNHY
5. HudtyD
6. DhxjdV
7. kdWhGX
8. tIsDXh
9. eMVeMX
10. fVQoeG

... 49980 lines omitted ...

49990. YfBDnP
49991. sHKVrT
49992. ZzhoZV
49993. Dyunmj
49994. KCQbpj
49995. rbMSgD
49996. WKOksU
49997. qqMTnq
49998. llrqUI
49999. XYpBnk
50000. EZkYGM

尽管在我的测试中存在高达 42 毫秒和低至 28 毫秒的异常值,但该时间数字相当一致。使用常规 std::map 的相同线束导致:

std::map

92ms
1. bWyUVV
2. mPBGRC
3. WfkvWy
4. vjWNHY
5. HudtyD
6. DhxjdV
7. kdWhGX
8. tIsDXh
9. eMVeMX
10. fVQoeG

... 49980 lines omitted ...

49990. YfBDnP
49991. sHKVrT
49992. ZzhoZV
49993. Dyunmj
49994. KCQbpj
49995. rbMSgD
49996. WKOksU
49997. qqMTnq
49998. llrqUI
49999. XYpBnk
50000. EZkYGM

所以您肯定使用了带有无序键控的正确容器,只是您实际上并没有将键控用于算法的 non-trivial 部分,这使得容器基本上不比顺序扫描好.这与您的评估相吻合,它实际上并不比矢量顺序扫描解决方案好多少;不管怎样,你基本上都是这么做的。

我怀疑上面显示的性能应该能够 运行 套你预期的 500 万对,只要你能把它全部保存在内存中(两次,抱歉,但结果显示不错坦率地说,它值那个价钱)。


内存更友好(至少有一点)

下面做同样的事情,但没有添加所有输出,并使用一个调用函数和映射转置 mid-flight。这从一开始就减少了对两张地图的需求。整体性能与上面的代码相当;只是一个不同的、对内存更友好的实现。

#include <iostream>
#include <fstream>
#include <algorithm>
#include <string>
#include <sstream>
#include <deque>
#include <unordered_map>
#include <chrono>

using map_type = std::unordered_map<std::string, std::string>;

std::deque<std::string> sort_bricks( map_type& bricks )
{
    std::deque<std::string> dst;

    // remember these
    std::string start = bricks.begin()->second;
    std::string resume = bricks.begin()->first;

    // process by append
    while (true) 
    {
        auto it = bricks.find(start);
        dst.emplace_back(std::move(start));

        if (it == bricks.end()) 
            break;

        start = std::move(it->second);
        bricks.erase(it);
    }

    // invert the remaining map
    {
        map_type mres;
        for (auto& pr : bricks)
            mres.emplace(std::move(pr.second), pr.first);
        std::swap(bricks, mres);
    }
    
    // process by prepend
    while (true) 
    {
        auto it = bricks.find(resume);
        dst.emplace_front(std::move(resume));

        if (it == bricks.end()) 
            break;

        resume = std::move(it->second);
        bricks.erase(it);
    }

    return dst;
}

int main()
{
    using namespace std::chrono;

    std::ifstream inFile("input-pairs-50K.txt");
    if (inFile.is_open())
    {
        map_type bricks;
        for (std::string line; std::getline(inFile, line);)
        {
            std::istringstream iss(line);
            std::string s1, s2;
            if (std::getline(iss, s1, ',') && 
                std::getline(iss, s2) &&
                !s1.empty() &&
                !s2.empty())
            {
                bricks.emplace(std::make_pair(s1, s2));
            }
        }

        auto tp0 = high_resolution_clock::now();
        auto sorted_bricks = sort_bricks(bricks);
        auto tp1 = high_resolution_clock::now();

        std::cout << "Size: " << sorted_bricks.size() << '\n';
        std::cout << "Time: " << duration_cast<milliseconds>(tp1-tp0).count() << "ms\n";
    }
}

迟到的答案,但稍作改进,我们可以将处理时间加快 100%。或者将 运行时间减半。

基本上 WhozCraig 选择的方法已经接近最优。

通过使用 std::string_view 并选择不同的字符串拆分方法,我可以将性能提高近 100%。大多数时间都花在读取源文件和拆分数据上。通过我选择的不同方法,我可以节省最多的时间。我添加了一些额外的小改进。

我用一个包含 5.000.000 对的大文件检查了这两种算法。我创建了一个模块,来生成这样一个测试文件。请查看下面的源代码。

我将展示我的源代码,给出的代码是“WhozCraig”,针对基准测量稍作修改。

但首先,请查看基准测试结果的屏幕截图。

用问题中给出的原始小文件进行测试。我的解决方案:

“WhozCraig”的小测试文件解决方案


然后我创建了一个5M对测试文件和运行同样的测试。我的做法

“WhozCraig”大测试文件的解决方案

请注意。使用“WhozCraig”的方法对大数据进行排序甚至更快。

但是读取比较慢

我没有调查原因。


当然,所有这些都取决于机器和编译器。我 运行 在一台有 10 年历史的 Windows 7 机器上进行测试。编译器是 Visual Studio 2019 社区版。在发布模式下编译。


请参阅下面的代码示例:

#include <iostream>
#include <fstream>
#include <string>
#include <chrono>
#include <unordered_map>
#include <random>
#include <string_view>
#include <deque>
#include <iterator>
#include <unordered_set>
#include <algorithm>
#include <array>
#include <utility>

// ----------------------------------------------------------------------------------------------------------------
// Generate test data
// We will generate that many strings
constexpr size_t MaxPairs = 5'000'000u;

// The strings will all be 6 characters long and consist of upper- and lowercase letters
constexpr size_t StringLength = 6u;
constexpr std::array<char, 52> Letter{ 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
                                       'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z' };

// Filename for test data
const std::string testFileName{"test.txt"};

// Create a file with test date following the given pattern
void createTestData() {

    // We need random numbers to select a letter from above string
    std::random_device randomDevice;
    std::mt19937 randomGenerator(randomDevice());
    std::uniform_int_distribution<> randomIntDistribution(0, Letter.size() - 1);

    // Function to create a random string
    auto generateString = [&]() -> std::string
    {
        std::string s(6, 0);
        for (size_t i{}; i < StringLength; ++i) 
            s[i] = Letter[randomIntDistribution(randomGenerator)]; 
        return s; };

    // We want to have unique, not sorted strings. So, use a std::unordered_set
    std::unordered_set<std::string> uniqueStrings{};

    // Now generate the requested numbers of unique strings
    while (uniqueStrings.size() < MaxPairs + 1) 
        uniqueStrings.insert(generateString());
    
    // Move them to a std::vector for random access possibility
    std::vector<std::string> sss(std::make_move_iterator(uniqueStrings.begin()), std::make_move_iterator(uniqueStrings.end()));

    // Now we create a vectore of pair of indices, like 0->1, 1->2, 2->3 and so on
    std::vector<std::pair<int, int>> index(MaxPairs);
    for (int i = 0; i < MaxPairs; ++i) {
        index[i].first = i;
        index[i].second = i + 1;
    }
    // This we will shuffle
    std::shuffle(index.begin(), index.end(), randomGenerator);

    // And then store the data in a text file
    if (std::ofstream ofs{ testFileName }; ofs) {
        for (size_t i = 0; i < MaxPairs; ++i)
            ofs << sss[index[i].first] << ',' << sss[index[i].second] << '\n';
    }
}

// We make the bucket size 3 times bigger then the number of elements.
// Maybe, but actually I do not know, this will reduce collisions
constexpr int MaxBucket = 16'777'216u;

// A very small speed boost by a bigger input buffer
constexpr size_t ifStreamBufferSize = 1'000'000u;
static char buffer[ifStreamBufferSize];

// And a more simple and faster hash funcion
struct MyHash {
    size_t operator()(const std::string_view& s) const {
        size_t hash = 0;
        for (const char c : s) hash = hash * 101 + c;
        return hash;
    }
};

int main() {
    // Please uncomment, if you want to create test Data
    //createTestData();

    // Start runtime measurement
    auto tp1 = std::chrono::high_resolution_clock::now();

    // Open source file and check if it could be opened
    if (std::ifstream ifs{ testFileName }; ifs) {

        // Set bigger file read buffer. Not sure, if it has really an effect.
        ifs.rdbuf()->pubsetbuf(buffer, ifStreamBufferSize);

        // Read complete source file into a string
        std::string text(std::istreambuf_iterator<char>(ifs), {});

        // We ant to identify the pair. A pair of a relation consists of a left and right part. Start of course with left
        bool readLeftSideOfRelation = true;

        // Start value for data analysis
        char* currentCharPtr = text.data();
        // End of string
        const char* const textEndPtr = currentCharPtr + text.length();

        // Define string views for the left and the right side of the relation 
        std::string_view firstView(currentCharPtr, StringLength);
        std::string_view secondView;

        // This is the simulation of a bimap
        std::unordered_map<std::string_view, std::string_view, MyHash> map1{};
        std::unordered_map<std::string_view, std::string_view, MyHash> map2{};
        // A liitle bit speed gain by reservin buffer space
        map1.reserve(MaxBucket);
        map2.reserve(MaxBucket);

        // Split the strings, create string_views and store in maps
        size_t count = 0;
        for (; currentCharPtr < textEndPtr; ++currentCharPtr, ++count) {

            // Split citerium
            if (*currentCharPtr < 'A') {

                // Set pointer to one after the delimiting character
                ++currentCharPtr;

                // Either we are just reading the left or the right side
                if (readLeftSideOfRelation)
                    secondView = { currentCharPtr , count };
                else
                {
                    // Now we read the right side of the relation. And with that a pair
                    // Store relation in each direction
                    map1[firstView] = secondView;
                    map2[secondView] = firstView;

                    // Start start of next left side
                    firstView = { currentCharPtr , count };
                }
                // Next, we want to read the other part of the relation
                readLeftSideOfRelation = not readLeftSideOfRelation;
                count = 0;
            }
        }
        // Here we will store the sorted result
        std::deque<std::string_view> result{};

        auto tp3 = std::chrono::high_resolution_clock::now();

        // Set parts of the relation
        std::string_view related = map1.begin()->second;
        std::string_view relatedSave = map1.begin()->first;

        // Go through the sequence
        for (;;) {
            auto it = map1.find(related);
            result.push_back(related);
            if (it == map1.end())
                break;
            related = it->second;
            map1.erase(it);
        }
        // We reached end, search in other direction
        related = relatedSave;
        for (;;) {
            auto it = map2.find(related);
            result.push_front(related);

            if (it == map2.end())
                break;
            related = it->second;
            map2.erase(it);
        }

        // Show timing result
        auto tp4 = std::chrono::high_resolution_clock::now();
        std::cout << "Sort: " << duration_cast<std::chrono::milliseconds>(tp4 - tp3).count() << "ms\n";

        auto tp2 = std::chrono::high_resolution_clock::now();
        std::cout << "Overall: " << duration_cast<std::chrono::milliseconds>(tp2 - tp1).count() << "ms\n";
    
        std::cout << "Count:  " << result.size() << '\n';
    }
}


“WhozCraig”的解决方案,用于额外的运行时间测量并用于测试。

#include <iostream>
#include <fstream>
#include <string>
#include <sstream>
#include <unordered_map>
#include <deque>
#include <algorithm>
#include <chrono>

void search_bricks_backwards
(
    std::string resume,
    std::unordered_map<std::string, std::string>& rbricks,
    std::deque<std::string>& dst
)
{
    while (true)
    {
        auto it = rbricks.find(resume);
        dst.emplace_front(std::move(resume));

        if (it == rbricks.end())
            break;

        resume = it->second;
        rbricks.erase(it);
    }
}

void search_bricks
(
    std::unordered_map<std::string, std::string>& bricks,
    std::unordered_map<std::string, std::string>& rbricks,
    std::deque<std::string>& dst
)
{
    // remember these
    std::string start = bricks.begin()->second;
    std::string resume = bricks.begin()->first;

    while (true)
    {
        auto it = bricks.find(start);
        dst.emplace_back(std::move(start));

        if (it == bricks.end())
            break;

        start = it->second;
        bricks.erase(it);
    }
    // same search, but different keyed map
    search_bricks_backwards(resume, rbricks, dst);
}


int main()
{
    using namespace std::chrono;
    //createTestData();
    auto tp0 = high_resolution_clock::now();

    std::unordered_map<std::string, std::string> bricks, rbricks;
    std::deque<std::string> sorted_bricks;

    std::ifstream inFile("test.txt");
    if (inFile.is_open())
    {
        for (std::string line; std::getline(inFile, line);)
        {
            // make damn sure we get two keys
            std::istringstream iss(line);
            std::string s1, s2;
            if (std::getline(iss, s1, ',') &&
                std::getline(iss, s2) &&
                !s1.empty() &&
                !s2.empty())
            {
                bricks.emplace(std::make_pair(s1, s2));
                rbricks.emplace(std::make_pair(s2, s1));
            }
        }
        auto tp3 = high_resolution_clock::now();

        search_bricks(bricks, rbricks, sorted_bricks);

        auto tp4 = high_resolution_clock::now();
        auto tp1 = high_resolution_clock::now();
        std::cout << "\nSort:  "<< duration_cast<milliseconds>(tp4 - tp3).count() << "ms\n";
        std::cout << "Overall:  " << duration_cast<milliseconds>(tp1 - tp0).count() << "ms\n";

        std::cout << "Count:  " << sorted_bricks.size() << '\n';
    }
}


在用户“subspring”的附加回答中,提出了一个特里树。

我觉得不可行。查看潜在的内存消耗。

我们有大写和小写字符。所以,总共 52 个字符。对于每一个,我们都需要一个 4 字节指针 + 1 个用于相关字符串的指针。那是 53*4 用于 trie 的一层。字符串长度为 6。我们需要尝试 2 次。因此,经典树的总内存消耗为:

((52+1)*4) ^ 6 * 2 = 181'570'446'368'768 ~182 TB。

与这个数字相比,我电脑中的 128GB RAM 小得离谱。

出于这个原因,我使用 std:unordered_maps 作为 link 元素实现了具有 space 优化设计的特里树。

有了它,我可以将 RAM 内存消耗减少到 25GB。

但这里的结果也令人失望。

即使是善意的,也至少慢了 3 倍。 . .

查看示例代码:

#include <iostream>
#include <fstream>
#include <string>
#include <chrono>
#include <unordered_map>
#include <vector>
#include <random>
#include <unordered_set>
#include <array>
#include <deque>
#include <list>
// ----------------------------------------------------------------------------------------------------------------
// Generate test data
// We will generate that many strings
constexpr size_t MaxPairs = 1'000'000u;
//constexpr size_t MaxPairs = 500000u;

// The strings will all be 6 characters long and consist of upper- and lowercase letters
constexpr size_t StringLength = 6u;
constexpr std::array<char, 52> Letter{ 'a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l', 'm', 'n', 'o', 'p', 'q', 'r', 's', 't', 'u', 'v', 'w', 'x', 'y', 'z',
                                       'A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P', 'Q', 'R', 'S', 'T', 'U', 'V', 'W', 'X', 'Y', 'Z' };

// Filename for test data
const std::string testFileName{ "r:\text1m.txt" };

// Create a file with test date following the given pattern
void createTestData() {

    // We need random numbers to select a letter from above string
    std::random_device randomDevice;
    std::mt19937 randomGenerator(randomDevice());
    std::uniform_int_distribution<> randomIntDistribution(0, (int)Letter.size() - 1);

    // Function to create a random string
    auto generateString = [&]() -> std::string
    {
        std::string s(6, 0);
        for (size_t i{}; i < StringLength; ++i)
            s[i] = Letter[randomIntDistribution(randomGenerator)];
        return s; };

    // We want to have unique, not sorted strings. So, use a std::unordered_set
    std::unordered_set<std::string> uniqueStrings{};

    // Now generate the requested numbers of unique strings
    while (uniqueStrings.size() < MaxPairs + 1)
        uniqueStrings.insert(generateString());

    // Move them to a std::vector for random access possibility
    std::vector<std::string> sss(std::make_move_iterator(uniqueStrings.begin()), std::make_move_iterator(uniqueStrings.end()));

    // Now we create a vectore of pair of indices, like 0->1, 1->2, 2->3 and so on
    std::vector<std::pair<int, int>> index(MaxPairs);
    for (int i = 0; i < MaxPairs; ++i) {
        index[i].first = i;
        index[i].second = i + 1;
    }
    // This we will shuffle
    std::shuffle(index.begin(), index.end(), randomGenerator);

    // And then store the data in a text file
    if (std::ofstream ofs{ testFileName }; ofs) {
        for (size_t i = 0; i < MaxPairs; ++i)
            ofs << sss[index[i].first] << ',' << sss[index[i].second] << '\n';
    }
}


struct TrieNode;
using CharacterWithLink = std::unordered_map<char, TrieNode*>;
using LinkIter = CharacterWithLink::iterator;

struct TrieNode {
    const char* related{};
    CharacterWithLink link{};
};

struct Trie {
    TrieNode root{};
    const char* beginOfWord{};
    LinkIter iter{};
    TrieNode* tnp{};
    std::vector< TrieNode> memory{};
    size_t index{};

    static constexpr size_t MaxMemory = static_cast<size_t>((float)MaxPairs * 3.6);
    //static constexpr size_t MaxMemory = static_cast<size_t>((float)MaxPairs * 10);
    Trie() { memory.resize(MaxMemory); }
    ~Trie() {}

    TrieNode* addWord(const char *characterOfWord) {
        tnp = &root;
        beginOfWord = characterOfWord;

        while (*characterOfWord >= 'A') {
            iter = tnp->link.find(*characterOfWord);

            if (iter == tnp->link.end())
                tnp->link[*characterOfWord] = &memory[index++];
            
            tnp = tnp->link[*characterOfWord];
            ++characterOfWord;
        }
        //tnp->related = beginOfWord;
        return tnp;
    }
    const char* find(const char* characterOfWord) {
        tnp = &root;
        const char* result{};
        while (*characterOfWord >= 'A') {
            iter = tnp->link.find(*characterOfWord);

            if (iter == tnp->link.end()) break;

            tnp = tnp->link[*characterOfWord];
            ++characterOfWord;
        }
        result = tnp->related;
        return result;
    }
};

struct BiRelationalTrie {
    Trie left{};
    Trie right{};

    void fill(std::string& text);
};

void BiRelationalTrie::fill(std::string & text) {

    const char* currentCharPtr{ text.data() };
    const char* textEndPtr{ text.data() + text.length() };

    // We want to identify the pair. A pair of a relation consists of a left and right part. Start of course with left
    bool readLeftSideOfRelation = true;

    const char* leftWordPtr{ text.data() };
    const char* rightWordPtr{};

    for(; currentCharPtr < textEndPtr; ++currentCharPtr) {

        // Split citerium
        if (*currentCharPtr < 'A') {

            // Set pointer to one after the delimiting character
            ++currentCharPtr;

            // Either we are just reading the left or the right side
            if (readLeftSideOfRelation)

                // We were reading the left site, now, set the start pointer for the right side
                rightWordPtr = currentCharPtr;
            else
            {
                // Now we have read the right side of the relation. And with that a complete pair
                // Store relation in each direction
                left.addWord(leftWordPtr)->related = rightWordPtr;
                right.addWord(rightWordPtr)->related = leftWordPtr;

                // Set start of next left side
                leftWordPtr = currentCharPtr;
            }
            // Next, we want to read the other part of the relation
            readLeftSideOfRelation = not readLeftSideOfRelation;
        }
    }
    std::cout << left.index << '\n';
}
// A very small speed boost by a bigger input buffer
constexpr size_t ifStreamBufferSize = 1'000'000u;
static char buffer[ifStreamBufferSize];



int main() {
    //createTestData();
    // Start runtime measurement 
    
    
    BiRelationalTrie biRelationalTrie{};

    // Here we will store the sorted result
    std::deque<const char *> result{};

    auto tp1 = std::chrono::high_resolution_clock::now();

    // Open source file and check if it could be opened
    if (std::ifstream ifs{ testFileName }; ifs) {

        // Set bigger file read buffer. Not sure, if it has really an effect.
        ifs.rdbuf()->pubsetbuf(buffer, ifStreamBufferSize);

        // Read complete source file into a string
        std::string text(std::istreambuf_iterator<char>(ifs), {});

        biRelationalTrie.fill(text);



        auto tp3 = std::chrono::high_resolution_clock::now();

        // Set parts of the relation
        const char *related = text.data()+7;
        const char* relatedSave = text.data();

        // Go through the sequence
        for (;;) {
            related = biRelationalTrie.left.find(related);
            if (related == nullptr)
                break;
            result.push_back(related);
        }
        // We reached end, search in other direction
        related = relatedSave;
        for (;;) {
            related = biRelationalTrie.right.find(related);
            if (related == nullptr)
                break;
            result.push_front(related);
        }

        // Show timing result
        auto tp4 = std::chrono::high_resolution_clock::now();
        std::cout << "Sort: " << std::chrono::duration_cast<std::chrono::milliseconds>(tp4 - tp3).count() << "ms\n";
        std::cout << result.size() << '\n';

        auto tp2 = std::chrono::high_resolution_clock::now();
        std::cout << "Overall: " << std::chrono::duration_cast<std::chrono::milliseconds>(tp2 - tp1).count() << "ms\n";
    }
}