使用 setMessageListener 注册监听器服务

Registering a listener service with setMessageListener

我有一个 spring amqp 应用程序,我想使用它,但我似乎不知道如何使用它来注册我的侦听器服务。这是我基于注释的配置文件。

package com.jerry.configuration;

import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;


@Configuration
public class SpringAmqpConfiguration {

    protected final String helloWorldQueueName = "hello.world.queue";

    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory("localhost");
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }

    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        //The routing key is set to the name of the queue by the broker for the default exchange.
        template.setRoutingKey(this.helloWorldQueueName);
        //Where we will synchronously receive messages from
        template.setQueue(this.helloWorldQueueName);
        return template;
    }

    @Bean
    // Every queue is bound to the default direct exchange
    public Queue helloWorldQueue() {
        return new Queue(this.helloWorldQueueName);
    }

    @Bean 
    public Binding binding() {
        return declare(new Binding(helloWorldQueue(), defaultDirectExchange()));
    }

    //https://docs.spring.io/spring-amqp/docs/1.5.0.BUILD-SNAPSHOT/reference/html/_reference.html#async-annotation-driven

    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        //If you want a fixed number of consumers, omit the max.
        factory.setMaxConcurrentConsumers(10);
        //Set message listener : container.setMessageListener(this.myFirstListener);
        return factory;
    }


    /**
    * Listener Bean
    * Threadpool 
    *     
    <bean id="messageListener" class="com.spring.rabbitmq.service.MessageListenerService" />

    <rabbit:listener-container connection-factory="connectionFactory">
        <rabbit:listener ref="messageListener" queues="my.first.queue" />
    </rabbit:listener-container>

    */


}

这是我希望使用的侦听器服务

package com.jerry.services;

import org.slf4j.Logger;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;


public class MessageListenerService implements MessageListener {

    private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MessageListenerService.class);

    @Override
    public void onMessage(Message message) {
        LOGGER.info(message.getMessageProperties().toString());;
        LOGGER.info(new String(message.getBody()));
    }
}

如何在此 bean 中注册侦听器服务以及如何注册多个侦听不同队列的侦听器?

@Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        //If you want a fixed number of consumers, omit the max.
        factory.setMaxConcurrentConsumers(10);
        //Set message listener : container.setMessageListener(this.myFirstListener);
        return factory;
    }

对于你的听众class你应该这样做:

@EnableRabbit
@Service
public class MessageListenerService {

   private static final Logger LOGGER = org.slf4j.LoggerFactory.getLogger(MessageListenerService.class);

  @RabbitListener(queues = { Constant.QUEUE_NAME })
  public void onMessage(Message message) {
    LOGGER.info(message.getMessageProperties().toString());;
    LOGGER.info(new String(message.getBody()));
  }
}

How do i register the listener service in this bean

MessageListenerService 定义 @Bean 并创建 SimpleMessageListenerContainer @Bean 并使用 setMessageListener().

容器工厂不适用于用户定义的侦听器,它适用于 @RabbitListener pojo 注释。

and how would i go about registering multiple listeners listening to different queues?

如果想让同一个监听器监听多个队列,那么在容器中设置队列名称即可。

如果您希望每个队列有不同的侦听器,则每个队列都需要一个 SimpleMessageListenerContainer

或者,改用 @RabbitListener 注释。