Apache Beam GCP 在动态创建的目录中上传 Avro

Apache Beam GCP upload Avro in dynamically created directories

我想在 GCP 中创建一个流式 Apache Beam 管道,它从 Google Pub/Sub 读取数据并将其推送到 GCS。我可以从 Pub/Sub 读取数据。 我当前的代码看起来像那样(从 GCP Apache Beam 模板之一中提取)

pipeline.apply("Read PubSub Events",
  PubsubIO.readMessagesWithAttributes().fromTopic(options.getInputTopic()))
                .apply("Map to Archive", ParDo.of(new PubsubMessageToArchiveDoFn()))
                .apply(
                        options.getWindowDuration() + " Window",
                        Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))
                .apply(
                        "Write File(s)",
                        AvroIO.write(AdEvent.class)
                                .to(
                                        new WindowedFilenamePolicy(
                                                options.getOutputDirectory(),
                                                options.getOutputFilenamePrefix(),
                                                options.getOutputShardTemplate(),
                                                options.getOutputFilenameSuffix()))
                                .withTempDirectory(NestedValueProvider.of(
                                        options.getAvroTempDirectory(),
                                        (SerializableFunction<String, ResourceId>) input ->
                                                FileBasedSink.convertToFileResourceIfPossible(input)))
                                .withWindowedWrites()
                                .withNumShards(options.getNumShards()));

它可以生成如下所示的文件 windowed-file2020-04-28T09:00:00.000Z-2020-04-28T09:02:00.000Z-pane-0-last-00-of-01.avro

我想将 GCS 中的数据存储在动态创建的目录中。在以下目录 2020-04-28/012020-04-28/02 等中 - 0102 是子目录,表示数据流流管道处理数据的时间。

示例:

gs://data/2020-04-28/01/0000000.avro
gs://data/2020-04-28/01/0000001.avro
gs://data/2020-04-28/01/....

gs://data/2020-04-28/02/0000000.avro
gs://data/2020-04-28/02/0000001.avro
gs://data/2020-04-28/02/....

gs://data/2020-04-28/03/0000000.avro
gs://data/2020-04-28/03/0000001.avro
gs://data/2020-04-28/03/....
...

0000000、0000001 等是我用来说明的简单文件名,我不希望这些文件是按顺序命名的。 您认为这在 GCP 数据流流设置中可行吗?

您可以实现自己的 FilenamePolicy (perhaps using WindowedFilenamePolicy as a starting point) to use your own logic for defining output paths. You can use / characters in your file paths as you wish (by the way, GCS buckets are "flat",它们实际上没有目录)。要获得 dates/times,windowedFilename 方法将 window 信息作为参数,因此您可以在 return 值中使用它,但您认为合适。

您需要使用 writeDynamic 而不是 Write。不幸的是,如前所述 here,AvroIO 本身不支持 writeDynamic,您需要使用 FileIO。

下面是一个使用 Scio 在 Scala 中实现的示例

    val dynamicOutput: FileIO.Write[String, GenericRecord] = FileIO
      .writeDynamic[String, GenericRecord]()
      .by((input: GenericRecord) => {
        input.get("id").toString.toUpperCase  + "/"
      })
      .withDestinationCoder(StringUtf8Coder.of())
      .withNumShards(1) // Since input is small, restrict to one file per bucket
      .withNaming(
        new SerializableFunction[String, FileNaming] {
          override def apply(partitionCol: String): FileNaming = {
            FileIO.Write.defaultNaming(s"Id=$partitionCol", ".parquet")
          }
        }
      )
      .via(Contextful.fn[GenericRecord,GenericRecord](
          new SerializableFunction[GenericRecord,GenericRecord]{
            override def apply(input: GenericRecord): GenericRecord = {
              val r = new GenericData.Record(outputSchema)
              r.put("amount",input.get("amount"))
              r.put("name",input.get("name"))
              r.put("type",input.get("type"))
              r
            }
          }
        ),
        ParquetIO.sink(outputSchema)
      )
      .to("gs://bucket-name/table-name")

在上面的示例中,我使用了 GenericRecord 类型并指定了架构并创建了动态分区并以 Parquet 格式写入文件。您可以选择以任何格式写入数据。

您可以使用 Pub/Sub to Cloud Storage Avro template 是一个流式管道,它从 Pub/Sub 主题读取数据并将 Avro 文件写入指定的 Cloud Storage 存储桶。此管道支持可选的用户提供的 window 持续时间,用于执行 windowed 写入。