Cloud Dataflow window 触发器覆盖已关闭 window 的值

Cloud Dataflow window trigger overwrites value from closed window

我正在编写一个从 Pub/Sub 读取的数据流(Beam SDK 2.0.0),对 window 中的元素进行计数,然后将计数作为时间序列存储在 BigTable 中。 windows 固定为 1 分钟的持续时间。

我的意图是使用触发器每秒更新当前 window 的值,以便获得当前时间 window 的实时更新。

但这似乎不起作用。该值每秒都会正确更新,但是一旦数据流在下一分钟开始工作,第一个值就会更新为零。所以基本上只有我的最后一个值是正确的,其余的都是零。

Pipeline pipeline = Pipeline.create(options);

PCollection<String> live = pipeline
        .apply("Read from PubSub", PubsubIO.readStrings()
        .fromSubscription("projects/..."))
        .apply("Window per minute",
            Window
                .<String>into(FixedWindows.of(Duration.standardMinutes(1)))
                .triggering(Repeatedly
                    .forever(AfterProcessingTime
                        .pastFirstElementInPane()
                        .plusDelayOf(Duration.standardSeconds(1)))                                         
                    .orFinally(AfterWatermark.pastEndOfWindow()))
                .accumulatingFiredPanes()
                .withAllowedLateness(Duration.ZERO)
            );

我试过使用触发代码,但无济于事。我现在唯一的选择是删除整个 .trigger 块。有没有人经历过类似的行为?

将我的问题报告给 Google 后,他们发现 Beam SDK 中的一些问题导致了此问题。有关这些链接的更多详细信息:

当 EOW 和 GC 计时器一起触发时(非零允许延迟)我们没有注意到它是最后一个窗格:https://issues.apache.org/jira/browse/BEAM-2505

如果处理时间计时器与 GC 计时器一起出现,则它们不会被正确忽略:https://issues.apache.org/jira/browse/BEAM-2502

处理时间计时器只是被解释为 GC 计时器,完全错误地比较来自不同时域的时间戳:https://issues.apache.org/jira/browse/BEAM-2504