Kafka Streams GlobalKTable 同步到应用程序
Kafka Streams GlobalKTable sychronization to applications
使用正常的 k-streams,kafka 将每个应用程序的偏移量存储在其内部偏移量主题上。在应用程序重新启动时,应用程序会根据 auto.offset.reset
策略重新处理主题。这个确实有解释here.
我正在使用 kafka-stream 的 GlobalKTable
通过应用程序复制数据。但是,我对应用程序的重启感到有点困惑,因为 它没有填充在 id (StreamsConfig.APPLICATION_ID_CONFIG
) 在重启后没有改变 的应用程序上(由于部署或崩溃) .每当我使用新 ID 启动流应用程序的新实例时,都会填充 GlobalKTable
。
A GlobalKTable
没有什么不同,只是启用了日志压缩功能的主题。创建 StreamsBuilder#globalTable
的 javadoc 指出:
streamsBuilder.globalTable("some-topic", Materialized.as("kglobaltable-store"))
Note that GlobalKTable
always applies "auto.offset.reset" strategy
"earliest" regardless of the specified value in StreamsConfig
.
因此我预计,不管应用程序 ID,我的流应用程序从一开始就读取 kglobaltable-store
主题并像这样在本地填充存储 github issue .似乎 javadoc 引用的主题是 some-topic
而不是 kglobaltable-store
.
这是 GlobalKTable
的预期行为吗?此外,是否有关于支持 GlobalKTables
?
的主题的保留政策
当我们在 some-topic
上有保留策略时,此行为还会导致 kglobaltable-store
主题上的数据过时。示例如下:
在时间 t0,让;
一些主题:(1, a) -> (2, b) -> (1, c)
kglobaltable-store: [(1, c), (2, b)]
经过一段时间 (2, b) 保留后,我启动我的流应用程序(使用新 ID)并且我的 GlobalKTable
仅存储记录 (1, c) 如果是这种情况.
编辑: 我正在使用 InMemoryKeyValueStore
.
因为您正在使用 InMemoryKeyValueStore
我假设您遇到了这个错误:https://issues.apache.org/jira/browse/KAFKA-6711
作为解决方法,您可以删除全局存储的本地检查点文件 (cf )——这将在重新启动时触发引导。或者您切换回默认 RocksDB
商店。
顺便说一句:如果您直接将主题阅读为 table 或 global-table,Kafka Streams 不会为 fault-tolerance 创建额外的更新日志主题,而是使用原始输入主题为此目的(这减少了 Kafka 集群内的存储需求)。因此,这些输入主题应该启用日志压缩。
使用正常的 k-streams,kafka 将每个应用程序的偏移量存储在其内部偏移量主题上。在应用程序重新启动时,应用程序会根据 auto.offset.reset
策略重新处理主题。这个确实有解释here.
我正在使用 kafka-stream 的 GlobalKTable
通过应用程序复制数据。但是,我对应用程序的重启感到有点困惑,因为 它没有填充在 id (StreamsConfig.APPLICATION_ID_CONFIG
) 在重启后没有改变 的应用程序上(由于部署或崩溃) .每当我使用新 ID 启动流应用程序的新实例时,都会填充 GlobalKTable
。
A GlobalKTable
没有什么不同,只是启用了日志压缩功能的主题。创建 StreamsBuilder#globalTable
的 javadoc 指出:
streamsBuilder.globalTable("some-topic", Materialized.as("kglobaltable-store"))
Note that
GlobalKTable
always applies "auto.offset.reset" strategy "earliest" regardless of the specified value inStreamsConfig
.
因此我预计,不管应用程序 ID,我的流应用程序从一开始就读取 kglobaltable-store
主题并像这样在本地填充存储 github issue .似乎 javadoc 引用的主题是 some-topic
而不是 kglobaltable-store
.
这是 GlobalKTable
的预期行为吗?此外,是否有关于支持 GlobalKTables
?
当我们在 some-topic
上有保留策略时,此行为还会导致 kglobaltable-store
主题上的数据过时。示例如下:
在时间 t0,让;
一些主题:(1, a) -> (2, b) -> (1, c)
kglobaltable-store: [(1, c), (2, b)]
经过一段时间 (2, b) 保留后,我启动我的流应用程序(使用新 ID)并且我的 GlobalKTable
仅存储记录 (1, c) 如果是这种情况.
编辑: 我正在使用 InMemoryKeyValueStore
.
因为您正在使用 InMemoryKeyValueStore
我假设您遇到了这个错误:https://issues.apache.org/jira/browse/KAFKA-6711
作为解决方法,您可以删除全局存储的本地检查点文件 (cf RocksDB
商店。
顺便说一句:如果您直接将主题阅读为 table 或 global-table,Kafka Streams 不会为 fault-tolerance 创建额外的更新日志主题,而是使用原始输入主题为此目的(这减少了 Kafka 集群内的存储需求)。因此,这些输入主题应该启用日志压缩。