使用自定义 valueDeserializer 时,如何在 .properties/.yaml 中设置 ack-mode?

How to set ack-mode in .properties/.yaml, when custom valueDeserializer is used?

有一个 spring-boot 程序正在使用来自 kafka 的 JSON 中的数据。 Afaik 设置 JsonDeserializer 的唯一方法是使用 java 配置,即它不能在 .yaml(或 .properties)文件中设置,例如

@Configuration
public class KafkaConfig {

    @Bean
    public ConsumerFactory<String, DeserObject> consumerFactory( final KafkaProperties properties ) {
        JsonDeserializer<DeserObject> jsonDeserializer = new JsonDeserializer<>( DeserObject.class );
        return new DefaultKafkaConsumerFactory<>(
                properties.buildConsumerProperties(),  // so consumer could still be configured in .yaml
                new StringDeserializer(),              // key deserializer
                jsonDeserializer                       // value deserializer
        );
    }

    @Bean
    public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
        ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory( consumerFactory );
        return factory;
    }
}

现在我们自己创建 ListenerContainerFactoryAckMode 无法从 .yaml 中设置:

spring:
  kafka:
    listener:
      ack-mode: manual

因为它不会被拾取(因为 ListernerContainerFactory 不是由引导自动配置的,因为我们正在创建它)。

所以问题是:是否有 任何 方法从 yaml 配置 ackMode,或者我们必须从 java 进行配置?:

@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
    ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory( consumerFactory );
    factory.getContainerProperties().setAckMode( MANUAL );  // !! AckMode needs to be set in Java
    return factory;
}

我想这也可以通过 KafkaProperties 注入,但这感觉很老套:

@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final KafkaProperties properties, final ConsumerFactory<String, DeserObject> consumerFactory ) {
    ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory( consumerFactory );
    factory.getContainerProperties().setAckMode( properties.getListener().getAckMode() );
    return factory;
}

您需要使用 Boot 的 ConcurrentKafkaListenerContainerFactoryConfigurer 来填充启动属性。这是您不提供时使用的标准 bean 定义...

@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
        ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
        ConsumerFactory<Object, Object> kafkaConsumerFactory) {
    ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
    configurer.configure(factory, kafkaConsumerFactory);
    return factory;
}