使用动态日期时间从 Dataflow 写入 Cloud Storage
Write to Cloud Storage from Dataflow with dynamic datetime
我们正在构建写入云存储的 Apache Beam (Java SDK) 管道。我们正在使用 TextIO.write()
转换写入存储。在此操作中,我们希望根据当前日期时间动态更改存储文件的子目录。
这是流式传输管道的一部分。理想情况下,我们希望部署它并让 Beam 作业根据处理日期时间动态更改保存文件的文件夹子目录。
我们当前的管道转换如下所示:
DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
myPCollection.apply(TextIO.write().to("gs://my-bucket/%s", dtfOut.print(new DateTime()));
此代码的问题在于,函数返回的 DateTime 值停留在与将管道部署到 Google Cloud Dataflow 时相同的值。我们希望在处理传入消息时根据日期时间动态更改子目录结构。
我打算:
- 从 ParDo 函数获取日期时间。
- 创建一个新的 ParDo 函数并将消息用作主要输入,并将来自另一个 ParDo 函数的日期时间作为辅助输入传递。
这是最好的方法吗? Apache Beam 中是否有内置工具可以解决我们的用例?
FileIO 提供了 writeDynamic() 方法,允许将 pCollection 的每个项目定向到给定项目本身内容的不同目录或文件。
我在下面放置了一个我创建的简单示例,只是为了演示:
public class ExamplePipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline pipeline = Pipeline.create(options);
Create.Values<String> sampleData = Create.of("ABC","DEF", "GHI");
pipeline.apply(sampleData)
.apply("WritingDynamic", FileIO.<PartitionData, String>writeDynamic()
.by(event -> new PartitionData())
.withDestinationCoder(AvroCoder.of(PartitionData.class))
.via(Contextful.fn(event -> event), TextIO.sink())
.to("gs://my-bucket/")
.withNaming(partitionData -> FileIO.Write.defaultNaming(partitionData.getPath(), ".txt")));
pipeline.run().waitUntilFinish();
}
public static class PartitionData implements Serializable {
private static final long serialVersionUID = 4549174774076944665L;
public String getPath() {
LocalDateTime writtingMoment = LocalDateTime.now(ZoneOffset.UTC);
int year = writtingMoment.getYear();
int month = writtingMoment.getMonthValue();
int day = writtingMoment.getDayOfMonth();
return String.format("%d/%02d/%02d/", year, month, day);
}
}
上面的代码将保存到一个结构中:
gs://my-bucket/${year}/%{month}/%{day}/... .txt
在 by()
方法中,我使用了我的数据分区器,我称之为 PartitionData
。
via()
应该return你要写的内容。
to()
将成为您路径的基础部分。
在 withNaming
中,您真正构建了路径的最后一部分。
通常我会在事件本身上加上时间戳,告诉它们在现实中发生的时间,然后您可以从事件中获取而不是使用 LocalDateTime.now
。
我们正在构建写入云存储的 Apache Beam (Java SDK) 管道。我们正在使用 TextIO.write()
转换写入存储。在此操作中,我们希望根据当前日期时间动态更改存储文件的子目录。
这是流式传输管道的一部分。理想情况下,我们希望部署它并让 Beam 作业根据处理日期时间动态更改保存文件的文件夹子目录。
我们当前的管道转换如下所示:
DateTimeFormatter dtfOut = DateTimeFormat.forPattern("yyyy/MM/dd");
myPCollection.apply(TextIO.write().to("gs://my-bucket/%s", dtfOut.print(new DateTime()));
此代码的问题在于,函数返回的 DateTime 值停留在与将管道部署到 Google Cloud Dataflow 时相同的值。我们希望在处理传入消息时根据日期时间动态更改子目录结构。
我打算:
- 从 ParDo 函数获取日期时间。
- 创建一个新的 ParDo 函数并将消息用作主要输入,并将来自另一个 ParDo 函数的日期时间作为辅助输入传递。
这是最好的方法吗? Apache Beam 中是否有内置工具可以解决我们的用例?
FileIO 提供了 writeDynamic() 方法,允许将 pCollection 的每个项目定向到给定项目本身内容的不同目录或文件。
我在下面放置了一个我创建的简单示例,只是为了演示:
public class ExamplePipeline {
public static void main(String[] args) {
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
Pipeline pipeline = Pipeline.create(options);
Create.Values<String> sampleData = Create.of("ABC","DEF", "GHI");
pipeline.apply(sampleData)
.apply("WritingDynamic", FileIO.<PartitionData, String>writeDynamic()
.by(event -> new PartitionData())
.withDestinationCoder(AvroCoder.of(PartitionData.class))
.via(Contextful.fn(event -> event), TextIO.sink())
.to("gs://my-bucket/")
.withNaming(partitionData -> FileIO.Write.defaultNaming(partitionData.getPath(), ".txt")));
pipeline.run().waitUntilFinish();
}
public static class PartitionData implements Serializable {
private static final long serialVersionUID = 4549174774076944665L;
public String getPath() {
LocalDateTime writtingMoment = LocalDateTime.now(ZoneOffset.UTC);
int year = writtingMoment.getYear();
int month = writtingMoment.getMonthValue();
int day = writtingMoment.getDayOfMonth();
return String.format("%d/%02d/%02d/", year, month, day);
}
}
上面的代码将保存到一个结构中:
gs://my-bucket/${year}/%{month}/%{day}/... .txt
在 by()
方法中,我使用了我的数据分区器,我称之为 PartitionData
。
via()
应该return你要写的内容。
to()
将成为您路径的基础部分。
在 withNaming
中,您真正构建了路径的最后一部分。
通常我会在事件本身上加上时间戳,告诉它们在现实中发生的时间,然后您可以从事件中获取而不是使用 LocalDateTime.now
。