Apache Beam GCP 在动态创建的目录中上传 Avro
Apache Beam GCP upload Avro in dynamically created directories
我想在 GCP 中创建一个流式 Apache Beam 管道,它从 Google Pub/Sub 读取数据并将其推送到 GCS。我可以从 Pub/Sub 读取数据。
我当前的代码看起来像那样(从 GCP Apache Beam 模板之一中提取)
pipeline.apply("Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
.apply(
"Write File(s)",
AvroIO.write(AdEvent.class)
.to(
new WindowedFilenamePolicy(
options.getOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>) input ->
FileBasedSink.convertToFileResourceIfPossible(input)))
.withWindowedWrites()
.withNumShards(options.getNumShards()));
它可以生成如下所示的文件
windowed-file2020-04-28T09:00:00.000Z-2020-04-28T09:02:00.000Z-pane-0-last-00-of-01.avro
我想将 GCS 中的数据存储在动态创建的目录中。在以下目录 2020-04-28/01
、2020-04-28/02
等中 - 01
和 02
是子目录,表示数据流流管道处理数据的时间。
示例:
gs://data/2020-04-28/01/0000000.avro
gs://data/2020-04-28/01/0000001.avro
gs://data/2020-04-28/01/....
gs://data/2020-04-28/02/0000000.avro
gs://data/2020-04-28/02/0000001.avro
gs://data/2020-04-28/02/....
gs://data/2020-04-28/03/0000000.avro
gs://data/2020-04-28/03/0000001.avro
gs://data/2020-04-28/03/....
...
0000000、0000001 等是我用来说明的简单文件名,我不希望这些文件是按顺序命名的。
您认为这在 GCP 数据流流设置中可行吗?
您可以实现自己的 FilenamePolicy (perhaps using WindowedFilenamePolicy
as a starting point) to use your own logic for defining output paths. You can use /
characters in your file paths as you wish (by the way, GCS buckets are "flat",它们实际上没有目录)。要获得 dates/times,windowedFilename
方法将 window 信息作为参数,因此您可以在 return 值中使用它,但您认为合适。
您需要使用 writeDynamic
而不是 Write
。不幸的是,如前所述 here,AvroIO 本身不支持 writeDynamic,您需要使用 FileIO。
下面是一个使用 Scio 在 Scala 中实现的示例
val dynamicOutput: FileIO.Write[String, GenericRecord] = FileIO
.writeDynamic[String, GenericRecord]()
.by((input: GenericRecord) => {
input.get("id").toString.toUpperCase + "/"
})
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1) // Since input is small, restrict to one file per bucket
.withNaming(
new SerializableFunction[String, FileNaming] {
override def apply(partitionCol: String): FileNaming = {
FileIO.Write.defaultNaming(s"Id=$partitionCol", ".parquet")
}
}
)
.via(Contextful.fn[GenericRecord,GenericRecord](
new SerializableFunction[GenericRecord,GenericRecord]{
override def apply(input: GenericRecord): GenericRecord = {
val r = new GenericData.Record(outputSchema)
r.put("amount",input.get("amount"))
r.put("name",input.get("name"))
r.put("type",input.get("type"))
r
}
}
),
ParquetIO.sink(outputSchema)
)
.to("gs://bucket-name/table-name")
在上面的示例中,我使用了 GenericRecord 类型并指定了架构并创建了动态分区并以 Parquet 格式写入文件。您可以选择以任何格式写入数据。
您可以使用 Pub/Sub to Cloud Storage Avro template 是一个流式管道,它从 Pub/Sub 主题读取数据并将 Avro 文件写入指定的 Cloud Storage 存储桶。此管道支持可选的用户提供的 window 持续时间,用于执行 windowed 写入。
我想在 GCP 中创建一个流式 Apache Beam 管道,它从 Google Pub/Sub 读取数据并将其推送到 GCS。我可以从 Pub/Sub 读取数据。 我当前的代码看起来像那样(从 GCP Apache Beam 模板之一中提取)
pipeline.apply("Read PubSub Events",
PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
.apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
.apply(
options.getWindowDuration() + " Window",
Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
.apply(
"Write File(s)",
AvroIO.write(AdEvent.class)
.to(
new WindowedFilenamePolicy(
options.getOutputDirectory(),
options.getOutputFilenamePrefix(),
options.getOutputShardTemplate(),
options.getOutputFilenameSuffix()))
.withTempDirectory(NestedValueProvider.of(
options.getAvroTempDirectory(),
(SerializableFunction<String, ResourceId>) input ->
FileBasedSink.convertToFileResourceIfPossible(input)))
.withWindowedWrites()
.withNumShards(options.getNumShards()));
它可以生成如下所示的文件
windowed-file2020-04-28T09:00:00.000Z-2020-04-28T09:02:00.000Z-pane-0-last-00-of-01.avro
我想将 GCS 中的数据存储在动态创建的目录中。在以下目录 2020-04-28/01
、2020-04-28/02
等中 - 01
和 02
是子目录,表示数据流流管道处理数据的时间。
示例:
gs://data/2020-04-28/01/0000000.avro
gs://data/2020-04-28/01/0000001.avro
gs://data/2020-04-28/01/....
gs://data/2020-04-28/02/0000000.avro
gs://data/2020-04-28/02/0000001.avro
gs://data/2020-04-28/02/....
gs://data/2020-04-28/03/0000000.avro
gs://data/2020-04-28/03/0000001.avro
gs://data/2020-04-28/03/....
...
0000000、0000001 等是我用来说明的简单文件名,我不希望这些文件是按顺序命名的。 您认为这在 GCP 数据流流设置中可行吗?
您可以实现自己的 FilenamePolicy (perhaps using WindowedFilenamePolicy
as a starting point) to use your own logic for defining output paths. You can use /
characters in your file paths as you wish (by the way, GCS buckets are "flat",它们实际上没有目录)。要获得 dates/times,windowedFilename
方法将 window 信息作为参数,因此您可以在 return 值中使用它,但您认为合适。
您需要使用 writeDynamic
而不是 Write
。不幸的是,如前所述 here,AvroIO 本身不支持 writeDynamic,您需要使用 FileIO。
下面是一个使用 Scio 在 Scala 中实现的示例
val dynamicOutput: FileIO.Write[String, GenericRecord] = FileIO
.writeDynamic[String, GenericRecord]()
.by((input: GenericRecord) => {
input.get("id").toString.toUpperCase + "/"
})
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1) // Since input is small, restrict to one file per bucket
.withNaming(
new SerializableFunction[String, FileNaming] {
override def apply(partitionCol: String): FileNaming = {
FileIO.Write.defaultNaming(s"Id=$partitionCol", ".parquet")
}
}
)
.via(Contextful.fn[GenericRecord,GenericRecord](
new SerializableFunction[GenericRecord,GenericRecord]{
override def apply(input: GenericRecord): GenericRecord = {
val r = new GenericData.Record(outputSchema)
r.put("amount",input.get("amount"))
r.put("name",input.get("name"))
r.put("type",input.get("type"))
r
}
}
),
ParquetIO.sink(outputSchema)
)
.to("gs://bucket-name/table-name")
在上面的示例中,我使用了 GenericRecord 类型并指定了架构并创建了动态分区并以 Parquet 格式写入文件。您可以选择以任何格式写入数据。
您可以使用 Pub/Sub to Cloud Storage Avro template 是一个流式管道,它从 Pub/Sub 主题读取数据并将 Avro 文件写入指定的 Cloud Storage 存储桶。此管道支持可选的用户提供的 window 持续时间,用于执行 windowed 写入。