如何解决 运行 时间异常:尝试从非全局 WindowFn 获取 GlobalWindow 的侧输入 window
How do solve run time exception: Attempted to get side input window for GlobalWindow from non-global WindowFn
我正在努力弄清楚如何解决我在这个数据流作业中遇到的问题。我在 apache beam archives question thread 上看到了类似的帖子,但我不太明白如何使用这些信息。
本质上,数据正在流式传输到 Big Query(有效),我正在尝试将这些 BQ 行写入同一个数据流作业中的 spanner,这会引发以下运行时异常:
java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn.getSideInputWindow(PartitioningWindowFn.java:47) ....
可以在这里看到数据流图的相关部分data flow graph,我用来写入 spanner 的代码在这里:
sensorReports
.apply("WindowSensorReportByMonth",
Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(5))).withAllowedLateness(Duration.ZERO).discardingFiredPanes()
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.discardingFiredPanes())
.apply("CreateSensorReportMutation", ParDo.of(new RowToMutationTransform()))
.apply("Write to Spanner",
SpannerIO.write()
.withDatabaseId(propertiesUtils.getSpannerDBId())
.withInstanceId(propertiesUtils.getSpannerInstanceId())
.withProjectId(propertiesUtils.getSpannerProjectId())
.withBatchSizeBytes(0));
SpannerIO.write() 使用全局 window 在内部读取数据库模式并将其用作辅助输入,因此您的非全局-windowed 突变与它发生冲突.
在传递给 Spanner.IO.write()
之前,您可以将所有突变放入全局 window
.apply("To Global Window", Window.into(new GlobalWindows()))
但在 BEAM 版本 2.5-2.8 中,这将导致错误或没有任何内容被写入(因为 SpannerIO 从不支持流式管道)。
编辑后的答案:
但是BEAM 2.9.0之前的版本不支持streaming pipelines。 V2.4 和更早版本可以,前提是您不向其传递 windowed PCollection。
你会很高兴听到一切都是 fixed in version 2.9 (release in progress) where the SpannerIO both supports streaming writes and handles the windowing correctly。
我正在努力弄清楚如何解决我在这个数据流作业中遇到的问题。我在 apache beam archives question thread 上看到了类似的帖子,但我不太明白如何使用这些信息。
本质上,数据正在流式传输到 Big Query(有效),我正在尝试将这些 BQ 行写入同一个数据流作业中的 spanner,这会引发以下运行时异常:
java.lang.IllegalArgumentException: Attempted to get side input window for GlobalWindow from non-global WindowFn
org.apache.beam.sdk.transforms.windowing.PartitioningWindowFn.getSideInputWindow(PartitioningWindowFn.java:47) ....
可以在这里看到数据流图的相关部分data flow graph,我用来写入 spanner 的代码在这里:
sensorReports
.apply("WindowSensorReportByMonth",
Window.<TableRow>into(FixedWindows.of(Duration.standardMinutes(5))).withAllowedLateness(Duration.ZERO).discardingFiredPanes()
.triggering(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1)))
.discardingFiredPanes())
.apply("CreateSensorReportMutation", ParDo.of(new RowToMutationTransform()))
.apply("Write to Spanner",
SpannerIO.write()
.withDatabaseId(propertiesUtils.getSpannerDBId())
.withInstanceId(propertiesUtils.getSpannerInstanceId())
.withProjectId(propertiesUtils.getSpannerProjectId())
.withBatchSizeBytes(0));
SpannerIO.write() 使用全局 window 在内部读取数据库模式并将其用作辅助输入,因此您的非全局-windowed 突变与它发生冲突.
在传递给 Spanner.IO.write()
.apply("To Global Window", Window.into(new GlobalWindows()))
但在 BEAM 版本 2.5-2.8 中,这将导致错误或没有任何内容被写入(因为 SpannerIO 从不支持流式管道)。
编辑后的答案:
但是BEAM 2.9.0之前的版本不支持streaming pipelines。 V2.4 和更早版本可以,前提是您不向其传递 windowed PCollection。
你会很高兴听到一切都是 fixed in version 2.9 (release in progress) where the SpannerIO both supports streaming writes and handles the windowing correctly。