正确的 Apache Beam 窗口策略用于连接两个数据流

Correct Apache beam windowing strategy for joining two streams of data

我有一个用例,我需要从两个流中读取数据并通过一个键将它们连接起来。一个流包含发送记录,另一个流包含对此发送的确认。所以我会在 stream1 中找到 1 个键为 123 的元素,在 stream2 中找到 1 个键为 123 的元素。发送和确认可能随时到达任一流,但我可以假设发送出现在确认之前。然而,ack 可能会丢失,发送永远不会被系统丢失。我想将等待 ack 的时间限制为 3 分钟(以简化)。

在这种情况下,最好的 window 策略是什么?

  1. 如果我使用固定 windows,发送和确认可能位于两个不同的 windows 中,我将无法进行连接。我可能会使用延迟 API,但在这种情况下我应该累积窗格吗?

  2. 我尝试使用间隔持续时间为 3 分钟的会话 window,但在 3 分钟结束时我没有看到触发器。加入发生在 6 分钟后。这是我的代码:

    加入代码:

    private static <T> Window<T> window() {
            return Window.<T>into(Sessions.withGapDuration(Duration.standardMinutes(3)))
                .triggering(AfterWatermark.pastEndOfWindow())
                .withAllowedLateness(Duration.ZERO)
                .discardingFiredPanes();
          }
    
    
    PCollection<KV<String, Record>> sendStream = KafkaIO.readRecord()
                .withTopic(pipelineOptions.getInputTopic())
                .window();
    
    PCollection<KV<String, Record>> ackStream = KafkaIO.readRecord()
                .withTopic(pipelineOptions.getInputTopic())
                .window();
    
    PCollection<KV<String,String>> joins =
           sendStream.apply("Joining Streams", Joins.innerJoin(ackStream))
                .apply(...);
    

    输出:

    19-01-2022 11:41:19 Send: 1mtz7n-kxi88e7a-89
    19-01-2022 11:42:19 Ack: 1mtz7n-kxi88e7a-89
    19-01-2022 11:48:33 JoinedByKey: 1mtz7n-kxi88e7a-89

我发现有 6 到 7 分钟的延迟。我的印象是会话应该只持续 3 分钟。

我还想在两个流中都收到事件时触发。如果我使用 earlyFiringTriggers,我会看到连接的数据在两个元素连接时输出一次,下一个在 window 的末尾输出。我也想避免这种情况,但我无法正确配置我的 window 方法。

有什么建议吗?

这个连接最好在全局中实现 window 使用状态来缓冲连接的两侧和计时器来清理状态。它可以被推广,因此在 https://issues.apache.org/jira/browse/BEAM-7386. There is an implementation under review/development at https://github.com/apache/beam/pull/15275 中被作为功能请求进行跟踪。您也许能够使用错误的详细信息和拉取请求来完成为您简化和定制的实施。