是否可以选择创建仅接受 RabbitMQ 和 Spring 中具有高优先级消息的消费者?

Is there an option to create consumers that accept only messages with high priority in RabbitMQ and Spring?

我正在创建一个应用程序,该应用程序使用 RabbitMQ 向消费者发送消息以进行耗时处理。但是,我需要确定消息的优先级。当一个高优先级的消息到达时,即使所有的消费者实例都在处理其他消息,也必须处理它。

据我所知,在 Spring Boot 和 RabbitMQ 中无法抢占处理低优先级消息并切换到处理高优先级消息。

是否可以创建只接受高优先级消息的消费者,或者在所有其他消费者都忙且高优先级消息到达时,运行 即时增加一组消费者?

我尝试添加带有 x-max-priority=10 标志的队列并增加消费者数量,但这并没有解决我的问题。

假设我们 运行 50 个消费者并发送 50 条低优先级消息。正在执行耗时处理时,一条新消息以高优先级到达但无法立即处理,因为所有 50 个消费者都在忙。

有一部分配置可以设置消费者数量

@Bean
public SimpleRabbitListenerContainerFactory
rabbitListenerContainerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
                               @Qualifier("rabbitConnectionFactory") ConnectionFactory connectionFactory) {
  SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
  configurer.configure(factory, connectionFactory);
  factory.setConcurrentConsumers(50);
  factory.setMaxConcurrentConsumers(100);
  return factory;
}

有没有办法创建一组接受高优先级消息(例如高于 0)的消费者,或者为高优先级消息即时创建消费者?

AMQP 协议中没有"select" 来自队列的消息的机制。

您可能要考虑使用具有专用消费者的离散队列。

顺便说一句,这与 spring 无关;有关 RabbitMQ 的一般问题应提交至 rabbitmq-users Google 组。

我不知道有什么方法可以实施您描述的先发制人策略,但您可以考虑多种替代方案。

优先级设置

首先要考虑的是 priority RabbitMQ 本身的支持。

考虑 Gavin M. Roy 的《深入了解 RabbitMQ》中的这段摘录:

“As of RabbitMQ 3.5.0, the priority field has been implemented as per the AMQP specification. It’s defined as an integer with possible values of 0 through 9 to be used for message prioritization in queues. As specified, if a message with a priority of 9 is published, and subsequently a message with a priority of 0 is published, a newly connected consumer would receive the message with the priority of 0 before the message with a priority of 9”.

例如

rabbitTemplate.convertAndSend("Hello World!", message -> {
  MessageProperties properties = MessagePropertiesBuilder.newInstance()
                                                         .setPriority(0)
                                                         .build();
  return MessageBuilder.fromMessage(message)
                       .andProperties(properties)
                       .build();
});

基于优先级的交换

第二种选择是定义主题交换并定义考虑优先级的路由键。

例如,考虑使用模式 EventName.Priority 的路由密钥交换 events,例如OrderPlaced.HighOrderPlaced.NormalOrderPlaced.Low

基于此,您可以将队列绑定到仅具有高优先级的订单,即 OrderPlaced.High 和一些专用于该队列的消费者。

例如

String routingKey = String.format("%s.%s", event.name(), event.priority());
rabbit.convertAndSend(routingKey, event);

使用如下所示的侦听器,其中队列 high-priority-orders 绑定到 events 使用路由键 [=16] 交换事件 OrderPlaced 和优先级 High =].

@Component
@RabbitListener(queues = "high-priority-orders", containerFactory="orders")
public class HighPriorityOrdersListener {

 @RabbitHandler
 public void onOrderPlaced(OrderPlacedEvent orderPlaced) {
   //...
 }
}

显然,您需要一个专用线程池(在上面的 orders 容器工厂中)来处理高优先级请求。