如何在 RabbitMQ (Spring AMQP) 中注册多个消息监听器
How Register multiple message listner in RabbitMQ (Spring AMQP)
我是 rabbitmq 的新手。我正在使用 spring-rabbit 1.3.5 版本。
我想注册多个消息监听器。怎么做?
我可以注册一个消息监听器。
这是我的代码:
1)扩展MessageListner接口的接口
public interface MessageQueueManager extends MessageListener{
public String createQueue(String queueName);
public void sendMessage(String message, String destinationQueueName) throws Exception;
}
2) 下面是实现:
@Service("messageQueueManager")
public class MessageQueueManagerImpl implements MessageQueueManager {
@Autowired
private AmqpAdmin admin;
@Autowired
private AmqpTemplate template;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private SimpleMessageListenerContainer container;
@Override
public void onMessage(Message message) {
// Different message can behave differently.
}
@Override
public String createQueue(String queueName) {
// survive a server restart
boolean durable = true;
// keep it even if nobody is using it
boolean autoDelete = false;
boolean exclusive = false;
// create queue
Queue newQueue = new Queue(queueName, durable, exclusive, autoDelete);
queueName = admin.declareQueue(newQueue);
// create binding with exchange
// Producer sends to an Exchange and a Consumer receives from a Queue, the bindings that connect Queues to Exchanges are critical for connecting those producers and consumers via messaging.
/*admin.declareBinding(new Binding(queueName, DestinationType.QUEUE,
"directExchange", queueName, new HashMap<String, Object>()));*/
Binding binding = BindingBuilder.bind(newQueue).to(DirectExchange.DEFAULT).with(queueName);
admin.declareBinding(binding);
// add queue to listener
container.addQueues(newQueue);
// start listener
container.start();
return queueName;
}
@Override
public void sendMessage(String message, String destinationQueueName)
throws Exception {
template.convertAndSend("directExchange", destinationQueueName,
MessageBuilder.withBody(message.getBytes()).build());
}
}
3)applicationContext.xml 文件中的监听器注册
<!-- Listener container for setting up concurrent listeners for queues -->
<bean id="simpleMessageListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg index="0" ref="connectionFactory" />
<property name="missingQueuesFatal" value="false" />
<property name="concurrentConsumers" value="5" />
<property name="autoStartup" value="false" />
<property name="messageListener" ref="messageQueueManager" />
</bean>
所以这里SimpleMessageListenerContainerclass只能带一个messageListner。我是否需要声明多个 SimpleMessageListenerContainer 实例来注册不同的 messageListner?
我想将此 class 注册为消息侦听器。
@Service("myMessageListener")
public class MessageHandler implements MessageListener {
@Override
public void onMessage(Message message) {
log.info("Received message: " + message);
log.info("Text: " + new String(message.getBody()));
}
}
1) 注册您的队列:
<rabbit:queue id="spring.queue" auto-delete="false" durable="true" exclusive="false" name="spring.queue"/>
<rabbit:queue id="user.login.notification" auto-delete="false" durable="true" exclusive="false" name="user.login.notification"/>
2) 声明绑定:
<rabbit:direct-exchange name="directExchange" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="spring.queue" key="spring.queue" />
<rabbit:binding queue="user.login.notification" key="user.login.notification MAIYAM" />
</rabbit:bindings>
</rabbit:direct-exchange>
3)当任意一个队列发布消息时,告诉容器调用onMessage(Message message)方法
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="auto" concurrency="10"
requeue-rejected="true">
<rabbit:listener ref="myMessageListener" queues="spring.queue" />
<rabbit:listener ref="messageQueueManager" queues="user.login.notification" />
</rabbit:listener-container>
4)从 MessageQueueManagerImpl class.
中删除 private SimpleMessageListenerContainer container;
现在应该可以了。
我是 rabbitmq 的新手。我正在使用 spring-rabbit 1.3.5 版本。 我想注册多个消息监听器。怎么做?
我可以注册一个消息监听器。
这是我的代码:
1)扩展MessageListner接口的接口
public interface MessageQueueManager extends MessageListener{
public String createQueue(String queueName);
public void sendMessage(String message, String destinationQueueName) throws Exception;
}
2) 下面是实现:
@Service("messageQueueManager")
public class MessageQueueManagerImpl implements MessageQueueManager {
@Autowired
private AmqpAdmin admin;
@Autowired
private AmqpTemplate template;
@Autowired
private ConnectionFactory connectionFactory;
@Autowired
private SimpleMessageListenerContainer container;
@Override
public void onMessage(Message message) {
// Different message can behave differently.
}
@Override
public String createQueue(String queueName) {
// survive a server restart
boolean durable = true;
// keep it even if nobody is using it
boolean autoDelete = false;
boolean exclusive = false;
// create queue
Queue newQueue = new Queue(queueName, durable, exclusive, autoDelete);
queueName = admin.declareQueue(newQueue);
// create binding with exchange
// Producer sends to an Exchange and a Consumer receives from a Queue, the bindings that connect Queues to Exchanges are critical for connecting those producers and consumers via messaging.
/*admin.declareBinding(new Binding(queueName, DestinationType.QUEUE,
"directExchange", queueName, new HashMap<String, Object>()));*/
Binding binding = BindingBuilder.bind(newQueue).to(DirectExchange.DEFAULT).with(queueName);
admin.declareBinding(binding);
// add queue to listener
container.addQueues(newQueue);
// start listener
container.start();
return queueName;
}
@Override
public void sendMessage(String message, String destinationQueueName)
throws Exception {
template.convertAndSend("directExchange", destinationQueueName,
MessageBuilder.withBody(message.getBytes()).build());
}
}
3)applicationContext.xml 文件中的监听器注册
<!-- Listener container for setting up concurrent listeners for queues -->
<bean id="simpleMessageListenerContainer"
class="org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer">
<constructor-arg index="0" ref="connectionFactory" />
<property name="missingQueuesFatal" value="false" />
<property name="concurrentConsumers" value="5" />
<property name="autoStartup" value="false" />
<property name="messageListener" ref="messageQueueManager" />
</bean>
所以这里SimpleMessageListenerContainerclass只能带一个messageListner。我是否需要声明多个 SimpleMessageListenerContainer 实例来注册不同的 messageListner?
我想将此 class 注册为消息侦听器。
@Service("myMessageListener")
public class MessageHandler implements MessageListener {
@Override
public void onMessage(Message message) {
log.info("Received message: " + message);
log.info("Text: " + new String(message.getBody()));
}
}
1) 注册您的队列:
<rabbit:queue id="spring.queue" auto-delete="false" durable="true" exclusive="false" name="spring.queue"/>
<rabbit:queue id="user.login.notification" auto-delete="false" durable="true" exclusive="false" name="user.login.notification"/>
2) 声明绑定:
<rabbit:direct-exchange name="directExchange" auto-delete="false">
<rabbit:bindings>
<rabbit:binding queue="spring.queue" key="spring.queue" />
<rabbit:binding queue="user.login.notification" key="user.login.notification MAIYAM" />
</rabbit:bindings>
</rabbit:direct-exchange>
3)当任意一个队列发布消息时,告诉容器调用onMessage(Message message)方法
<rabbit:listener-container
connection-factory="connectionFactory" acknowledge="auto" concurrency="10"
requeue-rejected="true">
<rabbit:listener ref="myMessageListener" queues="spring.queue" />
<rabbit:listener ref="messageQueueManager" queues="user.login.notification" />
</rabbit:listener-container>
4)从 MessageQueueManagerImpl class.
中删除private SimpleMessageListenerContainer container;
现在应该可以了。