如何在 SpringBoot:RabbitMQ 中为每个队列只配置一个消费者?

How to configure only one consumer per queue in SpringBoot:RabbitMQ?

应用概览: 可以创建多个动态队列。每个队列应该只有一个消费者。只有当消费者处理完一条消息时,消费者才应该拿起另一条消息。

这是我的配置:

@EnableRabbit
@Configuration
public class RabbitMqSenderConfig {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMqSenderConfig.class);

    @Value("${spring.rabbitmq.addresses}")
    private String addressURL;


    @Bean
    public ConnectionFactory connectionFactory() throws URISyntaxException {
        return new CachingConnectionFactory(new URI(addressURL));
    }

    /**
     * Required for executing adminstration functions against an AMQP Broker
     */
    @Bean
    public AmqpAdmin amqpAdmin() throws URISyntaxException {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public MessageConverter jsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }

    @Bean
    public AmqpTemplate rabbitTemplate() throws URISyntaxException {
        final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        rabbitTemplate.setMessageConverter(jsonMessageConverter());
        return rabbitTemplate;
    }


}

听众:

public class RabbitMqConsumer extends SimpleMessageListenerContainer {

    public void startConsumers() throws Exception {
        super.doStart();
    }
}

处理消息方法:

public void handleMessage(DeploymentJob deploymentJob) {
    // Deployment running, takes almost 10-15 minutes each
    try {
        System.out.println("deploymentJob.getSocketHandler().getSessions() -> "+deploymentJob.getSocketHandler().getSessions());
    } catch (Exception e) {
        e.printStackTrace();
    }
}

每次创建拉取请求时,我都会将详细信息发送到以 target 分支命名的队列中的 rabbitMQ,以便所有拉取请求都在队列中得到验证,然后我创建了一个这样的消费者。

    rabbitTemplate.convertAndSend(repoName, queue_name, deploymentJob);
    RabbitMqConsumer container = new RabbitMqConsumer();
    container.setConnectionFactory(rabbitMqSenderConfig.connectionFactory());
    container.setQueueNames(queue_name);
    container.setConcurrentConsumers(1);
    container.setMessageListener(new MessageListenerAdapter(new ConsumerHandler(), new Jackson2JsonMessageConverter()));
    container.startConsumers();

这里的问题是它正在为队列中的每条消息创建新的消费者,我想避免这种情况。 而且我没有找到一种方法来限制每个队列只有一个消费者。或者检查消费者是否不存在,然后不要创建 RabbitMqConsumer 的新实例。这可能吗?

消费者部分

只需使用由队列名称键入的 ConcurrentHashMap 来确定您已经有消费者的队列;当您创建一个新的消费者时,将其添加到地图中。