如何将 GenericMessageJacksonDeserializer 与 Spring Kafka 和 Spring 集成一起使用?
How to use GenericMessageJacksonDeserializer with Spring Kafka and Spring Integration?
如果我的 kafka 消息是 GenericMessage 类型,我想知道如何将 GenericMessageJacksonDeserializer 与 spring kafka 消费者工厂一起使用?
@Bean
public ConsumerFactory<?, GenericMessage> consumerFactory(KafkaProperties kafkaProperties) {
final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
final DefaultKafkaConsumerFactory<?, GenericMessage> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
consumerFactory.setValueDeserializer(GenericMessageJacksonDeserializer.class);
return consumerFactory;
}
consumerFactory.setValueDeserializer()
是针对org.apache.kafka.common.serialization.Deserializer
对象的,甚至不是针对class。
我猜你的意思是org.springframework.integration.support.json.GenericMessageJacksonDeserializer
这正是 ObjectMapper
配置。您应该使用 JacksonJsonUtils.messagingAwareMapper()
,即,错误...,已经真正意识到 GenericMessageJacksonDeserializer
。
当你得到那个 ObjectMapper
时,你应该将它注入到...自定义 Deserializer
实现中。并且已经有那个用途 consumerFactory.setValueDeserializer()
。 messagingAwareMapper
不会像预期的那样与 org.springframework.kafka.support.serializer.JsonDeserializer
一起工作,因为这个确实需要一个特定的类型来反序列化。但是我们可以尝试这样的事情:
consumerFactory.setValueDeserializer(new JsonDeserializer(GenericMessage.class, JacksonJsonUtils.messagingAwareMapper()));
如果我的 kafka 消息是 GenericMessage 类型,我想知道如何将 GenericMessageJacksonDeserializer 与 spring kafka 消费者工厂一起使用?
@Bean
public ConsumerFactory<?, GenericMessage> consumerFactory(KafkaProperties kafkaProperties) {
final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
final DefaultKafkaConsumerFactory<?, GenericMessage> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
consumerFactory.setValueDeserializer(GenericMessageJacksonDeserializer.class);
return consumerFactory;
}
consumerFactory.setValueDeserializer()
是针对org.apache.kafka.common.serialization.Deserializer
对象的,甚至不是针对class。我猜你的意思是
org.springframework.integration.support.json.GenericMessageJacksonDeserializer
这正是ObjectMapper
配置。您应该使用JacksonJsonUtils.messagingAwareMapper()
,即,错误...,已经真正意识到GenericMessageJacksonDeserializer
。当你得到那个
ObjectMapper
时,你应该将它注入到...自定义Deserializer
实现中。并且已经有那个用途consumerFactory.setValueDeserializer()
。messagingAwareMapper
不会像预期的那样与org.springframework.kafka.support.serializer.JsonDeserializer
一起工作,因为这个确实需要一个特定的类型来反序列化。但是我们可以尝试这样的事情:consumerFactory.setValueDeserializer(new JsonDeserializer(GenericMessage.class, JacksonJsonUtils.messagingAwareMapper()));