是否可以选择创建仅接受 RabbitMQ 和 Spring 中具有高优先级消息的消费者?
Is there an option to create consumers that accept only messages with high priority in RabbitMQ and Spring?
我正在创建一个应用程序,该应用程序使用 RabbitMQ 向消费者发送消息以进行耗时处理。但是,我需要确定消息的优先级。当一个高优先级的消息到达时,即使所有的消费者实例都在处理其他消息,也必须处理它。
据我所知,在 Spring Boot 和 RabbitMQ 中无法抢占处理低优先级消息并切换到处理高优先级消息。
是否可以创建只接受高优先级消息的消费者,或者在所有其他消费者都忙且高优先级消息到达时,运行 即时增加一组消费者?
我尝试添加带有 x-max-priority=10 标志的队列并增加消费者数量,但这并没有解决我的问题。
假设我们 运行 50 个消费者并发送 50 条低优先级消息。正在执行耗时处理时,一条新消息以高优先级到达但无法立即处理,因为所有 50 个消费者都在忙。
有一部分配置可以设置消费者数量
@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("rabbitConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(50);
factory.setMaxConcurrentConsumers(100);
return factory;
}
有没有办法创建一组接受高优先级消息(例如高于 0)的消费者,或者为高优先级消息即时创建消费者?
AMQP 协议中没有"select" 来自队列的消息的机制。
您可能要考虑使用具有专用消费者的离散队列。
顺便说一句,这与 spring 无关;有关 RabbitMQ 的一般问题应提交至 rabbitmq-users Google 组。
我不知道有什么方法可以实施您描述的先发制人策略,但您可以考虑多种替代方案。
优先级设置
首先要考虑的是 priority
RabbitMQ 本身的支持。
考虑 Gavin M. Roy 的《深入了解 RabbitMQ》中的这段摘录:
“As of RabbitMQ 3.5.0, the priority field has been implemented as per the AMQP specification. It’s defined as an integer with possible values of 0 through 9 to be used for message prioritization in queues. As specified, if a message with a priority of 9 is published, and subsequently a message with a priority of 0 is published, a newly connected consumer would receive the message with the priority of 0 before the message with a priority of 9”.
例如
rabbitTemplate.convertAndSend("Hello World!", message -> {
MessageProperties properties = MessagePropertiesBuilder.newInstance()
.setPriority(0)
.build();
return MessageBuilder.fromMessage(message)
.andProperties(properties)
.build();
});
基于优先级的交换
第二种选择是定义主题交换并定义考虑优先级的路由键。
例如,考虑使用模式 EventName.Priority
的路由密钥交换 events
,例如OrderPlaced.High
、OrderPlaced.Normal
或 OrderPlaced.Low
。
基于此,您可以将队列绑定到仅具有高优先级的订单,即 OrderPlaced.High
和一些专用于该队列的消费者。
例如
String routingKey = String.format("%s.%s", event.name(), event.priority());
rabbit.convertAndSend(routingKey, event);
使用如下所示的侦听器,其中队列 high-priority-orders
绑定到 events
使用路由键 [=16] 交换事件 OrderPlaced
和优先级 High
=].
@Component
@RabbitListener(queues = "high-priority-orders", containerFactory="orders")
public class HighPriorityOrdersListener {
@RabbitHandler
public void onOrderPlaced(OrderPlacedEvent orderPlaced) {
//...
}
}
显然,您需要一个专用线程池(在上面的 orders
容器工厂中)来处理高优先级请求。
我正在创建一个应用程序,该应用程序使用 RabbitMQ 向消费者发送消息以进行耗时处理。但是,我需要确定消息的优先级。当一个高优先级的消息到达时,即使所有的消费者实例都在处理其他消息,也必须处理它。
据我所知,在 Spring Boot 和 RabbitMQ 中无法抢占处理低优先级消息并切换到处理高优先级消息。
是否可以创建只接受高优先级消息的消费者,或者在所有其他消费者都忙且高优先级消息到达时,运行 即时增加一组消费者?
我尝试添加带有 x-max-priority=10 标志的队列并增加消费者数量,但这并没有解决我的问题。
假设我们 运行 50 个消费者并发送 50 条低优先级消息。正在执行耗时处理时,一条新消息以高优先级到达但无法立即处理,因为所有 50 个消费者都在忙。
有一部分配置可以设置消费者数量
@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
@Qualifier("rabbitConnectionFactory") ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
configurer.configure(factory, connectionFactory);
factory.setConcurrentConsumers(50);
factory.setMaxConcurrentConsumers(100);
return factory;
}
有没有办法创建一组接受高优先级消息(例如高于 0)的消费者,或者为高优先级消息即时创建消费者?
AMQP 协议中没有"select" 来自队列的消息的机制。
您可能要考虑使用具有专用消费者的离散队列。
顺便说一句,这与 spring 无关;有关 RabbitMQ 的一般问题应提交至 rabbitmq-users Google 组。
我不知道有什么方法可以实施您描述的先发制人策略,但您可以考虑多种替代方案。
优先级设置
首先要考虑的是 priority
RabbitMQ 本身的支持。
考虑 Gavin M. Roy 的《深入了解 RabbitMQ》中的这段摘录:
“As of RabbitMQ 3.5.0, the priority field has been implemented as per the AMQP specification. It’s defined as an integer with possible values of 0 through 9 to be used for message prioritization in queues. As specified, if a message with a priority of 9 is published, and subsequently a message with a priority of 0 is published, a newly connected consumer would receive the message with the priority of 0 before the message with a priority of 9”.
例如
rabbitTemplate.convertAndSend("Hello World!", message -> {
MessageProperties properties = MessagePropertiesBuilder.newInstance()
.setPriority(0)
.build();
return MessageBuilder.fromMessage(message)
.andProperties(properties)
.build();
});
基于优先级的交换
第二种选择是定义主题交换并定义考虑优先级的路由键。
例如,考虑使用模式 EventName.Priority
的路由密钥交换 events
,例如OrderPlaced.High
、OrderPlaced.Normal
或 OrderPlaced.Low
。
基于此,您可以将队列绑定到仅具有高优先级的订单,即 OrderPlaced.High
和一些专用于该队列的消费者。
例如
String routingKey = String.format("%s.%s", event.name(), event.priority());
rabbit.convertAndSend(routingKey, event);
使用如下所示的侦听器,其中队列 high-priority-orders
绑定到 events
使用路由键 [=16] 交换事件 OrderPlaced
和优先级 High
=].
@Component
@RabbitListener(queues = "high-priority-orders", containerFactory="orders")
public class HighPriorityOrdersListener {
@RabbitHandler
public void onOrderPlaced(OrderPlacedEvent orderPlaced) {
//...
}
}
显然,您需要一个专用线程池(在上面的 orders
容器工厂中)来处理高优先级请求。