映射类型和列表

Mapping Types and List

在我的消费者中,我在处理消息后回复了一些主题

@KafkaListener(...)
@SendTo
public Message<List<Foo>> consumeOdr(ConsumerRecord<String, String> message,
        @Header(required = false, value = KafkaHeaders.RECEIVED_MESSAGE_KEY) String key,
        @Header(required = false, value = KafkaHeaders.REPLY_TOPIC) byte[] topicReply,
        @Header(required = false, value = KafkaHeaders.CORRELATION_ID) byte[] correlationId,
        @Header(required = false, value = KafkaHeaders.REPLY_PARTITION) byte[] partitionReply) {

    ....

    MessageBuilder<List<Foo>> builder = MessageBuilder.withPayload(fooList);
    builder
            .setHeader(KafkaHeaders.MESSAGE_KEY, key)
                .setHeader(KafkaHeaders.CORRELATION_ID, correlationId)
                .setHeader(KafkaHeaders.TOPIC, topicReply)
                .setHeader(KafkaHeaders.PARTITION_ID, new BigInteger(partitionReply).intValue());
    return builder.build();
}

映射配置

spring.kafka.producer.properties.spring.json.type.mapping: >
  fooResponse: java.util.ArrayList

当我检查生成的消息时 headers

__TypeId__ -> fooResponse
__ContentTypeId__ -> java.lang.Object

  1. 为什么在__ContentTypeId__ Spring Kafka中使用java.lang.Object 而不是 Foo 完全限定的 class 名称 ?
  2. 如果我有另一个 Bar 类型的列表,我针对某个主题生成了,如何 我可以将 spring.json.type.mapping 属性 与 barResponse 以及现有的 fooResponse
  3. 一起正确设置吗
  4. 任何增强现有消息消费和生成方法的建议或模式都将受到欢迎

这是由于类型擦除。使用消费端的方法来判断类型。

https://docs.spring.io/spring-kafka/docs/current/reference/html/#serdes-type-methods

Starting with version 2.5, you can now configure the deserializer, via properties, to invoke a method to determine the target type. If present, this will override any of the other techniques discussed above. This can be useful if the data is published by an application that does not use the Spring serializer and you need to deserialize to different types depending on the data, or other headers. Set these properties to the method name - a fully qualified class name followed by the method name, separated by a period.

示例:

public static JavaType returnType(byte[] data, Headers headers) {
    return TypeFactory.defaultInstance()
            .constructCollectionLikeType(List.class, Foo.class);
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.So63058608Application.returnType

我对这个问题的回答中的完整示例