有没有办法在 Beam 的 ParDo 转换中创建 SpecificRecord 列表以编写 Parquet 文件?
Is there a way to create a list of SpecificRecord in a ParDo transformation in Beam for writing Parquet files?
我正在尝试在 Beam/Java 中编写数据流作业来处理来自 Pub/Sub 的一系列事件并写入 Parquet。 Pub/Sub中的事件采用JSON格式,每个事件可以生成一行或多行。我能够编写一个非常简单的示例,编写 ParDo 转换 returning 仅 1 条记录。 ParDo 看起来像这样
static class GenerateRecords extends DoFn<String, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
final GenericData.Record record = new GenericData.Record(schema);
String msg = context.element();
com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);
context.output(pRecord);
}
}
和管道的写入部分
.apply("Write to file",
FileIO.<GenericRecord>
write()
.via(
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.to(options.getOutputDirectory())
.withNumShards(options.getNumShards())
.withSuffix("pfile")
);
我的问题是,如何将这个 ParDo 转换推广到 return 记录列表?我试过 List 但这不起作用,ParquetIO.sink(schema) 在 "cannot resolve method via".
吠叫
您可以根据需要在 DoFn
中多次调用 context.output()
。因此,如果您知道在何种情况下需要发出多条记录的业务逻辑,那么您只需为每条输出记录调用 context.output(record)
。它应该比拥有 PCollection
个容器更简单。
PS:顺便说一句,我有一个 simple example 如何用 ParquetIO
和 AvroCoder
编写 GenericRecord
的方法,这也许会有所帮助。
我正在尝试在 Beam/Java 中编写数据流作业来处理来自 Pub/Sub 的一系列事件并写入 Parquet。 Pub/Sub中的事件采用JSON格式,每个事件可以生成一行或多行。我能够编写一个非常简单的示例,编写 ParDo 转换 returning 仅 1 条记录。 ParDo 看起来像这样
static class GenerateRecords extends DoFn<String, GenericRecord> {
@ProcessElement
public void processElement(ProcessContext context) {
final GenericData.Record record = new GenericData.Record(schema);
String msg = context.element();
com.tsp.de.schema.mschema pRecord = GenerateParquetRecord(msg);
context.output(pRecord);
}
}
和管道的写入部分
.apply("Write to file",
FileIO.<GenericRecord>
write()
.via(
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.to(options.getOutputDirectory())
.withNumShards(options.getNumShards())
.withSuffix("pfile")
);
我的问题是,如何将这个 ParDo 转换推广到 return 记录列表?我试过 List 但这不起作用,ParquetIO.sink(schema) 在 "cannot resolve method via".
吠叫您可以根据需要在 DoFn
中多次调用 context.output()
。因此,如果您知道在何种情况下需要发出多条记录的业务逻辑,那么您只需为每条输出记录调用 context.output(record)
。它应该比拥有 PCollection
个容器更简单。
PS:顺便说一句,我有一个 simple example 如何用 ParquetIO
和 AvroCoder
编写 GenericRecord
的方法,这也许会有所帮助。