spring-cloud-stream 和 kafka-clients 向后兼容
spring-cloud-stream and kafka-clients backward compatibility
我们有一个当前使用 spring-cloud-stream
Ditmars.RELEASE 的微服务,后者又使用 kafka-clients
0.10.1.1。
我们有兴趣升级到 spring-cloud-stream
2.0.0.RC3,后者又使用 kafka-clients
1.0.0 来解决我们遇到的问题:。
在我们将 只有一个 服务升级到 spring-boot
2.0.0.RELEASE 和 spring-cloud-stream
2.0.0.RC3 后,我们遇到了奇怪的行为:
被升级的服务(以后我称之为service-1)是某个主题的生产者send-enrollment-mail
。此外,我们还有另一个 (service-2),它使用旧版本的 spring-cloud-stream
和 kafka-clients
,它是该主题的消费者。当我们启动 service-1 时,它会使用适当的分区创建此主题,然后我们启动 service-2(使用新版本),最后从 service-1 生成一条消息,我们收到以下异常
org.springframework.messaging.MessageHandlingException: nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:406) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:725) ~[spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at com.watercorp.app.messaging.producer.MessageProducer.send(MessageProducer.java:39) ~[classes/:na]
at com.watercorp.app.service.EnrollmentServiceImpl.sendEnrollmentMail(EnrollmentServiceImpl.java:56) ~[classes/:na]
at com.watercorp.app.service.EnrollmentServiceImpl.enrollUsers(EnrollmentServiceImpl.java:50) ~[classes/:na]
at com.watercorp.app.messaging.handler.UsersEnrollmentMessageHandler.handleMessage(UsersEnrollmentMessageHandler.java:60) ~[classes/:na]
at com.watercorp.app.messaging.handler.UsersEnrollmentMessageHandler.handleMessage(UsersEnrollmentMessageHandler.java:24) ~[classes/:na]
at com.watercorp.app.messaging.consumer.MessageConsumer.lambda$handleMessageWithRetry[=20=](MessageConsumer.java:84) [classes/:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar:na]
at com.watercorp.app.messaging.consumer.MessageConsumer.handleMessageWithRetry(MessageConsumer.java:77) [classes/:na]
at com.watercorp.app.messaging.consumer.MessageConsumer.handleUsersEnrollmentMessage(MessageConsumer.java:65) [classes/:na]
at com.watercorp.app.messaging.consumer.MessageConsumer$$FastClassBySpringCGLIB$b03a437.invoke() [classes/:na]
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) [spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:747) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.validation.beanvalidation.MethodValidationInterceptor.invoke(MethodValidationInterceptor.java:112) [spring-context-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at com.watercorp.app.messaging.consumer.MessageConsumer$$EnhancerBySpringCGLIB$$a3ae110f.handleUsersEnrollmentMessage() [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) [spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87) [spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:70) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_121]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_121]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]
Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_121]
at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_121]
at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:133) ~[spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.processSendResult(KafkaProducerMessageHandler.java:507) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:398) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE]
... 62 common frames omitted
Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
at org.springframework.kafka.core.KafkaTemplate.onCompletion(KafkaTemplate.java:354) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE]
at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.access0(Sender.java:74) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.onComplete(Sender.java:692) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-1.0.0.jar:na]
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-1.0.0.jar:na]
... 1 common frames omitted
Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
并且还打印了以下错误:
ERROR 22159 --- [ad | producer-5] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{123, 34, 109, 101, 115, 115, 97, 103, 101, 84, 121, 112, 101, 34, 58, 34, 69, 78, 82, 79, 76, 76, 7...' to topic send-enrollment-mail and partition 7:
一些注意事项:
1. 消息被发送到 kafka 服务器并存储在那里——当我打开这个主题的控制台消费者时,我看到了 service-1 发送的消息。
2. 该消息没有被订阅该主题的service-2消费。
3. 一段时间后,service-2 开始每毫秒发出以下错误
WARN 11837 --- [ -C-1] o.a.k.c.consumer.internals.Fetcher : Unknown error fetching data for topic-partition send-enrollment-mail-5
4. 当我停止 service-2 然后从 service-1 生成另一条消息时,service-1 没有收到 UnknownServerException 错误。
5. 然后,当我启动 service-2 时,它会为每个至少包含一条从生产者发送的消息的分区吐出
WARN 11837 --- [ -C-1] o.a.k.c.consumer.internals.Fetcher : Unknown error fetching data for topic-partition send-enrollment-mail-5
。
对于此问题,我将不胜感激
您必须使用 headerMode=embeddedHeaders
或 none
才能与旧版 (Ditmars) SCSt 应用程序兼容(取决于这些应用程序使用的 header 个)。 2.0 应用程序的原生 header 模式是 native
- 因为 Kafka 现在支持 headers.
我们有一个当前使用 spring-cloud-stream
Ditmars.RELEASE 的微服务,后者又使用 kafka-clients
0.10.1.1。
我们有兴趣升级到 spring-cloud-stream
2.0.0.RC3,后者又使用 kafka-clients
1.0.0 来解决我们遇到的问题:
在我们将 只有一个 服务升级到 spring-boot
2.0.0.RELEASE 和 spring-cloud-stream
2.0.0.RC3 后,我们遇到了奇怪的行为:
被升级的服务(以后我称之为service-1)是某个主题的生产者send-enrollment-mail
。此外,我们还有另一个 (service-2),它使用旧版本的 spring-cloud-stream
和 kafka-clients
,它是该主题的消费者。当我们启动 service-1 时,它会使用适当的分区创建此主题,然后我们启动 service-2(使用新版本),最后从 service-1 生成一条消息,我们收到以下异常
org.springframework.messaging.MessageHandlingException: nested exception is java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:406) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.cloud.stream.binder.AbstractMessageChannelBinder$SendingHandler.handleMessageInternal(AbstractMessageChannelBinder.java:725) ~[spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at com.watercorp.app.messaging.producer.MessageProducer.send(MessageProducer.java:39) ~[classes/:na] at com.watercorp.app.service.EnrollmentServiceImpl.sendEnrollmentMail(EnrollmentServiceImpl.java:56) ~[classes/:na] at com.watercorp.app.service.EnrollmentServiceImpl.enrollUsers(EnrollmentServiceImpl.java:50) ~[classes/:na] at com.watercorp.app.messaging.handler.UsersEnrollmentMessageHandler.handleMessage(UsersEnrollmentMessageHandler.java:60) ~[classes/:na] at com.watercorp.app.messaging.handler.UsersEnrollmentMessageHandler.handleMessage(UsersEnrollmentMessageHandler.java:24) ~[classes/:na] at com.watercorp.app.messaging.consumer.MessageConsumer.lambda$handleMessageWithRetry[=20=](MessageConsumer.java:84) [classes/:na] at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287) ~[spring-retry-1.2.2.RELEASE.jar:na] at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:164) ~[spring-retry-1.2.2.RELEASE.jar:na] at com.watercorp.app.messaging.consumer.MessageConsumer.handleMessageWithRetry(MessageConsumer.java:77) [classes/:na] at com.watercorp.app.messaging.consumer.MessageConsumer.handleUsersEnrollmentMessage(MessageConsumer.java:65) [classes/:na] at com.watercorp.app.messaging.consumer.MessageConsumer$$FastClassBySpringCGLIB$b03a437.invoke() [classes/:na] at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:204) [spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:747) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.validation.beanvalidation.MethodValidationInterceptor.invoke(MethodValidationInterceptor.java:112) [spring-context-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:185) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.aop.framework.CglibAopProxy$DynamicAdvisedInterceptor.intercept(CglibAopProxy.java:689) [spring-aop-5.0.4.RELEASE.jar:5.0.4.RELEASE] at com.watercorp.app.messaging.consumer.MessageConsumer$$EnhancerBySpringCGLIB$$a3ae110f.handleUsersEnrollmentMessage() [classes/:na] at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_121] at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_121] at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_121] at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_121] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:181) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:114) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55) [spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:164) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.cloud.stream.binding.DispatchingStreamListenerMessageHandler.handleRequestMessage(DispatchingStreamListenerMessageHandler.java:87) [spring-cloud-stream-2.0.0.RC3.jar:2.0.0.RC3] at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:157) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:463) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:407) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) [spring-messaging-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:203) [spring-integration-core-5.0.3.RELEASE.jar:5.0.3.RELEASE] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:70) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:387) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:364) [spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1001) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:981) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:932) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:801) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:689) [spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_121] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_121] at java.lang.Thread.run(Thread.java:745) [na:1.8.0_121] Caused by: java.util.concurrent.ExecutionException: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request at java.util.concurrent.FutureTask.report(FutureTask.java:122) [na:1.8.0_121] at java.util.concurrent.FutureTask.get(FutureTask.java:206) [na:1.8.0_121] at org.springframework.util.concurrent.SettableListenableFuture.get(SettableListenableFuture.java:133) ~[spring-core-5.0.4.RELEASE.jar:5.0.4.RELEASE] at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.processSendResult(KafkaProducerMessageHandler.java:507) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE] at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleRequestMessage(KafkaProducerMessageHandler.java:398) ~[spring-integration-kafka-3.0.3.RELEASE.jar:3.0.3.RELEASE] ... 62 common frames omitted Caused by: org.springframework.kafka.core.KafkaProducerException: Failed to send; nested exception is org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request at org.springframework.kafka.core.KafkaTemplate.onCompletion(KafkaTemplate.java:354) ~[spring-kafka-2.1.4.RELEASE.jar:2.1.4.RELEASE] at org.apache.kafka.clients.producer.internals.ProducerBatch.completeFutureAndFireCallbacks(ProducerBatch.java:204) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.ProducerBatch.done(ProducerBatch.java:187) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:627) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.failBatch(Sender.java:596) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.completeBatch(Sender.java:557) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.handleProduceResponse(Sender.java:481) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.access0(Sender.java:74) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.onComplete(Sender.java:692) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:101) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:482) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:474) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:239) ~[kafka-clients-1.0.0.jar:na] at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:163) ~[kafka-clients-1.0.0.jar:na] ... 1 common frames omitted Caused by: org.apache.kafka.common.errors.UnknownServerException: The server experienced an unexpected error when processing the request
并且还打印了以下错误:
ERROR 22159 --- [ad | producer-5] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{123, 34, 109, 101, 115, 115, 97, 103, 101, 84, 121, 112, 101, 34, 58, 34, 69, 78, 82, 79, 76, 76, 7...' to topic send-enrollment-mail and partition 7:
一些注意事项:
1. 消息被发送到 kafka 服务器并存储在那里——当我打开这个主题的控制台消费者时,我看到了 service-1 发送的消息。
2. 该消息没有被订阅该主题的service-2消费。
3. 一段时间后,service-2 开始每毫秒发出以下错误
WARN 11837 --- [ -C-1] o.a.k.c.consumer.internals.Fetcher : Unknown error fetching data for topic-partition send-enrollment-mail-5
4. 当我停止 service-2 然后从 service-1 生成另一条消息时,service-1 没有收到 UnknownServerException 错误。
5. 然后,当我启动 service-2 时,它会为每个至少包含一条从生产者发送的消息的分区吐出
WARN 11837 --- [ -C-1] o.a.k.c.consumer.internals.Fetcher : Unknown error fetching data for topic-partition send-enrollment-mail-5。
对于此问题,我将不胜感激
您必须使用 headerMode=embeddedHeaders
或 none
才能与旧版 (Ditmars) SCSt 应用程序兼容(取决于这些应用程序使用的 header 个)。 2.0 应用程序的原生 header 模式是 native
- 因为 Kafka 现在支持 headers.