在 Spring 云数据流中使用来自 kafka 源的数据时出现异常
Exception while consuming data from kafka source in Spring Cloud Data Flow
当尝试将 kafka 主题作为源时,我收到 kafka 异常
这是我创建流的方式
stream create --definition ":myDestination > log" --name ingest_from_broker
stream deploy ingest_from_broker --properties "spring.cloud.stream.bindings.input.consumer.headerMode=raw"
当我 运行 时,我在日志文件中收到异常
java.lang.StringIndexOutOfBoundsException: String index out of range: 113
at java.lang.String.checkBounds(String.java:385) ~[na:1.8.0_66]
at java.lang.String.<init>(String.java:425) ~[na:1.8.0_66]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:132) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.extractMessageValues(AbstractBinder.java:153) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler.handleRequestMessage(KafkaMessageChannelBinder.java:698) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.7.RELEASE.jar!/:4.2.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) [spring-messaging-4.2.7.RELEASE.jar!/:4.2.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) [spring-messaging-4.2.7.RELEASE.jar!/:4.2.7.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:43) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:516) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) [spring-retry-1.1.3.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:157) [spring-retry-1.1.3.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.onMessage(KafkaMessageChannelBinder.java:513) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.8.RELEASE.jar!/:na]
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.8.RELEASE.jar!/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_66]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_66]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
我看到一些帖子提到设置此 属性 - 'spring.cloud.stream.bindings.input.consumer.headerMode=raw' 会有所帮助,但不知何故这不起作用。
试试看
stream create --definition ":myDestination > log --spring.cloud.stream.bindings.input.consumer.headerMode=raw" --name ingest_from_broker
或
stream deploy ingest_from_broker --properties "apps.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw"
即属性 必须在 Stream 定义期间直接在应用程序上指定,或者如果在部署时提供,则必须指明它适用于哪个应用程序。
当尝试将 kafka 主题作为源时,我收到 kafka 异常
这是我创建流的方式
stream create --definition ":myDestination > log" --name ingest_from_broker
stream deploy ingest_from_broker --properties "spring.cloud.stream.bindings.input.consumer.headerMode=raw"
当我 运行 时,我在日志文件中收到异常
java.lang.StringIndexOutOfBoundsException: String index out of range: 113
at java.lang.String.checkBounds(String.java:385) ~[na:1.8.0_66]
at java.lang.String.<init>(String.java:425) ~[na:1.8.0_66]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:132) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:105) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.AbstractBinder.extractMessageValues(AbstractBinder.java:153) ~[spring-cloud-stream-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler.handleRequestMessage(KafkaMessageChannelBinder.java:698) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.7.RELEASE.jar!/:4.2.7.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) [spring-messaging-4.2.7.RELEASE.jar!/:4.2.7.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) [spring-messaging-4.2.7.RELEASE.jar!/:4.2.7.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) [spring-integration-core-4.2.4.RELEASE.jar!/:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:43) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.doWithRetry(KafkaMessageChannelBinder.java:516) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:276) [spring-retry-1.1.3.RELEASE.jar!/:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:157) [spring-retry-1.1.3.RELEASE.jar!/:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.onMessage(KafkaMessageChannelBinder.java:513) [spring-cloud-stream-binder-kafka-1.0.2.RELEASE.jar!/:1.0.2.RELEASE]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) [spring-integration-kafka-1.3.1.RELEASE.jar!/:na]
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.8.RELEASE.jar!/:na]
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.8.RELEASE.jar!/:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_66]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_66]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_66]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_66]
我看到一些帖子提到设置此 属性 - 'spring.cloud.stream.bindings.input.consumer.headerMode=raw' 会有所帮助,但不知何故这不起作用。
试试看
stream create --definition ":myDestination > log --spring.cloud.stream.bindings.input.consumer.headerMode=raw" --name ingest_from_broker
或
stream deploy ingest_from_broker --properties "apps.log.spring.cloud.stream.bindings.input.consumer.headerMode=raw"
即属性 必须在 Stream 定义期间直接在应用程序上指定,或者如果在部署时提供,则必须指明它适用于哪个应用程序。