检查 StateStore 是否已完全填充

Check if StateStore is fully populated

我有一个紧凑的主题,大约有 30Mio Keys。 我的 App 将这个主题具体化为 KeyValueStore

如何检查 KeyValueStore 是否已完全填充?如果我通过 InteractiveQuery 查找密钥,我需要知道密钥是否不存在,因为 StateStore 尚未准备好,或者密钥是否确实不存在。

我是这样实现 StateStore 的:


  @Bean
  public Consumer<KTable<Key, Value>> process() {
    return stream -> stream.filter((k, v) -> v != null,
        Materialized.<Key, Value, KeyValueStore<Bytes, byte[]>>as("stateStore")
            .withKeySerde(new KeySerde())
            .withValueSerde(new ValueSerde()));
  }

已更新:我误解了 OP 从 "how to check if the Topology has finished materialized the input topic to state store" 到 "state store restore process"

的问题

只有当 KafkaStreams 的状态从 REBALANCING 变为 RUNNING 状态时,您才能从 KafkaStreams 实例获取 KeyValueStore。 您可以使用 StreamsBuilderFactoryBeanCustomizer 检查此状态转换以访问底层 KafkaStreams 实例。 如果您只想检查所有状态存储何时已完全填充以及 kafka 流线程何时准备就绪以便您可以获得 KeyValueStore 您可以在 StateListener:

上收听
@Bean
public StreamsBuilderFactoryBeanCustomizer onKafkaStateChangeFromRebalanceToRunning() {
    return factoryBean -> factoryBean.setStateListener((newState, oldState) -> {
        if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
            // set flag that `stateStore` store of current KafkaStreams has been fully restore
            // then you can get
        }
    }
}

或者如果您想从 KafkaStreams 实例

获取商店
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> factoryBean.setKafkaStreamsCustomizer((KafkaStreamsCustomizer) kafkaStreams -> {
        kafkaStreams.setStateListener((newState, oldState) -> {
            if (newState == KafkaStreams.State.RUNNING && oldState == KafkaStreams.State.REBALANCING) {
                //get and assign your store using kafkaStreams.store("stateStore", QueryableStoreTypes.keyValueStore());
                //and set flag that `stateStore` store of current KafkaStreams has been fully restore
            }
        });
    });
}

Read more in the docs.

请注意,StreamsBuilderFactoryBeanCustomizer 应该只有一个实例。

一般来说,没有"fully loaded"这样的东西,因为在应用程序启动后的任何时间点,新数据都可能写入输入主题,并且会读取这些新数据以更新相应的主题table.

您可以做的是监控消费者延迟:在您的应用程序中 KafkaStreams#metrics() 允许您访问所有客户端(即 consumer/producer)和 Kafka Streams 指标。消费者公开了一个名为 records-lag-max 的指标,这可能会有所帮助。

当然,在正常处理过程中(假设一直有新数据写入input topic)consumer lag会一直上下波动。