Spring Kafka - 在没有类型信息的情况下反序列化 Pojo

Spring Kafka - deserialise Pojos without type information

我正在开发一个使用 Kafka 进行内部通信的分布式微服务应用程序。应用程序通过主题交换 POJO。生产者向消费者发送消息时,默认添加一个header表示payload中object的包名和class名称。然后,消费者应用程序使用此信息反序列化有效负载。但这需要我在两个应用程序的同一个包中定义完全相同的 class,这对我来说不是一个好的设计。如果我将生产者端的配置 (JsonSerializer.ADD_TYPE_INFO_HEADERS) 设置为不发送 header 中的类型,则会导致消费者端出错。此外,我不想在消费者应用程序上使用默认类型,因为它有多个侦听器,它们需要不同类型的 object。为什么 kafkalistener 不能简单地将 json 有效载荷反序列化为参数中给定的 object 类型,为什么它需要 header?

为了解决这个问题,我在消费者应用程序上定义了一个带有 'BytesDeserialser' 的 consumerFactory 和一个带有 'BytesJsonMessageConverter' 的 KafkaListenerContainerFactory。有了它,它就可以在消费者方面发挥作用,但是我不确定如何在使用回复 KafkaTemplate 并反序列化消费者的回复时在生产者方面做到这一点。

以下是我的配置 - //生产者配置

@Bean
public Map<String, Object> producerConfigs() {
  Map<String, Object> props = new HashMap<>();
  props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
  props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
  props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
  props.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.common.adapter.model.response.AccountResponse");
  return props;
}

@Bean
public ProducerFactory<String, Object> replyProducerFactory() {
  return new DefaultKafkaProducerFactory<>(producerConfigs());
}

@Bean
public KafkaTemplate<String, Object> replyTemplate() {
  return new KafkaTemplate<>(replyProducerFactory());
}

//consumer configs
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate() {
    ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate =
    new ReplyingKafkaTemplate<>(requestProducerFactory(), replyListenerContainer());
    replyingKafkaTemplate.setReplyTimeout(10000);
    replyingKafkaTemplate.setMessageConverter(converter());
    return replyingKafkaTemplate;
}

@Bean
public KafkaMessageListenerContainer<String, Object> replyListenerContainer() {
    ContainerProperties containerProperties = new ContainerProperties(replyTopic);
    return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}

@Bean
public ConsumerFactory<String, Object> replyConsumerFactory() {
    JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
    jsonDeserializer.addTrustedPackages("*");
    return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}

@Bean
public Map<String, Object> consumerConfigs() {
    Map<String, Object> props = new HashMap<>();
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
    props.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.trader.account.model.response.AccountResponse");
    return props;
}

您可以使用 type mapping.

生产者将com.acme.Foo映射到foo,消费者将foo映射到com.other.Bar

类型必须在 JSON 级别兼容。

如果您只收到一种类型,您可以将反序列化器配置为使用该类型,而不是使用类型信息查找 headers。

https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#serdes-json-config

  • JsonDeserializer.KEY_DEFAULT_TYPE:如果不存在 header 信息,则用于密钥反序列化的后备类型。

  • JsonDeserializer.VALUE_DEFAULT_TYPE:如果不存在 header 信息,则值反序列化的后备类型。

从 2.5 版开始,您可以添加一个将由反序列化器调用的函数,以便您可以内省数据以确定类型。

参见 Using Methods to Determine Types

这(和类型映射)是在回复模板中处理多种类型的唯一方法。在消费者方面,我们可以根据方法参数推断类型(这是在那里使用的正确机制 - 它不是 "work around")。