Kafka Stateful Stream processor with statestore:幕后

Kafka Stateful Stream processor with statestore: Behind the scenes

我正在努力理解 Stateful Stream processor

据我所知,在这种类型的流处理器中,它使用 State Store.

维持某种状态

我了解到,实现 State Store 的方法之一是使用 RocksDB。假设以下 topology(并且只有一个处理器是 stateful

A->B->C ;处理器 B 有状态 local state store 和 changelog enabled。我使用的是低级别 API.

假设 sp 监听单个 kafka 主题,假设 topic-1 有 10 个分区。

我观察到,当应用程序启动时(不同物理机器中的 2 个实例和 num.stream.threads = 5),然后 state store 它创建目录结构 有如下内容:

0_0 , 0_1, 0_2..... 0_9 (每台机器有5个所以总共10个分区).

我在网上浏览了一些 material,上面说我们应该创建一个 StoreBuilder 并使用 addStateStore() 附加拓扑 在处理器中创建状态存储。

喜欢:

topology.addStateStore(storeBuilder,"processorName")

Ref also: org.apache.kafka.streams.state.Store

我不明白将 storeBuilder 附加到拓扑与在处理器中实际创建状态存储有什么区别。它们有什么区别?

第二部分:对于 statestore,它创建的目录如:0_0、0_1 等。它是由谁以及如何创建的? kafka 主题(sp 正在监听)和为 State Store 创建的目录数量之间是否存在某种 1:1 映射?

I didn't understand what is the difference in attaching a storeBuilder to topology vs actually creating a statestore within processor. What is the differences between them?

为了让Kafka Streams 为你管理store(容错、迁移),Kafka Streams 需要知道store。因此,您给 Kafka Streams StoreBuilder,Kafka Streams 会为您创建和管理商店。

如果您只是在处理器内部创建一个存储,Kafka Streams 不知道该存储并且该存储不会容错。

For statestore it creates directory like: 0_0, 0_1 etc. Who and how it gets created? Is there some sort of 1:1 mapping between the kafka topics (at which sp is listening) ande the number of directories that gets created for State Store?

是的,有一个映射。该存储在输入主题分区的数量上是共享的。每个分区还有一个 "task",任务目录名称为 y_z,其中 y 是子拓扑编号,z 是分区编号。对于您的简单拓扑,您看到的所有目录只有一个子拓扑,具有相同的 0_ 前缀。

因此,您的逻辑存储有 10 个物理分片。当相应的输入主题分区分配给不同的实例时,这种分片允许 Kafka Streams 迁移状态。总体而言,您最多可以 运行 10 个实例,每个实例将处理一个分区,并托管您商店的一个分片。