使用 Spring AMQP 延迟发送给监听器的消息
Delay message to send to listener using Spring AMQP
我需要在一定时间后向 MessageListener 发送消息,那么有什么方法可以使用 Spring AMQP 来实现。
例如。
生产者产生消息,消息发送到 RabbitMQ Q,消息被接收到监听器立即监听那个 Q,我想在一些配置参数说 1000 毫秒后延迟消息在消费者端接收
RabbitMQ 为此提供了 Delayed Exchange 功能。
从 1.6 版本开始 Spring AMQP 也提供了关于此事的高级 API:http://docs.spring.io/spring-amqp/reference/html/_reference.html#delayed-message-exchange:
<rabbit:topic-exchange name="topic" delayed="true" />
MessageProperties properties = new MessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
更新
在 Spring AMQP 1.6
之前你应该这样做:
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
}
...
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
另请参阅此问题及其答案:
如果使用spring开机,可以这样:
@Bean
Queue queue() {
return QueueBuilder.durable(queueName)
.withArgument("x-dead-letter-exchange", dlx)
.withArgument("x-dead-letter-routing-key", dlq)
.build();
}
@Bean
TopicExchange exchange() {
return (TopicExchange) ExchangeBuilder.topicExchange(topicExchangeName)
.delayed()
.build();
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(queueName);
}
我需要在一定时间后向 MessageListener 发送消息,那么有什么方法可以使用 Spring AMQP 来实现。
例如。 生产者产生消息,消息发送到 RabbitMQ Q,消息被接收到监听器立即监听那个 Q,我想在一些配置参数说 1000 毫秒后延迟消息在消费者端接收
RabbitMQ 为此提供了 Delayed Exchange 功能。
从 1.6 版本开始 Spring AMQP 也提供了关于此事的高级 API:http://docs.spring.io/spring-amqp/reference/html/_reference.html#delayed-message-exchange:
<rabbit:topic-exchange name="topic" delayed="true" />
MessageProperties properties = new MessageProperties();
properties.setDelay(15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
更新
在 Spring AMQP 1.6
之前你应该这样做:
@Bean
CustomExchange delayExchange() {
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
return new CustomExchange("my-exchange", "x-delayed-message", true, false, args);
}
...
MessageProperties properties = new MessageProperties();
properties.setHeader("x-delay", 15000);
template.send(exchange, routingKey,
MessageBuilder.withBody("foo".getBytes()).andProperties(properties).build());
另请参阅此问题及其答案:
如果使用spring开机,可以这样:
@Bean
Queue queue() {
return QueueBuilder.durable(queueName)
.withArgument("x-dead-letter-exchange", dlx)
.withArgument("x-dead-letter-routing-key", dlq)
.build();
}
@Bean
TopicExchange exchange() {
return (TopicExchange) ExchangeBuilder.topicExchange(topicExchangeName)
.delayed()
.build();
@Bean
Binding binding() {
return BindingBuilder.bind(queue()).to(exchange()).with(queueName);
}