使用临时文件处理大量数据
Using temporary files for working with large amount of data
我正在尝试使用临时序列化文件(数据文件中包含[no, offset, length] 的指针table 和包含数据的数据文件)来实现线程间通信。一个线程应该接收数据(处理它)并将其保存到内存中。第二个线程应该从内存中读取数据并显示结果。 (输入线程只追加数据,输出线程只读取数据。)
我必须将其编译为 32 位,因此我尝试通过 reading/writing 一个临时文件来解决 2 GB 的限制。
我实现了一个简单的例子。但问题是,如果 I/O 个线程同时工作,则输出线程无法正确读取。如果输入线程写入并关闭文件,则输出线程读取并关闭它工作正常。我用 shared_mutex 和 mutex 进行了同步,结果同样糟糕。
提前感谢您的回复。
更新: 根据 重置标志 (stream.clear()) 后行为会变得更好,但有时它仍然会失败,有时会通过。
主要:
int main() {
//Start input and output job
std::thread input = std::thread(inputJob);
std::thread output = std::thread(outputJob);
//Wait here for end
input.join();
output.join();
//HERE checking results
return 0;
}
输入作业:
void inputJob() {
is_on = true;
//Loading input data
for (int i = 1; i < 10; i++) {
student s("George", "Patton", 100000 + i, (i % 2) > 0, "0A");
for (int j = 1; j < 4; j++) s.subjects.push_back(subject(rand(), "MA", (j % 5)));
s1.push_back(s);
}
//Save to binary file
std::fstream data_stream, table_stream;
table_stream.open("./table.data", std::fstream::out | std::fstream::trunc | std::fstream::binary);
data_stream.open("./data.data", std::fstream::out | std::fstream::trunc | std::fstream::binary);
size_t off = 0;
if (table_stream.is_open() && data_stream.is_open()) for (size_t i = 0; i < s1.size(); i++) {
std::string tmp = s1[i].toBinaryString();
size_t sz = tmp.size();
table_row t(i, off, sz);
off += sz;
{
std::lock_guard lock(m);
table_stream << t.toBinaryString();
data_stream << tmp;
cout << "Written" << endl;
}
}
table_stream.close();
data_stream.close();
is_on = false;
}
输出作业:
void outputJob() {
//Load from binary file
std::fstream data_stream, table_stream;
table_stream.open("./table.data", std::fstream::in | std::fstream::binary);
data_stream.open("./data.data", std::fstream::in | std::fstream::binary);
if (table_stream.is_open() && data_stream.is_open()) {
size_t row_sz = sizeof(table_row);
std::string line = "";
size_t index = 0;
unsigned table_r = 0;
bool was_empty = false;
while (is_on || (was_empty == false)) {
{
std::lock_guard lock(m);
if (tryGetData(table_stream, line, row_sz, table_r) == 0 && line.empty() == false) {
table_r += row_sz;
was_empty = false;
table_row row;
index = 0;
row.fromBinaryString(line, index, line.size());
if (tryGetData(data_stream, line, row.len, row.off) == 0 && line.empty() == false) {
was_empty = false;
index = 0;
student tmp;
tmp.fromBinaryString(line, index);
if (VAL_CHECK == 1) s2.push_back(tmp);
}
else was_empty = true;
}
else was_empty = true;
}
}
}
table_stream.close();
data_stream.close();
}
尝试获取数据功能:
int tryGetData(std::fstream &data_stream, std::string& data, size_t data_sz, size_t offset) {
int ret = 0;
data = "";
if (data_stream.is_open()) {
//Set ptr
if (offset != UINT32_MAX) data_stream.seekp(offset);
char c;
while (data_sz > 0 && data_stream.get(c)) {
data.push_back(c);
data_sz--;
}
if (data_stream.eof()) ret = 1;
}
return ret;
}
我能想到的最简单的解决方案是使用双缓冲。每种文件类型各有两个,并确保输入线程写入一对,而输出线程始终读取另一对。
我能想到的所有其他解决方案都需要确保 OS 或文件库没有对文件进行任何缓存,因此将是特定于平台的。但如果这不是问题,请继续阅读 memory mapped files,例如。
主要问题是,写入线程可能比读取线程晚启动。如果读线程正在等待写线程打开文件,那么它就像魅力一样。
我正在尝试使用临时序列化文件(数据文件中包含[no, offset, length] 的指针table 和包含数据的数据文件)来实现线程间通信。一个线程应该接收数据(处理它)并将其保存到内存中。第二个线程应该从内存中读取数据并显示结果。 (输入线程只追加数据,输出线程只读取数据。)
我必须将其编译为 32 位,因此我尝试通过 reading/writing 一个临时文件来解决 2 GB 的限制。
我实现了一个简单的例子。但问题是,如果 I/O 个线程同时工作,则输出线程无法正确读取。如果输入线程写入并关闭文件,则输出线程读取并关闭它工作正常。我用 shared_mutex 和 mutex 进行了同步,结果同样糟糕。
提前感谢您的回复。
更新: 根据
主要:
int main() {
//Start input and output job
std::thread input = std::thread(inputJob);
std::thread output = std::thread(outputJob);
//Wait here for end
input.join();
output.join();
//HERE checking results
return 0;
}
输入作业:
void inputJob() {
is_on = true;
//Loading input data
for (int i = 1; i < 10; i++) {
student s("George", "Patton", 100000 + i, (i % 2) > 0, "0A");
for (int j = 1; j < 4; j++) s.subjects.push_back(subject(rand(), "MA", (j % 5)));
s1.push_back(s);
}
//Save to binary file
std::fstream data_stream, table_stream;
table_stream.open("./table.data", std::fstream::out | std::fstream::trunc | std::fstream::binary);
data_stream.open("./data.data", std::fstream::out | std::fstream::trunc | std::fstream::binary);
size_t off = 0;
if (table_stream.is_open() && data_stream.is_open()) for (size_t i = 0; i < s1.size(); i++) {
std::string tmp = s1[i].toBinaryString();
size_t sz = tmp.size();
table_row t(i, off, sz);
off += sz;
{
std::lock_guard lock(m);
table_stream << t.toBinaryString();
data_stream << tmp;
cout << "Written" << endl;
}
}
table_stream.close();
data_stream.close();
is_on = false;
}
输出作业:
void outputJob() {
//Load from binary file
std::fstream data_stream, table_stream;
table_stream.open("./table.data", std::fstream::in | std::fstream::binary);
data_stream.open("./data.data", std::fstream::in | std::fstream::binary);
if (table_stream.is_open() && data_stream.is_open()) {
size_t row_sz = sizeof(table_row);
std::string line = "";
size_t index = 0;
unsigned table_r = 0;
bool was_empty = false;
while (is_on || (was_empty == false)) {
{
std::lock_guard lock(m);
if (tryGetData(table_stream, line, row_sz, table_r) == 0 && line.empty() == false) {
table_r += row_sz;
was_empty = false;
table_row row;
index = 0;
row.fromBinaryString(line, index, line.size());
if (tryGetData(data_stream, line, row.len, row.off) == 0 && line.empty() == false) {
was_empty = false;
index = 0;
student tmp;
tmp.fromBinaryString(line, index);
if (VAL_CHECK == 1) s2.push_back(tmp);
}
else was_empty = true;
}
else was_empty = true;
}
}
}
table_stream.close();
data_stream.close();
}
尝试获取数据功能:
int tryGetData(std::fstream &data_stream, std::string& data, size_t data_sz, size_t offset) {
int ret = 0;
data = "";
if (data_stream.is_open()) {
//Set ptr
if (offset != UINT32_MAX) data_stream.seekp(offset);
char c;
while (data_sz > 0 && data_stream.get(c)) {
data.push_back(c);
data_sz--;
}
if (data_stream.eof()) ret = 1;
}
return ret;
}
我能想到的最简单的解决方案是使用双缓冲。每种文件类型各有两个,并确保输入线程写入一对,而输出线程始终读取另一对。
我能想到的所有其他解决方案都需要确保 OS 或文件库没有对文件进行任何缓存,因此将是特定于平台的。但如果这不是问题,请继续阅读 memory mapped files,例如。
主要问题是,写入线程可能比读取线程晚启动。如果读线程正在等待写线程打开文件,那么它就像魅力一样。