在 Spring AMQP 集成中使用 ImmediateRequeueMessageRecoverer?
Use ImmediateRequeueMessageRecoverer in Spring Integration for AMQP?
我们注意到,当 Spring 集成端点(来自 RabbitMQ)收到错误消息时,它们不会重试。如果我们的业务代码(即接收消息的“服务方法”)出现问题以致抛出异常,则会按预期重试。
这是我们的配置:
var myService = ...
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName)
.id(integrationFlowId)
.autoStartup(autoStartup)
.configureContainer(c -> c.acknowledgeMode(MANUAL)
.prefetchCount(10)
.concurrentConsumers(1)
.maxConcurrentConsumers(3))
.messageConverter(messageConverter))
.aggregate(...)
.handle(myService, "myMethod", e -> e.advice(myAdvice()))
.get();
myAdvice
方法是这样实现的:
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(5000L);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy((new SimpleRetryPolicy(MAX_VALUE)));
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.registerListener(new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext ctx, RetryCallback<T, E> callback, Throwable e) {
log.error("Caught {} due to {} (count = {})", e.getClass().getSimpleName(), e.getMessage(), ctx.getRetryCount(), e);
}
});
StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
bean.setRetryOperations(retryTemplate);
bean.setMessageRecoverer(new ImmediateRequeueMessageRecoverer());
return bean.getObject();
问题是,如果我们,例如,发布一条org.springframework.amqp.support.converter.MessageConverter
无法转换为DTO的消息(例如{ "yo" : "MTV Raps" }
),则不会重试该消息:
[my-service-97c696799-6xs26] org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
[my-service-97c696799-6xs26] at java.base/java.lang.Thread.run(Thread.java:831)
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
[my-service-97c696799-6xs26] ... 6 common frames omitted
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.support.converter.MessageConversionException: Don't know how to convert (Body:'{ "yo" : "MTV Raps" }' MessageProperties [headers={content_type=application/json}, contentType=application/json, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=myservice.routingkey, deliveryTag=1, consumerTag=amq.ctag-9De2w0uuQxnve_9k6HZ7tw, consumerQueue=myservice.myqueue]) to an object because no event type was found
[my-service-97c696799-6xs26] at com.mycompany.RabbitMQEventMessageConverter.fromMessage(RabbitMQEventMessageConverter.java:47)
[my-service-97c696799-6xs26] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.convertPayload(AmqpInboundChannelAdapter.java:361)
[my-service-97c696799-6xs26] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createMessageFromAmqp(AmqpInboundChannelAdapter.java:342)
[my-service-97c696799-6xs26] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:334)
[my-service-97c696799-6xs26] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:299)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
[my-service-97c696799-6xs26] ... 10 common frames omitted
似乎没有使用myAdvice()
方法中指定的ImmediateRequeueMessageRecoverer
,而是使用了默认的AmqpRejectAndDontRequeueException
。在我看来,原因很可能是 Spring 基础结构尚未调用 myAdvice()
方法。我试过在 configureContainer
中找到一种切换消息恢复器的方法,但我似乎找不到这样做的方法。
有谁知道在spring 集成调用“服务方法”之前失败re-queue/retry 的消息?
我们正在使用 Spring Integration 5.4.6 和 Spring Boot 2.4.6。
转换在创建消息之前执行。
转换错误通常被认为是致命的 - 重试是没有意义的,因为它会再次失败。
向入站适配器添加一个.errorChannel
;它的下游流将因转换错误而得到 ErrorMessage
。
但是,它也会从下游流中获取错误消息,因此您必须在那里处理所有错误类型。
编辑
您可以添加错误通道并在其流程上处理转换异常。但请记住,消息将被一次又一次地重新传送,没有延迟。
@SpringBootApplication
public class So67801807Application {
public static void main(String[] args) {
SpringApplication.run(So67801807Application.class, args);
}
@Bean
IntegrationFlow flow(ConnectionFactory cf) {
return IntegrationFlows.from(Amqp.inboundAdapter(cf, "foo")
.messageConverter(new MC())
.errorChannel("errors"))
.handle(...)
.get();
}
@Bean
IntegrationFlow errorFlow() {
return IntegrationFlows.from("errors")
.handle(msg -> {
if (((ErrorMessage) msg).getPayload().getCause() instanceof MessageConversionException) {
throw new ImmediateRequeueAmqpException("Requeuing due to conversion");
}
else {
// handle some other exception thrown by the downstream flow
}
})
.get();
}
}
class MC implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return null;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new MessageConversionException("test");
}
}
或者您可以将自定义错误处理程序添加到容器中。默认错误处理程序认为转换异常是致命的。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#exception-handling
我们注意到,当 Spring 集成端点(来自 RabbitMQ)收到错误消息时,它们不会重试。如果我们的业务代码(即接收消息的“服务方法”)出现问题以致抛出异常,则会按预期重试。
这是我们的配置:
var myService = ...
IntegrationFlows.from(Amqp.inboundAdapter(connectionFactory, queueName)
.id(integrationFlowId)
.autoStartup(autoStartup)
.configureContainer(c -> c.acknowledgeMode(MANUAL)
.prefetchCount(10)
.concurrentConsumers(1)
.maxConcurrentConsumers(3))
.messageConverter(messageConverter))
.aggregate(...)
.handle(myService, "myMethod", e -> e.advice(myAdvice()))
.get();
myAdvice
方法是这样实现的:
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(200L);
backOffPolicy.setMultiplier(2);
backOffPolicy.setMaxInterval(5000L);
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy((new SimpleRetryPolicy(MAX_VALUE)));
retryTemplate.setBackOffPolicy(backOffPolicy);
retryTemplate.registerListener(new RetryListenerSupport() {
@Override
public <T, E extends Throwable> void onError(RetryContext ctx, RetryCallback<T, E> callback, Throwable e) {
log.error("Caught {} due to {} (count = {})", e.getClass().getSimpleName(), e.getMessage(), ctx.getRetryCount(), e);
}
});
StatelessRetryOperationsInterceptorFactoryBean bean = new StatelessRetryOperationsInterceptorFactoryBean();
bean.setRetryOperations(retryTemplate);
bean.setMessageRecoverer(new ImmediateRequeueMessageRecoverer());
return bean.getObject();
问题是,如果我们,例如,发布一条org.springframework.amqp.support.converter.MessageConverter
无法转换为DTO的消息(例如{ "yo" : "MTV Raps" }
),则不会重试该消息:
[my-service-97c696799-6xs26] org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:146)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1436)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1720)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1495)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:967)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access00(SimpleMessageListenerContainer.java:83)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1288)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1194)
[my-service-97c696799-6xs26] at java.base/java.lang.Thread.run(Thread.java:831)
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener threw exception
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1746)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1636)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1551)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1539)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1530)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1474)
[my-service-97c696799-6xs26] ... 6 common frames omitted
[my-service-97c696799-6xs26] Caused by: org.springframework.amqp.support.converter.MessageConversionException: Don't know how to convert (Body:'{ "yo" : "MTV Raps" }' MessageProperties [headers={content_type=application/json}, contentType=application/json, contentLength=0, receivedDeliveryMode=NON_PERSISTENT, redelivered=false, receivedExchange=, receivedRoutingKey=myservice.routingkey, deliveryTag=1, consumerTag=amq.ctag-9De2w0uuQxnve_9k6HZ7tw, consumerQueue=myservice.myqueue]) to an object because no event type was found
[my-service-97c696799-6xs26] at com.mycompany.RabbitMQEventMessageConverter.fromMessage(RabbitMQEventMessageConverter.java:47)
[my-service-97c696799-6xs26] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.convertPayload(AmqpInboundChannelAdapter.java:361)
[my-service-97c696799-6xs26] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createMessageFromAmqp(AmqpInboundChannelAdapter.java:342)
[my-service-97c696799-6xs26] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:334)
[my-service-97c696799-6xs26] at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:299)
[my-service-97c696799-6xs26] at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1632)
[my-service-97c696799-6xs26] ... 10 common frames omitted
似乎没有使用myAdvice()
方法中指定的ImmediateRequeueMessageRecoverer
,而是使用了默认的AmqpRejectAndDontRequeueException
。在我看来,原因很可能是 Spring 基础结构尚未调用 myAdvice()
方法。我试过在 configureContainer
中找到一种切换消息恢复器的方法,但我似乎找不到这样做的方法。
有谁知道在spring 集成调用“服务方法”之前失败re-queue/retry 的消息?
我们正在使用 Spring Integration 5.4.6 和 Spring Boot 2.4.6。
转换在创建消息之前执行。
转换错误通常被认为是致命的 - 重试是没有意义的,因为它会再次失败。
向入站适配器添加一个.errorChannel
;它的下游流将因转换错误而得到 ErrorMessage
。
但是,它也会从下游流中获取错误消息,因此您必须在那里处理所有错误类型。
编辑
您可以添加错误通道并在其流程上处理转换异常。但请记住,消息将被一次又一次地重新传送,没有延迟。
@SpringBootApplication
public class So67801807Application {
public static void main(String[] args) {
SpringApplication.run(So67801807Application.class, args);
}
@Bean
IntegrationFlow flow(ConnectionFactory cf) {
return IntegrationFlows.from(Amqp.inboundAdapter(cf, "foo")
.messageConverter(new MC())
.errorChannel("errors"))
.handle(...)
.get();
}
@Bean
IntegrationFlow errorFlow() {
return IntegrationFlows.from("errors")
.handle(msg -> {
if (((ErrorMessage) msg).getPayload().getCause() instanceof MessageConversionException) {
throw new ImmediateRequeueAmqpException("Requeuing due to conversion");
}
else {
// handle some other exception thrown by the downstream flow
}
})
.get();
}
}
class MC implements MessageConverter {
@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return null;
}
@Override
public Object fromMessage(Message message) throws MessageConversionException {
throw new MessageConversionException("test");
}
}
或者您可以将自定义错误处理程序添加到容器中。默认错误处理程序认为转换异常是致命的。
https://docs.spring.io/spring-amqp/docs/current/reference/html/#exception-handling