Spring 与 Kafka 集成抛出 ClassCastException
Spring Integration with Kafka throwing ClassCastException
我有一个案例,我想从 Kafka Producer 发布消息,我的消息只是一个 POJO 对象,例如 CreateRequest。所以为了消费我添加了下面的代码
@Bean
@InboundChannelAdapter(channel = "inputchannel", poller=@Poller(fixedDelay = "5000",errorChannel = "errorChannel"))
public KafkaMessageSource consumeMsg() {
ConsumerFactory<String, String> cf = consumerFactory();
KafkaMessageSource kafkaMessageSource = new KafkaMessageSource (cf, new ConsumerProperties("Kafka_Topic"));
kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
kafkaMessageSource.getConsumerProperties().setClientId("clientid");
kafkaMessageSource.setMessageConverter(messageConverter());
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@Bean
public ConsumerFactory consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
return new DefaultKafkaConsumerFactory(props);
}
@Bean
RecordMessageConverter messageConverter(){
return new StringJsonMessageConverter();
}
此外,我已经添加了 setMessageConverter 和 setPayloadType 来获取 CreateResponse 类型的响应,但我仍然收到类型为 KafkaMessageSource 的响应,它正在抛出 java.lang.ClassCastException cannot cast KafkaMessageSource to type CreateResponse
kafkaMessageSource.setMessageConverter(messageConverter());
kafkaMessageSource.setPayloadType(CreateResponse.class);
堆栈跟踪:-
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@6e6d85cd]; nested exception is java.lang.ClassCastException: org.springframework.integration.kafka.inbound.KafkaMessageSource cannot be cast to com.kafka.response.domain.CreateResponse
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:104) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:444) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:428) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:376) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:323) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=13=](ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.19.RELEASE.jar!/:5.2.19.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:320) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_311]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]
Caused by: java.lang.ClassCastException: org.springframework.integration.kafka.inbound.KafkaMessageSource cannot be cast to com.kafka.response.domain.CreateResponse
at com.kafka.response.service.MessageConsumerQueue.consume(MessageConsumerQueue.java:55) ~[classes!/:1.0-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_311]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_311]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_311]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_311]
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.access[=13=]0(MethodReference.java:55) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:387) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:637) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:630) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:614) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:585) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
... 31 more
有人能告诉我哪里错了吗?
这是一个简单的 Spring 启动应用程序,它证明上述配置的意图是正确的:
@SpringBootApplication
public class So71806313Application {
public static void main(String[] args) {
SpringApplication.run(So71806313Application.class, args);
}
@Bean
RecordMessageConverter messageConverter(){
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = "inputChannel", poller=@Poller(fixedDelay = "5000"))
public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
new ConsumerProperties("Kafka_Topic"));
kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
kafkaMessageSource.getConsumerProperties().setClientId("clientid");
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@Bean
QueueChannel inputChannel() {
return new QueueChannel();
}
}
我不确定你的CreateResponse
是什么,所以我就这么简单:
public class CreateResponse {
private String name;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
然后我有一个集成测试来针对嵌入式 Kafka 进行验证:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers", topics = "Kafka_Topic")
class So71806313ApplicationTests {
@Autowired
QueueChannel inputChannel;
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Test
void contextLoads() {
this.kafkaTemplate.send("Kafka_Topic", "{\"name\" : \"foo\"}");
Message<?> receive = this.inputChannel.receive(10_000);
assertThat(receive).isNotNull();
assertThat(receive.getPayload())
.isInstanceOf(CreateResponse.class)
.extracting("name")
.isEqualTo("foo");
}
}
如果我把consumeMsg()
改成这个(注意注释@Bean
):
@Autowired
ConsumerFactory<String, String> consumerFactory;
// @Bean
@InboundChannelAdapter(channel = "inputChannel", poller=@Poller(fixedDelay = "5000"))
public KafkaMessageSource consumeMsg() {
KafkaMessageSource kafkaMessageSource = new KafkaMessageSource(consumerFactory,
new ConsumerProperties("Kafka_Topic"));
kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
kafkaMessageSource.getConsumerProperties().setClientId("clientid");
kafkaMessageSource.setMessageConverter(new StringJsonMessageConverter());
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
然后它确实失败了:
java.lang.AssertionError:
Expecting actual:
org.springframework.integration.kafka.inbound.KafkaMessageSource@305289b3
to be an instance of:
org.springframework.integration.Whosebug.so71806313.CreateResponse
but was instance of:
org.springframework.integration.kafka.inbound.KafkaMessageSource
所以,请修改您这边的配置。
我有一个案例,我想从 Kafka Producer 发布消息,我的消息只是一个 POJO 对象,例如 CreateRequest。所以为了消费我添加了下面的代码
@Bean
@InboundChannelAdapter(channel = "inputchannel", poller=@Poller(fixedDelay = "5000",errorChannel = "errorChannel"))
public KafkaMessageSource consumeMsg() {
ConsumerFactory<String, String> cf = consumerFactory();
KafkaMessageSource kafkaMessageSource = new KafkaMessageSource (cf, new ConsumerProperties("Kafka_Topic"));
kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
kafkaMessageSource.getConsumerProperties().setClientId("clientid");
kafkaMessageSource.setMessageConverter(messageConverter());
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@Bean
public ConsumerFactory consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, this.bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,10);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,"30000");
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, CreateResponse.class);
return new DefaultKafkaConsumerFactory(props);
}
@Bean
RecordMessageConverter messageConverter(){
return new StringJsonMessageConverter();
}
此外,我已经添加了 setMessageConverter 和 setPayloadType 来获取 CreateResponse 类型的响应,但我仍然收到类型为 KafkaMessageSource 的响应,它正在抛出 java.lang.ClassCastException cannot cast KafkaMessageSource to type CreateResponse
kafkaMessageSource.setMessageConverter(messageConverter());
kafkaMessageSource.setPayloadType(CreateResponse.class);
堆栈跟踪:-
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@6e6d85cd]; nested exception is java.lang.ClassCastException: org.springframework.integration.kafka.inbound.KafkaMessageSource cannot be cast to com.kafka.response.domain.CreateResponse
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:192) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:111) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:104) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:134) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:62) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:570) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:520) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:444) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:428) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:376) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$null(AbstractPollingEndpoint.java:323) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute[=13=](ErrorHandlingTaskExecutor.java:57) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50) ~[spring-core-5.2.19.RELEASE.jar!/:5.2.19.RELEASE]
at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:55) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller(AbstractPollingEndpoint.java:320) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:93) [spring-context-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [?:1.8.0_311]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [?:1.8.0_311]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access1(ScheduledThreadPoolExecutor.java:180) [?:1.8.0_311]
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) [?:1.8.0_311]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) [?:1.8.0_311]
at java.lang.Thread.run(Thread.java:748) [?:1.8.0_311]
Caused by: java.lang.ClassCastException: org.springframework.integration.kafka.inbound.KafkaMessageSource cannot be cast to com.kafka.response.domain.CreateResponse
at com.kafka.response.service.MessageConsumerQueue.consume(MessageConsumerQueue.java:55) ~[classes!/:1.0-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_311]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_311]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_311]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_311]
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:129) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:112) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.MethodReference.access[=13=]0(MethodReference.java:55) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:387) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:92) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:117) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:375) ~[spring-expression-5.2.9.RELEASE.jar!/:5.2.9.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:171) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:156) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeExpression(MessagingMethodInvokerHelper.java:637) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.fallbackToInvokeExpression(MessagingMethodInvokerHelper.java:630) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInvokeExceptionAndFallbackToExpressionIfAny(MessagingMethodInvokerHelper.java:614) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.invokeHandlerMethod(MessagingMethodInvokerHelper.java:585) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:477) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.support.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:355) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:108) ~[spring-integration-core-5.3.2.RELEASE.jar!/:5.3.2.RELEASE]
... 31 more
有人能告诉我哪里错了吗?
这是一个简单的 Spring 启动应用程序,它证明上述配置的意图是正确的:
@SpringBootApplication
public class So71806313Application {
public static void main(String[] args) {
SpringApplication.run(So71806313Application.class, args);
}
@Bean
RecordMessageConverter messageConverter(){
return new StringJsonMessageConverter();
}
@Bean
@InboundChannelAdapter(channel = "inputChannel", poller=@Poller(fixedDelay = "5000"))
public KafkaMessageSource<String, String> consumeMsg(ConsumerFactory<String, String> consumerFactory,
RecordMessageConverter messageConverter) {
KafkaMessageSource<String, String> kafkaMessageSource = new KafkaMessageSource<>(consumerFactory,
new ConsumerProperties("Kafka_Topic"));
kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
kafkaMessageSource.getConsumerProperties().setClientId("clientid");
kafkaMessageSource.setMessageConverter(messageConverter);
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
@Bean
QueueChannel inputChannel() {
return new QueueChannel();
}
}
我不确定你的CreateResponse
是什么,所以我就这么简单:
public class CreateResponse {
private String name;
public void setName(String name) {
this.name = name;
}
public String getName() {
return name;
}
}
然后我有一个集成测试来针对嵌入式 Kafka 进行验证:
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(bootstrapServersProperty = "spring.kafka.bootstrap-servers", topics = "Kafka_Topic")
class So71806313ApplicationTests {
@Autowired
QueueChannel inputChannel;
@Autowired
KafkaTemplate<String, String> kafkaTemplate;
@Test
void contextLoads() {
this.kafkaTemplate.send("Kafka_Topic", "{\"name\" : \"foo\"}");
Message<?> receive = this.inputChannel.receive(10_000);
assertThat(receive).isNotNull();
assertThat(receive.getPayload())
.isInstanceOf(CreateResponse.class)
.extracting("name")
.isEqualTo("foo");
}
}
如果我把consumeMsg()
改成这个(注意注释@Bean
):
@Autowired
ConsumerFactory<String, String> consumerFactory;
// @Bean @InboundChannelAdapter(channel = "inputChannel", poller=@Poller(fixedDelay = "5000")) public KafkaMessageSource consumeMsg() {
KafkaMessageSource kafkaMessageSource = new KafkaMessageSource(consumerFactory,
new ConsumerProperties("Kafka_Topic"));
kafkaMessageSource.getConsumerProperties().setGroupId("group_id");
kafkaMessageSource.getConsumerProperties().setClientId("clientid");
kafkaMessageSource.setMessageConverter(new StringJsonMessageConverter());
kafkaMessageSource.setPayloadType(CreateResponse.class);
return kafkaMessageSource;
}
然后它确实失败了:
java.lang.AssertionError:
Expecting actual:
org.springframework.integration.kafka.inbound.KafkaMessageSource@305289b3
to be an instance of:
org.springframework.integration.Whosebug.so71806313.CreateResponse
but was instance of:
org.springframework.integration.kafka.inbound.KafkaMessageSource
所以,请修改您这边的配置。