在 Apache Beam 中使用 defaultNaming 进行动态窗口写入
Using defaultNaming for dynamic windowed writes in Apache Beam
我正在关注 this post and the documentation 的答案,以便在管道末端对我的数据执行动态窗口写入。这是我目前所拥有的:
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(
FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withNaming(key -> defaultNaming(key, ".json")));
}
但是 NetBeans 在最后一行警告我语法错误:
FileNaming is not public in Write; cannot be accessed outside package
如何使 defaultNaming
可用于我的管道,以便我可以将其用于动态写入。或者,如果那不可能,我应该怎么做?
发布我的发现,以防其他人遇到此问题。
我之前尝试使用 writeDynamic()
时遇到了三个问题。
- 以前我一直在使用 Beam 2.3.0 版,它确实将
FileNaming
描述为 FileIO.Write
内部的 class。 Beam 2.4.0 将 FileNaming
定义为 public static interface
使其在外部可用。
- 完全 resolving/importing
defaultNaming
。而不是直接调用 defaultNaming
- 正如示例文档中调用的那样 - 它必须作为 FileIO.Write.defaultNaming
调用,因为 FileIO
是我实际导入的包。
- 执行动态写入还需要添加
withDestinationCoder
。
最终的解决方案是这样的。
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}
其中 Event::getKey
是在具有签名 public static String getKey(String event)
.
的同一包中定义的静态函数
这会执行 windowed 写入,每个 window 写入一个文件(由 .withNumShards(1)
方法定义)。这假定 window 已在上一步中定义。 GroupByKey
在写入之前不需要,因为只要明确定义分片数量,它就会在写入过程中完成。有关详细信息,请参阅 FileIO documentation "Writing files -> How many shards are generated per pane"。
我正在关注 this post and the documentation 的答案,以便在管道末端对我的数据执行动态窗口写入。这是我目前所拥有的:
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(
FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withNaming(key -> defaultNaming(key, ".json")));
}
但是 NetBeans 在最后一行警告我语法错误:
FileNaming is not public in Write; cannot be accessed outside package
如何使 defaultNaming
可用于我的管道,以便我可以将其用于动态写入。或者,如果那不可能,我应该怎么做?
发布我的发现,以防其他人遇到此问题。
我之前尝试使用 writeDynamic()
时遇到了三个问题。
- 以前我一直在使用 Beam 2.3.0 版,它确实将
FileNaming
描述为FileIO.Write
内部的 class。 Beam 2.4.0 将FileNaming
定义为public static interface
使其在外部可用。 - 完全 resolving/importing
defaultNaming
。而不是直接调用defaultNaming
- 正如示例文档中调用的那样 - 它必须作为FileIO.Write.defaultNaming
调用,因为FileIO
是我实际导入的包。 - 执行动态写入还需要添加
withDestinationCoder
。
最终的解决方案是这样的。
static void applyWindowedWrite(PCollection<String> stream) {
stream.apply(FileIO.<String, String>writeDynamic()
.by(Event::getKey)
.via(TextIO.sink())
.to("gs://some_bucket/events/")
.withDestinationCoder(StringUtf8Coder.of())
.withNumShards(1)
.withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}
其中 Event::getKey
是在具有签名 public static String getKey(String event)
.
这会执行 windowed 写入,每个 window 写入一个文件(由 .withNumShards(1)
方法定义)。这假定 window 已在上一步中定义。 GroupByKey
在写入之前不需要,因为只要明确定义分片数量,它就会在写入过程中完成。有关详细信息,请参阅 FileIO documentation "Writing files -> How many shards are generated per pane"。