spring jms 如何在持久主题侦听器之间分发消息?
How does spring jms distribute messages among durable topic listeners?
原始 jms 代码:
TopicSubscriber durSubscriber1 = receiverSession.createDurableSubscriber(topic,"subscription_1");
durSubscriber1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
RMQTextMessage rmqTextMessage = ((RMQTextMessage) message);
try {
System.out.println("sub_1:" + rmqTextMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
TopicSubscriber durSubscriber2 = receiverSession.createDurableSubscriber(topic,"subscription_2");
durSubscriber2.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
RMQTextMessage rmqTextMessage = ((RMQTextMessage) message);
try {
System.out.println("sub_2:" + rmqTextMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
当我使用以下代码时,每个侦听器都会收到所有消息。如果我使用相同的订阅名称 - 应用程序不会启动。
另一方面,我编写了使用 spring jms:
的代码
配置:
@Bean
public JmsListenerContainerFactory<?> myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
return factory;
}
听众:
@JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
public void receiveTopic(Email email) {
System.out.println("list_1:" + email);
}
@JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
public void receiveTopicDup(Email email) {
System.out.println("list_2:" + email);
}
在这种情况下,两个听众将消息分开。我的意思是,如果生产者发送 10 条消息,那么 listener_1 将收到 N 条消息,而 listener_2 将收到 M 条消息。
M+N=10
请解释 2 个代码片段的区别。
能否提供spring-jms版本对应的jms代码?
这是由于 rabbit JMS 客户端将 JMS 映射到本机 AMQP 的方式。您必须给每个听众一个订阅名称;否则他们将竞争同一个队列中的消息。
@JmsListener(destination = "my_topic_new", containerFactory = "myFactory", subscription = "foo")
public void receiveTopic(String email) {
System.out.println("list_1:" + email);
}
@JmsListener(destination = "my_topic_new", containerFactory = "myFactory", subscription = "bar")
public void receiveTopicDup(String email) {
System.out.println("list_2:" + email);
}
只有持久订阅才需要。
原始 jms 代码:
TopicSubscriber durSubscriber1 = receiverSession.createDurableSubscriber(topic,"subscription_1");
durSubscriber1.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
RMQTextMessage rmqTextMessage = ((RMQTextMessage) message);
try {
System.out.println("sub_1:" + rmqTextMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
TopicSubscriber durSubscriber2 = receiverSession.createDurableSubscriber(topic,"subscription_2");
durSubscriber2.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
RMQTextMessage rmqTextMessage = ((RMQTextMessage) message);
try {
System.out.println("sub_2:" + rmqTextMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
当我使用以下代码时,每个侦听器都会收到所有消息。如果我使用相同的订阅名称 - 应用程序不会启动。
另一方面,我编写了使用 spring jms:
的代码配置:
@Bean
public JmsListenerContainerFactory<?> myFactory(DefaultJmsListenerContainerFactoryConfigurer configurer, ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
// This provides all boot's default to this factory, including the message converter
configurer.configure(factory, connectionFactory);
// You could still override some of Boot's default if necessary.
factory.setPubSubDomain(true);
factory.setSubscriptionDurable(true);
return factory;
}
听众:
@JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
public void receiveTopic(Email email) {
System.out.println("list_1:" + email);
}
@JmsListener(destination = "my_topic_new", containerFactory = "myFactory")
public void receiveTopicDup(Email email) {
System.out.println("list_2:" + email);
}
在这种情况下,两个听众将消息分开。我的意思是,如果生产者发送 10 条消息,那么 listener_1 将收到 N 条消息,而 listener_2 将收到 M 条消息。
M+N=10
请解释 2 个代码片段的区别。 能否提供spring-jms版本对应的jms代码?
这是由于 rabbit JMS 客户端将 JMS 映射到本机 AMQP 的方式。您必须给每个听众一个订阅名称;否则他们将竞争同一个队列中的消息。
@JmsListener(destination = "my_topic_new", containerFactory = "myFactory", subscription = "foo")
public void receiveTopic(String email) {
System.out.println("list_1:" + email);
}
@JmsListener(destination = "my_topic_new", containerFactory = "myFactory", subscription = "bar")
public void receiveTopicDup(String email) {
System.out.println("list_2:" + email);
}
只有持久订阅才需要。