如何在 MessageConversionException 上中止 sendAndReceive

How to abort sendAndReceive on MessageConversionException

我正在使用 spring-amqp(最新版本)rabbitTemplate.sendAndReceive(exchange, routingKey, message) 发送消息的方法。 RabbitTemplate 配置有 Jackson2JsonMessageConverter.

如果我发送格式错误的 json,我可以看到消息转换失败 org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content,正如预期的那样。

但是,sendAndReceive 方法不会中止并继续执行,直到达到 replyTimeout。 比较一下,如果我的 @RabbitListener 注释方法有错误,那么 sendAndReceive 会立即中止并且 returns 异常。

有什么方法可以告诉 spring 在转换异常的情况下中止 sendAndReceive 吗?

2021-05-18 17:07:38.188 WARN  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler] :: Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:325)
    at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    ... 10 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting comma to separate Object entries
 at [Source: (String)"{sd}"; line: 1, column: 81]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipComma(ReaderBasedJsonParser.java:2324)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:525)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:377)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291)
    ... 18 common frames omitted
2021-05-18 17:07:38.188 WARN  [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler$DefaultExceptionStrategy] :: Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'{sd}' MessageProperties [headers={centreId=0, deliveryTypeId=null, path=/somePath, privileges=[], salespointId=0, insuranceId=null, promoter=null, userName=null, parameters={}, userId=null, superCentreId=0}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g1h2AA5yZXBseUA4NzA3MTY3MAAAaScAAAABYJ9TZA==.YoDsH6nkpNiLYvA4h8716g==, contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=someExchange, receivedRoutingKey=someKey, deliveryTag=1, consumerTag=amq.ctag-lHGwtgImO4ohGFCAHuKkLg, consumerQueue=someQueue])
2021-05-18 17:07:38.188 ERROR [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#8-1] [org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer] :: Execution of Rabbit message listener failed, and the error handler threw an exception
org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
    at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
    ... 6 common frames omitted
Caused by: org.springframework.amqp.support.converter.MessageConversionException: Failed to convert Message content
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:294)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:271)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.fromMessage(AbstractJackson2MessageConverter.java:251)
    at org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.extractMessage(AbstractAdaptableMessageListener.java:342)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter$MessagingMessageConverterAdapter.extractPayload(MessagingMessageListenerAdapter.java:325)
    at org.springframework.amqp.support.converter.MessagingMessageConverter.fromMessage(MessagingMessageConverter.java:132)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.toMessagingMessage(MessagingMessageListenerAdapter.java:207)
    at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:134)
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
    ... 10 common frames omitted
Caused by: com.fasterxml.jackson.core.JsonParseException: Unexpected character ('s' (code 115)): was expecting comma to separate Object entries
 at [Source: (String)"{sd}"; line: 1, column: 81]
    at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1851)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:707)
    at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:632)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser._skipComma(ReaderBasedJsonParser.java:2324)
    at com.fasterxml.jackson.core.json.ReaderBasedJsonParser.nextFieldName(ReaderBasedJsonParser.java:921)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer._readAndBindStringKeyMap(MapDeserializer.java:525)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:377)
    at com.fasterxml.jackson.databind.deser.std.MapDeserializer.deserialize(MapDeserializer.java:29)
    at com.fasterxml.jackson.databind.ObjectMapper._readMapAndClose(ObjectMapper.java:4526)
    at com.fasterxml.jackson.databind.ObjectMapper.readValue(ObjectMapper.java:3468)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertBytesToObject(AbstractJackson2MessageConverter.java:351)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.convertContent(AbstractJackson2MessageConverter.java:321)
    at org.springframework.amqp.support.converter.AbstractJackson2MessageConverter.doFromMessage(AbstractJackson2MessageConverter.java:291)
    ... 18 common frames omitted

编辑 1: 这似乎是 spring-amqp 中的错误。您可以关注此问题的状态here

编辑 2: 固定在 spring-amqp 2.3.9.

好的。我明白发生了什么事。我们在 AbstractMessageListenerContainer 上失败了,它还不知道我们的 MessageListener 是关于请求-回复行为的。因此,它通过其默认值 ConditionalRejectingErrorHandler 静默处理异常,而默认值又会抛出 AmqpRejectAndDontRequeueException,仅此而已。侦听器容器返回到下一条消息的主循环。

您可能必须实施自己的 ConditionalRejectingErrorHandler 覆盖其:

/**
 * Called when a message with a fatal exception has an {@code x-death} header, prior
 * to discarding the message. Subclasses can override this method to perform some
 * action, such as sending the message to a parking queue.
 * @param failed the failed message.
 * @since 2.3
 */
protected void handleDiscarded(Message failed) {

发回错误消息。

欢迎提出GitHub问题,我们一起思考如何在共同层面上处理这种情况。