Spring Kafka - 在没有类型信息的情况下反序列化 Pojo
Spring Kafka - deserialise Pojos without type information
我正在开发一个使用 Kafka 进行内部通信的分布式微服务应用程序。应用程序通过主题交换 POJO。生产者向消费者发送消息时,默认添加一个header表示payload中object的包名和class名称。然后,消费者应用程序使用此信息反序列化有效负载。但这需要我在两个应用程序的同一个包中定义完全相同的 class,这对我来说不是一个好的设计。如果我将生产者端的配置 (JsonSerializer.ADD_TYPE_INFO_HEADERS) 设置为不发送 header 中的类型,则会导致消费者端出错。此外,我不想在消费者应用程序上使用默认类型,因为它有多个侦听器,它们需要不同类型的 object。为什么 kafkalistener 不能简单地将 json 有效载荷反序列化为参数中给定的 object 类型,为什么它需要 header?
为了解决这个问题,我在消费者应用程序上定义了一个带有 'BytesDeserialser' 的 consumerFactory 和一个带有 'BytesJsonMessageConverter' 的 KafkaListenerContainerFactory。有了它,它就可以在消费者方面发挥作用,但是我不确定如何在使用回复 KafkaTemplate 并反序列化消费者的回复时在生产者方面做到这一点。
以下是我的配置 -
//生产者配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.common.adapter.model.response.AccountResponse");
return props;
}
@Bean
public ProducerFactory<String, Object> replyProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}
//consumer configs
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate() {
ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(requestProducerFactory(), replyListenerContainer());
replyingKafkaTemplate.setReplyTimeout(10000);
replyingKafkaTemplate.setMessageConverter(converter());
return replyingKafkaTemplate;
}
@Bean
public KafkaMessageListenerContainer<String, Object> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}
@Bean
public ConsumerFactory<String, Object> replyConsumerFactory() {
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.trader.account.model.response.AccountResponse");
return props;
}
您可以使用 type mapping.
生产者将com.acme.Foo
映射到foo
,消费者将foo
映射到com.other.Bar
。
类型必须在 JSON 级别兼容。
如果您只收到一种类型,您可以将反序列化器配置为使用该类型,而不是使用类型信息查找 headers。
https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#serdes-json-config
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在 header 信息,则用于密钥反序列化的后备类型。
JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在 header 信息,则值反序列化的后备类型。
从 2.5 版开始,您可以添加一个将由反序列化器调用的函数,以便您可以内省数据以确定类型。
参见 Using Methods to Determine Types。
这(和类型映射)是在回复模板中处理多种类型的唯一方法。在消费者方面,我们可以根据方法参数推断类型(这是在那里使用的正确机制 - 它不是 "work around")。
我正在开发一个使用 Kafka 进行内部通信的分布式微服务应用程序。应用程序通过主题交换 POJO。生产者向消费者发送消息时,默认添加一个header表示payload中object的包名和class名称。然后,消费者应用程序使用此信息反序列化有效负载。但这需要我在两个应用程序的同一个包中定义完全相同的 class,这对我来说不是一个好的设计。如果我将生产者端的配置 (JsonSerializer.ADD_TYPE_INFO_HEADERS) 设置为不发送 header 中的类型,则会导致消费者端出错。此外,我不想在消费者应用程序上使用默认类型,因为它有多个侦听器,它们需要不同类型的 object。为什么 kafkalistener 不能简单地将 json 有效载荷反序列化为参数中给定的 object 类型,为什么它需要 header?
为了解决这个问题,我在消费者应用程序上定义了一个带有 'BytesDeserialser' 的 consumerFactory 和一个带有 'BytesJsonMessageConverter' 的 KafkaListenerContainerFactory。有了它,它就可以在消费者方面发挥作用,但是我不确定如何在使用回复 KafkaTemplate 并反序列化消费者的回复时在生产者方面做到这一点。
以下是我的配置 - //生产者配置
@Bean
public Map<String, Object> producerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
props.put(JsonSerializer.TYPE_MAPPINGS, "cat:com.common.adapter.model.response.AccountResponse");
return props;
}
@Bean
public ProducerFactory<String, Object> replyProducerFactory() {
return new DefaultKafkaProducerFactory<>(producerConfigs());
}
@Bean
public KafkaTemplate<String, Object> replyTemplate() {
return new KafkaTemplate<>(replyProducerFactory());
}
//consumer configs
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate() {
ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate =
new ReplyingKafkaTemplate<>(requestProducerFactory(), replyListenerContainer());
replyingKafkaTemplate.setReplyTimeout(10000);
replyingKafkaTemplate.setMessageConverter(converter());
return replyingKafkaTemplate;
}
@Bean
public KafkaMessageListenerContainer<String, Object> replyListenerContainer() {
ContainerProperties containerProperties = new ContainerProperties(replyTopic);
return new KafkaMessageListenerContainer<>(replyConsumerFactory(), containerProperties);
}
@Bean
public ConsumerFactory<String, Object> replyConsumerFactory() {
JsonDeserializer<Object> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("*");
return new DefaultKafkaConsumerFactory<>(consumerConfigs(), new StringDeserializer(), jsonDeserializer);
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(JsonDeserializer.TYPE_MAPPINGS, "cat:com.trader.account.model.response.AccountResponse");
return props;
}
您可以使用 type mapping.
生产者将com.acme.Foo
映射到foo
,消费者将foo
映射到com.other.Bar
。
类型必须在 JSON 级别兼容。
如果您只收到一种类型,您可以将反序列化器配置为使用该类型,而不是使用类型信息查找 headers。
https://docs.spring.io/spring-kafka/docs/2.5.2.RELEASE/reference/html/#serdes-json-config
JsonDeserializer.KEY_DEFAULT_TYPE
:如果不存在 header 信息,则用于密钥反序列化的后备类型。JsonDeserializer.VALUE_DEFAULT_TYPE
:如果不存在 header 信息,则值反序列化的后备类型。
从 2.5 版开始,您可以添加一个将由反序列化器调用的函数,以便您可以内省数据以确定类型。
参见 Using Methods to Determine Types。
这(和类型映射)是在回复模板中处理多种类型的唯一方法。在消费者方面,我们可以根据方法参数推断类型(这是在那里使用的正确机制 - 它不是 "work around")。