当异常不可重试时向 DLQ 发送消息
Sending message to DLQ when Exception is not Retriable
我正在使用 Spring Cloud Streams 和默认 Spring 重试机制,仅使用属性。
我目前有一个应用程序可以在我的侦听器中抛出 MyCustomException 时正确重试(最多重试次数)。然后,它被发送到 DLQ - 这件作品在下面的配置下按预期工作。
问题是:我想将另一个 "non-retriable exceptions" 也提交给 DLQ(例如,当我们抛出 AnotherCustomException 甚至 RuntimeException 时)。
现在他们没有被重试(这是正确的并且按预期工作)但我的问题是他们没有被重定向到 myDeadLetterQueue。
属性文件中使用的配置如下。
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq = true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name = myDeadLetterQueue
spring.cloud.stream.bindings.input.consumer.defaultRetryable = false
spring.cloud.stream.bindings.input.consumer.retryableExceptions.com.packagename.common.kafka.exception.MyCustomException = true
此场景是否缺少任何其他配置?
but the issue for my case is that they are not being redirected to myDeadLetterQueue.
这没有意义;这就是它的工作原理;不可重发的异常被直接发送到 DLQ 主题而不被重试;我刚测试过没有任何问题:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So58390786Application {
public static void main(String[] args) {
SpringApplication.run(So58390786Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
if ("state".equals(in)) {
throw new IllegalStateException("ise");
}
throw new IllegalArgumentException("iae");
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("so58390786", "state".getBytes());
template.send("so58390786", "arg".getBytes());
template.send("so58390786", "state".getBytes());
};
}
@KafkaListener(id = "so58390786slq", topics = "so58390786dlq")
public void listenDlq(String in) {
System.out.println("dlq:" + in);
}
}
和
spring.cloud.stream.bindings.input.destination=so58390786
spring.cloud.stream.bindings.input.group=so58390786
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name=so58390786dlq
spring.cloud.stream.bindings.input.consumer.defaultRetryable = false
spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=true
logging.level.root=warn
spring.kafka.consumer.auto-offset-reset=earliest
和
state
state
state
2019-10-15 09:22:46.765 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalStateException: ise, failedMessage=GenericMessage [payload=byte[5], headers={kafka_offset=12, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763737, contentType=application/json, kafka_groupId=so58390786}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: ise
at com.example.demo.So58390786Application.listen(So58390786Application.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 33 more
arg
2019-10-15 09:22:46.776 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalArgumentException: iae, failedMessage=GenericMessage [payload=byte[3], headers={kafka_offset=13, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763741, contentType=application/json, kafka_groupId=so58390786}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: iae
at com.example.demo.So58390786Application.listen(So58390786Application.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 33 more
state
dlq:state
dlq:arg
state
state
2019-10-15 09:22:49.784 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalStateException: ise, failedMessage=GenericMessage [payload=byte[5], headers={kafka_offset=14, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763741, contentType=application/json, kafka_groupId=so58390786}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: ise
at com.example.demo.So58390786Application.listen(So58390786Application.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 33 more
dlq:state
编辑
打开调试日志记录以重试以查看您看到的内容;
logging.level.org.springframework.retry=debug
我明白了...
2019-10-15 09:35:38.856 DEBUG 60391 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2019-10-15 09:35:38.967 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=0
state
2019-10-15 09:35:38.973 DEBUG 60391 --- [container-0-C-1] o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 1000
2019-10-15 09:35:39.977 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
2019-10-15 09:35:39.978 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=1
state
2019-10-15 09:35:39.979 DEBUG 60391 --- [container-0-C-1] o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 2000
2019-10-15 09:35:41.982 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=2
2019-10-15 09:35:41.982 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=2
state
2019-10-15 09:35:41.983 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=3
2019-10-15 09:35:41.983 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry failed last attempt: count=3
...
2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=0
arg
2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
m2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry failed last attempt: count=1
我正在使用 Spring Cloud Streams 和默认 Spring 重试机制,仅使用属性。
我目前有一个应用程序可以在我的侦听器中抛出 MyCustomException 时正确重试(最多重试次数)。然后,它被发送到 DLQ - 这件作品在下面的配置下按预期工作。
问题是:我想将另一个 "non-retriable exceptions" 也提交给 DLQ(例如,当我们抛出 AnotherCustomException 甚至 RuntimeException 时)。
现在他们没有被重试(这是正确的并且按预期工作)但我的问题是他们没有被重定向到 myDeadLetterQueue。
属性文件中使用的配置如下。
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq = true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name = myDeadLetterQueue
spring.cloud.stream.bindings.input.consumer.defaultRetryable = false
spring.cloud.stream.bindings.input.consumer.retryableExceptions.com.packagename.common.kafka.exception.MyCustomException = true
此场景是否缺少任何其他配置?
but the issue for my case is that they are not being redirected to myDeadLetterQueue.
这没有意义;这就是它的工作原理;不可重发的异常被直接发送到 DLQ 主题而不被重试;我刚测试过没有任何问题:
@SpringBootApplication
@EnableBinding(Sink.class)
public class So58390786Application {
public static void main(String[] args) {
SpringApplication.run(So58390786Application.class, args);
}
@StreamListener(Sink.INPUT)
public void listen(String in) {
System.out.println(in);
if ("state".equals(in)) {
throw new IllegalStateException("ise");
}
throw new IllegalArgumentException("iae");
}
@Bean
public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
template.send("so58390786", "state".getBytes());
template.send("so58390786", "arg".getBytes());
template.send("so58390786", "state".getBytes());
};
}
@KafkaListener(id = "so58390786slq", topics = "so58390786dlq")
public void listenDlq(String in) {
System.out.println("dlq:" + in);
}
}
和
spring.cloud.stream.bindings.input.destination=so58390786
spring.cloud.stream.bindings.input.group=so58390786
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name=so58390786dlq
spring.cloud.stream.bindings.input.consumer.defaultRetryable = false
spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=true
logging.level.root=warn
spring.kafka.consumer.auto-offset-reset=earliest
和
state
state
state
2019-10-15 09:22:46.765 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalStateException: ise, failedMessage=GenericMessage [payload=byte[5], headers={kafka_offset=12, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763737, contentType=application/json, kafka_groupId=so58390786}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: ise
at com.example.demo.So58390786Application.listen(So58390786Application.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 33 more
arg
2019-10-15 09:22:46.776 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalArgumentException: iae, failedMessage=GenericMessage [payload=byte[3], headers={kafka_offset=13, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763741, contentType=application/json, kafka_groupId=so58390786}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: iae
at com.example.demo.So58390786Application.listen(So58390786Application.java:27)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 33 more
state
dlq:state
dlq:arg
state
state
2019-10-15 09:22:49.784 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalStateException: ise, failedMessage=GenericMessage [payload=byte[5], headers={kafka_offset=14, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763741, contentType=application/json, kafka_groupId=so58390786}]
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: ise
at com.example.demo.So58390786Application.listen(So58390786Application.java:25)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
... 33 more
dlq:state
编辑
打开调试日志记录以重试以查看您看到的内容;
logging.level.org.springframework.retry=debug
我明白了...
2019-10-15 09:35:38.856 DEBUG 60391 --- [ main] o.s.retry.support.RetryTemplate : Retry: count=0
2019-10-15 09:35:38.967 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=0
state
2019-10-15 09:35:38.973 DEBUG 60391 --- [container-0-C-1] o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 1000
2019-10-15 09:35:39.977 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
2019-10-15 09:35:39.978 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=1
state
2019-10-15 09:35:39.979 DEBUG 60391 --- [container-0-C-1] o.s.r.backoff.ExponentialBackOffPolicy : Sleeping for 2000
2019-10-15 09:35:41.982 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=2
2019-10-15 09:35:41.982 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=2
state
2019-10-15 09:35:41.983 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=3
2019-10-15 09:35:41.983 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry failed last attempt: count=3
...
2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry: count=0
arg
2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Checking for rethrow: count=1
m2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate : Retry failed last attempt: count=1