当 RabbitMQ 无法路由消息时,发布者确认给出 ACK

Publisher Confirms gives ACK when message can not be routed by RabbitMQ

我正在做一个小项目来检查 Publisher 使用 Spring Cloud Stream 3.0.1 确认 RabbitMQ 的效果。

我有一个特殊情况,当一条消息被发送到 RabbitMQ 的交换器时,但是这个交换器不能路由这个消息。我预计此消息将由发布者作为错误处理,但令我惊讶的是它被作为 ACK 处理。

这是发布商的代码:

@Slf4j
@Timed
@Component
@RequiredArgsConstructor
public class TestPublisher {

    private final MessagingChannels messagingChannels;

    public boolean send(Message<Event<TestEvent>> message) {

        log.info("Message for Testing Publisher confirms sent: " + message);
        return messagingChannels.walletTest().send(message);
    }

    @ServiceActivator(inputChannel = TEST_ACK)
    public void acks(Message<?> ack) {
        log.info("Message ACK received for Test: " + ack);
    }

    @ServiceActivator(inputChannel = TEST_ERROR)
    public void errors(Message<?> error) {
        log.info("Message error for Test received: " + error);
    }
}

我可以在 org.springframework.integration.handler.LoggingHandler

的日志中看到这条消息
org.springframework.integration.amqp.support.ReturnedAmqpMessageException, failedMessage=GenericMessage [payload=byte[1009], headers={id=c9bc65bc-9256-1ce2-324f-167cc526a819, timestamp=1591962425292}] [amqpMessage=(Body:'{"payload":{"id":"f696668d-140e-4ba7-96c7-ea6decdfcccd","version":20000,"currency":"USD"},"eventName":"TEST_UPDATED",......contentType=application/json, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, deliveryTag=0]), replyCode=312, replyText=NO_ROUTE, .....]

如您所见,它被报告为 replyCode=312 和 replyText=NO_ROUTE.

的失败消息

但是,它是作为 ACK 而不是 NACK 接收的。我检查了一些文档,我认为这是启用 Publisher Confirms 的预期行为,但我想确认一下。如果是这种情况,是否有某种方法可以在 ACK 的上下文中检测到此消息无法被路由?因为在这个上下文中,我没有 replyCode 和 replyText,我只有发布者发送的正确消息。

发布者确认已使用以下属性正确激活:

spring:
  rabbitmq:
    publisher-confirm-type: correlated
    publisher-returns: true

ACK 通道和错误已使用 errorChannelEnabled 和 confirmAckChannel 属性正确配置。

这就是它的工作原理,在返回的消息之后发送 ack,而不是 nack。

the documentation

For unroutable messages, the broker will issue a confirm once the exchange verifies a message won't route to any queue (returns an empty list of queues). If the message is also published as mandatory, the basic.return is sent to the client before basic.ack.

您无法判断是在 ack 的上下文中返回的。

如果您直接使用 Spring AMQP 而不是通过 Spring Cloud Stream,则可以,因为 ack 回调获取 CorrelationData,它将包含返回的消息。