在 C++ 中处理非常大的数据

Handle very large data in C++

我有一个 .csv 文件,其中包含 ~3GB 的数据。我想读取所有数据并进行处理。以下程序从文件中读取数据并将其存储到 std::vector<std::vector<std::string>> 中。但是,程序运行时间过长导致应用程序 (vscode) 冻结,需要重新启动。我做错了什么?

#include <algorithm>
#include <iostream>
#include <fstream>
#include "sheet.hpp"

extern std::vector<std::string> split(const std::string& str, const std::string& delim);

int main() {
    Sheet sheet;
    std::ifstream inputFile;
    inputFile.open("C:/Users/1032359/cpp-projects/Straggler Job Analyzer/src/part-00001-of-00500.csv");
    std::string line;
    while(inputFile >> line) {
        sheet.addRow(split(line, ","));
    }
    return 0;
}

// splitSheet 的成员函数已经过全面测试并且工作正常。 split 虽然复杂度为 N^2...


EDIT1:已根据评论中的建议修复读取的文件。

分割函数:

std::vector<std::string> split(const std::string& str, const std::string& delim) {
    std::vector<std::string> vec_of_tokens;
    std::string token;
    for (auto character : str) {
        if (std::find(delim.begin(), delim.end(), character) != delim.end()) {
            vec_of_tokens.push_back(token);
            token = "";
            continue;
        }
        token += character;
    }
    vec_of_tokens.push_back(token);
    return vec_of_tokens;
}

编辑2: 虚拟 csv 行: 5612000000,5700000000,4665712499,798,3349189123,0.02698,0.06714,0.07715,0.004219,0.004868,0.06726,7.915e-05,0.0003681,0.27,0.00293,3.285,0.008261,0,0,0.01608

limits:  
field1: starting timestamp (nanosecs)
field2: ending timestamp (nanosecs)
field3: job id (<= 1,000,000)
field4: task id (<= 10,000)
field5: machine id (<= 30,000,000)
field6: CPU time (sorry, no clue)
field7-20: no idea, unused for the current stage, but needed for later stages.

EDIT3:需要输出

还记得Excel中的.thenby函数吗?
这里的排序顺序是首先在第 5 列(基于 1 的索引)排序,然后在第 3 列,最后在第 4 列;全部升序。

