使用动态日期时间从 Dataflow 写入 Cloud Storage

Write to Cloud Storage from Dataflow with dynamic datetime

我们正在构建写入云存储的 Apache Beam (Java SDK) 管道。我们正在使用 TextIO.write() 转换写入存储。在此操作中,我们希望根据当前日期时间动态更改存储文件的子目录。

这是流式传输管道的一部分。理想情况下,我们希望部署它并让 Beam 作业根据处理日期时间动态更改保存文件的文件夹子目录。

我们当前的管道转换如下所示:

DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");

myPCollection.apply(TextIO.write().to("gs://my-bucket/%s", dtfOut.print(new DateTime()));

此代码的问题在于,函数返回的 DateTime 值停留在与将管道部署到 Google Cloud Dataflow 时相同的值。我们希望在处理传入消息时根据日期时间动态更改子目录结构。

我打算:

  1. 从 ParDo 函数获取日期时间。
  2. 创建一个新的 ParDo 函数并将消息用作主要输入,并将来自另一个 ParDo 函数的日期时间作为辅助输入传递。

这是最好的方法吗? Apache Beam 中是否有内置工具可以解决我们的用例?

FileIO 提供了 writeDynamic() 方法,允许将 pCollection 的每个项目定向到给定项目本身内容的不同目录或文件。

我在下面放置了一个我创建的简单示例,只是为了演示:

public class ExamplePipeline {
public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
    Pipeline pipeline = Pipeline.create(options);


    Create.Values<String> sampleData = Create.of("ABC","DEF", "GHI");

    pipeline.apply(sampleData)
            .apply("WritingDynamic", FileIO.<PartitionData, String>writeDynamic()
                    .by(event -> new PartitionData())
                    .withDestinationCoder(AvroCoder.of(PartitionData.class))
                    .via(Contextful.fn(event -> event), TextIO.sink())
                    .to("gs://my-bucket/")
                    .withNaming(partitionData -> FileIO.Write.defaultNaming(partitionData.getPath(), ".txt")));

    pipeline.run().waitUntilFinish();
}

public static class PartitionData implements Serializable {
    private static final long serialVersionUID = 4549174774076944665L;

    public String getPath() {
        LocalDateTime writtingMoment = LocalDateTime.now(ZoneOffset.UTC);
        int year = writtingMoment.getYear();
        int month = writtingMoment.getMonthValue();
        int day = writtingMoment.getDayOfMonth();

        return String.format("%d/%02d/%02d/", year, month, day);
    }
}

上面的代码将保存到一个结构中: gs://my-bucket/${year}/%{month}/%{day}/... .txt

by() 方法中,我使用了我的数据分区器,我称之为 PartitionData

via()应该return你要写的内容。

to() 将成为您路径的基础部分。

withNaming 中,您真正构建了路径的最后一部分。

通常我会在事件本身上加上时间戳,告诉它们在现实中发生的时间,然后您可以从事件中获取而不是使用 LocalDateTime.now