RabbitMQ 重试最大尝试次数结束后如何触发功能? (Spring 集成 - RabbitMQ 侦听器)
How to trigger a functionality after RabbitMQ retry max attempts are over? (Spring Integration - RabbitMQ Listener)
我想在 RabbitMQ 侦听器重试结束后触发一封电子邮件,如果处理过程失败仍然如此。
重试逻辑正在使用以下代码。但是如何在最大重试次数结束后触发该功能(电子邮件触发)。
@Bean
public SimpleMessageListenerContainer container() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(myQueue());
container.setDefaultRequeueRejected(false);
Advice[] adviceArray = new Advice[]{interceptor()};
container.setAdviceChain(adviceArray);
return container;
}
@Bean
public IntegrationFlow inboundFlow() {
return IntegrationFlows.from(
Amqp.inboundAdapter(container()))
.log()
.handle(listenerBeanName, listenerMethodName)
.get();
}
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(retryMaxAttempts)
.backOffOptions(initialInterval, multiplier, maxInterval)
//.recoverer(new RejectAndDontRequeueRecoverer())
.recoverer(new CustomRejectAndRecoverer())
.build();
}
添加 CustomeRecover 代码
@Service
public class CustomRejectAndRecoverer implements MessageRecoverer {
@Autowired
private EmailGateway emailgateway;
@Override
public void recover(Message message, Throwable cause) {
// INSERT CODE HERE.... HOW TO CALL GATEWAY
// emailgateway.sendMail(cause);
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(cause), message);
} }
这正是 RetryInterceptorBuilder
中的 .recoverer()
的用途。
你现在在那里使用 RejectAndDontRequeueRecoverer
,但没有人阻止你实施你自己的 MessageRecoverer
并委托给 RejectAndDontRequeueRecoverer
并向某些人发送消息 MessageChannel
使用发送电子邮件逻辑。
我想在 RabbitMQ 侦听器重试结束后触发一封电子邮件,如果处理过程失败仍然如此。
重试逻辑正在使用以下代码。但是如何在最大重试次数结束后触发该功能(电子邮件触发)。
@Bean
public SimpleMessageListenerContainer container() {
SimpleMessageListenerContainer container =
new SimpleMessageListenerContainer(connectionFactory());
container.setQueues(myQueue());
container.setDefaultRequeueRejected(false);
Advice[] adviceArray = new Advice[]{interceptor()};
container.setAdviceChain(adviceArray);
return container;
}
@Bean
public IntegrationFlow inboundFlow() {
return IntegrationFlows.from(
Amqp.inboundAdapter(container()))
.log()
.handle(listenerBeanName, listenerMethodName)
.get();
}
@Bean
RetryOperationsInterceptor interceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(retryMaxAttempts)
.backOffOptions(initialInterval, multiplier, maxInterval)
//.recoverer(new RejectAndDontRequeueRecoverer())
.recoverer(new CustomRejectAndRecoverer())
.build();
}
添加 CustomeRecover 代码
@Service
public class CustomRejectAndRecoverer implements MessageRecoverer {
@Autowired
private EmailGateway emailgateway;
@Override
public void recover(Message message, Throwable cause) {
// INSERT CODE HERE.... HOW TO CALL GATEWAY
// emailgateway.sendMail(cause);
throw new ListenerExecutionFailedException("Retry Policy Exhausted",
new AmqpRejectAndDontRequeueException(cause), message);
} }
这正是 RetryInterceptorBuilder
中的 .recoverer()
的用途。
你现在在那里使用 RejectAndDontRequeueRecoverer
,但没有人阻止你实施你自己的 MessageRecoverer
并委托给 RejectAndDontRequeueRecoverer
并向某些人发送消息 MessageChannel
使用发送电子邮件逻辑。