如何在没有缓冲的情况下使用 parquet-cpp 写入 streaming/row-oriented 数据?

How can I write streaming/row-oriented data using parquet-cpp without buffering?

我基本上有 row-oriented/streaming 数据 (Netflow) 进入我的 C++ 应用程序,我想将数据写入 Parquet-gzip 文件。

看parquet-cpp项目中的sample reader-writer.cc program,好像只能以柱状的方式给parquet-cpp喂数据:

constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
...
// Append a RowGroup with a specific number of rows.
parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);

// Write the Bool column
for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
   bool_writer->WriteBatch(1, nullptr, nullptr, &value);
}
// Write the Int32 column
...
// Write the ... column

这似乎意味着我需要自己缓冲 NUM_ROWS_PER_ROW_GROUP 行,然后遍历它们,以一次一列将它们传输到 parquet-cpp。我希望有更好的方法,因为这看起来效率不高,因为数据需要复制两次:一次复制到我的缓冲区中,然后一次将数据输入到 parquet-cpp 中时再复制一列。

有没有一种方法可以将每一行的数据放入 parquet-cpp 而不必先缓冲一堆行? Apache Arrow 项目(parquet-cpp 使用)有 a tutorial that shows how to convert row-wise data into an Arrow table。对于每一行输入数据,代码附加到每个列构建器:

for (const data_row& row : rows) {
   ARROW_RETURN_NOT_OK(id_builder.Append(row.id));
   ARROW_RETURN_NOT_OK(cost_builder.Append(row.cost));

我想用 parquet-cpp 做类似的事情。这可能吗?

您将永远无法完全没有缓冲,因为我们需要从行式表示转换为列式表示。在撰写本文时,最好的可能路径是构建 Apache Arrow 表,然后将其输入 parquet-cpp

parquet-cpp 提供了特殊的 Arrow APIs,可以直接对这些表进行操作,大多数情况下不需要任何额外的数据副本。您可以在 parquet/arrow/reader.hparquet/arrow/writer.h 中找到 API。

最佳但尚未实施的解决方案可以通过执行以下操作节省一些字节:

  • 在新的 parquet-cpp 中逐行摄取 API
  • 使用指定的编码和压缩设置直接对每列的这些值进行编码
  • 只在内存中缓冲这个
  • 在行组的末尾,逐列写出

虽然这个最佳解决方案可能会为您节省一些内存,但仍有一些步骤需要由某人来实施(请随意贡献或寻求帮助来实施这些步骤),您可能很擅长使用 Apache基于箭头 API.

我按照 使用箭头 API 在数据到达时填充箭头 table,然后使用 parquet-cpp 的 [= 写出 table 16=] 方法。我将 GZIP 设置为默认压缩,但为第二个字段指定了 SNAPPY。

#include <iostream>
#include "arrow/builder.h"
#include "arrow/table.h"
#include "arrow/io/file.h"
#include <parquet/arrow/writer.h>
#include <parquet/properties.h>

main() {
    arrow::Int32Builder sip_builder(arrow::default_memory_pool());
    arrow::Int32Builder dip_builder(arrow::default_memory_pool());
    for(size_t i=0; i < 1000; i++) {  // simulate row-oriented incoming data
        sip_builder.Append(i*100);
        dip_builder.Append(i*10 + i);
    }
    std::shared_ptr<arrow::Array> sip_array;
    sip_builder.Finish(&sip_array);
    std::shared_ptr<arrow::Array> dip_array;
    dip_builder.Finish(&dip_array);
    std::vector<std::shared_ptr<arrow::Field>> schema_definition = {
        arrow::field("sip", arrow::int32(), false /* don't allow null; makes field required */),
        arrow::field("dip", arrow::int32(), false)
    };
    auto schema = std::make_shared<arrow::Schema>(schema_definition);
    std::shared_ptr<arrow::Table> arrow_table;
    MakeTable(schema, {sip_array, dip_array}, &arrow_table);

    std::shared_ptr<arrow::io::FileOutputStream> file_output_stream;
    arrow::io::FileOutputStream::Open("test.parquet", &file_output_stream);
    parquet::WriterProperties::Builder props_builder;
    props_builder.compression(parquet::Compression::GZIP);
    props_builder.compression("dip", parquet::Compression::SNAPPY);
    auto props = props_builder.build();
    parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
       file_output_stream, sip_array->length(), props);
    std::cout << "done" << std::endl;
}
$ g++ -std=c++11 -I/opt/parquet-cpp/build/release/include -lparquet -larrow arrow-test.cc; ./a.out
done
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet
File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 1000
Number of RowGroups: 1            <<----------
Number of Real Columns: 2
Number of Columns: 2
Number of Selected Columns: 2
Column 0: sip (INT32)
Column 1: dip (INT32)
--- Row Group 0 ---
--- Total Bytes 8425 ---
  Rows: 1000---
Column 0
, Values: 1000, Null Values: 0, Distinct Values: 0
  Max: 99900, Min: 0
  Compression: GZIP, Encodings: PLAIN_DICTIONARY PLAIN RLE
  Uncompressed Size: 5306, Compressed Size: 3109
Column 1
, Values: 1000, Null Values: 0, Distinct Values: 0
  Max: 10989, Min: 0
  Compression: SNAPPY, Encodings: PLAIN_DICTIONARY PLAIN RLE
  Uncompressed Size: 5306, Compressed Size: 5316

上面的代码为整个 table/file 写出一个行组。根据您拥有的数据行数,这可能并不理想,因为太多行会导致 "fallback to plain encoding"(请参阅 Ryan Blue presentation,幻灯片 31-34)。要为每个 table/file 编写多个行组,请将 chunk_size 参数设置得更小(下面我除以 2 以获得每个 table/file 两个行组):

parquet::arrow::WriteTable(*arrow_table, ::arrow::default_memory_pool(),
       fileOutputStream, sip_array->length()/2, props);

这仍然不理想。在调用 parquet::arrow::WriteTable() 之前,文件的所有数据必须在箭头 table 中 buffered/stored,因为该函数会打开和关闭文件。我想为每个文件写入多个行组,但我只想在内存中一次 buffer/store 一两个行组数据。下面的代码实现了这一点。它基于 parquet/arrow/writer.cc:

中的代码
#include <parquet/util/memory.h>
...
auto arrow_output_stream = std::make_shared<parquet::ArrowOutputStream>(file_output_stream);
std::unique_ptr<parquet::arrow::FileWriter> writer;
parquet::arrow::FileWriter::Open(*(arrow_table->schema()), ::arrow::default_memory_pool(), 
     arrow_output_stream, props, parquet::arrow::default_arrow_writer_properties(), 
     &writer);
// write two row groups for the first table
writer->WriteTable(*arrow_table, sip_array->length()/2);
// ... code here would generate a new table ...
// for now, we'll just write out the same table again, to
// simulate writing more data to the same file, this
// time as one row group
writer->WriteTable(*arrow_table, sip_array->length());
writer->Close();
$ /opt/parquet-cpp/build/release/parquet_reader --only-metadata test.parquet                        File Name: test.parquet
Version: 0
Created By: parquet-cpp version 1.2.1-SNAPSHOT
Total rows: 2000000
Number of RowGroups: 3   <<--------
...
--- Row Group 0 ---
--- Total Bytes 2627115 ---
  Rows: 500000---
...
--- Row Group 1 ---
--- Total Bytes 2626873 ---
  Rows: 500000---
...
--- Row Group 2 ---
--- Total Bytes 4176371 ---
  Rows: 1000000---