如何将 GenericMessageJacksonDeserializer 与 Spring Kafka 和 Spring 集成一起使用?

How to use GenericMessageJacksonDeserializer with Spring Kafka and Spring Integration?

如果我的 kafka 消息是 GenericMessage 类型,我想知道如何将 GenericMessageJacksonDeserializer 与 spring kafka 消费者工厂一起使用?

@Bean
public ConsumerFactory<?, GenericMessage> consumerFactory(KafkaProperties kafkaProperties) {
    final Map<String, Object> consumerProperties = kafkaProperties.buildConsumerProperties();
    final DefaultKafkaConsumerFactory<?, GenericMessage> consumerFactory = new DefaultKafkaConsumerFactory<>(consumerProperties);
    consumerFactory.setValueDeserializer(GenericMessageJacksonDeserializer.class);
    return consumerFactory;
}
  1. consumerFactory.setValueDeserializer()是针对org.apache.kafka.common.serialization.Deserializer对象的,甚至不是针对class。

  2. 我猜你的意思是org.springframework.integration.support.json.GenericMessageJacksonDeserializer 这正是 ObjectMapper 配置。您应该使用 JacksonJsonUtils.messagingAwareMapper(),即,错误...,已经真正意识到 GenericMessageJacksonDeserializer

  3. 当你得到那个 ObjectMapper 时,你应该将它注入到...自定义 Deserializer 实现中。并且已经有那个用途 consumerFactory.setValueDeserializer()messagingAwareMapper 不会像预期的那样与 org.springframework.kafka.support.serializer.JsonDeserializer 一起工作,因为这个确实需要一个特定的类型来反序列化。但是我们可以尝试这样的事情:

    consumerFactory.setValueDeserializer(new JsonDeserializer(GenericMessage.class, JacksonJsonUtils.messagingAwareMapper()));