为什么我必须使用 Kafka Streams 配置状态存储
Why do I have to configure a state store with Kafka Streams
目前我有以下设置:
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("kafka.topics.table"),
new SomeKeySerde(),
new SomeValueSerde());
streamsBuilder.addStateStore(storeBuilder);
final KStream<byte[], SomeClass> requestsStream = streamsBuilder
.stream("myTopic", Consumed.with(Serdes.ByteArray(), theSerde));
requestsStream
.filter((key, request) -> Objects.nonNull(request))
.process(() -> new SomeClassUpdater("kafka.topics.table", maxNumMatches), "kafka.topics.table");
Properties streamsConfiguration = loadConfiguration();
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration);
streams.start()
为什么我需要本地状态存储,因为我没有用它做任何其他计算,而且数据也存储在 kafka 变更日志中?另外,它在什么时候存储在本地存储中,是否存储并提交到变更日志?
我面临的问题是我在本地存储并及时 运行 内存问题,尤其是当它经常重新分区时。因为旧分区仍然存在并填满了内存。
所以我的问题是,为什么我们需要 rocksdb 的持久性,因为:
- 数据保存在 kafka 变更日志中
- 当容器消失时,ramdisk 无论如何都消失了。
在单个线程上,我们可以有多个任务等于没有。主题的分区。每个分区都有自己的状态存储,这些状态存储将数据保存到 Changelog,这是 Kafka 的内部主题。 分区的每个状态存储还维护状态存储的副本其他分区,以恢复任务可能失败的分区的数据。
如果你不使用状态存储,并且你的一个任务失败了,它将转到内部主题,即更新日志,然后将为分区获取数据,这对于 CPU.因此,维护状态存储减少了任务可能失败的时间,并立即从另一个任务状态存储中获取数据。
目前我有以下设置:
StoreBuilder storeBuilder = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore("kafka.topics.table"),
new SomeKeySerde(),
new SomeValueSerde());
streamsBuilder.addStateStore(storeBuilder);
final KStream<byte[], SomeClass> requestsStream = streamsBuilder
.stream("myTopic", Consumed.with(Serdes.ByteArray(), theSerde));
requestsStream
.filter((key, request) -> Objects.nonNull(request))
.process(() -> new SomeClassUpdater("kafka.topics.table", maxNumMatches), "kafka.topics.table");
Properties streamsConfiguration = loadConfiguration();
KafkaStreams streams = new KafkaStreams(streamsBuilder.build(), streamsConfiguration);
streams.start()
为什么我需要本地状态存储,因为我没有用它做任何其他计算,而且数据也存储在 kafka 变更日志中?另外,它在什么时候存储在本地存储中,是否存储并提交到变更日志?
我面临的问题是我在本地存储并及时 运行 内存问题,尤其是当它经常重新分区时。因为旧分区仍然存在并填满了内存。 所以我的问题是,为什么我们需要 rocksdb 的持久性,因为:
- 数据保存在 kafka 变更日志中
- 当容器消失时,ramdisk 无论如何都消失了。
在单个线程上,我们可以有多个任务等于没有。主题的分区。每个分区都有自己的状态存储,这些状态存储将数据保存到 Changelog,这是 Kafka 的内部主题。 分区的每个状态存储还维护状态存储的副本其他分区,以恢复任务可能失败的分区的数据。
如果你不使用状态存储,并且你的一个任务失败了,它将转到内部主题,即更新日志,然后将为分区获取数据,这对于 CPU.因此,维护状态存储减少了任务可能失败的时间,并立即从另一个任务状态存储中获取数据。