Spring 集成(重试策略)
Spring Integration ( Retry Strategy)
我想通过 Spring 集成创建一个简单的 IntegrationFlow,但我遇到了困难。
我想创建一个集成流,从 Rabbit Mq 中的队列获取消息并将消息发布到端点 Rest。
我遇到的问题是,当一个请求失败时,它会不断地重试,请问如何在这段代码中实现重试策略?
例如,我要重试 3 次,1 秒后重试第一次,5 秒后重试第二次,1 分钟后重试第三次。
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
}
向侦听器容器的建议链添加重试拦截器。参见 https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#retry and https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#async-listeners
编辑
@SpringBootApplication
public class So63596805Application {
private static final Logger LOG = LoggerFactory.getLogger(So63596805Application.class);
public static void main(String[] args) {
SpringApplication.run(So63596805Application.class, args);
}
@Bean
IntegrationFlow flow(SimpleRabbitListenerContainerFactory factory, RabbitTemplate template) {
SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames("foo");
container.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000)
.recoverer((msg, cause) -> LOG.info("Retries exhausted for " + msg))
.build());
return IntegrationFlows.from(Amqp.inboundAdapter(container))
.handle(msg -> {
LOG.info(msg.getPayload().toString());
throw new RuntimeException("test");
})
.get();
}
}
这使用指数回退策略。
如果你使用
.maxAttempts(4)
.backOffOptions(1000, 5.0, 60000)
您将在 1、5 和 25 秒后重试 3 次。
1000, 8.0, 60000
会给你 1、8 和 60 秒。
如果您必须有规格(1、5、60),您将需要自定义 BackOffPolicy。
我想通过 Spring 集成创建一个简单的 IntegrationFlow,但我遇到了困难。
我想创建一个集成流,从 Rabbit Mq 中的队列获取消息并将消息发布到端点 Rest。
我遇到的问题是,当一个请求失败时,它会不断地重试,请问如何在这段代码中实现重试策略? 例如,我要重试 3 次,1 秒后重试第一次,5 秒后重试第二次,1 分钟后重试第三次。
HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
RestTemplate restTemplate = new RestTemplate();
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames(BOUTIQUE_QUEUE_NAME);
container.setAcknowledgeMode(AcknowledgeMode.AUTO);
return IntegrationFlows.from(Amqp.inboundAdapter(container)) /* Get Message from RabbitMQ */
.handle(msg ->
{
String msgString = new String((byte[]) msg.getPayload(), StandardCharsets.UTF_8);
HttpEntity<String> requestBody = new HttpEntity<String>(msgString, headers);
restTemplate.postForObject(ENDPOINT_LOCAL_URL, requestBody, String.class);
System.out.println(msgString);
})
.get();
}
向侦听器容器的建议链添加重试拦截器。参见 https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#retry and https://docs.spring.io/spring-amqp/docs/2.2.10.RELEASE/reference/html/#async-listeners
编辑
@SpringBootApplication
public class So63596805Application {
private static final Logger LOG = LoggerFactory.getLogger(So63596805Application.class);
public static void main(String[] args) {
SpringApplication.run(So63596805Application.class, args);
}
@Bean
IntegrationFlow flow(SimpleRabbitListenerContainerFactory factory, RabbitTemplate template) {
SimpleMessageListenerContainer container = factory.createListenerContainer();
container.setQueueNames("foo");
container.setAdviceChain(RetryInterceptorBuilder.stateless()
.maxAttempts(5)
.backOffOptions(1000, 2.0, 10000)
.recoverer((msg, cause) -> LOG.info("Retries exhausted for " + msg))
.build());
return IntegrationFlows.from(Amqp.inboundAdapter(container))
.handle(msg -> {
LOG.info(msg.getPayload().toString());
throw new RuntimeException("test");
})
.get();
}
}
这使用指数回退策略。
如果你使用
.maxAttempts(4)
.backOffOptions(1000, 5.0, 60000)
您将在 1、5 和 25 秒后重试 3 次。
1000, 8.0, 60000
会给你 1、8 和 60 秒。
如果您必须有规格(1、5、60),您将需要自定义 BackOffPolicy。