将 StateRestoreListener 与 Spring Cloud Kafka Streams 活页夹结合使用

Using StateRestoreListener with Spring Cloud Kafka Streams binder

我打算将 StateRestoreListener 与 Spring Cloud Kafka Streams 活页夹一起使用。 我需要监控我的应用程序的容错状态存储的恢复进度。 confluent https://docs.confluent.io/current/streams/monitoring.html#streams-monitoring-runtime-status 中有示例。

In order to observe the restoration of all state stores you provide your application an instance of the org.apache.kafka.streams.processor.StateRestoreListener interface. You set the org.apache.kafka.streams.processor.StateRestoreListener by calling the KafkaStreams#setGlobalStateRestoreListener method.

第一个问题是从应用程序获取 Kafka Streams。我使用

解决了这个问题
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();

第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误

java.lang.IllegalStateException: Can only set GlobalStateRestoreListener in CREATED state. Current state is: RUNNING

是否可以在 Spring Cloud Kafka Streams 活页夹中使用 StateRestoreListener? 谢谢

您可以使用 StreamsBuilderFactoryBeanCustomizer 来实现这一点,该 StreamsBuilderFactoryBeanCustomizer 让您可以访问基础 KafkaStreams 对象。如果您使用的是活页夹版本 3.0 或更高版本,这是推荐的方法。例如,您可以在您的应用程序中提供以下 bean 并使用 GlobalStateRestoreListener 对其进行自定义。

@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
    return factoryBean -> {
        factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
            @Override
            public void customize(KafkaStreams kafkaStreams) {
                kafkaStreams.setGlobalStateRestoreListener(...);
            }
        });
    };
}

This blog 有关于此策略的更多详细信息。