使用 Kafka 向 Dlq Spring 云流发送消息时出错
Error sending message to Dlq Spring cloud stream with Kafka
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Edgware.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
@Component
public class QueueConsumer {
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);
/** The processor. */
@Autowired
private IMessageProcessor processor;
/**
* Consume.
*
* @param message the message
*/
@StreamListener(value = OrderEventSink.ORDER_EVENT)
public void consume(Message<String> message) {
try {
processor.process(message);
} catch (MessageProcessingFailedException e) {
LOG.error("Error Code "+ e.getCode().getCode() + " " + e.getCode().getDescription(), e);
throw e;
}
}
}
- 我正在使用 spring 云流从 kafka 主题读取消息。正在从 queue 读取消息,并对其进行处理,如果在处理过程中失败,则消息应该配置为错误 queue,但会出现以下错误。
- 从消息中提取 headers 时出现异常,解决此问题的最佳方法是什么?
- kafka版本为1.0,kafka客户端为2.11-1.0
application.properties
spring.cloud.stream.bindings.orderEvent.destination=orderEvents
spring.cloud.stream.bindings.orderEvent.content-
type=application/json
spring.cloud.stream.bindings.orderEvent.group=orderEvents-consumer
spring.cloud.stream.bindings.orderEvent.consumer.back-off-
multiplier=5
spring.cloud.stream.bindings.orderEvent.consumer.back-off-initial-
interval=60000
spring.cloud.stream.bindings.orderEvent.consumer.max-attempts=1
spring.cloud.stream.bindings.orderEvent.consumer.headerMode=raw
spring.cloud.stream.bindings.kafka.binder.brokers=localhost
spring.cloud.stream.bindings.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.bindings.kafka.binder.zkNodes=localhost
spring.cloud.stream.bindings.kafka.binder.defaultZkPort=2181
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
enableDlq=true
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqName=dead-queue
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqProducerProperties.configuration.key.
serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqProducerProperties.configuration.value.
serializer=org.apache.kafka.common.serialization.StringSerializer
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'scm-orderEvents.scm-orderEvents-consumer.errors'; nested exception is java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:207) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:191) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:63) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access00(KafkaMessageListenerContainer.java:245) [spring-kafka-1.1.6.RELEASE.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.1.6.RELEASE.jar:na]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162]
Caused by: java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleMessage(KafkaMessageChannelBinder.java:380) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
... 16 common frames omitted
Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297
at java.lang.String.checkBounds(String.java:385) ~[na:1.8.0_162]
at java.lang.String.(String.java:425) ~[na:1.8.0_162]
at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.oldExtractHeaders(EmbeddedHeaderUtils.java:154) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:115) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:107) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleMessage(KafkaMessageChannelBinder.java:368) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE]
... 20 common frames omitted
这是kafka binder 1.3.2.RELEASE中的一个bug;它是 fixed on master(1.3.3.BUILD-SNAPSHOT)。
顺便说一句,最好的解决方案是使用 Spring Boot 2.0.1 和 SCSt Emlhurst.RELEASE(由 cloud FINCHLEY 引入 - 目前处于 M9 里程碑)。
这些版本原生支持 Kafka 1.0。
您可能还会成功迁移到与 SCSt 1.3.x 兼容的 kafka11 活页夹工件 (1.3.0),如 discussed on the Wiki.
pom.xml
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>Edgware.SR3</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
@Component
public class QueueConsumer {
/** The Constant LOG. */
public static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);
/** The processor. */
@Autowired
private IMessageProcessor processor;
/**
* Consume.
*
* @param message the message
*/
@StreamListener(value = OrderEventSink.ORDER_EVENT)
public void consume(Message<String> message) {
try {
processor.process(message);
} catch (MessageProcessingFailedException e) {
LOG.error("Error Code "+ e.getCode().getCode() + " " + e.getCode().getDescription(), e);
throw e;
}
}
}
- 我正在使用 spring 云流从 kafka 主题读取消息。正在从 queue 读取消息,并对其进行处理,如果在处理过程中失败,则消息应该配置为错误 queue,但会出现以下错误。
- 从消息中提取 headers 时出现异常,解决此问题的最佳方法是什么?
- kafka版本为1.0,kafka客户端为2.11-1.0
application.properties
spring.cloud.stream.bindings.orderEvent.destination=orderEvents
spring.cloud.stream.bindings.orderEvent.content-
type=application/json
spring.cloud.stream.bindings.orderEvent.group=orderEvents-consumer
spring.cloud.stream.bindings.orderEvent.consumer.back-off-
multiplier=5
spring.cloud.stream.bindings.orderEvent.consumer.back-off-initial-
interval=60000
spring.cloud.stream.bindings.orderEvent.consumer.max-attempts=1
spring.cloud.stream.bindings.orderEvent.consumer.headerMode=raw
spring.cloud.stream.bindings.kafka.binder.brokers=localhost
spring.cloud.stream.bindings.kafka.binder.defaultBrokerPort=9092
spring.cloud.stream.bindings.kafka.binder.zkNodes=localhost
spring.cloud.stream.bindings.kafka.binder.defaultZkPort=2181
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
enableDlq=true
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqName=dead-queue
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqProducerProperties.configuration.key.
serializer=org.apache.kafka.common.serialization.StringSerializer
spring.cloud.stream.kafka.bindings.orderEvent.consumer.
dlqProducerProperties.configuration.value.
serializer=org.apache.kafka.common.serialization.StringSerializer
org.springframework.messaging.MessageDeliveryException: failed to send Message to channel 'scm-orderEvents.scm-orderEvents-consumer.errors'; nested exception is java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105) ~[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] at org.springframework.integration.endpoint.MessageProducerSupport.sendErrorMessageIfNecessary(MessageProducerSupport.java:207) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:191) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:63) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352) ~[spring-integration-kafka-2.1.2.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:794) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.access00(KafkaMessageListenerContainer.java:245) [spring-kafka-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer$ListenerInvoker.run(KafkaMessageListenerContainer.java:1031) [spring-kafka-1.1.6.RELEASE.jar:na] at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_162] at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_162] at java.lang.Thread.run(Thread.java:748) [na:1.8.0_162] Caused by: java.lang.RuntimeException: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleMessage(KafkaMessageChannelBinder.java:380) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:185) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) ~[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] ... 16 common frames omitted Caused by: java.lang.StringIndexOutOfBoundsException: String index out of range: 4297 at java.lang.String.checkBounds(String.java:385) ~[na:1.8.0_162] at java.lang.String.(String.java:425) ~[na:1.8.0_162] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.oldExtractHeaders(EmbeddedHeaderUtils.java:154) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:115) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:107) ~[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleMessage(KafkaMessageChannelBinder.java:368) ~[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] ... 20 common frames omitted
这是kafka binder 1.3.2.RELEASE中的一个bug;它是 fixed on master(1.3.3.BUILD-SNAPSHOT)。
顺便说一句,最好的解决方案是使用 Spring Boot 2.0.1 和 SCSt Emlhurst.RELEASE(由 cloud FINCHLEY 引入 - 目前处于 M9 里程碑)。
这些版本原生支持 Kafka 1.0。
您可能还会成功迁移到与 SCSt 1.3.x 兼容的 kafka11 活页夹工件 (1.3.0),如 discussed on the Wiki.