Spring 与 Kafka 集成抛出 ClassCastException

Spring Integration with Kafka throwing ClassCastException

我有一个案例,我想从 Kafka Producer 发布消息,我的消息只是一个 POJO 对象,例如 CreateRequest。所以为了消费我添加了下面的代码

@Bean
@InboundChannelAdapter(channel = "inputchannel", poller=@Poller(fixedDelay = "5000",errorChannel = "errorChannel"))
    public KafkaMessageSource consumeMsg() {

        ConsumerFactory<String, String> cf = consumerFactory();
        KafkaMessageSource kafkaMessageSource = new KafkaMessageSource (cf, new ConsumerProperties("Kafka_Topic"));
        kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
        kafkaMessageSource.getConsumerProperties().setClientId("clientid");
        kafkaMessageSource.setMessageConverter(messageConverter());
        kafkaMessageSource.setPayloadType(CreateResponse.class);
        return kafkaMessageSource;
    }

    @Bean
    public ConsumerFactory consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
        return new DefaultKafkaConsumerFactory(props);
    }

  @Bean
    RecordMessageConverter messageConverter(){
        return new StringJsonMessageConverter();
    }

此外,我已经添加了 setMessageConverter 和 setPayloadType 来获取 CreateResponse 类型的响应,但我仍然收到类型为 KafkaMessageSource 的响应,它正在抛出 java.lang.ClassCastException cannot cast KafkaMessageSource to type CreateResponse

kafkaMessageSource.setMessageConverter(messageConverter());
kafkaMessageSource.setPayloadType(CreateResponse.class);

堆栈跟踪:-

org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@6e6d85cd]; nested exception is java.lang.ClassCastException: org.springframework.integration.kafka.inbound.KafkaMessageSource cannot be cast to com.kafka.response.domain.CreateResponse
        at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:104) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:444) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:428) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:376) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:323) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=13=](ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.19.RELEASE.jar!/:5.2.19.RELEASE]
        at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:320) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311]
        at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_311]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_311]
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
        at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]
Caused by: java.lang.ClassCastException: org.springframework.integration.kafka.inbound.KafkaMessageSource cannot be cast to com.kafka.response.domain.CreateResponse
        at com.kafka.response.service.MessageConsumerQueue.consume(MessageConsumerQueue.java:55) ~[classes!/:1.0-SNAPSHOT]
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_311]
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_311]
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_311]
        at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_311]
        at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference.access[=13=]0(MethodReference.java:55) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:387) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:637) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:630) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:614) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:585) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
        ... 31 more

有人能告诉我哪里错了吗?

这是一个简单的 Spring 启动应用程序,它证明上述配置的意图是正确的:

@SpringBootApplication
public class So71806313Application {

    public static void main(String[] args) {
        SpringApplication.run(So71806313Application.class, args);
    }

    @Bean
    RecordMessageConverter messageConverter(){
        return new StringJsonMessageConverter();
    }

    @Bean
    @InboundChannelAdapter(channel = "inputChannel", poller=@Poller(fixedDelay = "5000"))
    public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
            RecordMessageConverter messageConverter) {

        KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
                new ConsumerProperties("Kafka_Topic"));
        kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
        kafkaMessageSource.getConsumerProperties().setClientId("clientid");
        kafkaMessageSource.setMessageConverter(messageConverter);
        kafkaMessageSource.setPayloadType(CreateResponse.class);
        return kafkaMessageSource;
    }

    @Bean
    QueueChannel inputChannel() {
        return new QueueChannel();
    }

}

我不确定你的CreateResponse是什么,所以我就这么简单:

public class CreateResponse {

    private String name;

    public void setName(String name) {
        this.name = name;
    }

    public String getName() {
        return name;
    }

}

然后我有一个集成测试来针对嵌入式 Kafka 进行验证:

@SpringBootTest
@DirtiesContext
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers", topics = "Kafka_Topic")
class So71806313ApplicationTests {

    @Autowired
    QueueChannel inputChannel;

    @Autowired
    KafkaTemplate<String, String> kafkaTemplate;

    @Test
    void contextLoads() {
        this.kafkaTemplate.send("Kafka_Topic", "{\"name\" : \"foo\"}");

        Message<?> receive = this.inputChannel.receive(10_000);
        assertThat(receive).isNotNull();
        assertThat(receive.getPayload())
                .isInstanceOf(CreateResponse.class)
                .extracting("name")
                .isEqualTo("foo");
    }

}

如果我把consumeMsg()改成这个(注意注释@Bean):

@Autowired
ConsumerFactory<String, String> consumerFactory;

// @Bean @InboundChannelAdapter(channel = "inputChannel", poller=@Poller(fixedDelay = "5000")) public KafkaMessageSource consumeMsg() {

    KafkaMessageSource kafkaMessageSource = new KafkaMessageSource(consumerFactory,
            new ConsumerProperties("Kafka_Topic"));
    kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
    kafkaMessageSource.getConsumerProperties().setClientId("clientid");
    kafkaMessageSource.setMessageConverter(new StringJsonMessageConverter());
    kafkaMessageSource.setPayloadType(CreateResponse.class);
    return kafkaMessageSource;
}

然后它确实失败了:

java.lang.AssertionError: 
Expecting actual:
  org.springframework.integration.kafka.inbound.KafkaMessageSource@305289b3
to be an instance of:
  org.springframework.integration.Whosebug.so71806313.CreateResponse
but was instance of:
  org.springframework.integration.kafka.inbound.KafkaMessageSource

所以,请修改您这边的配置。