我会首先定义一个 class 来携带有关一条记录的信息,并为 operator>>operator<< 添加重载以帮助 reading/writing 记录 [=55= 】 溪流。我可能还会添加一个帮助程序来处理逗号分隔符。

首先,我用过的headers组:

#include <algorithm>   // sort
#include <array>       // array
#include <cstdint>     // integer types
#include <filesystem>  // filesystem
#include <fstream>     // ifstream
#include <iostream>    // cout
#include <iterator>    // istream_iterator
#include <tuple>       // tie
#include <vector>      // vector

一个简单的定界符助手可能如下所示。如果定界符在流中,它会丢弃 (ignore()) 定界符;如果定界符不存在,它会在流中设置 failbit

template <char Char> struct delimiter {};

template <char Char> // read a delimiter
std::istream& operator>>(std::istream& is, const delimiter<Char>) {
    if (is.peek() == Char) is.ignore();
    else is.setstate(std::ios::failbit);
    return is;
}

template <char Char> // write a delimiter
std::ostream& operator<<(std::ostream& os, const delimiter<Char>) {
    return os.put(Char);
}

根据您提供的信息,实际的 record class 看起来像这样:

struct record {
    uint64_t start;       // ns
    uint64_t end;         // ns
    uint32_t job_id;      // [0,1000000]
    uint16_t task_id;     // [0,10000]
    uint32_t machine_id;  // [0,30000000]
    double cpu_time;
    std::array<double, 20 - 6> unknown;
};

然后可以使用 delimiter class 模板(实例化为使用逗号和换行符作为分隔符)像这样从流中读取这样的记录:

std::istream& operator>>(std::istream& is, record& r) {
    delimiter<','> del;
    delimiter<'\n'> nl;

    // first read the named fields
    if (is >> r.start >> del >> r.end >> del >> r.job_id >> del >>
        r.task_id >> del >> r.machine_id >> del >> r.cpu_time)
    {
        // then read the unnamed fields:
        for (auto& unk : r.unknown) is >> del >> unk;
    }
    return is >> nl;
}

写入记录的方式类似:

std::ostream& operator<<(std::ostream& os, const record& r) {
    delimiter<','> del;
    delimiter<'\n'> nl;

    os << 
        r.start << del <<
        r.end << del <<
        r.job_id << del <<
        r.task_id << del <<
        r.machine_id << del <<
        r.cpu_time;
    for(auto&& unk : r.unknown) os << del << unk;
    return os << nl;
}

将整个文件读入内存,排序并打印结果:

int main() {
    std::filesystem::path filename = "C:/Users/1032359/cpp-projects/"
                                     "Straggler Job Analyzer/src/part-00001-of-00500.csv";
    
    std::vector<record> records;

    // Reserve space for "3GB" / 158 (the length of a record + some extra bytes)
    // records. Increase the 160 below if your records are actually longer on average:
    records.reserve(std::filesystem::file_size(filename) / 160);

    // open the file
    std::ifstream inputFile(filename);

    // copy everything from the file into `records`
    std::copy(std::istream_iterator<record>(inputFile),
              std::istream_iterator<record>{},
              std::back_inserter(records));

    // sort on columns 5-3-4 (ascending)
    auto sorter = [](const record& lhs, const record& rhs) {
        return std::tie(lhs.machine_id, lhs.job_id, lhs.task_id) <
               std::tie(rhs.machine_id, rhs.job_id, rhs.task_id);
    };
    std::sort(records.begin(), records.end(), sorter);

    // display the result
    for(auto& r : records) std::cout << r;
}

在我带有旋转磁盘的旧计算机上,上述过程大约需要 2 分钟。如果这太慢了,我会测量长 运行 部分的时间:

  • reserve
  • copy
  • sort

然后,您或许可以使用该信息来尝试找出需要改进的地方。例如,如果排序有点慢,使用 std::vector<double> 而不是 std::array<double, 20-6> 来存储未命名的字段可能会有所帮助:

struct record {
    record() : unknown(20-6) {} 

    uint64_t start;       // ns
    uint64_t end;         // ns
    uint32_t job_id;      // [0,1000000]
    uint16_t task_id;     // [0,10000]
    uint32_t machine_id;  // [0,30000000]
    double cpu_time;
    std::vector<double> unknown;
};

作为解决这个问题的另一种方法,我建议不要读取内存中的所有数据,而是使用最少的 RAM 来对巨大的 CSV 文件进行排序:std::vector 行偏移量。

重要的是理解概念,而不是精确的实现。

由于实现每行只需要 8 个字节(在 64 位模式下),要对 3 GB 的数据文件进行排序,我们只需要大约 150 MB 的 RAM。缺点是同一行需要多次解析数字,大概log2(17e6)=24次。但是,我认为这种开销可以通过使用较少的内存来部分补偿,并且不需要解析行的所有数字。

#include <Windows.h>
#include <cstdint>
#include <vector>
#include <algorithm>
#include <array>
#include <fstream>

std::array<uint64_t, 5> readFirst5Numbers(const char* line)
{
    std::array<uint64_t, 5> nbr;
    for (int i = 0; i < 5; i++)
    {
        nbr[i] = atoll(line);
        line = strchr(line, ',') + 1;
    }
    return nbr;
}

int main()
{
    // 1. Map the input file in memory
    const char* inputPath = "C:/Users/1032359/cpp-projects/Straggler Job Analyzer/src/part-00001-of-00500.csv";
    HANDLE fileHandle = CreateFileA(inputPath, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, 0, NULL);
    DWORD highsize;
    DWORD lowsize = GetFileSize(fileHandle, &highsize);
    HANDLE mappingHandle = CreateFileMapping(fileHandle, NULL, PAGE_READONLY, highsize, lowsize, NULL);
    size_t fileSize = (size_t)lowsize | (size_t)highsize << 32;
    const char* memoryAddr = (const char*)MapViewOfFile(mappingHandle, FILE_MAP_READ, 0, 0, fileSize);

    // 2. Find the offset of the start of lines
    std::vector<size_t> linesOffset;
    linesOffset.push_back(0);
    for (size_t i = 0; i < fileSize; i++)
        if (memoryAddr[i] == '\n')
            linesOffset.push_back(i + 1);
    linesOffset.pop_back();

    // 3. sort the offset according to some logic
    std::sort(linesOffset.begin(), linesOffset.end(), [memoryAddr](const size_t& offset1, const size_t& offset2) {
        std::array<uint64_t, 5> nbr1 = readFirst5Numbers(memoryAddr + offset1);
        std::array<uint64_t, 5> nbr2 = readFirst5Numbers(memoryAddr + offset2);
        if (nbr1[4] != nbr2[4])
            return nbr1[4] < nbr2[4];
        if (nbr1[2] != nbr2[2])
            return nbr1[2] < nbr2[2];
        return nbr1[4] < nbr2[4];
        });

    // 4. output sorted array
    const char* outputPath = "C:/Users/1032359/cpp-projects/Straggler Job Analyzer/output/part-00001-of-00500.csv";
    std::ofstream outputFile;
    outputFile.open(outputPath);
    for (size_t offset : linesOffset)
    {
        const char* line = memoryAddr + offset;
        size_t len = strchr(line, '\n') + 1 - line;
        outputFile.write(line, len);
    }
}

我建议采用稍微不同的方法:

  1. 不解析整行,只提取用于排序的字段
  2. 请注意,您声明的范围需要少量的位数,这些位数合起来可以放在一个 64 位值中:

30,000,000 - 25 bit

10,000 - 14 bit

1,000,000 - 20 bit

  1. 在你的向量中保存一个“原始”源,以便你可以根据需要写出来。

这是我得到的:

#include <fstream>
#include <iostream>
#include <string>
#include <vector>
#include <chrono>
#include <algorithm>

struct Record {
    uint64_t key;
    std::string str;
    Record(uint64_t key, std::string&& str)
        : key(key)
        , str(std::move(str))
    {}
};

int main()
{
    auto t1 = std::chrono::high_resolution_clock::now();
    std::ifstream src("data.csv");
    std::vector<Record> v;
    std::string str;
    uint64_t key(0);
    while (src >> str)
    {
        size_t pos = str.find(',') + 1;
        pos = str.find(',', pos) + 1;
        char* p(nullptr);
        uint64_t f3 = strtoull(&str[pos], &p, 10);
        uint64_t f4 = strtoull(++p, &p, 10);
        uint64_t f5 = strtoull(++p, &p, 10);
        key  = f5 << 34;
        key |= f3 << 14;
        key |= f4;
        v.emplace_back(key, std::move(str));
    }
    std::sort(v.begin(), v.end(), [](const Record& a, const Record& b) {
        return a.key < b.key;
        });
    auto t2 = std::chrono::high_resolution_clock::now();
    std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count() << std::endl;

    std::ofstream out("out.csv");
    for (const auto& r : v) {
        out.write(r.str.c_str(), r.str.length());
        out.write("\n", 1);
    }
    auto t3 = std::chrono::high_resolution_clock::now();
    std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(t3 - t2).count() << std::endl;
}

当然,您可以 reserve space 预先在向量中避免重新分配。

我生成了一个包含 18,000,000 条记录的文件。我的时间显示读取/排序文件大约 30 秒,写入输出大约 200 秒。

更新:

out.write() 替换流式传输,将写入时间从 200 秒减少到 17 秒!