Scheduled/Delay Spring AMQP RabbitMq 中的消息传递
Scheduled/Delay messaging in Spring AMQP RabbitMq
我正在努力寻找 Spring AMQP/Rabbit MQ 中 scheduled/Delaying 消息的方式。
经过大量搜索我仍然无法找到在 Spring AMQP 中执行此操作。有人可以告诉我如何在 Spring AMQP 中执行 x-delay。
如果消费者端发生某些异常,我想延迟消息。 RabbitMQ 说要添加 x-delay 并安装我已经完成的插件,但消息仍然立即没有任何延迟
我在消息中收到这个
收到 <(正文:'[B@60a4ae5f(byte[26])'MessageProperties [headers={x-delay=15000}
@Bean
ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(1500);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean
Binding binding(@Qualifier("queue")Queue queue, DirectExchange exchange) {
return new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), queue.getName(), null);
//return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
DirectExchange exchange() {
DirectExchange exchange=new DirectExchange("delay-exchange");
return exchange;
}
消费者---
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("Received <" + message+ ">" +rabbitTemplate);
if(i==1){
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
Map<String,Object> headers = message.getMessageProperties().getHeaders();
headers.put("x-delay", 15000);
props.headers(headers);
i++;
channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
props.build(), message.getBody());
}
}
首先,您似乎没有关注 Scheduling Messages with RabbitMQ 文章:
To use the Delayed Message Exchange you just need to declare an exchange providing the "x-delayed-message" exchange type as follows:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
我想说 Spring AMQP 也可以达到同样的效果:
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
}
另一个问题是您真的应该向 delay-exchange
发布消息,而不是任何其他消息。再次:无论如何,该文档中都提到了这一点。
更新
自 Spring AMQP 1.6 起,延迟消息作为开箱即用的功能得到支持:
https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available.
我正在努力寻找 Spring AMQP/Rabbit MQ 中 scheduled/Delaying 消息的方式。
经过大量搜索我仍然无法找到在 Spring AMQP 中执行此操作。有人可以告诉我如何在 Spring AMQP 中执行 x-delay。
如果消费者端发生某些异常,我想延迟消息。 RabbitMQ 说要添加 x-delay 并安装我已经完成的插件,但消息仍然立即没有任何延迟
我在消息中收到这个
收到 <(正文:'[B@60a4ae5f(byte[26])'MessageProperties [headers={x-delay=15000}
@Bean
ConnectionFactory connectionFactory(){
CachingConnectionFactory connectionFactory=new CachingConnectionFactory("127.0.0.1");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
connectionFactory.setPort(1500);
connectionFactory.setPublisherReturns(true);
return connectionFactory;
}
@Bean
Binding binding(@Qualifier("queue")Queue queue, DirectExchange exchange) {
return new Binding(queue.getName(), Binding.DestinationType.QUEUE, exchange.getName(), queue.getName(), null);
//return BindingBuilder.bind(queue).to(exchange).with(queueName);
}
@Bean
DirectExchange exchange() {
DirectExchange exchange=new DirectExchange("delay-exchange");
return exchange;
}
消费者---
@Override
public void onMessage(Message message, Channel channel) throws Exception {
System.out.println("Received <" + message+ ">" +rabbitTemplate);
if(i==1){
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
Map<String,Object> headers = message.getMessageProperties().getHeaders();
headers.put("x-delay", 15000);
props.headers(headers);
i++;
channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(),
props.build(), message.getBody());
}
}
首先,您似乎没有关注 Scheduling Messages with RabbitMQ 文章:
To use the Delayed Message Exchange you just need to declare an exchange providing the "x-delayed-message" exchange type as follows:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
我想说 Spring AMQP 也可以达到同样的效果:
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
}
另一个问题是您真的应该向 delay-exchange
发布消息,而不是任何其他消息。再次:无论如何,该文档中都提到了这一点。
更新
自 Spring AMQP 1.6 起,延迟消息作为开箱即用的功能得到支持: https://spring.io/blog/2016/02/16/spring-amqp-1-6-0-milestone-1-and-1-5-4-available.