在 Kafka Streams 中重建状态存储是否会将重复记录传播到下游主题?

Does rebuilding state stores in Kafka Streams propagate duplicate records to downstream topics?

我目前正在将 Kafka Streams 用于有状态应用程序。不过,状态 不是 存储在 Kafka 状态存储中,而是暂时存储在内存中。这意味着每当我重新启动应用程序时,所有状态都会丢失,并且必须通过从头开始处理所有记录来重建它。

在对 Kafka 状态存储进行了一些研究之后,这似乎正是我正在寻找的在应用程序重启之间保持状态(在内存中或磁盘上)的解决方案。然而,我发现网上的资源缺少一些非常重要的细节,所以我仍然有几个问题想知道这究竟是如何工作的:

状态存储使用它们自己的 changelog 主题,kafka-streams 状态存储负责从它们加载。如果您的状态存储未初始化,您的 kafka-streams 应用程序将使用 EARLIEST 从更新日志主题中恢复其本地状态存储,因为它必须读取每条记录。

这意味着全新实例的启动顺序大致为:

  • 观察到没有本地状态存储缓存
  • 通过使用 statestore 的更改日志主题加载本地状态存储(状态存储的主题名称是 <state-store-name>-changelog
  • 读取每条记录并相应地更新本地 rocksDB 实例
  • 不要发出任何东西,因为这是一个应用程序服务,而不是您的实际拓扑
  • 根据您配置拓扑的方式,使用 EARLIESTLATEST 读取您的消费者组偏移量。如果您的消费者群体还没有任何抵消,这不仅仅是一个问题
  • 处理东西,根据拓扑发出记录

将实际拓扑的 auto.offset.reset 设置为 LATEST 还是 EARLIEST 由您决定。如果它们丢失了,或者您创建了一个新组,它会在可能跳过的记录(LATEST)与处理旧记录的重新处理和重复数据删除(EARLIEST)、

之间取得平衡

长话短说:状态恢复不同于处理,由 kafka-streams 自行处理。

If the stream is set to start from offset latest, will the state still be (re)calculated from all the previous records?

如果您重新启动同一个应用程序(例如,在之前停止它之后),那么状态将不会被重新计算重新处理原始输入数据。相反,状态将从其 "backup" 恢复(每个状态存储或 KTable 都持久存储在 Kafka 主题中,即所谓的 "changelog topic" 那个 table/state 存储用于此目的)所以它的数据正是应用程序停止时的数据。此行为使您能够无缝地停止+重新启动您的应用程序,而无需跳过 "stop" 和 "restart" 之间到达的记录。

但是您需要注意一个不同的警告:设置偏移起点的配置(latestearliest)仅在您 运行 时使用您的 Kafka Streams 应用程序 第一次 。之后,每当您停止+重新启动您的应用程序时,它总是会从之前停止的地方继续。这是因为,如果应用程序至少 运行 一次,它已将其消费者偏移量信息存储在 Kafka 中,这使它知道一旦重新启动从何处自动恢复操作。

如果您需要始终(重新)开始的不同行为,例如latest 偏移量(因此可能会跳过停止应用程序和重新启动应用程序之间到达的记录),您必须 reset your Kafka Streams application。重置工具执行的步骤之一是从 Kafka 中删除应用程序的消费者偏移信息,这使得应用程序认为它以前从未启动过,可以这么说。

If previously already processed records need to be reprocessed in order to rebuild the state, will this propagate records through the rest of the Streams topology (e.g. InputTopic -> stateful processor -> OutputTopic, will this result in duplicated records in the OutputTopic because of rebuilding state)?

如上所述,默认情况下不会进行重新处理。在应用程序停止时,状态将自动重建为其之前的状态(双关语)。

仅当您手动重置您的应用程序(见上文)时才会重新处理,例如将应用程序配置为重新读取历史数据(例如在重置后将 auto.offset.reset 设置为 earliest)。