RabbitMQ Spring 引导 AMQP - 使用并发线程

RabbitMQ Spring Boot AMQP - consume with concurrent threads

我希望我的应用程序同时处理从 RabbitMQ 接收到的多条消息。 我可能已经尝试了所有 google-page-1 解决方案,但它不起作用。 这是我的设置:

POM.xml

<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>1.5.10.RELEASE</version>
</parent>
.
.
.
<dependency>
    <groupId>org.springframework.amqp</groupId>
    <artifactId>spring-rabbit</artifactId>
</dependency>

application.properties:

#############################
#         RabbitMQ          #
#############################
#AMQP RabbitMQ configuration 
spring.rabbitmq.host=zzzzzzzz
spring.rabbitmq.port=5672
spring.rabbitmq.username=zzzzzzz
spring.rabbitmq.password=zzzzzzz
#Rabbit component names
com.cp.neworder.queue.name = new-order-queue-stg
com.cp.neworder.queue.exchange = new-order-exchange-stg
com.cp.completedorder.queue.name = completed-order-queue
com.cp.completedorder.queue.exchange = completed-order-exchange
#Rabbit MQ concurrect consumers config
spring.rabbitmq.listener.simple.concurrency=3
spring.rabbitmq.listener.simple.retry.initial-interval=3000

配置文件:

@Configuration
public class RabbitMQConfig {

    @Value("${com.cp.neworder.queue.name}")
    private String newOrderQueueName;
    @Value("${com.cp.neworder.queue.exchange}")
    private String newOrderExchangeName;

    @Bean
    Queue queue() {
        return new Queue(newOrderQueueName, true);
    }

    @Bean
    TopicExchange exchange() {
        return new TopicExchange(newOrderExchangeName);
    }

    @Bean
    Binding binding(Queue queue, TopicExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(newOrderQueueName);
    }

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(newOrderQueueName);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    MessageListenerAdapter listenerAdapter(OrderMessageListener receiver) {
        return new MessageListenerAdapter(receiver, "receiveOrder");
    }

}

我的消费者 class 按预期工作,它一次只处理一个请求。我怎么知道?

  1. 我把异步请求的处理过程保存在数据库中,所以我可以查询当前有多少正在处理,它总是只有 1 个。
  2. 可以看一下RabbitMQ管理平台,看到正在一个个出队

我的设置有哪些错误?我如何让它工作?

谢谢。

SimpleMessageListenerContainer 有办法设置并发消费者。它有 setConcurrentConsumers method 可以设置消费者的数量。

    @Bean
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter) {
      SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
      container.setConnectionFactory(connectionFactory);
      container.setQueueNames(newOrderQueueName);
      container.setMessageListener(listenerAdapter);
      container. setConcurrentConsumers(10);
      return container;
  }

使用此配置,当您启动应用程序时,您将能够在 RabbitMQ 管理中看到多个消费者

您没有使用启动来创建容器,因此未应用启动属性。

尝试

@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, MessageListenerAdapter listenerAdapter,  
               RabbitProperties properties) {

    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
    container.setConnectionFactory(connectionFactory);
    container.setQueueNames(newOrderQueueName);
    container.setMessageListener(listenerAdapter);

    container.setConcurrentConsumers(properties.getListener().getSimple().getConcurrency();

    return container;
}