读取而不从 JMS 队列中删除消息

Read without removing message from JMS queue

如何在不从队列中删除原始消息的情况下从 WebSphere MQ 读取消息?

我有 spring 应用程序从 WebSphere MQ 读取消息。 阅读后,我有一个处理方法来处理从队列中检索到的数据。

第 1 步:

response = jmsTemplate.receive();
//Message automatically removed from queue.

第 2 步:

process(response);

处理方法中有可能抛出异常。如果出现异常,我需要保留队列中的消息。

可能吗?他们有什么办法只有在用户确认后才能删除消息吗?

我尝试添加以下内容:

jmsTemplate.setSessionAcknowledgeMode(javax.jms.Session.CLIENT_ACKNOWLEDGE);

...但邮件仍被删除。

JmsTemplate 创建代码片段:

JndiConnectionFactorySupport connectionFactoryBean = new JndiConnectionFactorySupport();
    connectionFactoryBean.setBindingsDir(this.bindingDir);


        connectionFactoryBean
                .setConnectionFactoryName(connectionFactoryName);
        connectionFactoryBean.afterPropertiesSet();
        jmsTemplate.setConnectionFactory(connectionFactoryBean.getObject());


    JndiDestinationResolver destinationResolver = new JndiDestinationResolver();
    destinationResolver.setJndiTemplate(connectionFactoryBean
            .getJndiTemplate());

    jmsTemplate.setDestinationResolver(destinationResolver);
    jmsTemplate.setReceiveTimeout(20000);
    jmsTemplate.setDefaultDestinationName(this.defaultDestinationName);

            

尝试了如下 jmsTemplate.execute() 方法:

@SuppressWarnings({ "unused", "unchecked" })
        Message responseMessage = (Message) jmsTemplate.execute(
            new SessionCallback() { 
                public Object doInJms(Session session)
                        throws JMSException {
                    MessageConsumer consumer = session
                    .createConsumer(jmsTemplate.getDestinationResolver().resolveDestinationName(session, "QUEUE_NAME", false));
                    Message response = consumer.receive(1);
                    try {
                        testMethod();//this method will throw exception.
                        response.acknowledge();
                        consumer.close();
                    } catch(Exception e){
                        consumer.close();//control will come here.
                    }
                    
                    return response;
                }
        }, true);

您可以添加 JMS 消息的事务处理。参见 the example

您的听众应该是 "transacted"。像这样

<jms:listener-container connection-factory="connectionFactory" acknowledge="transacted">
    <jms:listener ref="notificationProcessor" destination="incoming.queue"/>
</jms:listener-container>

您不能使用 receive() 方法执行此操作,因为当接收方法 returns.

时操作已完成(从会话的角度来看)

你需要运行在会话范围内可能失败的代码;例如带有 JmsTemplate.execute()SessionCallback - 像这样...

this.jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
this.jmsTemplate.convertAndSend("foo", "bar");
try {
    String value = this.jmsTemplate.execute(session -> {
        MessageConsumer consumer = session.createConsumer(
                this.jmsTemplate.getDestinationResolver().resolveDestinationName(session, "foo", false));
        String result;
        try {
            Message received = consumer.receive(5000);
            result = (String) this.jmsTemplate.getMessageConverter().fromMessage(received);
            // Do some stuff that might throw an exception
            received.acknowledge();
        }
        finally {
            consumer.close();
        }
        return result;
    }, true);
    System.out.println(value);
}
catch (Exception e) {
    e.printStackTrace();
}

您必须浏览队列。

使用 Websphere MQ 执行的真实代码示例

public void browseMessagesAndJiraCreation(String jiraUserName, String jiraPassword) {

    int counterMessages = jmsTemplate.browse(destinationQueueName, new BrowserCallback<Integer>() {

        @Override
        public Integer doInJms(final Session session, final QueueBrowser queueBrowser) throws JMSException {
            Enumeration<TextMessage> enumeration = queueBrowser.getEnumeration();
            int counterMessages = 0;
            while (enumeration.hasMoreElements()) {
                counterMessages += 1;
                TextMessage msg = enumeration.nextElement();
                logger.info("Found : {}", msg.getText());
                JiraId jiraId = jiraManager.createIssue(jiraUserName, jiraPassword);
                jiraManager.attachFileToJira(jiraId, msg.getText(), jiraUserName, jiraPassword);
            }
            return counterMessages;
        }
    });
    logger.info("{}:messages were browsed and processed from queue:{}.", counterMessages, destinationQueueName);
}

解释:

  • Spring 框架 JmsTemplate 的使用
  • 您传递字符串 gestinationQueueName(例如 destinationQueueName=QL.PREFCNTR.USER.REPLY)
  • Java 短信枚举
  • counterMessages 是已处理消息的计数器
  • 消息未被消费!