Spring boot和rabbitmq集成,组件故障如何恢复?
Spring boot and rabbitmq integration, how to recover on component failure?
我有一个 spring rest 应用程序,使用 rabbit 作为消息传递中间件。当应用程序接受传入的 rest 请求时,它会回复用户,同时它会异步执行一些额外的处理,生成一条新消息并将其放在 rabbit 上。一些消费者从那里读取消息并将其传送到外部系统。现在,有一种情况是rabbit服务器本身可能宕机了。
为了处理这种故障,我有一个阻塞队列(当然大小可调)
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
在 rabbit 联机之前,队列将能够处理一些消息。但是如果它花费的时间太长并且请求超过队列大小,那么它将失败。现在的问题是,有没有办法可以改进它并使其更有效率?什么是行业最佳实践?该场景是否有任何模式?
我用了spring可重试。这足以满足要求。因为我可以重试,配置重试间隔、最大尝试次数等,以及实现自定义恢复功能。
@Async
@Retryable(value = { AmqpException.class },
maxAttempts=10, backoff=@Backoff(delay=100, maxDelay=300000,multiplier = 2))
public void sendEmailMessageToQueue(SimpleMailMessage email){
log.info("Sending message to queue"+Thread.currentThread().getName());
try {
rabbitTemplate.convertAndSend(queueName, email);
}catch (AmqpException e){
log.log(Level.WARNING,"Could not send message to queue, will retry.",e);
}
}
@Recover
public void connectionException(AmqpException e) {
log.log(Level.SEVERE,"Could not send message to queue, will save the message in db instead."+Thread.currentThread().getName(),e);
}
我有一个 spring rest 应用程序,使用 rabbit 作为消息传递中间件。当应用程序接受传入的 rest 请求时,它会回复用户,同时它会异步执行一些额外的处理,生成一条新消息并将其放在 rabbit 上。一些消费者从那里读取消息并将其传送到外部系统。现在,有一种情况是rabbit服务器本身可能宕机了。 为了处理这种故障,我有一个阻塞队列(当然大小可调)
@Bean
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(5);
executor.setMaxPoolSize(10);
executor.setQueueCapacity(25);
return executor;
}
在 rabbit 联机之前,队列将能够处理一些消息。但是如果它花费的时间太长并且请求超过队列大小,那么它将失败。现在的问题是,有没有办法可以改进它并使其更有效率?什么是行业最佳实践?该场景是否有任何模式?
我用了spring可重试。这足以满足要求。因为我可以重试,配置重试间隔、最大尝试次数等,以及实现自定义恢复功能。
@Async
@Retryable(value = { AmqpException.class },
maxAttempts=10, backoff=@Backoff(delay=100, maxDelay=300000,multiplier = 2))
public void sendEmailMessageToQueue(SimpleMailMessage email){
log.info("Sending message to queue"+Thread.currentThread().getName());
try {
rabbitTemplate.convertAndSend(queueName, email);
}catch (AmqpException e){
log.log(Level.WARNING,"Could not send message to queue, will retry.",e);
}
}
@Recover
public void connectionException(AmqpException e) {
log.log(Level.SEVERE,"Could not send message to queue, will save the message in db instead."+Thread.currentThread().getName(),e);
}