读取而不从 JMS 队列中删除消息
Read without removing message from JMS queue
如何在不从队列中删除原始消息的情况下从 WebSphere MQ 读取消息?
我有 spring 应用程序从 WebSphere MQ 读取消息。
阅读后,我有一个处理方法来处理从队列中检索到的数据。
第 1 步:
response = jmsTemplate.receive();
//Message automatically removed from queue.
第 2 步:
process(response);
处理方法中有可能抛出异常。如果出现异常,我需要保留队列中的消息。
可能吗?他们有什么办法只有在用户确认后才能删除消息吗?
我尝试添加以下内容:
jmsTemplate.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE);
...但邮件仍被删除。
JmsTemplate
创建代码片段:
JndiConnectionFactorySupport connectionFactoryBean = new JndiConnectionFactorySupport();
connectionFactoryBean.setBindingsDir(this.bindingDir);
connectionFactoryBean
.setConnectionFactoryName(connectionFactoryName);
connectionFactoryBean.afterPropertiesSet();
jmsTemplate.setConnectionFactory(connectionFactoryBean.getObject());
JndiDestinationResolver destinationResolver = new JndiDestinationResolver();
destinationResolver.setJndiTemplate(connectionFactoryBean
.getJndiTemplate());
jmsTemplate.setDestinationResolver(destinationResolver);
jmsTemplate.setReceiveTimeout(20000);
jmsTemplate.setDefaultDestinationName(this.defaultDestinationName);
尝试了如下 jmsTemplate.execute()
方法:
@SuppressWarnings({ "unused", "unchecked" })
Message responseMessage = (Message) jmsTemplate.execute(
new SessionCallback() {
public Object doInJms(Session session)
throws JMSException {
MessageConsumer consumer = session
.createConsumer(jmsTemplate.getDestinationResolver().resolveDestinationName(session, "QUEUE_NAME", false));
Message response = consumer.receive(1);
try {
testMethod();//this method will throw exception.
response.acknowledge();
consumer.close();
} catch(Exception e){
consumer.close();//control will come here.
}
return response;
}
}, true);
您可以添加 JMS 消息的事务处理。参见 the example
您的听众应该是 "transacted"
。像这样
<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted">
<jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>
您不能使用 receive()
方法执行此操作,因为当接收方法 returns.
时操作已完成(从会话的角度来看)
你需要运行在会话范围内可能失败的代码;例如带有 JmsTemplate.execute()
和 SessionCallback
- 像这样...
this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
this.jmsTemplate.convertAndSend("foo", "bar");
try {
String value = this.jmsTemplate.execute(session -> {
MessageConsumer consumer = session.createConsumer(
this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, "foo", false));
String result;
try {
Message received = consumer.receive(5000);
result = (String) this.jmsTemplate.getMessageConverter().fromMessage(received);
// Do some stuff that might throw an exception
received.acknowledge();
}
finally {
consumer.close();
}
return result;
}, true);
System.out.println(value);
}
catch (Exception e) {
e.printStackTrace();
}
您必须浏览队列。
使用 Websphere MQ 执行的真实代码示例
public void browseMessagesAndJiraCreation(String jiraUserName, String jiraPassword) {
int counterMessages = jmsTemplate.browse(destinationQueueName, new BrowserCallback<Integer>() {
@Override
public Integer doInJms(final Session session, final QueueBrowser queueBrowser) throws JMSException {
Enumeration<TextMessage> enumeration = queueBrowser.getEnumeration();
int counterMessages = 0;
while (enumeration.hasMoreElements()) {
counterMessages += 1;
TextMessage msg = enumeration.nextElement();
logger.info("Found : {}", msg.getText());
JiraId jiraId = jiraManager.createIssue(jiraUserName, jiraPassword);
jiraManager.attachFileToJira(jiraId, msg.getText(), jiraUserName, jiraPassword);
}
return counterMessages;
}
});
logger.info("{}:messages were browsed and processed from queue:{}.", counterMessages, destinationQueueName);
}
解释:
- Spring 框架 JmsTemplate 的使用
- 您传递字符串 gestinationQueueName(例如 destinationQueueName=QL.PREFCNTR.USER.REPLY)
- Java 短信枚举
- counterMessages 是已处理消息的计数器
- 消息未被消费!
如何在不从队列中删除原始消息的情况下从 WebSphere MQ 读取消息?
我有 spring 应用程序从 WebSphere MQ 读取消息。 阅读后,我有一个处理方法来处理从队列中检索到的数据。
第 1 步:
response = jmsTemplate.receive();
//Message automatically removed from queue.
第 2 步:
process(response);
处理方法中有可能抛出异常。如果出现异常,我需要保留队列中的消息。
可能吗?他们有什么办法只有在用户确认后才能删除消息吗?
我尝试添加以下内容:
jmsTemplate.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE);
...但邮件仍被删除。
JmsTemplate
创建代码片段:
JndiConnectionFactorySupport connectionFactoryBean = new JndiConnectionFactorySupport();
connectionFactoryBean.setBindingsDir(this.bindingDir);
connectionFactoryBean
.setConnectionFactoryName(connectionFactoryName);
connectionFactoryBean.afterPropertiesSet();
jmsTemplate.setConnectionFactory(connectionFactoryBean.getObject());
JndiDestinationResolver destinationResolver = new JndiDestinationResolver();
destinationResolver.setJndiTemplate(connectionFactoryBean
.getJndiTemplate());
jmsTemplate.setDestinationResolver(destinationResolver);
jmsTemplate.setReceiveTimeout(20000);
jmsTemplate.setDefaultDestinationName(this.defaultDestinationName);
尝试了如下 jmsTemplate.execute()
方法:
@SuppressWarnings({ "unused", "unchecked" })
Message responseMessage = (Message) jmsTemplate.execute(
new SessionCallback() {
public Object doInJms(Session session)
throws JMSException {
MessageConsumer consumer = session
.createConsumer(jmsTemplate.getDestinationResolver().resolveDestinationName(session, "QUEUE_NAME", false));
Message response = consumer.receive(1);
try {
testMethod();//this method will throw exception.
response.acknowledge();
consumer.close();
} catch(Exception e){
consumer.close();//control will come here.
}
return response;
}
}, true);
您可以添加 JMS 消息的事务处理。参见 the example
您的听众应该是 "transacted"
。像这样
<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted">
<jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>
您不能使用 receive()
方法执行此操作,因为当接收方法 returns.
你需要运行在会话范围内可能失败的代码;例如带有 JmsTemplate.execute()
和 SessionCallback
- 像这样...
this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
this.jmsTemplate.convertAndSend("foo", "bar");
try {
String value = this.jmsTemplate.execute(session -> {
MessageConsumer consumer = session.createConsumer(
this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, "foo", false));
String result;
try {
Message received = consumer.receive(5000);
result = (String) this.jmsTemplate.getMessageConverter().fromMessage(received);
// Do some stuff that might throw an exception
received.acknowledge();
}
finally {
consumer.close();
}
return result;
}, true);
System.out.println(value);
}
catch (Exception e) {
e.printStackTrace();
}
您必须浏览队列。
使用 Websphere MQ 执行的真实代码示例
public void browseMessagesAndJiraCreation(String jiraUserName, String jiraPassword) { int counterMessages = jmsTemplate.browse(destinationQueueName, new BrowserCallback<Integer>() { @Override public Integer doInJms(final Session session, final QueueBrowser queueBrowser) throws JMSException { Enumeration<TextMessage> enumeration = queueBrowser.getEnumeration(); int counterMessages = 0; while (enumeration.hasMoreElements()) { counterMessages += 1; TextMessage msg = enumeration.nextElement(); logger.info("Found : {}", msg.getText()); JiraId jiraId = jiraManager.createIssue(jiraUserName, jiraPassword); jiraManager.attachFileToJira(jiraId, msg.getText(), jiraUserName, jiraPassword); } return counterMessages; } }); logger.info("{}:messages were browsed and processed from queue:{}.", counterMessages, destinationQueueName); }
解释:
- Spring 框架 JmsTemplate 的使用
- 您传递字符串 gestinationQueueName(例如 destinationQueueName=QL.PREFCNTR.USER.REPLY)
- Java 短信枚举
- counterMessages 是已处理消息的计数器
- 消息未被消费!