JMS 消费者使用 Glassfish 和 OpenMQ 同步接收来自远程生产者的消息
JMS Consumer to Synchronously receive Message from Remote Producer using Glassfish and OpenMQ
我正在为远程消费者实例使用 Glassfish 和 OpenMQ 来查找消息生产者的队列并同步处理请求。
根据 McIntosh 在 Synchronous Consumer with JMS Queue 上的回答,
同步消息接收可以通过调度来处理。我打算这样做,但我只看到通过异步消息驱动 Bean (MDB) 连接到消息队列的示例,如下所示:
import javax.jms.MessageListener;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.MessageDrivenContext;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "addressList", propertyValue = "mq://localhost:54020/"), //found in Producer server's domain.xml as JMS_PROVIDER_PORT
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/ProducerRequestMessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")})
public class ConsumerNode extends Node implements MessageListener {
@Resource
private MessageDrivenContext _mdc;
public ConsumerNode() {
super();
}
@Override
public void onMessage(Message message) {
//process message...
}
}
如何在不实施 MessageListener
和设置为消息驱动 Bean 的情况下连接到远程生产者队列?
终于想通了。我需要 imq.jar mq 库位于:
%GLASSFISH_HOME%/mq/lib/
以下代码回答了我的问题,核心部分是 init()
和 onMessage()
方法:
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Singleton;
import javax.ejb.Schedule;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Connection;
import javax.jms.Queue;
//from imq.jar
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.ConnectionFactory;
@Singleton
public class SyncNode {
private ConnectionFactory _producerRequestFactory;
private Connection _connection;
private Session _session;
private Queue _producerRequestMessageQueue;
private MessageConsumer _consumer;
@PostConstruct
void init() {
try {
_producerRequestFactory = new ConnectionFactory();
_producerRequestFactory.setProperty(ConnectionConfiguration.imqBrokerHostName, "localhost");
_producerRequestFactory.setProperty(ConnectionConfiguration.imqBrokerHostPort, "56527"); //56527 is JMS_PROVIDER_PORT found in producer's domain.xml in domain config directory
_connection = _producerRequestFactory.createConnection();
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_producerRequestMessageQueue = _session.createQueue("ProducerRequestMessageQueue"); //name of the queue that the producer sends messages to.
_consumer = _session.createConsumer(_producerRequestMessageQueue);
_connection.start();
} catch (JMSException ex) {
//handle exception
}
}
@PreDestroy
void cleanup() {
try {
_consumer.close();
_session.close();
_connection.close();
} catch (JMSException ex) {
//handle exception
}
}
@Schedule(hour = "*", minute = "*", second = "*/10", persistent = false)
public void onMessage() {
try {
_connection.start();
Message message = _consumer.receive();
//handle message
} catch (JMSException ex) {
//handle exception
}
}
}
两个代码示例帮助了我:
- This answer 教会了我使用 main 方法转义的正确注解。
- This tutorial 帮助点击如何连接到另一台服务器,或者在我的例子中是端口。
我正在为远程消费者实例使用 Glassfish 和 OpenMQ 来查找消息生产者的队列并同步处理请求。
根据 McIntosh 在 Synchronous Consumer with JMS Queue 上的回答, 同步消息接收可以通过调度来处理。我打算这样做,但我只看到通过异步消息驱动 Bean (MDB) 连接到消息队列的示例,如下所示:
import javax.jms.MessageListener;
import javax.annotation.Resource;
import javax.ejb.ActivationConfigProperty;
import javax.ejb.MessageDriven;
import javax.ejb.MessageDrivenContext;
@MessageDriven(activationConfig = {
@ActivationConfigProperty(propertyName = "addressList", propertyValue = "mq://localhost:54020/"), //found in Producer server's domain.xml as JMS_PROVIDER_PORT
@ActivationConfigProperty(propertyName = "destinationLookup", propertyValue = "jms/ProducerRequestMessageQueue"),
@ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue")})
public class ConsumerNode extends Node implements MessageListener {
@Resource
private MessageDrivenContext _mdc;
public ConsumerNode() {
super();
}
@Override
public void onMessage(Message message) {
//process message...
}
}
如何在不实施 MessageListener
和设置为消息驱动 Bean 的情况下连接到远程生产者队列?
终于想通了。我需要 imq.jar mq 库位于:
%GLASSFISH_HOME%/mq/lib/
以下代码回答了我的问题,核心部分是 init()
和 onMessage()
方法:
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import javax.ejb.Singleton;
import javax.ejb.Schedule;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.ObjectMessage;
import javax.jms.StreamMessage;
import javax.jms.TextMessage;
import javax.jms.Connection;
import javax.jms.Queue;
//from imq.jar
import com.sun.messaging.ConnectionConfiguration;
import com.sun.messaging.ConnectionFactory;
@Singleton
public class SyncNode {
private ConnectionFactory _producerRequestFactory;
private Connection _connection;
private Session _session;
private Queue _producerRequestMessageQueue;
private MessageConsumer _consumer;
@PostConstruct
void init() {
try {
_producerRequestFactory = new ConnectionFactory();
_producerRequestFactory.setProperty(ConnectionConfiguration.imqBrokerHostName, "localhost");
_producerRequestFactory.setProperty(ConnectionConfiguration.imqBrokerHostPort, "56527"); //56527 is JMS_PROVIDER_PORT found in producer's domain.xml in domain config directory
_connection = _producerRequestFactory.createConnection();
_session = _connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
_producerRequestMessageQueue = _session.createQueue("ProducerRequestMessageQueue"); //name of the queue that the producer sends messages to.
_consumer = _session.createConsumer(_producerRequestMessageQueue);
_connection.start();
} catch (JMSException ex) {
//handle exception
}
}
@PreDestroy
void cleanup() {
try {
_consumer.close();
_session.close();
_connection.close();
} catch (JMSException ex) {
//handle exception
}
}
@Schedule(hour = "*", minute = "*", second = "*/10", persistent = false)
public void onMessage() {
try {
_connection.start();
Message message = _consumer.receive();
//handle message
} catch (JMSException ex) {
//handle exception
}
}
}
两个代码示例帮助了我:
- This answer 教会了我使用 main 方法转义的正确注解。
- This tutorial 帮助点击如何连接到另一台服务器,或者在我的例子中是端口。