将 StateRestoreListener 与 Spring Cloud Kafka Streams 活页夹结合使用
Using StateRestoreListener with Spring Cloud Kafka Streams binder
我打算将 StateRestoreListener 与 Spring Cloud Kafka Streams 活页夹一起使用。
我需要监控我的应用程序的容错状态存储的恢复进度。
confluent https://docs.confluent.io/current/streams/monitoring.html#streams-monitoring-runtime-status 中有示例。
In order to observe the restoration of all state stores you provide
your application an instance of the
org.apache.kafka.streams.processor.StateRestoreListener interface. You
set the org.apache.kafka.streams.processor.StateRestoreListener by
calling the KafkaStreams#setGlobalStateRestoreListener method.
第一个问题是从应用程序获取 Kafka Streams。我使用
解决了这个问题
StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误
java.lang.IllegalStateException: Can only set
GlobalStateRestoreListener in CREATED state. Current state is: RUNNING
是否可以在 Spring Cloud Kafka Streams 活页夹中使用 StateRestoreListener?
谢谢
您可以使用 StreamsBuilderFactoryBeanCustomizer
来实现这一点,该 StreamsBuilderFactoryBeanCustomizer
让您可以访问基础 KafkaStreams
对象。如果您使用的是活页夹版本 3.0 或更高版本,这是推荐的方法。例如,您可以在您的应用程序中提供以下 bean
并使用 GlobalStateRestoreListener
对其进行自定义。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setGlobalStateRestoreListener(...);
}
});
};
}
This blog 有关于此策略的更多详细信息。
我打算将 StateRestoreListener 与 Spring Cloud Kafka Streams 活页夹一起使用。 我需要监控我的应用程序的容错状态存储的恢复进度。 confluent https://docs.confluent.io/current/streams/monitoring.html#streams-monitoring-runtime-status 中有示例。
In order to observe the restoration of all state stores you provide your application an instance of the org.apache.kafka.streams.processor.StateRestoreListener interface. You set the org.apache.kafka.streams.processor.StateRestoreListener by calling the KafkaStreams#setGlobalStateRestoreListener method.
第一个问题是从应用程序获取 Kafka Streams。我使用
解决了这个问题StreamsBuilderFactoryBean streamsBuilderFactoryBean = context.getBean("&stream-builder-process", StreamsBuilderFactoryBean.class);
KafkaStreams kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
第二个问题是将 StateRestoreListener 设置为 KafkaStreams,因为我收到错误
java.lang.IllegalStateException: Can only set GlobalStateRestoreListener in CREATED state. Current state is: RUNNING
是否可以在 Spring Cloud Kafka Streams 活页夹中使用 StateRestoreListener? 谢谢
您可以使用 StreamsBuilderFactoryBeanCustomizer
来实现这一点,该 StreamsBuilderFactoryBeanCustomizer
让您可以访问基础 KafkaStreams
对象。如果您使用的是活页夹版本 3.0 或更高版本,这是推荐的方法。例如,您可以在您的应用程序中提供以下 bean
并使用 GlobalStateRestoreListener
对其进行自定义。
@Bean
public StreamsBuilderFactoryBeanCustomizer streamsBuilderFactoryBeanCustomizer() {
return factoryBean -> {
factoryBean.setKafkaStreamsCustomizer(new KafkaStreamsCustomizer() {
@Override
public void customize(KafkaStreams kafkaStreams) {
kafkaStreams.setGlobalStateRestoreListener(...);
}
});
};
}
This blog 有关于此策略的更多详细信息。