JMS 2.0 - 如何使用共享消费者接收来自主题的消息?
JMS 2.0 - How to receive messages from topic with shared consumers?
我正在使用 ActiveMQ Artemis 和 JMS 2.0 与共享消费者一起阅读主题消息。我有两个问题:
- 有什么方法可以使用 xml 格式的配置。
- 当我在消费者上设置消息侦听器时,是否必须使用
while
循环?如果我不使用 while (true)
循环程序将在主题没有消息时终止。
SharedConsumer.java
public class SharedConsumer {
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
ConnectionFactory connectionFactory;
public String maxConnectionForJSON;
public void readFromTopicAndSendToQueue()throws Exception{
Context initialContext = null;
JMSContext jmsContext = null;
int maxConnectionCount = 0;
maxConnectionForJSON = "30";
if (!StringUtils.isBlank(maxConnectionForJSON)){
try{
maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
}catch (Exception e){
//logging
}
}
if (maxConnectionCount != 0) {
try {
List<JMSConsumer> jmsConsumerList = new ArrayList<>();
initialContext = new InitialContext();
Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
jmsContext = cf.createContext("admin", "admin");
for (int i = 0; i < maxConnectionCount; i++){
JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
MessageListener listener = new Listener();
jmsConsumer.setMessageListener(listener);
}
while (true) {
Thread.sleep(30000);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
if (initialContext != null) {
initialContext.close();
}
if (jmsContext != null) {
jmsContext.close();
}
}
}
}
public static void main(final String[] args) throws Exception {
SharedConsumer sharedConsumer = new SharedConsumer();
sharedConsumer.readFromTopicAndSendToQueue();
}
}
SharedConsumerListener.java
public class Listener implements MessageListener {
public static int count = 0;
@Override
public void onMessage(Message message) {
System.out.println(message.toString() + "\ncount :" + count);
count++;
}
}
我可以使用 xml 文件读取 JMS 1.1 (ActiveMQ) 中的队列。我认为我们可以在 JMS 2.0 Artemis 中使用如下配置文件,但我错了。非常感谢 Justin Bertram 的帮助。
在 JMS 1.1 配置文件中
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.queueName}"/>
</bean>
<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
queue-capacity="0" rejection-policy="CALLER_RUNS"/>
<int:channel id="jmsInChannelForJSON" >
<int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />
<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />
简而言之,一旦您设置了 JMS 消费者的消息侦听器,防止程序终止是正常的。
当您创建 JMS 使用者并设置其消息侦听器时,JMS 客户端实现将在后台创建新线程以异步侦听来自创建使用者并设置侦听器的线程的消息。因此,创建消费者和设置监听器的线程将继续运行。在您的情况下,您需要以某种方式停止线程退出并终止应用程序,因此您需要 while
循环。
我正在使用 ActiveMQ Artemis 和 JMS 2.0 与共享消费者一起阅读主题消息。我有两个问题:
- 有什么方法可以使用 xml 格式的配置。
- 当我在消费者上设置消息侦听器时,是否必须使用
while
循环?如果我不使用while (true)
循环程序将在主题没有消息时终止。
SharedConsumer.java
public class SharedConsumer {
@Resource(lookup = "java:comp/DefaultJMSConnectionFactory")
ConnectionFactory connectionFactory;
public String maxConnectionForJSON;
public void readFromTopicAndSendToQueue()throws Exception{
Context initialContext = null;
JMSContext jmsContext = null;
int maxConnectionCount = 0;
maxConnectionForJSON = "30";
if (!StringUtils.isBlank(maxConnectionForJSON)){
try{
maxConnectionCount = Integer.parseInt(maxConnectionForJSON);
}catch (Exception e){
//logging
}
}
if (maxConnectionCount != 0) {
try {
List<JMSConsumer> jmsConsumerList = new ArrayList<>();
initialContext = new InitialContext();
Topic topic = (Topic) initialContext.lookup("topic/exampleTopic");
ConnectionFactory cf = (ConnectionFactory) initialContext.lookup("ConnectionFactory");
jmsContext = cf.createContext("admin", "admin");
for (int i = 0; i < maxConnectionCount; i++){
JMSConsumer jmsConsumer = jmsContext.createSharedDurableConsumer(topic, "ct");
MessageListener listener = new Listener();
jmsConsumer.setMessageListener(listener);
}
while (true) {
Thread.sleep(30000);
}
} catch (Exception e) {
System.err.println(e.getMessage());
} finally {
if (initialContext != null) {
initialContext.close();
}
if (jmsContext != null) {
jmsContext.close();
}
}
}
}
public static void main(final String[] args) throws Exception {
SharedConsumer sharedConsumer = new SharedConsumer();
sharedConsumer.readFromTopicAndSendToQueue();
}
}
SharedConsumerListener.java
public class Listener implements MessageListener {
public static int count = 0;
@Override
public void onMessage(Message message) {
System.out.println(message.toString() + "\ncount :" + count);
count++;
}
}
我可以使用 xml 文件读取 JMS 1.1 (ActiveMQ) 中的队列。我认为我们可以在 JMS 2.0 Artemis 中使用如下配置文件,但我错了。非常感谢 Justin Bertram 的帮助。
在 JMS 1.1 配置文件中
<bean id="brokerUrl" class="java.lang.String">
<constructor-arg value="#{appProperties.queueUrl}"/>
</bean>
<amq:connectionFactory id="amqConnectionFactory" brokerURL="#brokerUrl" dispatchAsync="true"/>
<bean id="connectionFactory1" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
<constructor-arg ref="amqConnectionFactory"/>
<property name="maxConnections" value="#{appProperties.maxConnections}"/>
<property name="idleTimeout" value="#{appProperties.idleTimeout}"/>
<property name="maximumActiveSessionPerConnection" value = "10"/>
</bean>
<bean id="jmsForQueue" class="org.springframework.jms.core.JmsTemplate">
<constructor-arg ref="connectionFactory1"/>
</bean>
<bean id="jSONQueue" class="org.apache.activemq.command.ActiveMQQueue">
<constructor-arg value="#{appProperties.queueName}"/>
</bean>
<task:executor id="mainExecutorForJSON" pool-size="#{appProperties.mainExecutorForJSONPoolSize}"
queue-capacity="0" rejection-policy="CALLER_RUNS"/>
<int:channel id="jmsInChannelForJSON" >
<int:dispatcher task-executor="mainExecutorForJSON"/>
</int:channel>
<int-jms:message-driven-channel-adapter id="jmsInForJSON" destination="jSONNrtQueue" channel="jmsInChannelForJSON"
concurrent-consumers="#{appProperties.concurrentConsumerCountForJSON}" />
<int:service-activator input-channel="jmsInChannelForJSON" ref="dataServiceJMS" />
简而言之,一旦您设置了 JMS 消费者的消息侦听器,防止程序终止是正常的。
当您创建 JMS 使用者并设置其消息侦听器时,JMS 客户端实现将在后台创建新线程以异步侦听来自创建使用者并设置侦听器的线程的消息。因此,创建消费者和设置监听器的线程将继续运行。在您的情况下,您需要以某种方式停止线程退出并终止应用程序,因此您需要 while
循环。