当异常不可重试时向 DLQ 发送消息

Sending message to DLQ when Exception is not Retriable

我正在使用 Spring Cloud Streams 和默认 Spring 重试机制,仅使用属性。

我目前有一个应用程序可以在我的侦听器中抛出 MyCustomException 时正确重试(最多重试次数)。然后,它被发送到 DLQ - 这件作品在下面的配置下按预期工作。

问题是:我想将另一个 "non-retriable exceptions" 也提交给 DLQ(例如,当我们抛出 AnotherCustomException 甚至 RuntimeException 时)。

现在他们没有被重试(这是正确的并且按预期工作)但我的问题是他们没有被重定向到 myDeadLetterQueue。

属性文件中使用的配置如下。

spring.cloud.stream.kafka.bindings.input.consumer.enableDlq = true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name = myDeadLetterQueue

spring.cloud.stream.bindings.input.consumer.defaultRetryable = false
spring.cloud.stream.bindings.input.consumer.retryableExceptions.com.packagename.common.kafka.exception.MyCustomException = true

此场景是否缺少任何其他配置?

but the issue for my case is that they are not being redirected to myDeadLetterQueue.

这没有意义;这就是它的工作原理;不可重发的异常被直接发送到 DLQ 主题而不被重试;我刚测试过没有任何问题:

@SpringBootApplication
@EnableBinding(Sink.class)
public class So58390786Application {

    public static void main(String[] args) {
        SpringApplication.run(So58390786Application.class, args);
    }

    @StreamListener(Sink.INPUT)
    public void listen(String in) {
        System.out.println(in);
        if ("state".equals(in)) {
            throw new IllegalStateException("ise");
        }
        throw new IllegalArgumentException("iae");
    }


    @Bean
    public ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
        return args -> {
            template.send("so58390786", "state".getBytes());
            template.send("so58390786", "arg".getBytes());
            template.send("so58390786", "state".getBytes());
        };
    }


    @KafkaListener(id = "so58390786slq", topics = "so58390786dlq")
    public void listenDlq(String in) {
        System.out.println("dlq:" + in);
    }

}

spring.cloud.stream.bindings.input.destination=so58390786
spring.cloud.stream.bindings.input.group=so58390786
spring.cloud.stream.kafka.bindings.input.consumer.enableDlq=true
spring.cloud.stream.kafka.bindings.input.consumer.dlq-name=so58390786dlq

spring.cloud.stream.bindings.input.consumer.defaultRetryable = false
spring.cloud.stream.bindings.input.consumer.retryable-exceptions.java.lang.IllegalStateException=true

logging.level.root=warn

spring.kafka.consumer.auto-offset-reset=earliest

state
state
state
2019-10-15 09:22:46.765 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalStateException: ise, failedMessage=GenericMessage [payload=byte[5], headers={kafka_offset=12, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763737, contentType=application/json, kafka_groupId=so58390786}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: ise
    at com.example.demo.So58390786Application.listen(So58390786Application.java:25)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 33 more

arg
2019-10-15 09:22:46.776 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalArgumentException: iae, failedMessage=GenericMessage [payload=byte[3], headers={kafka_offset=13, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=1, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763741, contentType=application/json, kafka_groupId=so58390786}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalArgumentException: iae
    at com.example.demo.So58390786Application.listen(So58390786Application.java:27)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 33 more

state
dlq:state
dlq:arg
state
state
2019-10-15 09:22:49.784 ERROR 57129 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessagingException: Exception thrown while invoking So58390786Application#listen[1 args]; nested exception is java.lang.IllegalStateException: ise, failedMessage=GenericMessage [payload=byte[5], headers={kafka_offset=14, kafka_consumer=org.apache.kafka.clients.consumer.KafkaConsumer@5c959af1, deliveryAttempt=3, kafka_timestampType=CREATE_TIME, kafka_receivedMessageKey=null, kafka_receivedPartitionId=0, kafka_receivedTopic=so58390786, kafka_receivedTimestamp=1571145763741, contentType=application/json, kafka_groupId=so58390786}]
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:64)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:127)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:177)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:403)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:198)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:384)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access0(KafkaMessageDrivenChannelAdapter.java:75)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:443)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:417)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage[=12=](RetryingMessageListenerAdapter.java:120)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:287)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:211)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:114)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:40)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:1592)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:1575)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:1534)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:1487)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:1401)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1165)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:949)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:884)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: ise
    at com.example.demo.So58390786Application.listen(So58390786Application.java:25)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
    at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
    at org.springframework.cloud.stream.binding.StreamListenerMessageHandler.handleRequestMessage(StreamListenerMessageHandler.java:55)
    ... 33 more

dlq:state

编辑

打开调试日志记录以重试以查看您看到的内容;

logging.level.org.springframework.retry=debug

我明白了...

2019-10-15 09:35:38.856 DEBUG 60391 --- [           main] o.s.retry.support.RetryTemplate          : Retry: count=0
2019-10-15 09:35:38.967 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry: count=0
state
2019-10-15 09:35:38.973 DEBUG 60391 --- [container-0-C-1] o.s.r.backoff.ExponentialBackOffPolicy   : Sleeping for 1000
2019-10-15 09:35:39.977 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=1
2019-10-15 09:35:39.978 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry: count=1
state
2019-10-15 09:35:39.979 DEBUG 60391 --- [container-0-C-1] o.s.r.backoff.ExponentialBackOffPolicy   : Sleeping for 2000
2019-10-15 09:35:41.982 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=2
2019-10-15 09:35:41.982 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry: count=2
state
2019-10-15 09:35:41.983 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=3
2019-10-15 09:35:41.983 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry failed last attempt: count=3

...

2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry: count=0
arg
2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Checking for rethrow: count=1
m2019-10-15 09:35:41.994 DEBUG 60391 --- [container-0-C-1] o.s.retry.support.RetryTemplate          : Retry failed last attempt: count=1