Spring Cloud Streams 应用程序抛出嵌套异常是 java.lang.ClassCastException 领域模型 class

Spring Cloud Streams application throwing nested exception is java.lang.ClassCastException for a domain model class

您好,我正在试用最新的 Spring Kafka 云流框架。但是,对于 String 和 Double 它工作正常但是当我尝试发送 Java POJO class 时,它会抛出以下异常。

我尝试了各种序列化和反序列化配置,但似乎没有任何效果。 我能够生成来自供应商的消息,如 json 但由于错误,消费者无法处理它。

如果您对此问题有任何建议,我们将不胜感激。 谢谢

org.springframework.kafka.listener.ListenerExecutionFailedException: Listener failed; nested exception is org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9), failedMessage=GenericMessage [payload=byte[72], headers={kafka_offset=804, scst_nativeHeadersPresent=true, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@24ee2e4c, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedPartitionId=0, contentType=application/json, kafka_receivedTopic=random-topic-2, kafka_receivedTimestamp=1624778145945, kafka_groupId=mypublish-reader-group, target-protocol=kafka}]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.decorateException(KafkaMessageListenerContainer.java:2371) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2339) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2300) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2214) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2139) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2021) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1703) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1272) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1264) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1161) ~[spring-kafka-2.7.3.jar!/:2.7.3]
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515) ~[na:na]
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]
        at java.base/java.lang.Thread.run(Thread.java:829) ~[na:na]
Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder@25625f0f]; nested exception is java.lang.ClassCastException: class [B cannot be cast to class com.msg.bt.Grade ([B is in module java.base of loader 'bootstrap'; com.msg.bt.Grade is in unnamed module of loader org.springframework.boot.loader.LaunchedURLClassLoader @475530b9)
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.8.jar!/:5.3.8]
        at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.1.jar!/:5.5.1]
        at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:398) ~[spring-integration-kafka-5.5.1.jar!/:5.5.1]

下面是POM.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.5.2</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.sample</groupId>
    <artifactId>message</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>message</name>
    <description>Spring messaging sample</description>
    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>2020.0.3</spring-cloud.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-binder-kafka-streams</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream-test-support</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

下面是申请

@Bean
public Supplier<Grade> random() {
    return () -> {
        System.out.println("=========In Supplier --------------------");
        return new Grade(UUID.randomUUID().toString(),Math.random()*100);
    };
}

@Bean
public Function<Grade, String> publish() {
    System.out.println("@@@@@@@@@@@@@@In Function user--------------------");
    return user -> "User:: " + user.toString();
}

@Bean
public Consumer<String> log() {
    System.out.println("++++++++++++In log --------------------");
    return s -> {
        System.out.println("Received>> " + s);
    };
}

application.yml

spring:
  application:
    name: kafka-messaging
  json:
    value:
      default:
        type: com.msg.Grade
    trusted:
      packages: com.msg

  cloud:
    function:
      definition: random;publish;log
    stream:
      bindings:
        random-out-0:
          destination: random-topic-2

        publish-in-0:
          destination: random-topic-2
          group: mypublish-reader-group
        publish-out-0:
          destination: message-topic-2

        log-in-0:
          destination: message-topic-2
          group: message-reader-group

      kafka:
        binder:
          brokers: localhost:9092

终于找到问题所在了。 感觉非常愚蠢,因为问题是模型 class 中没有默认的无参数构造函数。添加无参数构造函数后,一切都按预期工作。