Spring 云流 3.0 StreamsBuilderFactoryBeanCustomizer
Spring Cloud Stream 3.0 StreamsBuilderFactoryBeanCustomizer
我无法使用 StreamsBuilderFactoryBeanCustomizer 为我的流消费者配置自定义处理程序。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomDeserializationHandler.class);
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomEventErrorHandler.class);
fb.getStreamsConfiguration().forEach((k,v) -> System.err.println("Key , Value "+ k + " , " + v));
};
}
在我的 spring 启动 @Configuration class 中设置以上内容并启动应用程序后,我仍然在 SteamConfig 下看到默认值。根据文档
Blockquote
customizer will be invoked by the binder right before the factory bean is started.
Blockquote
但是 binder 似乎没有调用 StreamsBuilderFactoryBeanCustomizer
这是一个已知问题吗? 我正在使用 spring-cloud-stream-binder-kafka-streams
2020.0.1 (spring-cloud.version) 2.4.2(spring 启动)
021-03-11 11:52:04.386 [main] INFO o.apache.kafka.streams.StreamsConfig - StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = service-stream
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 1000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
max.task.idle.ms = 0
max.warmup.replicas = 2
看起来有两个同名的接口:一个来自 Spring Kafka and another in Spring Boot. The binder only takes into consideration the one from Spring Kafka. Please make sure that you are implementing that one. We have filed an issue 的 Boot 来解决这个不一致问题。
这已在 spring 云版本 2020.0.3 中更新。
可以使用 org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer
代替。
我无法使用 StreamsBuilderFactoryBeanCustomizer 为我的流消费者配置自定义处理程序。
@Bean
public StreamsBuilderFactoryBeanCustomizer customizer() {
return fb -> {
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomDeserializationHandler.class);
fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomEventErrorHandler.class);
fb.getStreamsConfiguration().forEach((k,v) -> System.err.println("Key , Value "+ k + " , " + v));
};
}
在我的 spring 启动 @Configuration class 中设置以上内容并启动应用程序后,我仍然在 SteamConfig 下看到默认值。根据文档
Blockquote
customizer will be invoked by the binder right before the factory bean is started.
Blockquote
但是 binder 似乎没有调用 StreamsBuilderFactoryBeanCustomizer 这是一个已知问题吗? 我正在使用 spring-cloud-stream-binder-kafka-streams 2020.0.1 (spring-cloud.version) 2.4.2(spring 启动)
021-03-11 11:52:04.386 [main] INFO o.apache.kafka.streams.StreamsConfig - StreamsConfig values:
acceptable.recovery.lag = 10000
application.id = service-stream
application.server =
bootstrap.servers = [localhost:9092]
buffered.records.per.partition = 1000
built.in.metrics.version = latest
cache.max.bytes.buffering = 10485760
client.id =
commit.interval.ms = 1000
connections.max.idle.ms = 540000
default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
max.task.idle.ms = 0
max.warmup.replicas = 2
看起来有两个同名的接口:一个来自 Spring Kafka and another in Spring Boot. The binder only takes into consideration the one from Spring Kafka. Please make sure that you are implementing that one. We have filed an issue 的 Boot 来解决这个不一致问题。
这已在 spring 云版本 2020.0.3 中更新。
可以使用 org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer
代替。