映射类型和列表
Mapping Types and List
在我的消费者中,我在处理消息后回复了一些主题
@KafkaListener(...)
@SendTo
public Message<List<Foo>> consumeOdr(ConsumerRecord<String, String> message,
@Header(required = false, value = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(required = false, value = KafkaHeaders.REPLY_TOPIC) byte[] topicReply,
@Header(required = false, value = KafkaHeaders.CORRELATION_ID) byte[] correlationId,
@Header(required = false, value = KafkaHeaders.REPLY_PARTITION) byte[] partitionReply) {
....
MessageBuilder<List<Foo>> builder = MessageBuilder.withPayload(fooList);
builder
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.CORRELATION_ID, correlationId)
.setHeader(KafkaHeaders.TOPIC, topicReply)
.setHeader(KafkaHeaders.PARTITION_ID, new BigInteger(partitionReply).intValue());
return builder.build();
}
映射配置
spring.kafka.producer.properties.spring.json.type.mapping: >
fooResponse: java.util.ArrayList
当我检查生成的消息时 headers
__TypeId__
-> fooResponse
__ContentTypeId__
-> java.lang.Object
- 为什么在
__ContentTypeId__
Spring Kafka中使用java.lang.Object
而不是 Foo
完全限定的 class 名称 ?
- 如果我有另一个
Bar
类型的列表,我针对某个主题生成了,如何
我可以将 spring.json.type.mapping
属性 与 barResponse
以及现有的 fooResponse
一起正确设置吗
- 任何增强现有消息消费和生成方法的建议或模式都将受到欢迎
这是由于类型擦除。使用消费端的方法来判断类型。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-type-methods
Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type. If present, this will override any of the other techniques discussed above. This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers. Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period.
示例:
public static JavaType returnType(byte[] data, Headers headers) {
return TypeFactory.defaultInstance()
.constructCollectionLikeType(List.class, Foo.class);
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
我对这个问题的回答中的完整示例
在我的消费者中,我在处理消息后回复了一些主题
@KafkaListener(...)
@SendTo
public Message<List<Foo>> consumeOdr(ConsumerRecord<String, String> message,
@Header(required = false, value = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
@Header(required = false, value = KafkaHeaders.REPLY_TOPIC) byte[] topicReply,
@Header(required = false, value = KafkaHeaders.CORRELATION_ID) byte[] correlationId,
@Header(required = false, value = KafkaHeaders.REPLY_PARTITION) byte[] partitionReply) {
....
MessageBuilder<List<Foo>> builder = MessageBuilder.withPayload(fooList);
builder
.setHeader(KafkaHeaders.MESSAGE_KEY, key)
.setHeader(KafkaHeaders.CORRELATION_ID, correlationId)
.setHeader(KafkaHeaders.TOPIC, topicReply)
.setHeader(KafkaHeaders.PARTITION_ID, new BigInteger(partitionReply).intValue());
return builder.build();
}
映射配置
spring.kafka.producer.properties.spring.json.type.mapping: >
fooResponse: java.util.ArrayList
当我检查生成的消息时 headers
__TypeId__
-> fooResponse
__ContentTypeId__
-> java.lang.Object
- 为什么在
__ContentTypeId__
Spring Kafka中使用java.lang.Object
而不是Foo
完全限定的 class 名称 ? - 如果我有另一个
Bar
类型的列表,我针对某个主题生成了,如何 我可以将spring.json.type.mapping
属性 与barResponse
以及现有的fooResponse
一起正确设置吗
- 任何增强现有消息消费和生成方法的建议或模式都将受到欢迎
这是由于类型擦除。使用消费端的方法来判断类型。
见https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-type-methods
Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type. If present, this will override any of the other techniques discussed above. This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers. Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period.
示例:
public static JavaType returnType(byte[] data, Headers headers) {
return TypeFactory.defaultInstance()
.constructCollectionLikeType(List.class, Foo.class);
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType
我对这个问题的回答中的完整示例