如何在 SpringBoot:RabbitMQ 中为每个队列只配置一个消费者?
How to configure only one consumer per queue in SpringBoot:RabbitMQ?
应用概览:
可以创建多个动态队列。每个队列应该只有一个消费者。只有当消费者处理完一条消息时,消费者才应该拿起另一条消息。
这是我的配置:
@EnableRabbit
@Configuration
public class RabbitMqSenderConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqSenderConfig.class);
@Value("${spring.rabbitmq.addresses}")
private String addressURL;
@Bean
public ConnectionFactory connectionFactory() throws URISyntaxException {
return new CachingConnectionFactory(new URI(addressURL));
}
/**
* Required for executing adminstration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() throws URISyntaxException {
return new RabbitAdmin(connectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() throws URISyntaxException {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
听众:
public class RabbitMqConsumer extends SimpleMessageListenerContainer {
public void startConsumers() throws Exception {
super.doStart();
}
}
处理消息方法:
public void handleMessage(DeploymentJob deploymentJob) {
// Deployment running, takes almost 10-15 minutes each
try {
System.out.println("deploymentJob.getSocketHandler().getSessions() -> "+deploymentJob.getSocketHandler().getSessions());
} catch (Exception e) {
e.printStackTrace();
}
}
每次创建拉取请求时,我都会将详细信息发送到以 target
分支命名的队列中的 rabbitMQ,以便所有拉取请求都在队列中得到验证,然后我创建了一个这样的消费者。
rabbitTemplate.convertAndSend(repoName, queue_name, deploymentJob);
RabbitMqConsumer container = new RabbitMqConsumer();
container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
container.setQueueNames(queue_name);
container.setConcurrentConsumers(1);
container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new Jackson2JsonMessageConverter()));
container.startConsumers();
这里的问题是它正在为队列中的每条消息创建新的消费者,我想避免这种情况。
而且我没有找到一种方法来限制每个队列只有一个消费者。或者检查消费者是否不存在,然后不要创建 RabbitMqConsumer
的新实例。这可能吗?
消费者部分
只需使用由队列名称键入的 ConcurrentHashMap
来确定您已经有消费者的队列;当您创建一个新的消费者时,将其添加到地图中。
应用概览: 可以创建多个动态队列。每个队列应该只有一个消费者。只有当消费者处理完一条消息时,消费者才应该拿起另一条消息。
这是我的配置:
@EnableRabbit
@Configuration
public class RabbitMqSenderConfig {
private static final Logger logger = LoggerFactory.getLogger(RabbitMqSenderConfig.class);
@Value("${spring.rabbitmq.addresses}")
private String addressURL;
@Bean
public ConnectionFactory connectionFactory() throws URISyntaxException {
return new CachingConnectionFactory(new URI(addressURL));
}
/**
* Required for executing adminstration functions against an AMQP Broker
*/
@Bean
public AmqpAdmin amqpAdmin() throws URISyntaxException {
return new RabbitAdmin(connectionFactory());
}
@Bean
public MessageConverter jsonMessageConverter() {
return new Jackson2JsonMessageConverter();
}
@Bean
public AmqpTemplate rabbitTemplate() throws URISyntaxException {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
rabbitTemplate.setMessageConverter(jsonMessageConverter());
return rabbitTemplate;
}
}
听众:
public class RabbitMqConsumer extends SimpleMessageListenerContainer {
public void startConsumers() throws Exception {
super.doStart();
}
}
处理消息方法:
public void handleMessage(DeploymentJob deploymentJob) {
// Deployment running, takes almost 10-15 minutes each
try {
System.out.println("deploymentJob.getSocketHandler().getSessions() -> "+deploymentJob.getSocketHandler().getSessions());
} catch (Exception e) {
e.printStackTrace();
}
}
每次创建拉取请求时,我都会将详细信息发送到以 target
分支命名的队列中的 rabbitMQ,以便所有拉取请求都在队列中得到验证,然后我创建了一个这样的消费者。
rabbitTemplate.convertAndSend(repoName, queue_name, deploymentJob);
RabbitMqConsumer container = new RabbitMqConsumer();
container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
container.setQueueNames(queue_name);
container.setConcurrentConsumers(1);
container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new Jackson2JsonMessageConverter()));
container.startConsumers();
这里的问题是它正在为队列中的每条消息创建新的消费者,我想避免这种情况。
而且我没有找到一种方法来限制每个队列只有一个消费者。或者检查消费者是否不存在,然后不要创建 RabbitMqConsumer
的新实例。这可能吗?
消费者部分
只需使用由队列名称键入的 ConcurrentHashMap
来确定您已经有消费者的队列;当您创建一个新的消费者时,将其添加到地图中。