如何在 RabbitMQ (Spring AMQP) 中注册多个消息监听器

How Register multiple message listner in RabbitMQ (Spring AMQP)

我是 rabbitmq 的新手。我正在使用 spring-rabbit 1.3.5 版本。 我想注册多个消息监听器。怎么做?

我可以注册一个消息监听器。

这是我的代码:

1)扩展MessageListner接口的接口

public interface MessageQueueManager extends MessageListener{
        public String createQueue(String queueName);

        public void sendMessage(String message, String destinationQueueName) throws Exception;
}

2) 下面是实现:

 @Service("messageQueueManager")
    public class MessageQueueManagerImpl implements MessageQueueManager {

    @Autowired
        private AmqpAdmin admin;
        @Autowired
        private AmqpTemplate template;
        @Autowired
        private ConnectionFactory connectionFactory;
        @Autowired
        private SimpleMessageListenerContainer container;

        @Override
        public void onMessage(Message message) {
                // Different message can behave differently.    

        }

        @Override
        public String createQueue(String queueName) {

            // survive a server restart
            boolean durable = true;
            // keep it even if nobody is using it
            boolean autoDelete = false;
            boolean exclusive = false;
            // create queue
            Queue newQueue = new Queue(queueName, durable, exclusive, autoDelete);
            queueName = admin.declareQueue(newQueue);

            // create binding with exchange
            // Producer sends to an Exchange and a Consumer receives from a Queue, the bindings that connect Queues to Exchanges are critical for connecting those producers and consumers via messaging.
            /*admin.declareBinding(new Binding(queueName, DestinationType.QUEUE,
                    "directExchange", queueName, new HashMap<String, Object>()));*/
            Binding binding = BindingBuilder.bind(newQueue).to(DirectExchange.DEFAULT).with(queueName);
            admin.declareBinding(binding);

            // add queue to listener
            container.addQueues(newQueue);

            // start listener
            container.start();
            return queueName;
        }

        @Override
        public void sendMessage(String message, String destinationQueueName)
                throws Exception {
            template.convertAndSend("directExchange", destinationQueueName,
                    MessageBuilder.withBody(message.getBytes()).build());

        }
    }

3)applicationContext.xml 文件中的监听器注册

<!-- Listener container for setting up concurrent listeners for queues -->
    <bean id="simpleMessageListenerContainer"
        class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
        <constructor-arg index="0" ref="connectionFactory" />
        <property name="missingQueuesFatal" value="false" />
        <property name="concurrentConsumers" value="5" />
        <property name="autoStartup" value="false" />
        <property name="messageListener" ref="messageQueueManager" />
    </bean>

所以这里SimpleMessageListenerContainerclass只能带一个messageListner。我是否需要声明多个 SimpleMessageListenerContainer 实例来注册不同的 messageListner?

我想将此 class 注册为消息侦听器。

@Service("myMessageListener")
public class MessageHandler  implements MessageListener {
    @Override
    public void onMessage(Message message) {
        log.info("Received message: " + message);
        log.info("Text: " + new String(message.getBody()));
    }

}

1) 注册您的队列:

<rabbit:queue id="spring.queue" auto-delete="false"  durable="true" exclusive="false" name="spring.queue"/>
<rabbit:queue id="user.login.notification" auto-delete="false"  durable="true" exclusive="false" name="user.login.notification"/>

2) 声明绑定:

<rabbit:direct-exchange name="directExchange" auto-delete="false">
        <rabbit:bindings>
            <rabbit:binding queue="spring.queue" key="spring.queue" />
            <rabbit:binding queue="user.login.notification" key="user.login.notification MAIYAM" />
        </rabbit:bindings>
    </rabbit:direct-exchange>

3)当任意一个队列发布消息时,告诉容器调用onMessage(Message message)方法

<rabbit:listener-container
        connection-factory="connectionFactory" acknowledge="auto" concurrency="10"
        requeue-rejected="true">
        <rabbit:listener ref="myMessageListener" queues="spring.queue" />
        <rabbit:listener ref="messageQueueManager" queues="user.login.notification" />
    </rabbit:listener-container>

4)从 MessageQueueManagerImpl class.

中删除 private SimpleMessageListenerContainer container;

现在应该可以了。