Kafka Stateful Stream processor with statestore:幕后
Kafka Stateful Stream processor with statestore: Behind the scenes
我正在努力理解 Stateful
Stream processor
。
据我所知,在这种类型的流处理器中,它使用 State Store
.
维持某种状态
我了解到,实现 State Store
的方法之一是使用 RocksDB
。假设以下 topology
(并且只有一个处理器是 stateful
)
A->B->C ;处理器 B 有状态 local state store 和 changelog
enabled。我使用的是低级别 API.
假设 sp 监听单个 kafka 主题,假设 topic-1
有 10 个分区。
我观察到,当应用程序启动时(不同物理机器中的 2 个实例和 num.stream.threads
= 5),然后 state store
它创建目录结构
有如下内容:
0_0 , 0_1, 0_2..... 0_9 (每台机器有5个所以总共10个分区).
我在网上浏览了一些 material,上面说我们应该创建一个 StoreBuilder
并使用 addStateStore()
附加拓扑 的 在处理器中创建状态存储。
喜欢:
topology.addStateStore(storeBuilder,"processorName")
Ref also: org.apache.kafka.streams.state.Store
我不明白将 storeBuilder 附加到拓扑与在处理器中实际创建状态存储有什么区别。它们有什么区别?
第二部分:对于 statestore,它创建的目录如:0_0、0_1 等。它是由谁以及如何创建的? kafka 主题(sp 正在监听)和为 State Store
创建的目录数量之间是否存在某种 1:1 映射?
I didn't understand what is the difference in attaching a storeBuilder to topology vs actually creating a statestore within processor. What is the differences between them?
为了让Kafka Streams 为你管理store(容错、迁移),Kafka Streams 需要知道store。因此,您给 Kafka Streams StoreBuilder
,Kafka Streams 会为您创建和管理商店。
如果您只是在处理器内部创建一个存储,Kafka Streams 不知道该存储并且该存储不会容错。
For statestore it creates directory like: 0_0, 0_1 etc. Who and how it gets created? Is there some sort of 1:1 mapping between the kafka topics (at which sp is listening) ande the number of directories that gets created for State Store?
是的,有一个映射。该存储在输入主题分区的数量上是共享的。每个分区还有一个 "task",任务目录名称为 y_z
,其中 y
是子拓扑编号,z
是分区编号。对于您的简单拓扑,您看到的所有目录只有一个子拓扑,具有相同的 0_
前缀。
因此,您的逻辑存储有 10 个物理分片。当相应的输入主题分区分配给不同的实例时,这种分片允许 Kafka Streams 迁移状态。总体而言,您最多可以 运行 10 个实例,每个实例将处理一个分区,并托管您商店的一个分片。
我正在努力理解 Stateful
Stream processor
。
据我所知,在这种类型的流处理器中,它使用 State Store
.
我了解到,实现 State Store
的方法之一是使用 RocksDB
。假设以下 topology
(并且只有一个处理器是 stateful
)
A->B->C ;处理器 B 有状态 local state store 和 changelog
enabled。我使用的是低级别 API.
假设 sp 监听单个 kafka 主题,假设 topic-1
有 10 个分区。
我观察到,当应用程序启动时(不同物理机器中的 2 个实例和 num.stream.threads
= 5),然后 state store
它创建目录结构
有如下内容:
0_0 , 0_1, 0_2..... 0_9 (每台机器有5个所以总共10个分区).
我在网上浏览了一些 material,上面说我们应该创建一个 StoreBuilder
并使用 addStateStore()
附加拓扑 的 在处理器中创建状态存储。
喜欢:
topology.addStateStore(storeBuilder,"processorName")
Ref also: org.apache.kafka.streams.state.Store
我不明白将 storeBuilder 附加到拓扑与在处理器中实际创建状态存储有什么区别。它们有什么区别?
第二部分:对于 statestore,它创建的目录如:0_0、0_1 等。它是由谁以及如何创建的? kafka 主题(sp 正在监听)和为 State Store
创建的目录数量之间是否存在某种 1:1 映射?
I didn't understand what is the difference in attaching a storeBuilder to topology vs actually creating a statestore within processor. What is the differences between them?
为了让Kafka Streams 为你管理store(容错、迁移),Kafka Streams 需要知道store。因此,您给 Kafka Streams StoreBuilder
,Kafka Streams 会为您创建和管理商店。
如果您只是在处理器内部创建一个存储,Kafka Streams 不知道该存储并且该存储不会容错。
For statestore it creates directory like: 0_0, 0_1 etc. Who and how it gets created? Is there some sort of 1:1 mapping between the kafka topics (at which sp is listening) ande the number of directories that gets created for State Store?
是的,有一个映射。该存储在输入主题分区的数量上是共享的。每个分区还有一个 "task",任务目录名称为 y_z
,其中 y
是子拓扑编号,z
是分区编号。对于您的简单拓扑,您看到的所有目录只有一个子拓扑,具有相同的 0_
前缀。
因此,您的逻辑存储有 10 个物理分片。当相应的输入主题分区分配给不同的实例时,这种分片允许 Kafka Streams 迁移状态。总体而言,您最多可以 运行 10 个实例,每个实例将处理一个分区,并托管您商店的一个分片。