使用临时文件处理大量数据

Using temporary files for working with large amount of data

我正在尝试使用临时序列化文件(数据文件中包含[no, offset, length] 的指针table 和包含数据的数据文件)来实现线程间通信。一个线程应该接收数据(处理它)并将其保存到内存中。第二个线程应该从内存中读取数据并显示结果。 (输入线程只追加数据,输出线程只读取数据。)

我必须将其编译为 32 位,因此我尝试通过 reading/writing 一个临时文件来解决 2 GB 的限制。

我实现了一个简单的例子。但问题是,如果 I/O 个线程同时工作,则输出线程无法正确读取。如果输入线程写入并关闭文件,则输出线程读取并关闭它工作正常。我用 shared_mutex 和 mutex 进行了同步,结果同样糟糕。

提前感谢您的回复。

更新: 根据 重置标志 (stream.clear()) 后行为会变得更好,但有时它仍然会失败,有时会通过。

主要:

int main() {        
      //Start input and output job
      std::thread input = std::thread(inputJob);
      std::thread output = std::thread(outputJob);
        
      //Wait here for end
      input.join();
      output.join();
    
    //HERE checking results
    
      return 0;
}

输入作业:

void inputJob() {
    is_on = true;
    //Loading input data
    for (int i = 1; i < 10; i++) {
        student s("George", "Patton", 100000 + i, (i % 2) > 0, "0A");
        for (int j = 1; j < 4; j++) s.subjects.push_back(subject(rand(), "MA", (j % 5)));
        s1.push_back(s);
    }

    //Save to binary file
    std::fstream data_stream, table_stream;
    table_stream.open("./table.data", std::fstream::out | std::fstream::trunc | std::fstream::binary);
    data_stream.open("./data.data", std::fstream::out | std::fstream::trunc | std::fstream::binary);
    size_t off = 0;
    if (table_stream.is_open() && data_stream.is_open()) for (size_t i = 0; i < s1.size(); i++) {
        std::string tmp = s1[i].toBinaryString();
        size_t sz = tmp.size();
        table_row t(i, off, sz);
        off += sz;
        {
            std::lock_guard lock(m);
            table_stream << t.toBinaryString();
            data_stream << tmp;
            cout << "Written" << endl;
        }       
    }
    table_stream.close();
    data_stream.close();
    is_on = false;
}

输出作业:

    void outputJob() {
    //Load from binary file
    std::fstream data_stream, table_stream;
    table_stream.open("./table.data", std::fstream::in | std::fstream::binary);
    data_stream.open("./data.data", std::fstream::in | std::fstream::binary);
    if (table_stream.is_open() && data_stream.is_open()) {
        size_t row_sz = sizeof(table_row);
        std::string line = "";
        size_t index = 0;

            unsigned table_r = 0;
            bool was_empty = false;
            while (is_on || (was_empty == false)) {
                {
                    std::lock_guard lock(m);
                    if (tryGetData(table_stream, line, row_sz, table_r) == 0 && line.empty() == false) {
                        table_r += row_sz;
                        was_empty = false;
                        table_row row;
                        index = 0;
                        row.fromBinaryString(line, index, line.size());

                        if (tryGetData(data_stream, line, row.len, row.off) == 0 && line.empty() == false) {
                            was_empty = false;
                            index = 0;
                            student tmp;
                            tmp.fromBinaryString(line, index);
                            if (VAL_CHECK == 1) s2.push_back(tmp);
                        }
                        else was_empty = true;
                    }
                    else was_empty = true;
                }
            }
        
    }
    table_stream.close();
    data_stream.close();
}

尝试获取数据功能:

    int tryGetData(std::fstream &data_stream, std::string& data, size_t data_sz, size_t offset) {
    int ret = 0;
    data = "";
        if (data_stream.is_open()) {
            //Set ptr
            if (offset != UINT32_MAX) data_stream.seekp(offset);

            char c;
            while (data_sz > 0 && data_stream.get(c)) {
                data.push_back(c);
                data_sz--;
            }
            if (data_stream.eof()) ret = 1;
        }
    return ret;
}

我能想到的最简单的解决方案是使用双缓冲。每种文件类型各有两个,并确保输入线程写入一对,而输出线程始终读取另一对。

我能想到的所有其他解决方案都需要确保 OS 或文件库没有对文件进行任何缓存,因此将是特定于平台的。但如果这不是问题,请继续阅读 memory mapped files,例如。

主要问题是,写入线程可能比读取线程晚启动。如果读线程正在等待写线程打开文件,那么它就像魅力一样。