在 ActiveMQ 中等待接收异步消息的更好方法
Better way to wait to receive Async messages in ActiveMQ
我已经使用 ActiveMQ 异步发送和接收消息。
我在确定等待消息的最佳方式时遇到了问题。
在循环中休眠线程是一种选择。但是我觉得不好看
任何人都可以为此提出更好的方法。
AsyncReceiver.java
public class AsyncReceiver implements MessageListener, ExceptionListener{
public static void main(String[] args) throws Exception{
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
env.put("queue.queueSampleQueue","MyNewQueue");
InitialContext ctx = new InitialContext(env);
Queue queue = (Queue) ctx.lookup("queueSampleQueue");
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
QueueConnection queueConn = connFactory.createQueueConnection();
QueueSession queueSession = queueConn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
AsyncReceiver asyncReceiver = new AsyncReceiver();
queueReceiver.setMessageListener(asyncReceiver);
queueConn.setExceptionListener(asyncReceiver);
queueConn.start();
// Waiting for messages
System.out.print("waiting for messages");
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
}
queueConn.close();
}
public void onMessage(Message message){
TextMessage msg = (TextMessage) message;
try {
System.out.println("received: " + msg.getText());
} catch (JMSException ex) {
ex.printStackTrace();
}
}
public void onException(JMSException exception){
System.err.println("an error occurred: " + exception);
}
}
Sender.java
public class Sender{
public static void main(String[] args) throws Exception{
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
env.put("queue.queueSampleQueue", "MyNewQueue");
InitialContext ctx = new InitialContext(env);
Queue queue = (Queue) ctx.lookup("queueSampleQueue");
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
QueueConnection queueConn = connFactory.createQueueConnection();
QueueSession queueSession = queueConn.createQueueSession(false,Session.DUPS_OK_ACKNOWLEDGE);
QueueSender queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = queueSession.createTextMessage("Hello");
queueSender.send(message);
System.out.println("sent: " + message.getText());
queueConn.close();
}
}
有两种方法可以process/consume队列中的消息。
定期检查队列中的新消息 - 如果您定期 运行 您的程序,这很适合。您可以通过实现一个带有一些线程休眠的循环来做到这一点。前任。一天两次,一天一次等等
向队列注册消费者(使用 MessageListener)。您可以按照以下示例执行此操作。
Consumer.java
javax.jms.Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(queueName);
consumer = session.createConsumer(destination);
consumer.setMessageListener(new YourClass());
YourClass.java
public class YourClass implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
inputJsonString = textMessage.getText();
//do what ever you want with inputJsonString
message.acknowledge();
}
}
我已经使用 ActiveMQ 异步发送和接收消息。
我在确定等待消息的最佳方式时遇到了问题。 在循环中休眠线程是一种选择。但是我觉得不好看
任何人都可以为此提出更好的方法。
AsyncReceiver.java
public class AsyncReceiver implements MessageListener, ExceptionListener{
public static void main(String[] args) throws Exception{
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
env.put("queue.queueSampleQueue","MyNewQueue");
InitialContext ctx = new InitialContext(env);
Queue queue = (Queue) ctx.lookup("queueSampleQueue");
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
QueueConnection queueConn = connFactory.createQueueConnection();
QueueSession queueSession = queueConn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
AsyncReceiver asyncReceiver = new AsyncReceiver();
queueReceiver.setMessageListener(asyncReceiver);
queueConn.setExceptionListener(asyncReceiver);
queueConn.start();
// Waiting for messages
System.out.print("waiting for messages");
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
}
queueConn.close();
}
public void onMessage(Message message){
TextMessage msg = (TextMessage) message;
try {
System.out.println("received: " + msg.getText());
} catch (JMSException ex) {
ex.printStackTrace();
}
}
public void onException(JMSException exception){
System.err.println("an error occurred: " + exception);
}
}
Sender.java
public class Sender{
public static void main(String[] args) throws Exception{
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
env.put("queue.queueSampleQueue", "MyNewQueue");
InitialContext ctx = new InitialContext(env);
Queue queue = (Queue) ctx.lookup("queueSampleQueue");
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
QueueConnection queueConn = connFactory.createQueueConnection();
QueueSession queueSession = queueConn.createQueueSession(false,Session.DUPS_OK_ACKNOWLEDGE);
QueueSender queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = queueSession.createTextMessage("Hello");
queueSender.send(message);
System.out.println("sent: " + message.getText());
queueConn.close();
}
}
有两种方法可以process/consume队列中的消息。
定期检查队列中的新消息 - 如果您定期 运行 您的程序,这很适合。您可以通过实现一个带有一些线程休眠的循环来做到这一点。前任。一天两次,一天一次等等
向队列注册消费者(使用 MessageListener)。您可以按照以下示例执行此操作。
Consumer.java
javax.jms.Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(queueName);
consumer = session.createConsumer(destination);
consumer.setMessageListener(new YourClass());
YourClass.java
public class YourClass implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
inputJsonString = textMessage.getText();
//do what ever you want with inputJsonString
message.acknowledge();
}
}