使用自定义 valueDeserializer 时,如何在 .properties/.yaml 中设置 ack-mode?
How to set ack-mode in .properties/.yaml, when custom valueDeserializer is used?
有一个 spring-boot 程序正在使用来自 kafka 的 JSON 中的数据。 Afaik 设置 JsonDeserializer 的唯一方法是使用 java 配置,即它不能在 .yaml(或 .properties)文件中设置,例如
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, DeserObject> consumerFactory( final KafkaProperties properties ) {
JsonDeserializer<DeserObject> jsonDeserializer = new JsonDeserializer<>( DeserObject.class );
return new DefaultKafkaConsumerFactory<>(
properties.buildConsumerProperties(), // so consumer could still be configured in .yaml
new StringDeserializer(), // key deserializer
jsonDeserializer // value deserializer
);
}
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
return factory;
}
}
现在我们自己创建 ListenerContainerFactory
,AckMode
无法从 .yaml 中设置:
spring:
kafka:
listener:
ack-mode: manual
因为它不会被拾取(因为 ListernerContainerFactory 不是由引导自动配置的,因为我们正在创建它)。
所以问题是:是否有 任何 方法从 yaml 配置 ackMode,或者我们必须从 java 进行配置?:
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
factory.getContainerProperties().setAckMode( MANUAL ); // !! AckMode needs to be set in Java
return factory;
}
我想这也可以通过 KafkaProperties
注入,但这感觉很老套:
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final KafkaProperties properties, final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
factory.getContainerProperties().setAckMode( properties.getListener().getAckMode() );
return factory;
}
您需要使用 Boot 的 ConcurrentKafkaListenerContainerFactoryConfigurer
来填充启动属性。这是您不提供时使用的标准 bean 定义...
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}
有一个 spring-boot 程序正在使用来自 kafka 的 JSON 中的数据。 Afaik 设置 JsonDeserializer 的唯一方法是使用 java 配置,即它不能在 .yaml(或 .properties)文件中设置,例如
@Configuration
public class KafkaConfig {
@Bean
public ConsumerFactory<String, DeserObject> consumerFactory( final KafkaProperties properties ) {
JsonDeserializer<DeserObject> jsonDeserializer = new JsonDeserializer<>( DeserObject.class );
return new DefaultKafkaConsumerFactory<>(
properties.buildConsumerProperties(), // so consumer could still be configured in .yaml
new StringDeserializer(), // key deserializer
jsonDeserializer // value deserializer
);
}
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
return factory;
}
}
现在我们自己创建 ListenerContainerFactory
,AckMode
无法从 .yaml 中设置:
spring:
kafka:
listener:
ack-mode: manual
因为它不会被拾取(因为 ListernerContainerFactory 不是由引导自动配置的,因为我们正在创建它)。
所以问题是:是否有 任何 方法从 yaml 配置 ackMode,或者我们必须从 java 进行配置?:
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
factory.getContainerProperties().setAckMode( MANUAL ); // !! AckMode needs to be set in Java
return factory;
}
我想这也可以通过 KafkaProperties
注入,但这感觉很老套:
@Bean
public KafkaListenerContainerFactory kafkaListenerContainerFactory( final KafkaProperties properties, final ConsumerFactory<String, DeserObject> consumerFactory ) {
ConcurrentKafkaListenerContainerFactory<String, DeserObject> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory( consumerFactory );
factory.getContainerProperties().setAckMode( properties.getListener().getAckMode() );
return factory;
}
您需要使用 Boot 的 ConcurrentKafkaListenerContainerFactoryConfigurer
来填充启动属性。这是您不提供时使用的标准 bean 定义...
@Bean
@ConditionalOnMissingBean(name = "kafkaListenerContainerFactory")
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<Object, Object>();
configurer.configure(factory, kafkaConsumerFactory);
return factory;
}