为什么我无法使用 spring-boot-starter-amqp 在消费者端获取 correlationId?

Why can't I get the correlationId on the consumer side by using the spring-boot-starter-amqp?

我将 RabbitTemplate 发送消息与我自己生成的 CorrelationData 一起使用。我已经在 cofirmCallBack 中收到了 correlationId,但我无法在消费者端收到它。

我用2.0.3.RELEASE和2.1.0.RELEASE测试了这个问题,结果和上面的描述一致。

rabbitmq 配置

@Configuration
public class RabbitMQConfig {

    @Value("${mq.rabbit.addresses}")
    private String addresses;

    @Value("${mq.rabbit.username}")
    private String username;

    @Value("${mq.rabbit.password}")
    private String password;

    @Value("${mq.rabbit.virtualHost}")
    private String virtualHost;

    @Value("${mq.rabbit.sessionCacheSize}")
    private int sessionCacheSize;

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setAddresses(addresses);// addresses list of addresses with form "host[:port],..."
        connectionFactory.setVirtualHost(virtualHost);
        connectionFactory.setPublisherConfirms(true);
        connectionFactory.setPublisherReturns(true);
        connectionFactory.setChannelCacheSize(sessionCacheSize);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        return connectionFactory;
    }

    @Bean
    @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
    public RabbitTemplate rabbitTemplate(MessageConverter messageConverter) {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setMessageConverter(messageConverter);
        template.setMandatory(true);
        template.setConfirmCallback(new ConfirmCallbackListener());
        template.setReturnCallback(new ReturnCallBackListener());
        return template;
    }

    @Bean
    public MessageConverter messageConverter(ObjectMapper customMapper) {
        return new Jackson2JsonMessageConverter(customMapper);
    }

    @Bean
    public Queue testQueue() {
        return new Queue("test-queue", true);
    }

    @Bean
    public TopicExchange defaultExchange() {
        return new TopicExchange("test-exchange", true, false);
    }

    @Bean
    public Binding bindingExchangeCommon(Queue testQueue, TopicExchange defaultExchange) {
        return BindingBuilder.bind(testQueue).to(defaultExchange).with("test");
    }

    @Bean
    public SimpleMessageListenerContainer testMessageContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueueNames("test-queue");
        container.setExposeListenerChannel(true);
        container.setPrefetchCount(250);
        container.setMaxConcurrentConsumers(20);
        container.setConcurrentConsumers(10);
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(new TestMessageListener());
        return container;
    }
}

确认回调

public class ConfirmCallbackListener implements RabbitTemplate.ConfirmCallback {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if(!ack) {
            logger.info("send message ack failed: " + cause + " -> ID: " + String.valueOf(correlationData));
        }else {
            logger.info("send message ack success -> ID: " + String.valueOf(correlationData));
        }
    }
}

return回调

public class ReturnCallBackListener implements RabbitTemplate.ReturnCallback{

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        logger.info("send message failed...");
    }

}

消息监听器

public class TestMessageListener implements ChannelAwareMessageListener {

    private Logger logger = LoggerFactory.getLogger(this.getClass());

    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        try {
            // Here: get CorrelationId is always null
            logger.info("handle message: {} -> ID: {}" , new String(message.getBody(), "UTF-8"), 
                message.getMessageProperties().getCorrelationId());
            if(true) {
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                logger.info("listener ack message completed");
            }else {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            }
        } catch (Exception e) {
            logger.error("handle test message error", e);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
    }

}

发送消息

@RestController
@RequestMapping("/rabbitmq")
public class RabbitmqCtrl {

    private AtomicLong atoId = new AtomicLong();

    @Resource
    private RabbitTemplate rabbitTemplate;

    @PostMapping("sendMsg")
    public String sendMsg(@RequestBody String content) {
        Message message = new Message();
        message.setId(String.valueOf(atoId.incrementAndGet()));
        message.setContent(content);
        rabbitTemplate.convertAndSend("test-exchange", "test", message, new CorrelationData(String.valueOf(atoId.get())));
        return "success";
    }
}

我尝试将 CorrelationDataPostProcessor 设置为 RabbitTemplate,如下所示:

template.setCorrelationDataPostProcessor(new CorrelationDataPostProcessor() {
            @Override
            public CorrelationData postProcess(Message message, CorrelationData correlationData) {
                if(correlationData != null) {
                    message.getMessageProperties().setCorrelationId(correlationData.getId());
                }
                return correlationData;
            }
        });

这样我就可以得到CorelationID了,不过我觉得既然我在发消息的时候就已经设置好ID了,那我应该就不用了。还有什么更合理的解释吗?

CorrelationData 不会通过网络发送,除非您使用自定义 MessagePostProcessor 明确说明,就像您对 CorrelationDataPostProcessor 所做的那样。 默认实现是这样的:

default Message postProcessMessage(Message message, Correlation correlation) {
    return postProcessMessage(message);
}

如您所见,correlation 被完全忽略。

因此,要向消费者端发送相关性,我们确实必须提供自定义 MessagePostProcessor 并将其注入 RabbitTemplate.