在 Spring RecoveryCallback 中获取消息详细信息

Fetch message details in Spring RecoveryCallback

我正在将消息发布到 RabbitMQ 中,我想在 RabbitMQ 关闭时跟踪错误,为此我添加了一个 RetryTemplate 与恢复回调,但恢复回调仅提供此方法 getLastThrowable() 而且我不确定如何提供 RabbitMQ 关闭时失败消息的详细信息。 (根据文档“RecoveryCallback 在某种程度上受到限制,因为重试上下文仅包含 lastThrowable 字段。对于更复杂的用例,您应该使用外部 RetryTemplate 以便您可以通过以下方式向 RecoveryCallback 传达更多信息 上下文的属性”),但我不知道该怎么做,如果有人能帮我举一个很棒的例子。

兔子模板

public RabbitTemplate rabbitMqTemplate(RecoveryCallback publisherRecoveryCallback) {
    RabbitTemplate r = new RabbitTemplate(rabbitConnectionFactory);
    r.setExchange(exchangeName);
    r.setRoutingKey(routingKey);
    r.setConnectionFactory(rabbitConnectionFactory);
    r.setMessageConverter(jsonMessageConverter());

    RetryTemplate retryTemplate = new RetryTemplate();
    ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
    backOffPolicy.setInitialInterval(500);
    backOffPolicy.setMultiplier(10.0);
    backOffPolicy.setMaxInterval(10000);
    retryTemplate.setBackOffPolicy(backOffPolicy);
    r.setRetryTemplate(retryTemplate);
    r.setRecoveryCallback(publisherRecoveryCallback);
    return r;
    }

恢复回调

@Component
public class PublisherRecoveryCallback implements RecoveryCallback<AssortmentEvent> {
    @Override
    public AssortmentEvent recover(RetryContext context) throws Exception {
        log.error("Error publising event",context.getLastThrowable());
        //how to get message details here??
        return null;
    }
}

AMQP 出站适配器

return IntegrationFlows.from("eventsChannel") .split() .handle(Amqp.outboundAdapter(rabbitMqTemplate) .exchangeName(exchangeName) .confirmCorrelationExpression("payload") .confirmAckChannel(ackChannel) .confirmNackChannel(nackChannel) ) .get();

这是不可能的,因为函数 RabbitTemplate.execute() 已经不知道您发送的消息,因为它可以从任何其他方法执行,我们可能没有要处理的消息:

return this.retryTemplate.execute(
                    (RetryCallback<T, Exception>) context -> RabbitTemplate.this.doExecute(action, connectionFactory),
                    (RecoveryCallback<T>) this.recoveryCallback);

我建议你做的是在发送之前将消息存储到 ThreadLocal,然后从你的自定义 RecoveryCallback.

那里获取它