RabbitMQ 延迟消息不起作用

RabbitMQ delayed message not working

我正在尝试使用延迟交换插件,但不知何故它对我不起作用并且消息被立即收到。

我尝试了以下操作:

a) 成功启用 rabbitmq_delayed_message_exchange 并在 ubuntu-16.04 上重新启动了 rabbitmq 服务器。

b) 声明交换

Map<String,Object> props = new HashMap<String,Object>();

props.put("x-delayed-type", "direct");

this.automationExchange = new DirectExchange(exchangeName,true,false, props);

c) 将消息推送为

DefaultClassMapper typeMapper = QueueUtils.classMapper;
typeMapper.setDefaultType(type);

Jackson2JsonMessageConverter converter = QueueUtils.converter;                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                              
converter.setClassMapper(typeMapper);

RabbitTemplate template = AMQPRabbitMQTemplate.getAMQPTemplate();
template.setMessageConverter(converter);

template.convertAndSend(routingKey, message, new MessagePostProcessor() {

    @Override
    public Message postProcessMessage(Message m) throws AmqpException {
        m.getMessageProperties().setDelay(delayMiliSeconds);
        m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
        return m;
    }
});

现在打印消息时

public void onMessage(Message message, Channel channel) throws Exception{
    System.out.println(message.getMessageProperties().getDelay());
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

它正在为 getDelay 打印 null,根据 https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq.

理想情况下应该为设定值的负值

如果我做错了什么,请告诉我。 我正在为 spring-amqp 和 spring-rabbit.

使用 1.6.8.RELEASE 版本

为了避免 headers 从入站消息意外传播到出站消息,MessageProperties.getReceived... 方法提供了入站消息的某些 headers。

在这种情况下,header 在 MessageProperties.getReceivedDelay() 中。

在向管理员声明之前,您还需要在 automationExchangesetDelayed(true)

我想你也在 RabbitTemplate 中将交换设置为默认值。