如何在 StatefulRetryOperationsInterceptor 中使用 messageKeyGenerator

How to use messageKeyGenerator in StatefulRetryOperationsInterceptor

我正在尝试使用 Spring 的 StatefulRetryOperationsInterceptor 为 RabbitMQ 实现示例重试机制。

如文档中所述,由于缺少消息 ID,我需要设置消息密钥生成器。我不明白的是每条消息生成的唯一 ID 的实际用法。即,当我使用下面的实现时,重试没有任何问题:

StatefulRetryOperationsInterceptor interceptor = 
RetryInterceptorBuilder.stateful()
           .maxAttempts(3)
           .backOffOptions(2000, 1, 2000)
           .messageKeyGenerator(
            new MessageKeyGenerator() {
              @Override
              public Object getKey(Message message) {
                return 1;
              }
           );
container.setAdviceChain(new Advice[] {interceptor});

有状态重试需要原始消息在某种程度上是唯一的 - 因此可以确定消息的重试 "state" - 最简单的方法是让消息发布者添加一个唯一的消息 ID header .

但是,您的消息 body 或其他 header 中的某些内容可能用于唯一标识该消息。输入用于确定唯一 ID 的 MessageKeyGenerator

使用常量(在您的情况下为 1)将不起作用,因为每条消息都具有相同的消息密钥,并且都将被视为同一消息的传送(从重试的角度来看)。

该框架确实提供了一个 MissingMessageIdAdvice,它可以为状态重试提供有限的支持(如果在重试建议之前添加到通知链)。它将 messageId 添加到传入消息中。

"Limited" 表示完全状态重试支持不可用 - 只允许一次重新投递尝试。

如果重新投递失败,邮件将被拒绝,这会导致邮件被丢弃或路由到 DLX/DLQ(如果已配置)。在所有情况下,"temporary" 状态都会从缓存中移除。

一般来说,如果需要完全重试支持并且没有 messageId 属性 并且无法使用 MessageKeyGenerator 生成唯一密钥,我建议使用无状态重试。