Spring AMQP - 如何确认消息已成功传递和路由?

Spring AMQP - How to confirm that a message is delivered and Routed successfully?

我正在寻找一种传递消息的方法,一旦成功传递(和路由)消息,我需要执行一些操作。

我已通过以下方式启用发布者确认和 returns:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true

我已经配置return并在兔子模板上确认回调:

rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> {
  System.out.println("Message returned");
});
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
  System.out.println("confirm"); //correlationData.returnedMessage has the original message
});

这是我的发布代码:

CorrelationData crd = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend("X-ORDERS", "ORDER_PLACED", request, crd);

crd.getFuture().addCallback(new ListenableFutureCallback<Confirm>() {
  @Override
  public void onFailure(Throwable throwable) {
    log.info("Failure received");
  }

  @Override
  public void onSuccess(Confirm confirm) {
    if(confirm.isAck()){
    log.info("Success received");
    doSomethingAfterSuccess();
  }}
});

现在,当我发布无法路由消息的消息时:-

  1. rabbitTemplate 的 returnCallBack 和 confirmCallBack 也在

  2. correlationData 的 onSuccess(..) 仍然被调用 isAck() = true

那么,如何检查邮件是否已成功传递和路由?

编辑:找到解决方案。发布代码:

CorrelationData crd = new CorrelationData(UUID.randomUUID().toString());
    rabbitTemplate.convertAndSend("X-ORDERS", "ORDER_PLACED", request, crd);

    crd.getFuture().addCallback(new ListenableFutureCallback<Confirm>() {
      @Override
      public void onFailure(Throwable throwable) {
        log.info("Failure received");
      }

      @Override
      public void onSuccess(Confirm confirm) {
        if(confirm.isAck() && crd.getReturnedMessage == null){
        log.info("Success received");
        doSomethingAfterSuccess();
      }}
    });

基本上把onSuccess里面的条件改成了"confirm.isAck() && crd.getReturnedMessage == null"

根据 RabbitMQ 文档 - 您仍然会收到肯定的确认,但保证会在 return.

之后发送

因此只需检查 future.returnedMessage 是否不是 onSuccess() 中的 null

the documentation

In addition, when both confirms and returns are enabled, the CorrelationData is populated with the returned message. It is guaranteed that this occurs before the future is set with the ack.