如何暂停和恢复 JMS 消息的异步消费
How to pause and resume asynchronous consumption of JMS messages
我正在使用 activeMQ 构建一个应用程序,其中有一个生产者和一个消费者。
在消费者中,我使用 MessageListener 异步侦听来自生产者的消息,这是使用名为 onMessage(Message message) 的方法完成的。
但在使用消息之前,我想执行条件检查然后使用消息。
我不想使用消息的同步消费,因为它会违反我的设计。
void initialize() throws JMSException {
this.connection = this.connectionFactory.createConnection();
this.connection.start();
final Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = session.createQueue("testQ");
this.consumer = session.createConsumer(destination);
this.consumer.setMessageListener(this);
}
在此处检查条件,例如检测互联网连接等
public void onMessage(final Message message) {
Preconditions.checkNotNull(message);
if (!(message instanceof TextMessage)) {
_LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName());
throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled");
}
try {
final String messageType = message.getStringProperty("messageType");
Preconditions.checkNotNull(messageType);
_LOG.info("The MessageType is {}", messageType);
final String msg = ((TextMessage) message).getText();
Preconditions.checkNotNull(msg);
_LOG.debug(msg);
process(messageType, msg);
} catch (final JMSException e) {
_LOG.error("We could not read the message", e);
}
}
任何代码示例都很棒。
不清楚您为什么要进行检查,因为如果连接有任何问题,您的应用程序将收到此类错误的通知。您可以为 JMS 连接设置一个 ExceptionListener,如果连接出现问题,它将被调用。
如果存在连接问题,将不会调用 onMessage。
此外,我会在为消费者设置消息侦听器后推送 connection.start() 方法调用。 connection.start() 调用向消息传递提供程序指示应用程序已准备好接收消息。
有 connection.stop() 到 pause/stop 消息传递。您可以再次发出 connection.start() 以恢复消息传递。
根据您的回复更新
我建议您使用带有客户端确认模式的会话。
final Session session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
然后在onMessage
方法中,如果没有互联网连接,则不确认消息。如果消息未过期,将重新发送。
在异步消息传递中,您始终可以在处理消息之前检查条件。一旦您的侦听器代码(即 OnMessage 方法)收到 JMS 消息,您就可以给出进一步的指示。我已经解决了这样的问题,我想用我的数据库内容检查 JMS 消息。
通过url连接到网络服务url来检查互联网连接是否正常:
public static boolean isReachable(String targetUrl) throws IOException
{
try {
HttpURLConnection httpUrlConnection = (HttpURLConnection) new URL(
targetUrl).openConnection();
httpUrlConnection.setRequestMethod("HEAD");
int responseCode = httpUrlConnection.getResponseCode();
return responseCode == HttpURLConnection.HTTP_OK;
} catch (UnknownHostException noInternetConnection)
{
return false;
}
}
然后在您的 onMessage 方法中将方法调用为
public void onMessage(final Message message) {
if(isReachable){
Preconditions.checkNotNull(message);
if (!(message instanceof TextMessage)) {
_LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName());
throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled");
}
try {
final String messageType = message.getStringProperty("messageType");
Preconditions.checkNotNull(messageType);
_LOG.info("The MessageType is {}", messageType);
final String msg = ((TextMessage) message).getText();
Preconditions.checkNotNull(msg);
_LOG.debug(msg);
process(messageType, msg);
} catch (final JMSException e) {
_LOG.error("We could not read the message", e);
}
}
}
我正在使用 activeMQ 构建一个应用程序,其中有一个生产者和一个消费者。 在消费者中,我使用 MessageListener 异步侦听来自生产者的消息,这是使用名为 onMessage(Message message) 的方法完成的。 但在使用消息之前,我想执行条件检查然后使用消息。 我不想使用消息的同步消费,因为它会违反我的设计。
void initialize() throws JMSException {
this.connection = this.connectionFactory.createConnection();
this.connection.start();
final Session session = this.connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
final Destination destination = session.createQueue("testQ");
this.consumer = session.createConsumer(destination);
this.consumer.setMessageListener(this);
}
在此处检查条件,例如检测互联网连接等
public void onMessage(final Message message) {
Preconditions.checkNotNull(message);
if (!(message instanceof TextMessage)) {
_LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName());
throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled");
}
try {
final String messageType = message.getStringProperty("messageType");
Preconditions.checkNotNull(messageType);
_LOG.info("The MessageType is {}", messageType);
final String msg = ((TextMessage) message).getText();
Preconditions.checkNotNull(msg);
_LOG.debug(msg);
process(messageType, msg);
} catch (final JMSException e) {
_LOG.error("We could not read the message", e);
}
}
任何代码示例都很棒。
不清楚您为什么要进行检查,因为如果连接有任何问题,您的应用程序将收到此类错误的通知。您可以为 JMS 连接设置一个 ExceptionListener,如果连接出现问题,它将被调用。
如果存在连接问题,将不会调用 onMessage。
此外,我会在为消费者设置消息侦听器后推送 connection.start() 方法调用。 connection.start() 调用向消息传递提供程序指示应用程序已准备好接收消息。
有 connection.stop() 到 pause/stop 消息传递。您可以再次发出 connection.start() 以恢复消息传递。
根据您的回复更新
我建议您使用带有客户端确认模式的会话。
final Session session = this.connection.createSession(false, Session.CLIENT_ACKNOWLEDGE)
然后在onMessage
方法中,如果没有互联网连接,则不确认消息。如果消息未过期,将重新发送。
在异步消息传递中,您始终可以在处理消息之前检查条件。一旦您的侦听器代码(即 OnMessage 方法)收到 JMS 消息,您就可以给出进一步的指示。我已经解决了这样的问题,我想用我的数据库内容检查 JMS 消息。
通过url连接到网络服务url来检查互联网连接是否正常:
public static boolean isReachable(String targetUrl) throws IOException
{
try {
HttpURLConnection httpUrlConnection = (HttpURLConnection) new URL(
targetUrl).openConnection();
httpUrlConnection.setRequestMethod("HEAD");
int responseCode = httpUrlConnection.getResponseCode();
return responseCode == HttpURLConnection.HTTP_OK;
} catch (UnknownHostException noInternetConnection)
{
return false;
}
}
然后在您的 onMessage 方法中将方法调用为
public void onMessage(final Message message) {
if(isReachable){
Preconditions.checkNotNull(message);
if (!(message instanceof TextMessage)) {
_LOG.error("The message is not of type TextMessage but of type {} so we could not process", message.getClass().getSimpleName());
throw new IllegalArgumentException("This type '" + message.getClass().getSimpleName() + "' of message could not been handled");
}
try {
final String messageType = message.getStringProperty("messageType");
Preconditions.checkNotNull(messageType);
_LOG.info("The MessageType is {}", messageType);
final String msg = ((TextMessage) message).getText();
Preconditions.checkNotNull(msg);
_LOG.debug(msg);
process(messageType, msg);
} catch (final JMSException e) {
_LOG.error("We could not read the message", e);
}
}
}