RabbitMQ 延迟消息不起作用
RabbitMQ delayed message not working
我正在尝试使用延迟交换插件,但不知何故它对我不起作用并且消息被立即收到。
我尝试了以下操作:
a) 成功启用 rabbitmq_delayed_message_exchange 并在 ubuntu-16.04 上重新启动了 rabbitmq 服务器。
b) 声明交换
Map<String,Object> props = new HashMap<String,Object>();
props.put("x-delayed-type", "direct");
this.automationExchange = new DirectExchange(exchangeName,true,false, props);
c) 将消息推送为
DefaultClassMapper typeMapper = QueueUtils.classMapper;
typeMapper.setDefaultType(type);
Jackson2JsonMessageConverter converter = QueueUtils.converter;
converter.setClassMapper(typeMapper);
RabbitTemplate template = AMQPRabbitMQTemplate.getAMQPTemplate();
template.setMessageConverter(converter);
template.convertAndSend(routingKey, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message m) throws AmqpException {
m.getMessageProperties().setDelay(delayMiliSeconds);
m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return m;
}
});
现在打印消息时
public void onMessage(Message message, Channel channel) throws Exception{
System.out.println(message.getMessageProperties().getDelay());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
它正在为 getDelay 打印 null,根据 https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq.
理想情况下应该为设定值的负值
如果我做错了什么,请告诉我。
我正在为 spring-amqp 和 spring-rabbit.
使用 1.6.8.RELEASE 版本
为了避免 headers 从入站消息意外传播到出站消息,MessageProperties.getReceived...
方法提供了入站消息的某些 headers。
在这种情况下,header 在 MessageProperties.getReceivedDelay()
中。
在向管理员声明之前,您还需要在 automationExchange
上 setDelayed(true)
。
我想你也在 RabbitTemplate
中将交换设置为默认值。
我正在尝试使用延迟交换插件,但不知何故它对我不起作用并且消息被立即收到。
我尝试了以下操作:
a) 成功启用 rabbitmq_delayed_message_exchange 并在 ubuntu-16.04 上重新启动了 rabbitmq 服务器。
b) 声明交换
Map<String,Object> props = new HashMap<String,Object>();
props.put("x-delayed-type", "direct");
this.automationExchange = new DirectExchange(exchangeName,true,false, props);
c) 将消息推送为
DefaultClassMapper typeMapper = QueueUtils.classMapper;
typeMapper.setDefaultType(type);
Jackson2JsonMessageConverter converter = QueueUtils.converter;
converter.setClassMapper(typeMapper);
RabbitTemplate template = AMQPRabbitMQTemplate.getAMQPTemplate();
template.setMessageConverter(converter);
template.convertAndSend(routingKey, message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message m) throws AmqpException {
m.getMessageProperties().setDelay(delayMiliSeconds);
m.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return m;
}
});
现在打印消息时
public void onMessage(Message message, Channel channel) throws Exception{
System.out.println(message.getMessageProperties().getDelay());
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
它正在为 getDelay 打印 null,根据 https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq.
理想情况下应该为设定值的负值如果我做错了什么,请告诉我。 我正在为 spring-amqp 和 spring-rabbit.
使用 1.6.8.RELEASE 版本为了避免 headers 从入站消息意外传播到出站消息,MessageProperties.getReceived...
方法提供了入站消息的某些 headers。
在这种情况下,header 在 MessageProperties.getReceivedDelay()
中。
在向管理员声明之前,您还需要在 automationExchange
上 setDelayed(true)
。
我想你也在 RabbitTemplate
中将交换设置为默认值。