检查 StateStore 是否已完全填充
Check if StateStore is fully populated
我有一个紧凑的主题,大约有 30Mio Keys。
我的 App
将这个主题具体化为 KeyValueStore
。
如何检查 KeyValueStore
是否已完全填充?如果我通过 InteractiveQuery
查找密钥,我需要知道密钥是否不存在,因为 StateStore
尚未准备好,或者密钥是否确实不存在。
我是这样实现 StateStore 的:
@Bean
public Consumer<KTable<Key, Value>> process() {
return stream -> stream.filter((k, v) -> v != null,
Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
.withKeySerde(new KeySerde())
.withValueSerde(new ValueSerde()));
}
已更新:我误解了 OP 从 "how to check if the Topology has finished materialized the input topic to state store" 到 "state store restore process"
的问题
只有当 KafkaStreams 的状态从 REBALANCING
变为 RUNNING
状态时,您才能从 KafkaStreams 实例获取 KeyValueStore。
您可以使用 StreamsBuilderFactoryBeanCustomizer
检查此状态转换以访问底层 KafkaStreams 实例。
如果您只想检查所有状态存储何时已完全填充以及 kafka 流线程何时准备就绪以便您可以获得 KeyValueStore
您可以在 StateListener
:
上收听
@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
// set flag that `stateStore` store of current KafkaStreams has been fully restore
// then you can get
}
}
}
或者如果您想从 KafkaStreams
实例
获取商店
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
//get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
//and set flag that `stateStore` store of current KafkaStreams has been fully restore
}
});
});
}
请注意,StreamsBuilderFactoryBeanCustomizer 应该只有一个实例。
一般来说,没有"fully loaded"这样的东西,因为在应用程序启动后的任何时间点,新数据都可能写入输入主题,并且会读取这些新数据以更新相应的主题table.
您可以做的是监控消费者延迟:在您的应用程序中 KafkaStreams#metrics()
允许您访问所有客户端(即 consumer/producer)和 Kafka Streams 指标。消费者公开了一个名为 records-lag-max
的指标,这可能会有所帮助。
当然,在正常处理过程中(假设一直有新数据写入input topic)consumer lag会一直上下波动。
我有一个紧凑的主题,大约有 30Mio Keys。
我的 App
将这个主题具体化为 KeyValueStore
。
如何检查 KeyValueStore
是否已完全填充?如果我通过 InteractiveQuery
查找密钥,我需要知道密钥是否不存在,因为 StateStore
尚未准备好,或者密钥是否确实不存在。
我是这样实现 StateStore 的:
@Bean
public Consumer<KTable<Key, Value>> process() {
return stream -> stream.filter((k, v) -> v != null,
Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
.withKeySerde(new KeySerde())
.withValueSerde(new ValueSerde()));
}
已更新:我误解了 OP 从 "how to check if the Topology has finished materialized the input topic to state store" 到 "state store restore process"
的问题只有当 KafkaStreams 的状态从 REBALANCING
变为 RUNNING
状态时,您才能从 KafkaStreams 实例获取 KeyValueStore。
您可以使用 StreamsBuilderFactoryBeanCustomizer
检查此状态转换以访问底层 KafkaStreams 实例。
如果您只想检查所有状态存储何时已完全填充以及 kafka 流线程何时准备就绪以便您可以获得 KeyValueStore
您可以在 StateListener
:
@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
// set flag that `stateStore` store of current KafkaStreams has been fully restore
// then you can get
}
}
}
或者如果您想从 KafkaStreams
实例
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
kafkaStreams.setStateListener((newState, oldState) -> {
if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
//get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
//and set flag that `stateStore` store of current KafkaStreams has been fully restore
}
});
});
}
请注意,StreamsBuilderFactoryBeanCustomizer 应该只有一个实例。
一般来说,没有"fully loaded"这样的东西,因为在应用程序启动后的任何时间点,新数据都可能写入输入主题,并且会读取这些新数据以更新相应的主题table.
您可以做的是监控消费者延迟:在您的应用程序中 KafkaStreams#metrics()
允许您访问所有客户端(即 consumer/producer)和 Kafka Streams 指标。消费者公开了一个名为 records-lag-max
的指标,这可能会有所帮助。
当然,在正常处理过程中(假设一直有新数据写入input topic)consumer lag会一直上下波动。