Kafka Streams 时间戳提取器

Kafka Streams TimestampExtractor

大家好,我有一个关于 TimestampExtractor 和 Kafka Streams 的问题....

在我们的应用程序中,有可能接收到无序事件,因此我喜欢根据有效负载内的业务日期而不是它们放置在主题中的时间点对事件进行排序。

为此,我编写了一个自定义 TimestampExtractor,以便能够从有效负载中提取时间戳。直到我在这里告诉的一切都完美地工作但是当我为这个主题构建 KTable 时,我发现我收到的事件乱序(从业务角度来看它不是最后一个事件但它在最后收到)显示为对象的最后状态,而 ConsumerRecord 具有来自有效负载的时间戳。

我不知道可能是我错误地认为 Kafka Stream 会用 TimestampExtractor 解决这个乱序问题。

然后在调试过程中,我看到如果 TimestampExtractor returns -1 结果是 Kafka Streams 忽略了消息,并且 TimestampExtractor 还提供了最后一个接受事件的时间戳,所以我构建了一个逻辑来实现以下检查 (payloadTimestamp < previousTimestamp) return -1,这实现了我想要的逻辑,但我不确定我是否在危险水域航行。

我可以处理这样的逻辑吗,或者还有什么其他方法可以处理 Kafka 流中的乱序事件....

感谢解答..

目前 (Kafka 2.0),KTables 在更新时不考虑时间戳,因为假设输入主题中没有乱序数据。这个假设的原因是 "single writer principle"——假设对于压缩的 KTable 输入主题,每个键只有一个生产者,因此,不会有任何关于单键。

这是一个已知问题:https://issues.apache.org/jira/browse/KAFKA-6521

针对您的修复:这样做并非 100% 正确或安全 "hack":

  • 首先,假设您有两个不同的消息和两个不同的密钥 <key1, value1, 5>, <key2, value2, 3>。与时间戳为 5 的第一条记录相比,时间戳为 3 的第二条记录要晚一些。但是,两者具有不同的键,因此,您实际上想将第二条记录放入 KTable。仅当您有两个具有相同键的记录时,您才想删除迟到的数据 IHMO。
  • 其次,如果您有两条记录具有相同的键,而第二条记录乱序,并且您在处理第二条记录之前崩溃了,TimestampExtractor 会丢失第一条记录的时间戳。因此在重启时,它不会丢弃乱序记录。

要做到这一点,您需要在应用程序逻辑中过滤 "manually",而不是无状态和密钥不可知的 TimestampExtractor。除了通过 builder#table() 读取数据之外,您还可以将其作为流读取,然后应用 .groupByKey().reduce() 来构建 KTable。在你的Reducer逻辑中,你将新旧记录的时间戳与return具有较大时间戳的记录进行比较。