当多个具有相同密钥的消息同时到达时,Kafka Ktable 更新日志(使用 toStream())缺少一些 ktable 更新

Kafka Ktable changelog (using toStream()) is missing some ktable updates when several messages with the same key arrive at the same time

我有一个输入流,我用它来创建一个 ktable。然后我使用 toStream() 方法使用 ktable changelog 创建一个输出流。问题是 toStream() 方法创建的流不包含来自已更新我的 KTable 的输入流的所有消息。这是我的代码:

final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
      aggregateKtableMethod,
      storageConf);

KStream<String, event> outputStream = KTable.toStream();

对于 inputStream 中的每条消息,我想在 outputStream 中获取一条消息。对于大多数消息,它运行良好,但在特定情况下我丢失了一些事件:如果我在很短的时间间隔(小于 5 秒)内收到 2 条具有相同密钥的消息。在这种情况下,我只在 outputStream 中收到第二个事件。

我想是因为Ktable的更新是通过一些批操作来完成的,但是我找不到任何相关的配置或文档。这是这些丢失事件的原因吗?你知道如何更改配置,这样我就不会丢失任何消息吗?

我找到了解决方案。问题出在我用来创建 ktable 的“storageConf”中,缓存可以。我只需要使用以下功能禁用它:

storageConf.withCachingDisabled();

final KTable<String, event> KTable = inputStream.groupByKey().aggregate(() -> null,
  aggregateKtableMethod,
  storageConf);

现在我的所有事件都在输出流中。