将无界集合写入 GCS
Writing an an unbounded collection to GCS
我看到很多关于同一主题的问题。但是,我在写入 GCS 时仍然遇到问题。我正在阅读 pubsub 的主题并尝试将其推送到 GCS。我提到了 。但是,在最新的光束包中找不到 IOChannelUtils。
PCollection<String> details = pipeline
.apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));
PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) {
return "constant";
}
}));
PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes()).apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());
这是我从堆栈溢出中的许多其他类似主题中获取的。现在,我明白了,TextIO 确实支持使用 withWindowedWrites 和 withNumShards 的无界 PCollection 写入选项。
参考:
但是,我不明白,我应该怎么做。
我正在尝试按如下方式写入 GCS。
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites()
.withFilenamePolicy(policy).withNumShards(4));
我没有足够的积分来对 Stack Overflow 中的这些主题添加评论,因此我将其作为另一个问题提出。
查看此 Pub/Sub to GCS Pipeline,它提供了将窗口文件写入 GCS 的完整示例。
我可以通过如下所示修改窗口来解决这个问题
PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(30))
)).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes());
streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles()));
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
// OVERRIDE THE FILE NAME CREATION
}
}
虽然我可以这样解决,但我仍然不确定这里的窗口概念。当我找到它时,我会添加更多细节。如果有人有更多的了解,请添加更多详细信息。
谢谢
我看到很多关于同一主题的问题。但是,我在写入 GCS 时仍然遇到问题。我正在阅读 pubsub 的主题并尝试将其推送到 GCS。我提到了
PCollection<String> details = pipeline
.apply(PubsubIO.readStrings().fromTopic("/topics/<project>/sampleTopic"));
PCollection<KV<String, String>> keyedStream = details.apply(WithKeys.of(new SerializableFunction<String, String>() {
public String apply(String s) {
return "constant";
}
}));
PCollection<KV<String, Iterable<String>>> keyedWindows = keyedStream.apply(Window.<KV<String, String>>into(FixedWindows.of(ONE_MIN)).withAllowedLateness(ONE_DAY)
.triggering(AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(10))
.withLateFirings(AfterFirst.of(AfterPane.elementCountAtLeast(10),
AfterProcessingTime.pastFirstElementInPane().plusDelayOf(TEN_SECONDS))))
.discardingFiredPanes()).apply(GroupByKey.create());
PCollection<Iterable<String>> windows = keyedWindows.apply(Values.create());
这是我从堆栈溢出中的许多其他类似主题中获取的。现在,我明白了,TextIO 确实支持使用 withWindowedWrites 和 withNumShards 的无界 PCollection 写入选项。
参考:
但是,我不明白,我应该怎么做。
我正在尝试按如下方式写入 GCS。
FilenamePolicy policy = DefaultFilenamePolicy.constructUsingStandardParameters(
StaticValueProvider.of(outputDirectory), DefaultFilenamePolicy.DEFAULT_SHARD_TEMPLATE, "");
details.apply(TextIO.write().to("gs://<bucket>/topicfile").withWindowedWrites()
.withFilenamePolicy(policy).withNumShards(4));
我没有足够的积分来对 Stack Overflow 中的这些主题添加评论,因此我将其作为另一个问题提出。
查看此 Pub/Sub to GCS Pipeline,它提供了将窗口文件写入 GCS 的完整示例。
我可以通过如下所示修改窗口来解决这个问题
PCollection<String> streamedDataWindows = streamedData.apply(Window.<String>into(new GlobalWindows())
.triggering(Repeatedly
.forever(AfterProcessingTime
.pastFirstElementInPane()
.plusDelayOf(Duration.standardSeconds(30))
)).withAllowedLateness(Duration.standardDays(1)).discardingFiredPanes());
streamedDataWindows.apply(TextIO.write().to(CLOUD_STORAGE).withWindowedWrites().withNumShards(1).withFilenamePolicy(new PerWindowFiles()));
public static class PerWindowFiles extends FileBasedSink.FilenamePolicy {
public ResourceId windowedFilename(ResourceId outputDirectory, WindowedContext context, String extension) {
// OVERRIDE THE FILE NAME CREATION
}
}
虽然我可以这样解决,但我仍然不确定这里的窗口概念。当我找到它时,我会添加更多细节。如果有人有更多的了解,请添加更多详细信息。 谢谢