在 C++ 中处理非常大的数据
Handle very large data in C++
我有一个 .csv
文件,其中包含 ~3GB 的数据。我想读取所有数据并进行处理。以下程序从文件中读取数据并将其存储到 std::vector<std::vector<std::string>>
中。但是,程序运行时间过长导致应用程序 (vscode) 冻结,需要重新启动。我做错了什么?
#include <algorithm>
#include <iostream>
#include <fstream>
#include "sheet.hpp"
extern std::vector<std::string> split(const std::string& str, const std::string& delim);
int main() {
Sheet sheet;
std::ifstream inputFile;
inputFile.open("C:/Users/1032359/cpp-projects/Straggler Job Analyzer/src/part-00001-of-00500.csv");
std::string line;
while(inputFile >> line) {
sheet.addRow(split(line, ","));
}
return 0;
}
// split
和 Sheet
的成员函数已经过全面测试并且工作正常。 split
虽然复杂度为 N^2...
EDIT1:已根据评论中的建议修复读取的文件。
分割函数:
std::vector<std::string> split(const std::string& str, const std::string& delim) {
std::vector<std::string> vec_of_tokens;
std::string token;
for (auto character : str) {
if (std::find(delim.begin(), delim.end(), character) != delim.end()) {
vec_of_tokens.push_back(token);
token = "";
continue;
}
token += character;
}
vec_of_tokens.push_back(token);
return vec_of_tokens;
}
编辑2:
虚拟 csv 行:
5612000000,5700000000,4665712499,798,3349189123,0.02698,0.06714,0.07715,0.004219,0.004868,0.06726,7.915e-05,0.0003681,0.27,0.00293,3.285,0.008261,0,0,0.01608
limits:
field1: starting timestamp (nanosecs)
field2: ending timestamp (nanosecs)
field3: job id (<= 1,000,000)
field4: task id (<= 10,000)
field5: machine id (<= 30,000,000)
field6: CPU time (sorry, no clue)
field7-20: no idea, unused for the current stage, but needed for later stages.
EDIT3:需要输出
还记得Excel中的.thenby函数吗?
这里的排序顺序是首先在第 5 列(基于 1 的索引)排序,然后在第 3 列,最后在第 4 列;全部升序。
我会首先定义一个 class 来携带有关一条记录的信息,并为 operator>>
和 operator<<
添加重载以帮助 reading/writing 记录 [=55= 】 溪流。我可能还会添加一个帮助程序来处理逗号分隔符。
首先,我用过的headers组:
#include <algorithm> // sort
#include <array> // array
#include <cstdint> // integer types
#include <filesystem> // filesystem
#include <fstream> // ifstream
#include <iostream> // cout
#include <iterator> // istream_iterator
#include <tuple> // tie
#include <vector> // vector
一个简单的定界符助手可能如下所示。如果定界符在流中,它会丢弃 (ignore()
) 定界符;如果定界符不存在,它会在流中设置 failbit
。
template <char Char> struct delimiter {};
template <char Char> // read a delimiter
std::istream& operator>>(std::istream& is, const delimiter<Char>) {
if (is.peek() == Char) is.ignore();
else is.setstate(std::ios::failbit);
return is;
}
template <char Char> // write a delimiter
std::ostream& operator<<(std::ostream& os, const delimiter<Char>) {
return os.put(Char);
}
根据您提供的信息,实际的 record
class 看起来像这样:
struct record {
uint64_t start; // ns
uint64_t end; // ns
uint32_t job_id; // [0,1000000]
uint16_t task_id; // [0,10000]
uint32_t machine_id; // [0,30000000]
double cpu_time;
std::array<double, 20 - 6> unknown;
};
然后可以使用 delimiter
class 模板(实例化为使用逗号和换行符作为分隔符)像这样从流中读取这样的记录:
std::istream& operator>>(std::istream& is, record& r) {
delimiter<','> del;
delimiter<'\n'> nl;
// first read the named fields
if (is >> r.start >> del >> r.end >> del >> r.job_id >> del >>
r.task_id >> del >> r.machine_id >> del >> r.cpu_time)
{
// then read the unnamed fields:
for (auto& unk : r.unknown) is >> del >> unk;
}
return is >> nl;
}
写入记录的方式类似:
std::ostream& operator<<(std::ostream& os, const record& r) {
delimiter<','> del;
delimiter<'\n'> nl;
os <<
r.start << del <<
r.end << del <<
r.job_id << del <<
r.task_id << del <<
r.machine_id << del <<
r.cpu_time;
for(auto&& unk : r.unknown) os << del << unk;
return os << nl;
}
将整个文件读入内存,排序并打印结果:
int main() {
std::filesystem::path filename = "C:/Users/1032359/cpp-projects/"
"Straggler Job Analyzer/src/part-00001-of-00500.csv";
std::vector<record> records;
// Reserve space for "3GB" / 158 (the length of a record + some extra bytes)
// records. Increase the 160 below if your records are actually longer on average:
records.reserve(std::filesystem::file_size(filename) / 160);
// open the file
std::ifstream inputFile(filename);
// copy everything from the file into `records`
std::copy(std::istream_iterator<record>(inputFile),
std::istream_iterator<record>{},
std::back_inserter(records));
// sort on columns 5-3-4 (ascending)
auto sorter = [](const record& lhs, const record& rhs) {
return std::tie(lhs.machine_id, lhs.job_id, lhs.task_id) <
std::tie(rhs.machine_id, rhs.job_id, rhs.task_id);
};
std::sort(records.begin(), records.end(), sorter);
// display the result
for(auto& r : records) std::cout << r;
}
在我带有旋转磁盘的旧计算机上,上述过程大约需要 2 分钟。如果这太慢了,我会测量长 运行 部分的时间:
reserve
copy
sort
然后,您或许可以使用该信息来尝试找出需要改进的地方。例如,如果排序有点慢,使用 std::vector<double>
而不是 std::array<double, 20-6>
来存储未命名的字段可能会有所帮助:
struct record {
record() : unknown(20-6) {}
uint64_t start; // ns
uint64_t end; // ns
uint32_t job_id; // [0,1000000]
uint16_t task_id; // [0,10000]
uint32_t machine_id; // [0,30000000]
double cpu_time;
std::vector<double> unknown;
};
作为解决这个问题的另一种方法,我建议不要读取内存中的所有数据,而是使用最少的 RAM 来对巨大的 CSV 文件进行排序:std::vector
行偏移量。
重要的是理解概念,而不是精确的实现。
由于实现每行只需要 8 个字节(在 64 位模式下),要对 3 GB 的数据文件进行排序,我们只需要大约 150 MB 的 RAM。缺点是同一行需要多次解析数字,大概log2(17e6)
=24次。但是,我认为这种开销可以通过使用较少的内存来部分补偿,并且不需要解析行的所有数字。
#include <Windows.h>
#include <cstdint>
#include <vector>
#include <algorithm>
#include <array>
#include <fstream>
std::array<uint64_t, 5> readFirst5Numbers(const char* line)
{
std::array<uint64_t, 5> nbr;
for (int i = 0; i < 5; i++)
{
nbr[i] = atoll(line);
line = strchr(line, ',') + 1;
}
return nbr;
}
int main()
{
// 1. Map the input file in memory
const char* inputPath = "C:/Users/1032359/cpp-projects/Straggler Job Analyzer/src/part-00001-of-00500.csv";
HANDLE fileHandle = CreateFileA(inputPath, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, 0, NULL);
DWORD highsize;
DWORD lowsize = GetFileSize(fileHandle, &highsize);
HANDLE mappingHandle = CreateFileMapping(fileHandle, NULL, PAGE_READONLY, highsize, lowsize, NULL);
size_t fileSize = (size_t)lowsize | (size_t)highsize << 32;
const char* memoryAddr = (const char*)MapViewOfFile(mappingHandle, FILE_MAP_READ, 0, 0, fileSize);
// 2. Find the offset of the start of lines
std::vector<size_t> linesOffset;
linesOffset.push_back(0);
for (size_t i = 0; i < fileSize; i++)
if (memoryAddr[i] == '\n')
linesOffset.push_back(i + 1);
linesOffset.pop_back();
// 3. sort the offset according to some logic
std::sort(linesOffset.begin(), linesOffset.end(), [memoryAddr](const size_t& offset1, const size_t& offset2) {
std::array<uint64_t, 5> nbr1 = readFirst5Numbers(memoryAddr + offset1);
std::array<uint64_t, 5> nbr2 = readFirst5Numbers(memoryAddr + offset2);
if (nbr1[4] != nbr2[4])
return nbr1[4] < nbr2[4];
if (nbr1[2] != nbr2[2])
return nbr1[2] < nbr2[2];
return nbr1[4] < nbr2[4];
});
// 4. output sorted array
const char* outputPath = "C:/Users/1032359/cpp-projects/Straggler Job Analyzer/output/part-00001-of-00500.csv";
std::ofstream outputFile;
outputFile.open(outputPath);
for (size_t offset : linesOffset)
{
const char* line = memoryAddr + offset;
size_t len = strchr(line, '\n') + 1 - line;
outputFile.write(line, len);
}
}
我建议采用稍微不同的方法:
- 不解析整行,只提取用于排序的字段
- 请注意,您声明的范围需要少量的位数,这些位数合起来可以放在一个 64 位值中:
30,000,000 - 25 bit
10,000 - 14 bit
1,000,000 - 20 bit
- 在你的向量中保存一个“原始”源,以便你可以根据需要写出来。
这是我得到的:
#include <fstream>
#include <iostream>
#include <string>
#include <vector>
#include <chrono>
#include <algorithm>
struct Record {
uint64_t key;
std::string str;
Record(uint64_t key, std::string&& str)
: key(key)
, str(std::move(str))
{}
};
int main()
{
auto t1 = std::chrono::high_resolution_clock::now();
std::ifstream src("data.csv");
std::vector<Record> v;
std::string str;
uint64_t key(0);
while (src >> str)
{
size_t pos = str.find(',') + 1;
pos = str.find(',', pos) + 1;
char* p(nullptr);
uint64_t f3 = strtoull(&str[pos], &p, 10);
uint64_t f4 = strtoull(++p, &p, 10);
uint64_t f5 = strtoull(++p, &p, 10);
key = f5 << 34;
key |= f3 << 14;
key |= f4;
v.emplace_back(key, std::move(str));
}
std::sort(v.begin(), v.end(), [](const Record& a, const Record& b) {
return a.key < b.key;
});
auto t2 = std::chrono::high_resolution_clock::now();
std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count() << std::endl;
std::ofstream out("out.csv");
for (const auto& r : v) {
out.write(r.str.c_str(), r.str.length());
out.write("\n", 1);
}
auto t3 = std::chrono::high_resolution_clock::now();
std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(t3 - t2).count() << std::endl;
}
当然,您可以 reserve
space 预先在向量中避免重新分配。
我生成了一个包含 18,000,000 条记录的文件。我的时间显示读取/排序文件大约 30 秒,写入输出大约 200 秒。
更新:
用 out.write()
替换流式传输,将写入时间从 200 秒减少到 17 秒!
我有一个 .csv
文件,其中包含 ~3GB 的数据。我想读取所有数据并进行处理。以下程序从文件中读取数据并将其存储到 std::vector<std::vector<std::string>>
中。但是,程序运行时间过长导致应用程序 (vscode) 冻结,需要重新启动。我做错了什么?
#include <algorithm>
#include <iostream>
#include <fstream>
#include "sheet.hpp"
extern std::vector<std::string> split(const std::string& str, const std::string& delim);
int main() {
Sheet sheet;
std::ifstream inputFile;
inputFile.open("C:/Users/1032359/cpp-projects/Straggler Job Analyzer/src/part-00001-of-00500.csv");
std::string line;
while(inputFile >> line) {
sheet.addRow(split(line, ","));
}
return 0;
}
// split
和 Sheet
的成员函数已经过全面测试并且工作正常。 split
虽然复杂度为 N^2...
EDIT1:已根据评论中的建议修复读取的文件。
分割函数:
std::vector<std::string> split(const std::string& str, const std::string& delim) {
std::vector<std::string> vec_of_tokens;
std::string token;
for (auto character : str) {
if (std::find(delim.begin(), delim.end(), character) != delim.end()) {
vec_of_tokens.push_back(token);
token = "";
continue;
}
token += character;
}
vec_of_tokens.push_back(token);
return vec_of_tokens;
}
编辑2:
虚拟 csv 行:
5612000000,5700000000,4665712499,798,3349189123,0.02698,0.06714,0.07715,0.004219,0.004868,0.06726,7.915e-05,0.0003681,0.27,0.00293,3.285,0.008261,0,0,0.01608
limits:
field1: starting timestamp (nanosecs)
field2: ending timestamp (nanosecs)
field3: job id (<= 1,000,000)
field4: task id (<= 10,000)
field5: machine id (<= 30,000,000)
field6: CPU time (sorry, no clue)
field7-20: no idea, unused for the current stage, but needed for later stages.
EDIT3:需要输出
还记得Excel中的.thenby函数吗?
这里的排序顺序是首先在第 5 列(基于 1 的索引)排序,然后在第 3 列,最后在第 4 列;全部升序。
我会首先定义一个 class 来携带有关一条记录的信息,并为 operator>>
和 operator<<
添加重载以帮助 reading/writing 记录 [=55= 】 溪流。我可能还会添加一个帮助程序来处理逗号分隔符。
首先,我用过的headers组:
#include <algorithm> // sort
#include <array> // array
#include <cstdint> // integer types
#include <filesystem> // filesystem
#include <fstream> // ifstream
#include <iostream> // cout
#include <iterator> // istream_iterator
#include <tuple> // tie
#include <vector> // vector
一个简单的定界符助手可能如下所示。如果定界符在流中,它会丢弃 (ignore()
) 定界符;如果定界符不存在,它会在流中设置 failbit
。
template <char Char> struct delimiter {};
template <char Char> // read a delimiter
std::istream& operator>>(std::istream& is, const delimiter<Char>) {
if (is.peek() == Char) is.ignore();
else is.setstate(std::ios::failbit);
return is;
}
template <char Char> // write a delimiter
std::ostream& operator<<(std::ostream& os, const delimiter<Char>) {
return os.put(Char);
}
根据您提供的信息,实际的 record
class 看起来像这样:
struct record {
uint64_t start; // ns
uint64_t end; // ns
uint32_t job_id; // [0,1000000]
uint16_t task_id; // [0,10000]
uint32_t machine_id; // [0,30000000]
double cpu_time;
std::array<double, 20 - 6> unknown;
};
然后可以使用 delimiter
class 模板(实例化为使用逗号和换行符作为分隔符)像这样从流中读取这样的记录:
std::istream& operator>>(std::istream& is, record& r) {
delimiter<','> del;
delimiter<'\n'> nl;
// first read the named fields
if (is >> r.start >> del >> r.end >> del >> r.job_id >> del >>
r.task_id >> del >> r.machine_id >> del >> r.cpu_time)
{
// then read the unnamed fields:
for (auto& unk : r.unknown) is >> del >> unk;
}
return is >> nl;
}
写入记录的方式类似:
std::ostream& operator<<(std::ostream& os, const record& r) {
delimiter<','> del;
delimiter<'\n'> nl;
os <<
r.start << del <<
r.end << del <<
r.job_id << del <<
r.task_id << del <<
r.machine_id << del <<
r.cpu_time;
for(auto&& unk : r.unknown) os << del << unk;
return os << nl;
}
将整个文件读入内存,排序并打印结果:
int main() {
std::filesystem::path filename = "C:/Users/1032359/cpp-projects/"
"Straggler Job Analyzer/src/part-00001-of-00500.csv";
std::vector<record> records;
// Reserve space for "3GB" / 158 (the length of a record + some extra bytes)
// records. Increase the 160 below if your records are actually longer on average:
records.reserve(std::filesystem::file_size(filename) / 160);
// open the file
std::ifstream inputFile(filename);
// copy everything from the file into `records`
std::copy(std::istream_iterator<record>(inputFile),
std::istream_iterator<record>{},
std::back_inserter(records));
// sort on columns 5-3-4 (ascending)
auto sorter = [](const record& lhs, const record& rhs) {
return std::tie(lhs.machine_id, lhs.job_id, lhs.task_id) <
std::tie(rhs.machine_id, rhs.job_id, rhs.task_id);
};
std::sort(records.begin(), records.end(), sorter);
// display the result
for(auto& r : records) std::cout << r;
}
在我带有旋转磁盘的旧计算机上,上述过程大约需要 2 分钟。如果这太慢了,我会测量长 运行 部分的时间:
reserve
copy
sort
然后,您或许可以使用该信息来尝试找出需要改进的地方。例如,如果排序有点慢,使用 std::vector<double>
而不是 std::array<double, 20-6>
来存储未命名的字段可能会有所帮助:
struct record {
record() : unknown(20-6) {}
uint64_t start; // ns
uint64_t end; // ns
uint32_t job_id; // [0,1000000]
uint16_t task_id; // [0,10000]
uint32_t machine_id; // [0,30000000]
double cpu_time;
std::vector<double> unknown;
};
作为解决这个问题的另一种方法,我建议不要读取内存中的所有数据,而是使用最少的 RAM 来对巨大的 CSV 文件进行排序:std::vector
行偏移量。
重要的是理解概念,而不是精确的实现。
由于实现每行只需要 8 个字节(在 64 位模式下),要对 3 GB 的数据文件进行排序,我们只需要大约 150 MB 的 RAM。缺点是同一行需要多次解析数字,大概log2(17e6)
=24次。但是,我认为这种开销可以通过使用较少的内存来部分补偿,并且不需要解析行的所有数字。
#include <Windows.h>
#include <cstdint>
#include <vector>
#include <algorithm>
#include <array>
#include <fstream>
std::array<uint64_t, 5> readFirst5Numbers(const char* line)
{
std::array<uint64_t, 5> nbr;
for (int i = 0; i < 5; i++)
{
nbr[i] = atoll(line);
line = strchr(line, ',') + 1;
}
return nbr;
}
int main()
{
// 1. Map the input file in memory
const char* inputPath = "C:/Users/1032359/cpp-projects/Straggler Job Analyzer/src/part-00001-of-00500.csv";
HANDLE fileHandle = CreateFileA(inputPath, GENERIC_READ, FILE_SHARE_READ, NULL, OPEN_EXISTING, 0, NULL);
DWORD highsize;
DWORD lowsize = GetFileSize(fileHandle, &highsize);
HANDLE mappingHandle = CreateFileMapping(fileHandle, NULL, PAGE_READONLY, highsize, lowsize, NULL);
size_t fileSize = (size_t)lowsize | (size_t)highsize << 32;
const char* memoryAddr = (const char*)MapViewOfFile(mappingHandle, FILE_MAP_READ, 0, 0, fileSize);
// 2. Find the offset of the start of lines
std::vector<size_t> linesOffset;
linesOffset.push_back(0);
for (size_t i = 0; i < fileSize; i++)
if (memoryAddr[i] == '\n')
linesOffset.push_back(i + 1);
linesOffset.pop_back();
// 3. sort the offset according to some logic
std::sort(linesOffset.begin(), linesOffset.end(), [memoryAddr](const size_t& offset1, const size_t& offset2) {
std::array<uint64_t, 5> nbr1 = readFirst5Numbers(memoryAddr + offset1);
std::array<uint64_t, 5> nbr2 = readFirst5Numbers(memoryAddr + offset2);
if (nbr1[4] != nbr2[4])
return nbr1[4] < nbr2[4];
if (nbr1[2] != nbr2[2])
return nbr1[2] < nbr2[2];
return nbr1[4] < nbr2[4];
});
// 4. output sorted array
const char* outputPath = "C:/Users/1032359/cpp-projects/Straggler Job Analyzer/output/part-00001-of-00500.csv";
std::ofstream outputFile;
outputFile.open(outputPath);
for (size_t offset : linesOffset)
{
const char* line = memoryAddr + offset;
size_t len = strchr(line, '\n') + 1 - line;
outputFile.write(line, len);
}
}
我建议采用稍微不同的方法:
- 不解析整行,只提取用于排序的字段
- 请注意,您声明的范围需要少量的位数,这些位数合起来可以放在一个 64 位值中:
30,000,000 - 25 bit
10,000 - 14 bit
1,000,000 - 20 bit
- 在你的向量中保存一个“原始”源,以便你可以根据需要写出来。
这是我得到的:
#include <fstream>
#include <iostream>
#include <string>
#include <vector>
#include <chrono>
#include <algorithm>
struct Record {
uint64_t key;
std::string str;
Record(uint64_t key, std::string&& str)
: key(key)
, str(std::move(str))
{}
};
int main()
{
auto t1 = std::chrono::high_resolution_clock::now();
std::ifstream src("data.csv");
std::vector<Record> v;
std::string str;
uint64_t key(0);
while (src >> str)
{
size_t pos = str.find(',') + 1;
pos = str.find(',', pos) + 1;
char* p(nullptr);
uint64_t f3 = strtoull(&str[pos], &p, 10);
uint64_t f4 = strtoull(++p, &p, 10);
uint64_t f5 = strtoull(++p, &p, 10);
key = f5 << 34;
key |= f3 << 14;
key |= f4;
v.emplace_back(key, std::move(str));
}
std::sort(v.begin(), v.end(), [](const Record& a, const Record& b) {
return a.key < b.key;
});
auto t2 = std::chrono::high_resolution_clock::now();
std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(t2 - t1).count() << std::endl;
std::ofstream out("out.csv");
for (const auto& r : v) {
out.write(r.str.c_str(), r.str.length());
out.write("\n", 1);
}
auto t3 = std::chrono::high_resolution_clock::now();
std::cout << std::chrono::duration_cast<std::chrono::milliseconds>(t3 - t2).count() << std::endl;
}
当然,您可以 reserve
space 预先在向量中避免重新分配。
我生成了一个包含 18,000,000 条记录的文件。我的时间显示读取/排序文件大约 30 秒,写入输出大约 200 秒。
更新:
用 out.write()
替换流式传输,将写入时间从 200 秒减少到 17 秒!