在第一次重试时执行操作以将消息保存在数据库中
Perform an action in the first retry attempt to save the message on the Database
我正在尝试将进入 RabbitMQ 队列的每条消息保存在数据库中,仅用于日志记录。之后需要正常处理消息
问题是:这个队列配置了RetryOperationsInterceptor
的重试策略,每次在处理消息时出现错误,都会重新排队并重新处理消息。保存消息的逻辑在读取队列的监听器中,所以我有 3
(我配置的重试次数)而不是只在数据库中保存一条消息。
看我的RetryOperationsInterceptor
:
@Bean
public RetryOperationsInterceptor defaultRetryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(2000, 2.0, 10000)
.build();
}
容器工厂:
@Bean(name = FACTORY_CONTAINER_NAME)
public SimpleRabbitListenerContainerFactory factoryQueueExample(ConnectionFactory connectionFactory,
RetryOperationsInterceptor defaultRetryOperationsInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(getMessageConverter());
factory.setDefaultRequeueRejected(false);
Advice[] adviceChain = {defaultRetryOperationsInterceptor};
factory.setAdviceChain(adviceChain);
return factory;
}
队列侦听器:
@Slf4j
@Component
@AllArgsConstructor
public class MessageListener {
private final MessageRepository messageRepository;
private final MessageService messageService;
@RabbitListener(queues = MessageConfiguration.QUEUE,
containerFactory = MessageConfiguration.FACTORY_CONTAINER_NAME)
public void process(SomeMessage someMessage) {
messageRepository.save(someMessage); // transform to a entity and save on database
messageService.process(someMessage); // process message
}
}
我不知道是否有相关信息,但此队列也有关联的 DLQ。重试后,消息进入 DLQ 队列。
我的想法是在重试拦截器中找到一些可以在第一次尝试时调用服务的东西,只保存一次消息。
我也对解决此问题的其他想法持开放态度,例如将尝试编号与消息一起保存,只是为了表明这不是保存在数据库中的重复消息,而是不同尝试中的相同消息.
注意如何应用重试:
Advice[] adviceChain = {defaultRetryOperationsInterceptor};
factory.setAdviceChain(adviceChain);
您可以用同样的方式编写自己的 MethodInterceptor
来写入数据库。
当您按照 before that defaultRetryOperationsInterceptor
的顺序定义此自定义建议时,数据库保存将仅被调用一次,只是因为这样的操作将在重试之外发生,甚至之前。
在文档中查看有关 AOP 的更多信息:https://docs.spring.io/spring/docs/5.0.6.RELEASE/spring-framework-reference/core.html#aop
是否至少有四种解法:
Advice
解决方案(参见 Garry 的 answer)
- 来自
RetrySynchronizationManager
的计数器
messageId
解决方案
- 使用全状态
RetryOperationsInterceptor
RetrySynchronizationManager
选项中的计数器可以很容易地用于我的情况:
int attempts = RetrySynchronizationManager.getContext().getRetryCount();
boolean canSaveOnDatabase = attempts == 0;
messageId
选项对于发件人发送一个唯一标识该消息的 ID 是必需的。因此,我可以在数据库中保存并使用该信息来了解消息是否已保存。这可以在从 RabbitTemplate
.
调用某些 send
方法时配置 by the MessagePostProcessor
最后一个选项,有状态 RetryOperationsInterceptor
我可以在 RabbitListener
中寻找 header @Header(AmqpHeaders.REDELIVERED) boolean redelivered
。如果您使用无状态 RetryOperationsInterceptor
(我的情况),则此选项不起作用,并且此选项还需要 messageId
.
我正在尝试将进入 RabbitMQ 队列的每条消息保存在数据库中,仅用于日志记录。之后需要正常处理消息
问题是:这个队列配置了RetryOperationsInterceptor
的重试策略,每次在处理消息时出现错误,都会重新排队并重新处理消息。保存消息的逻辑在读取队列的监听器中,所以我有 3
(我配置的重试次数)而不是只在数据库中保存一条消息。
看我的RetryOperationsInterceptor
:
@Bean
public RetryOperationsInterceptor defaultRetryOperationsInterceptor() {
return RetryInterceptorBuilder.stateless()
.maxAttempts(3)
.backOffOptions(2000, 2.0, 10000)
.build();
}
容器工厂:
@Bean(name = FACTORY_CONTAINER_NAME)
public SimpleRabbitListenerContainerFactory factoryQueueExample(ConnectionFactory connectionFactory,
RetryOperationsInterceptor defaultRetryOperationsInterceptor) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setMessageConverter(getMessageConverter());
factory.setDefaultRequeueRejected(false);
Advice[] adviceChain = {defaultRetryOperationsInterceptor};
factory.setAdviceChain(adviceChain);
return factory;
}
队列侦听器:
@Slf4j
@Component
@AllArgsConstructor
public class MessageListener {
private final MessageRepository messageRepository;
private final MessageService messageService;
@RabbitListener(queues = MessageConfiguration.QUEUE,
containerFactory = MessageConfiguration.FACTORY_CONTAINER_NAME)
public void process(SomeMessage someMessage) {
messageRepository.save(someMessage); // transform to a entity and save on database
messageService.process(someMessage); // process message
}
}
我不知道是否有相关信息,但此队列也有关联的 DLQ。重试后,消息进入 DLQ 队列。
我的想法是在重试拦截器中找到一些可以在第一次尝试时调用服务的东西,只保存一次消息。
我也对解决此问题的其他想法持开放态度,例如将尝试编号与消息一起保存,只是为了表明这不是保存在数据库中的重复消息,而是不同尝试中的相同消息.
注意如何应用重试:
Advice[] adviceChain = {defaultRetryOperationsInterceptor};
factory.setAdviceChain(adviceChain);
您可以用同样的方式编写自己的 MethodInterceptor
来写入数据库。
当您按照 before that defaultRetryOperationsInterceptor
的顺序定义此自定义建议时,数据库保存将仅被调用一次,只是因为这样的操作将在重试之外发生,甚至之前。
在文档中查看有关 AOP 的更多信息:https://docs.spring.io/spring/docs/5.0.6.RELEASE/spring-framework-reference/core.html#aop
是否至少有四种解法:
Advice
解决方案(参见 Garry 的 answer)- 来自
RetrySynchronizationManager
的计数器
messageId
解决方案- 使用全状态
RetryOperationsInterceptor
RetrySynchronizationManager
选项中的计数器可以很容易地用于我的情况:
int attempts = RetrySynchronizationManager.getContext().getRetryCount();
boolean canSaveOnDatabase = attempts == 0;
messageId
选项对于发件人发送一个唯一标识该消息的 ID 是必需的。因此,我可以在数据库中保存并使用该信息来了解消息是否已保存。这可以在从 RabbitTemplate
.
send
方法时配置 by the MessagePostProcessor
最后一个选项,有状态 RetryOperationsInterceptor
我可以在 RabbitListener
中寻找 header @Header(AmqpHeaders.REDELIVERED) boolean redelivered
。如果您使用无状态 RetryOperationsInterceptor
(我的情况),则此选项不起作用,并且此选项还需要 messageId
.