如何在 Spring 引导中实现循环队列消费者
How to implement a round-robin queue consumer in Spring boot
我正在 spring 中构建消息驱动服务,它将 运行 在集群中,需要以循环方式从 RabbitMQ 队列中拉取消息。该实现目前正在以先到先得的方式从队列中拉出消息,导致一些服务器得到备份而其他服务器空闲。
当前的 QueueConsumerConfiguration.java 看起来像:
@Configuration
public class QueueConsumerConfiguration extends RabbitMqConfiguration {
private Logger LOG = LoggerFactory.getLogger(QueueConsumerConfiguration.class);
private static final int DEFAULT_CONSUMERS=2;
@Value("${eventservice.inbound}")
protected String inboudEventQueue;
@Value("${eventservice.consumers}")
protected int queueConsumers;
@Autowired
private EventHandler eventtHandler;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.inboudEventQueue);
template.setQueue(this.inboudEventQueue);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public Queue inboudEventQueue() {
return new Queue(this.inboudEventQueue);
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(this.inboudEventQueue);
container.setMessageListener(messageListenerAdapter());
if (this.queueConsumers > 0) {
LOG.info("Starting queue consumers:" + this.queueConsumers );
container.setMaxConcurrentConsumers(this.queueConsumers);
container.setConcurrentConsumers(this.queueConsumers);
} else {
LOG.info("Starting default queue consumers:" + DEFAULT_CONSUMERS);
container.setMaxConcurrentConsumers(DEFAULT_CONSUMERS);
container.setConcurrentConsumers(DEFAULT_CONSUMERS);
}
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(this.eventtHandler, jsonMessageConverter());
}
}
是否只是添加
的情况
container.setChannelTransacted(true);
要配置吗?
RabbitMQ 对所有消费者一视同仁——它知道一个容器中的多个消费者与容器中的多个消费者之间没有区别。多个容器中的一个消费者(例如在不同的主机上)。从 Rabbit 的角度来看,每个人都是消费者。
如果你想更好地控制服务器亲和力,你需要使用多个队列,每个容器监听自己的队列。
然后您可以控制制作方的分发 - 例如使用主题或直接交换和特定路由键将消息路由到特定队列。
这将生产者与消费者紧密绑定(他必须知道有多少)。
或者您可以让您的制作人使用路由键 rk.0, rk.1, ..., rk.29
(重复,当达到 30 时重置为 0)。
然后您可以使用多个绑定来绑定消费者队列 -
消费者 1 获得 rk.0 到 rk.9,消费者 2 获得 rk.10 到 rk.19,等等
如果您随后决定增加消费者数量,只需适当重构绑定以重新分配工作。
容器会按需扩展到 maxConcurrentConsumers,但实际上,只有当整个容器空闲一段时间后才会缩减。
我正在 spring 中构建消息驱动服务,它将 运行 在集群中,需要以循环方式从 RabbitMQ 队列中拉取消息。该实现目前正在以先到先得的方式从队列中拉出消息,导致一些服务器得到备份而其他服务器空闲。
当前的 QueueConsumerConfiguration.java 看起来像:
@Configuration
public class QueueConsumerConfiguration extends RabbitMqConfiguration {
private Logger LOG = LoggerFactory.getLogger(QueueConsumerConfiguration.class);
private static final int DEFAULT_CONSUMERS=2;
@Value("${eventservice.inbound}")
protected String inboudEventQueue;
@Value("${eventservice.consumers}")
protected int queueConsumers;
@Autowired
private EventHandler eventtHandler;
@Bean
public RabbitTemplate rabbitTemplate() {
RabbitTemplate template = new RabbitTemplate(connectionFactory());
template.setRoutingKey(this.inboudEventQueue);
template.setQueue(this.inboudEventQueue);
template.setMessageConverter(jsonMessageConverter());
return template;
}
@Bean
public Queue inboudEventQueue() {
return new Queue(this.inboudEventQueue);
}
@Bean
public SimpleMessageListenerContainer listenerContainer() {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory());
container.setQueueNames(this.inboudEventQueue);
container.setMessageListener(messageListenerAdapter());
if (this.queueConsumers > 0) {
LOG.info("Starting queue consumers:" + this.queueConsumers );
container.setMaxConcurrentConsumers(this.queueConsumers);
container.setConcurrentConsumers(this.queueConsumers);
} else {
LOG.info("Starting default queue consumers:" + DEFAULT_CONSUMERS);
container.setMaxConcurrentConsumers(DEFAULT_CONSUMERS);
container.setConcurrentConsumers(DEFAULT_CONSUMERS);
}
return container;
}
@Bean
public MessageListenerAdapter messageListenerAdapter() {
return new MessageListenerAdapter(this.eventtHandler, jsonMessageConverter());
}
}
是否只是添加
的情况container.setChannelTransacted(true);
要配置吗?
RabbitMQ 对所有消费者一视同仁——它知道一个容器中的多个消费者与容器中的多个消费者之间没有区别。多个容器中的一个消费者(例如在不同的主机上)。从 Rabbit 的角度来看,每个人都是消费者。
如果你想更好地控制服务器亲和力,你需要使用多个队列,每个容器监听自己的队列。
然后您可以控制制作方的分发 - 例如使用主题或直接交换和特定路由键将消息路由到特定队列。
这将生产者与消费者紧密绑定(他必须知道有多少)。
或者您可以让您的制作人使用路由键 rk.0, rk.1, ..., rk.29
(重复,当达到 30 时重置为 0)。
然后您可以使用多个绑定来绑定消费者队列 -
消费者 1 获得 rk.0 到 rk.9,消费者 2 获得 rk.10 到 rk.19,等等
如果您随后决定增加消费者数量,只需适当重构绑定以重新分配工作。
容器会按需扩展到 maxConcurrentConsumers,但实际上,只有当整个容器空闲一段时间后才会缩减。