Spring 云流 3.0 StreamsBuilderFactoryBeanCustomizer

Spring Cloud Stream 3.0 StreamsBuilderFactoryBeanCustomizer

我无法使用 StreamsBuilderFactoryBeanCustomizer 为我的流消费者配置自定义处理程序。

@Bean
    public StreamsBuilderFactoryBeanCustomizer customizer() { 
        return fb -> {          
    fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomDeserializationHandler.class);
            fb.getStreamsConfiguration().put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomEventErrorHandler.class);               
            fb.getStreamsConfiguration().forEach((k,v) -> System.err.println("Key , Value "+ k + " , " + v));
        };
    }

在我的 spring 启动 @Configuration class 中设置以上内容并启动应用程序后,我仍然在 SteamConfig 下看到默认值。根据文档

Blockquote

customizer will be invoked by the binder right before the factory bean is started.

Blockquote

但是 binder 似乎没有调用 StreamsBuilderFactoryBeanCustomizer 这是一个已知问题吗? 我正在使用 spring-cloud-stream-binder-kafka-streams 2020.0.1 (spring-cloud.version) 2.4.2(spring 启动)

021-03-11 11:52:04.386 [main] INFO  o.apache.kafka.streams.StreamsConfig - StreamsConfig values: 
    acceptable.recovery.lag = 10000
    application.id = service-stream
    application.server = 
    bootstrap.servers = [localhost:9092]
    buffered.records.per.partition = 1000
    built.in.metrics.version = latest
    cache.max.bytes.buffering = 10485760
    client.id = 
    commit.interval.ms = 1000
    connections.max.idle.ms = 540000
    default.deserialization.exception.handler = class org.apache.kafka.streams.errors.LogAndContinueExceptionHandler
    default.key.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    default.production.exception.handler = class org.apache.kafka.streams.errors.DefaultProductionExceptionHandler
    default.timestamp.extractor = class org.apache.kafka.streams.processor.FailOnInvalidTimestamp
    default.value.serde = class org.apache.kafka.common.serialization.Serdes$ByteArraySerde
    max.task.idle.ms = 0
    max.warmup.replicas = 2

看起来有两个同名的接口:一个来自 Spring Kafka and another in Spring Boot. The binder only takes into consideration the one from Spring Kafka. Please make sure that you are implementing that one. We have filed an issue 的 Boot 来解决这个不一致问题。

这已在 spring 云版本 2020.0.3 中更新。 可以使用 org.springframework.kafka.config.StreamsBuilderFactoryBeanConfigurer 代替。