将 ObjectMapper 注入 Spring Kafka serialiser/deserialiser
Inject ObjectMapper into Spring Kafka serialiser/deserialiser
我正在使用 Spring Kafka 1.1.2-RELEASE 和 Spring Boot 1.5.0 RC,我配置了一个自定义值 serialiser/deserialiser class 扩展 org.springframework.kafka.support.serializer.JsonSerializer
/org.springframework.kafka.support.serializer.JsonDeserializer
。这些 classes 确实使用了可以通过构造函数提供的 Jackson ObjectMapper。
是否可以从我的 Spring 上下文中注入 ObjectMapper?我已经配置了一个 ObjectMapper,我想在 serialiser/deserialiser.
中重用它
您可以将 JsonSerializer
和 JsonDeserializer
配置为 @Bean
。
向他们注入所需的 ObjectMapper
。并在 DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
bean 定义中使用这些 bean:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
DefaultKafkaProducerFactory<Integer, String> producerFactory =
new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setValueSerializer(jsonSerializer());
return producerFactory;
}
@Component
public class ObjectMapperProducerFactoryCustomizer implements DefaultKafkaProducerFactoryCustomizer {
private final ObjectMapper objectMapper;
public ObjectMapperProducerFactoryCustomizer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
if (Objects.nonNull(producerFactory)) {
producerFactory.setValueSerializer(new JsonSerializer<>(objectMapper));
}
}
}
对于反应式非阻塞 kafka 客户端,配置如下:
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {
@Bean
public ReactiveKafkaProducerTemplate<String, Object> kafkaProducerTemplate(KafkaProperties p,
ObjectMapper objectMapper) {
return new ReactiveKafkaProducerTemplate<>(
SenderOptions.<String, Object>create(p.buildProducerProperties())
.withValueSerializer(new JsonSerializer<>(objectMapper)));
}
}
以上暗示您正在使用 io.projectreactor.kafka
部门,
dependencies {
compile "io.projectreactor.kafka:reactor-kafka"
}
我正在使用 Spring Kafka 1.1.2-RELEASE 和 Spring Boot 1.5.0 RC,我配置了一个自定义值 serialiser/deserialiser class 扩展 org.springframework.kafka.support.serializer.JsonSerializer
/org.springframework.kafka.support.serializer.JsonDeserializer
。这些 classes 确实使用了可以通过构造函数提供的 Jackson ObjectMapper。
是否可以从我的 Spring 上下文中注入 ObjectMapper?我已经配置了一个 ObjectMapper,我想在 serialiser/deserialiser.
中重用它您可以将 JsonSerializer
和 JsonDeserializer
配置为 @Bean
。
向他们注入所需的 ObjectMapper
。并在 DefaultKafkaProducerFactory
和 DefaultKafkaConsumerFactory
bean 定义中使用这些 bean:
@Bean
public ProducerFactory<Integer, String> producerFactory() {
DefaultKafkaProducerFactory<Integer, String> producerFactory =
new DefaultKafkaProducerFactory<>(producerConfigs());
producerFactory.setValueSerializer(jsonSerializer());
return producerFactory;
}
@Component
public class ObjectMapperProducerFactoryCustomizer implements DefaultKafkaProducerFactoryCustomizer {
private final ObjectMapper objectMapper;
public ObjectMapperProducerFactoryCustomizer(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public void customize(DefaultKafkaProducerFactory<?, ?> producerFactory) {
if (Objects.nonNull(producerFactory)) {
producerFactory.setValueSerializer(new JsonSerializer<>(objectMapper));
}
}
}
对于反应式非阻塞 kafka 客户端,配置如下:
@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
public class KafkaConfig {
@Bean
public ReactiveKafkaProducerTemplate<String, Object> kafkaProducerTemplate(KafkaProperties p,
ObjectMapper objectMapper) {
return new ReactiveKafkaProducerTemplate<>(
SenderOptions.<String, Object>create(p.buildProducerProperties())
.withValueSerializer(new JsonSerializer<>(objectMapper)));
}
}
以上暗示您正在使用 io.projectreactor.kafka
部门,
dependencies {
compile "io.projectreactor.kafka:reactor-kafka"
}