Kafka Streams GlobalKTable 同步到应用程序

Kafka Streams GlobalKTable sychronization to applications

使用正常的 k-streams,kafka 将每个应用程序的偏移量存储在其内部偏移量主题上。在应用程序重新启动时,应用程序会根据 auto.offset.reset 策略重新处理主题。这个确实有解释here.

我正在使用 kafka-stream 的 GlobalKTable 通过应用程序复制数据。但是,我对应用程序的重启感到有点困惑,因为 它没有填充在 id (StreamsConfig.APPLICATION_ID_CONFIG) 在重启后没有改变 的应用程序上(由于部署或崩溃) .每当我使用新 ID 启动流应用程序的新实例时,都会填充 GlobalKTable

A GlobalKTable 没有什么不同,只是启用了日志压缩功能的主题。创建 StreamsBuilder#globalTable 的 javadoc 指出:

streamsBuilder.globalTable("some-topic", Materialized.as("kglobaltable-store"))

Note that GlobalKTable always applies "auto.offset.reset" strategy "earliest" regardless of the specified value in StreamsConfig.

因此我预计,不管应用程序 ID,我的流应用程序从一开始就读取 kglobaltable-store 主题并像这样在本地填充存储 github issue .似乎 javadoc 引用的主题是 some-topic 而不是 kglobaltable-store.

这是 GlobalKTable 的预期行为吗?此外,是否有关于支持 GlobalKTables?

的主题的保留政策

当我们在 some-topic 上有保留策略时,此行为还会导致 kglobaltable-store 主题上的数据过时。示例如下:

在时间 t0,让;

一些主题:(1, a) -> (2, b) -> (1, c)

kglobaltable-store: [(1, c), (2, b)]

经过一段时间 (2, b) 保留后,我启动我的流应用程序(使用新 ID)并且我的 GlobalKTable 仅存储记录 (1, c) 如果是这种情况.

编辑: 我正在使用 InMemoryKeyValueStore.

因为您正在使用 InMemoryKeyValueStore 我假设您遇到了这个错误:https://issues.apache.org/jira/browse/KAFKA-6711

作为解决方法,您可以删除全局存储的本地检查点文件 (cf )——这将在重新启动时触发引导。或者您切换回默认 RocksDB 商店。

顺便说一句:如果您直接将主题阅读为 table 或 global-table,Kafka Streams 不会为 fault-tolerance 创建额外的更新日志主题,而是使用原始输入主题为此目的(这减少了 Kafka 集群内的存储需求)。因此,这些输入主题应该启用日志压缩。