spring-kafka 序列化期间发生 MessageConversionException?

MessageConversionException during serialization in spring-kafka?

为了表达我的意思,我做了一个简单的项目。

依赖关系:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
        <exclusions>
            <exclusion>
                <groupId>org.junit.vintage</groupId>
                <artifactId>junit-vintage-engine</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>

这就是代码中的所有kafka配置:

@Configuration
public class KafkaSerializationConfig implements SmartInitializingSingleton {

    private final KafkaProperties kafkaProperties;

    public KafkaSerializationConfig(KafkaProperties kafkaProperties) {
        this.kafkaProperties = kafkaProperties;
    }

    @Override
    public void afterSingletonsInstantiated() {
        AdminClient client = AdminClient.create(kafkaProperties.buildAdminProperties());

        List<NewTopic> newTopics = new ArrayList<>();
        newTopics.add(new NewTopic("demo", 2, (short) 2));
        client.createTopics(newTopics);

        client.close();
    }

    private static ObjectMapper mapper = new ObjectMapper()
        .registerModules(new Jdk8Module(), new JavaTimeModule());

    public static class KafkaSerializer extends JsonSerializer<Object> {
        public KafkaSerializer() {
            super(mapper);
        }
    }

    public static class KafkaDeserializer extends JsonDeserializer<Object> {
        public KafkaDeserializer() {
            super(mapper);
        }
    }

}

application.yml 文件:

    spring:
  kafka:
    bootstrap-servers: localhost:9092
    consumer:
      group-id: sample-cutter
      auto-offset-reset: earliest
      properties.spring.json.trusted.packages: "*"
      key-deserializer: org.apache.kafka.common.serialization.LongDeserializer
      value-deserializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.LongSerializer
      value-serializer: com.example.springkafkasimpledemo.config.KafkaSerializationConfig.KafkaSerializer

所以这个应用程序可以正常运行了。但是让我们尝试使用它。假设我们有两个服务:服务器和客户端。所以我们有两个 类 具有相同的字段:

客户的 DTO:

public class GettingUser {

    private String firstName;
    private String lastName;

    public GettingUser() {
    }

    public GettingUser(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

服务器的 DTO:

public class SendingUser {

    private String firstName;
    private String lastName;

    public SendingUser() {
    }

    public SendingUser(String firstName, String lastName) {
        this.firstName = firstName;
        this.lastName = lastName;
    }

    public String getFirstName() {
        return firstName;
    }

    public void setFirstName(String firstName) {
        this.firstName = firstName;
    }

    public String getLastName() {
        return lastName;
    }

    public void setLastName(String lastName) {
        this.lastName = lastName;
    }
}

最后让我们尝试使用它:

@RestController
public class SpringSerializationDemoController {

    private final KafkaTemplate<Long, SendingUser> template;
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public SpringSerializationDemoController(KafkaTemplate<Long, SendingUser> template) {
        this.template = template;
    }

    @GetMapping("/start-demo")
    public String startDemo() {
        SendingUser user = new SendingUser("John", "Smith");
        template.send("demo", user);
        return "OK";
    }

    @KafkaListener(topics = "demo")
    public void consumeCutSample(GettingUser user) {
        logger.info("Got user: {}", user);
    }
}

我可以看到显示我的应用无法将 SendingUser 转换为 GettingUser 的异常。 异常:

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener method could not be invoked with the incoming message
Endpoint handler details:
Method [public void com.example.springkafkasimpledemo.controller.SpringSerializationDemoController.consumeCutSample(com.example.springkafkasimpledemo.domain.GettingUser)]
Bean [com.example.springkafkasimpledemo.controller.SpringSerializationDemoController@53601a9c]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2037) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeErrorHandler(KafkaMessageListenerContainer.java:2025) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1924) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1851) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1748) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1472) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1135) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1038) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
    at java.base/java.lang.Thread.run(Thread.java:835) ~[na:na]
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot handle message; nested exception is org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}], failedMessage=GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:340) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:86) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.adapter.RecordMessagingMessageListenerAdapter.onMessage(RecordMessagingMessageListenerAdapter.java:51) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1992) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1974) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1911) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    ... 8 common frames omitted
Caused by: org.springframework.messaging.converter.MessageConversionException: Cannot convert from [com.example.springkafkasimpledemo.domain.SendingUser] to [com.example.springkafkasimpledemo.domain.GettingUser] for GenericMessage [payload=com.example.springkafkasimpledemo.domain.SendingUser@36f41365, headers={kafka_offset=0, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@74a44b4c, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=1, kafka_receivedTopic=demo, kafka_receivedTimestamp=1604226244969, kafka_groupId=sample-cutter}]
    at org.springframework.messaging.handler.annotation.support.PayloadMethodArgumentResolver.resolveArgument(PayloadMethodArgumentResolver.java:145) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor$KafkaNullAwarePayloadArgumentResolver.resolveArgument(KafkaListenerAnnotationBeanPostProcessor.java:910) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolverComposite.resolveArgument(HandlerMethodArgumentResolverComposite.java:117) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.getMethodArgumentValues(InvocableHandlerMethod.java:148) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:116) ~[spring-messaging-5.2.10.RELEASE.jar:5.2.10.RELEASE]
    at org.springframework.kafka.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:48) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    at org.springframework.kafka.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:329) ~[spring-kafka-2.5.7.RELEASE.jar:2.5.7.RELEASE]
    ... 13 common frames omitted

但它为什么要这样做呢?我的对象映射器不使用类型。我可以在我的控制台中看到它:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --from-beginning --topic demo
{"firstName":"John","lastName":"Smith"}

无法理解为什么 spring-kafka 尝试将 SendingUser 转换为 GettingUser。

如果您正在使用 Spring JSON(反)序列化器(两侧),您需要配置类型映射 - 请参阅 https://docs.spring.io/spring-kafka/docs/2.6.2/reference/html/#json-serde

将源 class 映射到发送端的令牌,并将令牌映射到接收端所需的 class。

或者您可以在反序列化器中禁用 headers 并配置默认类型。参见 setUseTypeHeaders()

编辑

对于更复杂的类型,例如仿制药,你应该 configure the deserializer to call a method that returns a JavaType.

例如List<Foo>:

public static JavaType returnType(String topic, byte[] data, Headers headers) {
    return TypeFactory.defaultInstance()
            .constructCollectionLikeType(List.class, Foo.class);
}
spring.kafka.consumer.properties.spring.json.value.type.method=com.example.demo.Application.returnType

为您的 Event<SourceFile> 使用 TypeReferenceconstructParametricType。参见 Spring Kafka Consumer consumed message as LinkedHashMap hence automatically converting BigDecimal to double