Dataflow 将 GenericRecords 的 pCollection 写入 Parquet 文件
Dataflow writing a pCollection of GenericRecords to Parquet files
在 apache beam 步骤中,我有一个 KV<String, Iterable<KV<Long, GenericRecord>>>>
的 PCollection。
我想将 iterable 中的所有记录写入同一个镶木地板文件。我的代码片段如下
p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord>
.apply(ParDo.of(new MapKafkaGenericRecordValue(formatter, options.getFileNameDelimiter()))) //PCollection<KV<String, KV<Long, GenericRecord>>>
.apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>
现在我想把Iterable中的所有Records都写在同一个parquet文件中(通过KV键导出文件名)。
我找到了问题的解决方案。
在这一步 -
apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>
我将应用另一个转换,它将 return 仅将 Iterable 作为输出 pCollection。
`.apply(ParDo.of(new GetIterable())) //PCollection>>
其中 key 是我必须写入的文件的名称。
那么剩下的片段是
.apply(Flatten.iterables())
.apply(
FileIO.<String, KV<String, GenericRecord>>writeDynamic()
.by((SerializableFunction<KV<String, GenericRecord>, String>) KV::getKey)
.via(
Contextful.fn(
(SerializableFunction<KV<String, GenericRecord>, GenericRecord>) KV::getValue
),
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.withTempDirectory("/tmp/temp-beam")
.to(options.getGCSBucketUrl())
.withNumShards(1)
.withDestinationCoder(StringUtf8Coder.of())
)
在 apache beam 步骤中,我有一个 KV<String, Iterable<KV<Long, GenericRecord>>>>
的 PCollection。
我想将 iterable 中的所有记录写入同一个镶木地板文件。我的代码片段如下
p.apply(ParDo.of(new MapWithAvroSchemaAndConvertToGenericRecord())) // PCollection<GenericRecord>
.apply(ParDo.of(new MapKafkaGenericRecordValue(formatter, options.getFileNameDelimiter()))) //PCollection<KV<String, KV<Long, GenericRecord>>>
.apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>
现在我想把Iterable中的所有Records都写在同一个parquet文件中(通过KV键导出文件名)。
我找到了问题的解决方案。 在这一步 -
apply(GroupByKey.create()) //PCollection<KV<String, Iterable<KV<Long, GenericRecord>>>>>
我将应用另一个转换,它将 return 仅将 Iterable 作为输出 pCollection。 `.apply(ParDo.of(new GetIterable())) //PCollection>> 其中 key 是我必须写入的文件的名称。 那么剩下的片段是
.apply(Flatten.iterables())
.apply(
FileIO.<String, KV<String, GenericRecord>>writeDynamic()
.by((SerializableFunction<KV<String, GenericRecord>, String>) KV::getKey)
.via(
Contextful.fn(
(SerializableFunction<KV<String, GenericRecord>, GenericRecord>) KV::getValue
),
ParquetIO.sink(schema)
.withCompressionCodec(CompressionCodecName.SNAPPY)
)
.withTempDirectory("/tmp/temp-beam")
.to(options.getGCSBucketUrl())
.withNumShards(1)
.withDestinationCoder(StringUtf8Coder.of())
)