来自 EmbeddedHeadersMessageConverter 的 StringIndexOutOfBoundsException
StringIndexOutOfBoundsException from EmbeddedHeadersMessageConverter
我正在将一个简单的 Kafka 消费者应用程序从现有框架中移出,我觉得 spring-cloud-stream 是一种简单的方法。我使用 Initializr bootstrap 应用程序,它现在使用 Spring-Boot v1.3.3 和 Spring-Cloud-Stream v1.0.0-RC1。该应用程序非常简单,它所要做的就是从 Kafka 中挑选一条消息,反序列化 JSON 编码的 object 并将其传递给我们现有的库。开始时我只使用了 LogSink 示例,因为最终我不会做太多其他事情(只是反序列化并将 object 传递给不同的方法)。
一切都很好:它连接到 Kafka,接收消息并将其(作为 byte[])传递到我的接收器。但是,EmbeddedHeadersMessageConverter 会记录 StringIndexOutOfBoundsException:
2016-04-11 10:06:50.287 ERROR 11464 --- [pool-1-thread-1] fkaMessageChannelBinder$ReceivingHandler : Could not convert message: 7B2267656E65726174696F6E223A3 [...]
java.lang.StringIndexOutOfBoundsException: String index out of range: 2009
at java.lang.String.checkBounds(String.java:373) ~[na:1.8.0_25]
at java.lang.String.<init>(String.java:413) ~[na:1.8.0_25]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:131) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:104) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler.handleRequestMessage(KafkaMessageChannelBinder.java:583) ~[spring-cloud-stream-binder-kafka-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:43) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.7.RELEASE.jar:na]
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_25]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_25]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_25]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
https://github.com/spring-cloud/spring-cloud-stream/issues/209 似乎表明问题是缺少 Kafka headers,是的,没有。但是那里提到的解决方案是添加
spring.cloud.stream.binder.kafka.mode=raw
到我的应用程序配置。不幸的是,这对我不起作用。此外,STS 实际上有 auto-completion 的各个属性,它建议
spring.cloud.stream.kafka.binder.mode=raw
两者(单独或组合)都没有任何区别,异常仍在记录中。
我已经使用 Spring 多年,但这将是我的第一个 Spring-Boot/Spring-Cloud 应用程序。
申请代码如下:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@SpringBootApplication
public class UpdateApplication {
private static Logger logger = LoggerFactory.getLogger(UpdateApplication.class);
public static void main(String[] args) {
SpringApplication.run(UpdateApplication.class, args);
}
@EnableBinding(Sink.class)
public static class UpdateHandler {
@StreamListener(Sink.INPUT)
//@ServiceActivator(inputChannel=Sink.INPUT)
public void loggerSink(Object payload) {
logger.info("Received: " + payload);
}
}
}
我都试过了,@ServiceActivator 和@StreamListener 注释,在这种情况下似乎没有什么区别。
我的 application.properties 看起来像这样:
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.input.destination=updates
spring.cloud.stream.bindings.input.group=update-client
spring.cloud.stream.kafka.binder.brokers=brokerName
spring.cloud.stream.kafka.binder.zkNodes=zookeeperName
spring.cloud.stream.kafka.binder.mode=raw
如能帮助消除此错误,我们将不胜感激。
附带说明:因为我刚开始尝试 spring-cloud-stream,所以我添加了
spring.cloud.stream.bindings.updates.consumer.resetOffsets=true
spring.cloud.stream.bindings.updates.consumer.startOffset=earlist
到配置以避免每次重新启动时都必须发送新消息,但这没有用。
由于 RC
该选项已移至 .consumer.
配置选项。
所以,现在你必须这样做:
spring.cloud.stream.bindings.input.consumer.mode=raw
在 Reference Manual 中查看更多信息。
spring.cloud.stream.bindings.input.consumer.headerMode=raw
适用于版本 1.1。0.RELEASE。
我正在将一个简单的 Kafka 消费者应用程序从现有框架中移出,我觉得 spring-cloud-stream 是一种简单的方法。我使用 Initializr bootstrap 应用程序,它现在使用 Spring-Boot v1.3.3 和 Spring-Cloud-Stream v1.0.0-RC1。该应用程序非常简单,它所要做的就是从 Kafka 中挑选一条消息,反序列化 JSON 编码的 object 并将其传递给我们现有的库。开始时我只使用了 LogSink 示例,因为最终我不会做太多其他事情(只是反序列化并将 object 传递给不同的方法)。
一切都很好:它连接到 Kafka,接收消息并将其(作为 byte[])传递到我的接收器。但是,EmbeddedHeadersMessageConverter 会记录 StringIndexOutOfBoundsException:
2016-04-11 10:06:50.287 ERROR 11464 --- [pool-1-thread-1] fkaMessageChannelBinder$ReceivingHandler : Could not convert message: 7B2267656E65726174696F6E223A3 [...]
java.lang.StringIndexOutOfBoundsException: String index out of range: 2009
at java.lang.String.checkBounds(String.java:373) ~[na:1.8.0_25]
at java.lang.String.<init>(String.java:413) ~[na:1.8.0_25]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.oldExtractHeaders(EmbeddedHeadersMessageConverter.java:131) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.cloud.stream.binder.EmbeddedHeadersMessageConverter.extractHeaders(EmbeddedHeadersMessageConverter.java:104) ~[spring-cloud-stream-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$ReceivingHandler.handleRequestMessage(KafkaMessageChannelBinder.java:583) ~[spring-cloud-stream-binder-kafka-1.0.0.RC1.jar:1.0.0.RC1]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:69) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.channel.FixedSubscriberChannel.send(FixedSubscriberChannel.java:63) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) [spring-messaging-4.2.5.RELEASE.jar:4.2.5.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:105) [spring-integration-core-4.2.5.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:43) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$AutoAcknowledgingChannelForwardingMessageListener.doOnMessage(KafkaMessageDrivenChannelAdapter.java:171) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.AbstractDecodingMessageListener.onMessage(AbstractDecodingMessageListener.java:50) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:221) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at org.springframework.integration.kafka.listener.QueueingMessageListenerInvoker$KafkaMessageDispatchingSubscriber.onNext(QueueingMessageListenerInvoker.java:209) [spring-integration-kafka-1.3.0.RELEASE.jar:na]
at reactor.core.processor.util.RingBufferSubscriberUtils.route(RingBufferSubscriberUtils.java:67) [reactor-core-2.0.7.RELEASE.jar:na]
at reactor.core.processor.RingBufferProcessor$BatchSignalProcessor.run(RingBufferProcessor.java:789) [reactor-core-2.0.7.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_25]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_25]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_25]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_25]
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_25]
https://github.com/spring-cloud/spring-cloud-stream/issues/209 似乎表明问题是缺少 Kafka headers,是的,没有。但是那里提到的解决方案是添加
spring.cloud.stream.binder.kafka.mode=raw
到我的应用程序配置。不幸的是,这对我不起作用。此外,STS 实际上有 auto-completion 的各个属性,它建议
spring.cloud.stream.kafka.binder.mode=raw
两者(单独或组合)都没有任何区别,异常仍在记录中。
我已经使用 Spring 多年,但这将是我的第一个 Spring-Boot/Spring-Cloud 应用程序。
申请代码如下:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.integration.annotation.ServiceActivator;
@SpringBootApplication
public class UpdateApplication {
private static Logger logger = LoggerFactory.getLogger(UpdateApplication.class);
public static void main(String[] args) {
SpringApplication.run(UpdateApplication.class, args);
}
@EnableBinding(Sink.class)
public static class UpdateHandler {
@StreamListener(Sink.INPUT)
//@ServiceActivator(inputChannel=Sink.INPUT)
public void loggerSink(Object payload) {
logger.info("Received: " + payload);
}
}
}
我都试过了,@ServiceActivator 和@StreamListener 注释,在这种情况下似乎没有什么区别。
我的 application.properties 看起来像这样:
spring.cloud.stream.bindings.input.binder=kafka
spring.cloud.stream.bindings.input.destination=updates
spring.cloud.stream.bindings.input.group=update-client
spring.cloud.stream.kafka.binder.brokers=brokerName
spring.cloud.stream.kafka.binder.zkNodes=zookeeperName
spring.cloud.stream.kafka.binder.mode=raw
如能帮助消除此错误,我们将不胜感激。
附带说明:因为我刚开始尝试 spring-cloud-stream,所以我添加了
spring.cloud.stream.bindings.updates.consumer.resetOffsets=true
spring.cloud.stream.bindings.updates.consumer.startOffset=earlist
到配置以避免每次重新启动时都必须发送新消息,但这没有用。
由于 RC
该选项已移至 .consumer.
配置选项。
所以,现在你必须这样做:
spring.cloud.stream.bindings.input.consumer.mode=raw
在 Reference Manual 中查看更多信息。
spring.cloud.stream.bindings.input.consumer.headerMode=raw
适用于版本 1.1。0.RELEASE。