在 ActiveMQ 中等待接收异步消息的更好方法

Better way to wait to receive Async messages in ActiveMQ

我已经使用 ActiveMQ 异步发送和接收消息。

我在确定等待消息的最佳方式时遇到了问题。 在循环中休眠线程是一种选择。但是我觉得不好看

任何人都可以为此提出更好的方法。

AsyncReceiver.java

public class AsyncReceiver implements MessageListener, ExceptionListener{

    public static void main(String[] args) throws Exception{

        Properties env = new Properties();                                  
        env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
        env.put("queue.queueSampleQueue","MyNewQueue");

        InitialContext ctx = new InitialContext(env);
        Queue queue = (Queue) ctx.lookup("queueSampleQueue");
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
        QueueConnection queueConn = connFactory.createQueueConnection();
        QueueSession queueSession = queueConn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);

        QueueReceiver queueReceiver = queueSession.createReceiver(queue);
        AsyncReceiver asyncReceiver = new AsyncReceiver();
        queueReceiver.setMessageListener(asyncReceiver);
        queueConn.setExceptionListener(asyncReceiver);
        queueConn.start();

        // Waiting for messages
        System.out.print("waiting for messages");
        for (int i = 0; i < 10; i++) {
            Thread.sleep(1000);
        }

        queueConn.close();
    }

    public void onMessage(Message message){
        TextMessage msg = (TextMessage) message;
        try {
            System.out.println("received: " + msg.getText());
        } catch (JMSException ex) {
            ex.printStackTrace();
        }
    }

    public void onException(JMSException exception){
        System.err.println("an error occurred: " + exception);
    }
}

Sender.java

public class Sender{

    public static void main(String[] args) throws Exception{

        Properties env = new Properties();
        env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
        env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
        env.put("queue.queueSampleQueue", "MyNewQueue");

        InitialContext ctx = new InitialContext(env);
        Queue queue = (Queue) ctx.lookup("queueSampleQueue");
        QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
        QueueConnection queueConn = connFactory.createQueueConnection();
        QueueSession queueSession = queueConn.createQueueSession(false,Session.DUPS_OK_ACKNOWLEDGE);

        QueueSender queueSender = queueSession.createSender(queue);
        queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
        TextMessage message = queueSession.createTextMessage("Hello");
        queueSender.send(message);
        System.out.println("sent: " + message.getText());

        queueConn.close();
    }
}

有两种方法可以process/consume队列中的消息。

  1. 定期检查队列中的新消息 - 如果您定期 运行 您的程序,这很适合。您可以通过实现一个带有一些线程休眠的循环来做到这一点。前任。一天两次,一天一次等等

  2. 向队列注册消费者(使用 MessageListener)。您可以按照以下示例执行此操作。

Consumer.java

        javax.jms.Connection connection = null;
        Session session = null;
        Destination destination = null;
        MessageConsumer consumer = null;


        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
        connection = connectionFactory.createConnection();
        connection.start();


        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(queueName);


        consumer = session.createConsumer(destination);
        consumer.setMessageListener(new YourClass());

YourClass.java

public class YourClass implements MessageListener {
@Override 
public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                inputJsonString = textMessage.getText();
               //do what ever you want with inputJsonString
                message.acknowledge(); 
           }

}