Kafka Streams:我们是否应该提前每个键的流时间来测试 Windowed 抑制?

Kafka Streams: Should we advance stream time per key to test Windowed suppression?

我从 This blog and this tutorial 了解到,为了用事件时间语义测试抑制,应该发送虚拟记录以提前流时间。 我试图通过这样做来提前时间。但这似乎不起作用,除非特定键的时间提前。

我有一个自定义 TimestampExtractor,它将我首选的“流媒体时间”与记录相关联。 我的流拓扑伪代码如下(我使用的是 Kafka Streams DSL API):

    source.mapValues(someProcessingLambda)
          .flatMap(flattenRecordsLambda)
          .groupByKey(Grouped.with(Serdes.ByteArray(), Serdes.ByteArray()))
          .windowedBy(TimeWindows.of(Duration.ofMinutes(10)).grace(Duration.ZERO))
          .aggregate(()->null, aggregationLambda)
          .suppress(Suppressed.untilWindowCloses(Suppressed.BufferConfig.unbounded()));

我的输入格式如下:

   1 - {"stream_time":"2019-04-09T11:08:36.000-04:00", id:"1", data:"..."}
   2 - {"stream_time":"2019-04-09T11:09:36.000-04:00", id:"1", data:"..."}
   3 - {"stream_time":"2019-04-09T11:18:36.000-04:00", id:"2", data:"..."}
   4 - {"stream_time":"2019-04-09T11:19:36.000-04:00", id:"2", data:"..."}
    .
    .

现在记录12属于一个10分钟window根据stream_time34属于另一个. 在 window 中,记录按照 id 聚合。 我预计记录 3 会发出流已经前进的信号并导致抑制发出对应于第一个 window 的数据。 但是,在我发送带有 id:1 的虚拟记录以提前该密钥的流时间之前,不会发出数据。

我对测试说明的理解有误吗?这是预期的行为吗?虚拟记录的密钥重要吗?

很抱歉给您带来麻烦。这确实是一个棘手的问题。我有一些想法可以添加一些操作来支持这种集成测试,但是在不破坏基本的流处理时间语义的情况下很难做到。

听起来您正在测试“真正的”KafkaStreams 应用程序,而不是使用 TopologyTestDriver 进行测试。我的第一个建议是,如果 TopologyTestDriver 满足您的需求,您将有更好的时间来验证您的应用程序语义。

在我看来,您的输入主题(以及您的应用程序)中可能有多个分区。如果密钥 1 进入一个分区,而密钥 3 进入另一个分区,您将看到您所观察到的情况。应用程序的每个分区独立地跟踪流时间。 TopologyTestDriver 工作得很好,因为它只使用一个分区,还因为它同步处理数据。否则,你将不得不制作你的“虚拟”时间推进消息,以进入与你试图清除的密钥相同的分区。

这将特别棘手,因为您的“flatMap().groupByKey()”将对数据进行重新分区。您必须制作虚拟消息,以便它在重新分区后进入正确的分区。或者您可以尝试将您的虚拟消息直接写入重新分区主题。

如果您确实需要使用 KafkaStreams 而不是 TopologyTestDriver 进行测试,我想最简单的方法就是为每个键编写一条“时间推进”消息,正如您在问题中所建议的那样。不是因为它是绝对必要的,而是因为它是满足所有这些警告的最简单方法。 我还要提到的是,我们正在对 Kafka Streams 中的流时间处理进行一些一般性改进,这应该会大大简化这种情况,但当然,现在这对你没有帮助。