在使用自动重新连接的 IBM MQ 客户端应用程序中检测重新连接

Detect reconnect in an IBM MQ client application that uses auto-reconnect

我正在使用配置了自动重新连接的 IBM MQ 类 for JMS(IBM MQ 版本 8.0.0.4)。根据文档,重新连接是隐式发生的。我想在重新连接时发出一个简单的日志语句。出于这个原因,当这种情况发生时,我需要以某种方式得到通知。

在第 Application Recovery 页的 IBM 文档中,我无意中看到了第 "Detecting failover" 节,它说:

Reconnection aware: Register an MQCBT_EVENT_HANDLER event handler with the queue manager. The event handler is posted with MQRC_RECONNECTING when the client starts to try to reconnect to the server, and MQRC_RECONNECTED after a successful reconnection. You can then run a routine to reestablish a predictable state so that the client application is able to continue processing.

遗憾的是,我没有找到 Java/JMS 的代码示例来演示如何以及在何处注册此类事件处理程序。我不知道我的情况是否支持。任何人都可以为我提供正确的方向甚至提供代码示例吗?非常感谢。

2020 年 2 月 5 日的问题更新:

在收到 Sashi 2020 年 1 月 27 日的初步答复后,添加了我自己创建的以下代码示例。

public static void main(String[] args) {
    Connection connection = null;
    Session session = null;
    Object destination = null;
    MessageProducer producer = null;

    try {
        JmsFactoryFactory jmsFactoryFactory = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
        JmsConnectionFactory cf = jmsFactoryFactory.createConnectionFactory();

        cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, HOST);
        cf.setIntProperty(WMQConstants.WMQ_PORT, PORT);
        cf.setStringProperty(WMQConstants.WMQ_CHANNEL, CHANNEL);
        cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
        cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, QM_NAME);
        cf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS, WMQConstants.WMQ_CLIENT_RECONNECT);
        cf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, RECONNECT_TIMEOUT);

        connection = cf.createConnection();
        connection.setExceptionListener(new MQExceptionListener());
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        destination = session.createQueue(QUEUE);
        producer = session.createProducer((Destination)destination);
        connection.start();
    } catch (JMSException ex) {
        LOGGER.error(ex.toString());
    }
}

public class MQExceptionListener implements ExceptionListener {
    public void onException(JMSException e) {
        System.out.println(e);
        if(e.getLinkedException() != null)
            System.out.println(e.getLinkedException());
    }
}

这是我在日志中得到的:

ERROR [Main.main:57] (main) com.ibm.msg.client.jms.DetailedIllegalStateException: JMSWMQ0018: Failed to connect to queue manager '<hostname>' with connection mode 'Client' and host name '<hostname>(<port>)'.
Check the queue manager is started and if running in client mode, check there is a listener running. Please see the linked exception for more information.
ERROR [Main.main:61] (main) Inner exceptions:
ERROR [Main.main:65] (main) com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2538' ('MQRC_HOST_NOT_AVAILABLE').
ERROR [Main.main:65] (main) com.ibm.mq.jmqi.JmqiException: CC=2;RC=2538;AMQ9204: Connection to host '<hostname>(<port>)' rejected. [1=com.ibm.mq.jmqi.JmqiException[CC=2;RC=2538;AMQ9204: Connection to host '<hostname>/<ip>:<port>' rejected. [1=java.net.ConnectException[Connection refused: connect],3=<hostname>/<ip>:<port>,4=TCP,5=Socket.connect]],3=<hostname>(<port>),5=RemoteTCPConnection.bindAndConnectSocket]
ERROR [Main.main:65] (main) com.ibm.mq.jmqi.JmqiException: CC=2;RC=2538;AMQ9204: Connection to host '<hostname>/<ip>:<port>' rejected. [1=java.net.ConnectException[Connection refused: connect],3=<hostname>/<ip>:<port>,4=TCP,5=Socket.connect]
ERROR [Main.main:65] (main) java.net.ConnectException: Connection refused: connect

2020 年 2 月 11 日的问题更新:

我根据 Sashi 在 2020 年 2 月 5 日收到的反馈添加了这些内容。

我尝试构建一个连接到 IBM MQ 实例的最小应用程序。这是代码:

Application.java

public class Application {
    private static final Logger LOGGER = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        new Application().run();
    }

    private void run() {
        MQWriter writer = new MQWriter();
        int i = 1;
        while (true) {
            String message = "Hello Testing " + i;
            LOGGER.info("Sending message {} to MQ server...", message);
            writer.write(message);
            i++;
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

MQConnectionDetails.java

public class MQConnectionDetails {
    public static final String HOST = "XXX.XXX.XXX.XXX";
    public static final int PORT = 1414;
    public static final String QM_NAME = "QM1";
    public static final String CHANNEL = "DEV.APP.SVRCONN";
    public static final String QUEUE = "DEV.QUEUE.1";
    public static final int RECONNECT_TIMEOUT = 60; // 1 minute
}

MQWriter.java

public class MQWriter {

    private static final Logger LOGGER = LoggerFactory.getLogger(MQWriter.class);

    private Connection connection = null;
    private Session session = null;
    private Object destination = null;
    private MessageProducer producer = null;

    public MQWriter() {
        try {
            JmsFactoryFactory jff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
            JmsConnectionFactory jcf = jff.createConnectionFactory();
            jcf.setStringProperty(WMQConstants.WMQ_HOST_NAME, MQConnectionDetails.HOST);
            jcf.setIntProperty(WMQConstants.WMQ_PORT, MQConnectionDetails.PORT);
            jcf.setStringProperty(WMQConstants.WMQ_CHANNEL, MQConnectionDetails.CHANNEL);
            jcf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
            jcf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, MQConnectionDetails.QM_NAME);
            jcf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS, WMQConstants.WMQ_CLIENT_RECONNECT);
            jcf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, MQConnectionDetails.RECONNECT_TIMEOUT);

            LOGGER.info("Initializing connection to write queue {} on {}:{}...",
                    MQConnectionDetails.QUEUE,
                    MQConnectionDetails.HOST,
                    MQConnectionDetails.PORT);
            connection = jcf.createConnection();
            connection.setExceptionListener(new MQExceptionListener());
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(MQConnectionDetails.QUEUE);
            producer = session.createProducer((Destination)destination);
            connection.start();
        } catch (JMSException ex) {
            LOGGER.error("Error initializing connection to write queue", ex);
        }
    }

    public void write(String message) {
        try {
            TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
        } catch (Exception ex) {
            LOGGER.error("Error sending message to write queue", ex);
        }
    }
}

MQExceptionListener.java

public class MQExceptionListener implements ExceptionListener {
    private static final Logger LOGGER = LoggerFactory.getLogger(MQExceptionListener.class);

    public void onException(JMSException ex) {
        LOGGER.error("=====");
        LOGGER.error(ex.toString());
        if (ex.getLinkedException() != null) {
            LOGGER.error(ex.getLinkedException().toString());
        }
        LOGGER.error("=====");
    }
}

我运行的测试场景是这样的:

  1. 确保 IBM MQ 在 TCP 端口 1414 上可用(Amazon EC2 上的 IBM MQ Docker 容器 运行)。
  2. 运行 上面的应用程序 (Application.java) 并确保它向队列发送消息。
  3. 通过将端口从 1414 更改为 1415 来更改 Amazon EC2 安全组上的防火墙配置,这会使客户端无法使用 IBM MQ。

这是我观察到的:

以下是抛出的异常的摘要:

控制台异常:

2020-02-11 09:50:16,155 INFO [Application.run:21] (main) Sending message Hello Testing 13 to MQ server...
2020-02-11 09:50:17,285 INFO [Application.run:21] (main) Sending message Hello Testing 14 to MQ server...
2020-02-11 09:50:18,413 INFO [Application.run:21] (main) Sending message Hello Testing 15 to MQ server...
2020-02-11 09:50:19,555 INFO [Application.run:21] (main) Sending message Hello Testing 16 to MQ server...
2020-02-11 09:51:45,966 ERROR [MQExceptionListener.onException:14] (JMSCCThreadPoolWorker-6) =====
2020-02-11 09:51:45,966 ERROR [MQExceptionListener.onException:15] (JMSCCThreadPoolWorker-6) com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ1107: A problem with this connection has occurred.
An error has occurred with the IBM MQ JMS connection.
Use the linked exception to determine the cause of this error.
2020-02-11 09:51:45,966 ERROR [MQExceptionListener.onException:17] (JMSCCThreadPoolWorker-6) com.ibm.mq.MQException: MQ delivered an asynchronous event with completion code '2', and reason '2009'.
2020-02-11 09:51:45,966 ERROR [MQExceptionListener.onException:19] (JMSCCThreadPoolWorker-6) =====
2020-02-11 09:51:45,967 ERROR [MQWriter.write:52] (main) Error sending message to write queue
com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ2007: Failed to send a message to destination 'DEV.QUEUE.1'.
JMS attempted to perform an MQPUT or MQPUT1; however IBM MQ reported an error.
Use the linked exception to determine the cause of this error.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:595)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:215)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1288)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1245)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.access0(WMQMessageProducer.java:76)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$SpiIdentifiedProducerShadow.sendInternal(WMQMessageProducer.java:906)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$ProducerShadow.send(WMQMessageProducer.java:566)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.send(WMQMessageProducer.java:1428)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendMessage(JmsMessageProducerImpl.java:855)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.synchronousSendInternal(JmsMessageProducerImpl.java:2055)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendInternal(JmsMessageProducerImpl.java:1993)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:1486)
    at com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:293)
    at org.example.MQWriter.write(MQWriter.java:50)
    at org.example.Application.run(Application.java:22)
    at org.example.Application.main(Application.java:13)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2009' ('MQRC_CONNECTION_BROKEN').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:203)
    ... 14 more
Caused by: com.ibm.mq.jmqi.JmqiException: CC=2;RC=2009
    at com.ibm.mq.jmqi.remote.api.RemoteHconn$ReconnectionState.recordFailure(RemoteHconn.java:4931)
    at com.ibm.mq.jmqi.remote.api.RemoteHconn.setReconnectionFailureInner(RemoteHconn.java:2650)
    at com.ibm.mq.jmqi.remote.api.RemoteParentHconn.setReconnectionFailure(RemoteParentHconn.java:152)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.bestHconn(RemoteReconnectThread.java:265)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.run(RemoteReconnectThread.java:115)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.runTask(WorkQueueItem.java:319)
    at com.ibm.msg.client.commonservices.workqueue.SimpleWorkQueueItem.runItem(SimpleWorkQueueItem.java:99)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.run(WorkQueueItem.java:343)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueManager.runWorkQueueItem(WorkQueueManager.java:312)
    at com.ibm.msg.client.commonservices.j2se.workqueue.WorkQueueManagerImplementation$ThreadPoolWorker.run(WorkQueueManagerImplementation.java:1227)
2020-02-11 09:51:46,969 INFO [Application.run:21] (main) Sending message Hello Testing 17 to MQ server...
2020-02-11 09:51:46,972 ERROR [MQWriter.write:52] (main) Error sending message to write queue
com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ2007: Failed to send a message to destination 'DEV.QUEUE.1'.
JMS attempted to perform an MQPUT or MQPUT1; however IBM MQ reported an error.
Use the linked exception to determine the cause of this error.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:595)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:215)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1288)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1245)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.access0(WMQMessageProducer.java:76)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$SpiIdentifiedProducerShadow.sendInternal(WMQMessageProducer.java:906)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$ProducerShadow.send(WMQMessageProducer.java:566)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.send(WMQMessageProducer.java:1428)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendMessage(JmsMessageProducerImpl.java:855)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.synchronousSendInternal(JmsMessageProducerImpl.java:2055)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendInternal(JmsMessageProducerImpl.java:1993)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:1486)
    at com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:293)
    at org.example.MQWriter.write(MQWriter.java:50)
    at org.example.Application.run(Application.java:22)
    at org.example.Application.main(Application.java:13)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2009' ('MQRC_CONNECTION_BROKEN').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:203)
    ... 14 more
Caused by: com.ibm.mq.jmqi.JmqiException: CC=2;RC=2009
    at com.ibm.mq.jmqi.remote.api.RemoteHconn$ReconnectionState.recordFailure(RemoteHconn.java:4931)
    at com.ibm.mq.jmqi.remote.api.RemoteHconn.setReconnectionFailureInner(RemoteHconn.java:2650)
    at com.ibm.mq.jmqi.remote.api.RemoteParentHconn.setReconnectionFailure(RemoteParentHconn.java:152)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.bestHconn(RemoteReconnectThread.java:265)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.run(RemoteReconnectThread.java:115)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.runTask(WorkQueueItem.java:319)
    at com.ibm.msg.client.commonservices.workqueue.SimpleWorkQueueItem.runItem(SimpleWorkQueueItem.java:99)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.run(WorkQueueItem.java:343)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueManager.runWorkQueueItem(WorkQueueManager.java:312)
    at com.ibm.msg.client.commonservices.j2se.workqueue.WorkQueueManagerImplementation$ThreadPoolWorker.run(WorkQueueManagerImplementation.java:1227)

2020 年 2 月 12 日的问题更新

根据 2020 年 2 月 11 日 JoshMc 的回答添加了此样本和调查结果。我对此样本的评论:

MQWriter2.java

public class MQWriter2 {

    private static final Logger LOGGER = LoggerFactory.getLogger(MQWriter2.class);

    private Connection connection = null;
    private Session session = null;
    private Queue destination = null;
    private MessageProducer producer = null;

    public MQWriter2() {
        try {
            MQConnectionFactory factory = new MQConnectionFactory();
            factory.setTransportType(WMQConstants.WMQ_CM_CLIENT);
            factory.setConnectionNameList("XXX.XXX.XXX.XXX(1414)");
            factory.setQueueManager(MQConnectionDetails.QM_NAME);
            factory.setChannel(MQConnectionDetails.CHANNEL);
            factory.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
            factory.setClientReconnectTimeout(MQConnectionDetails.RECONNECT_TIMEOUT);

            LOGGER.info("Initializing connection to write queue {} on {}:{}...",
                    MQConnectionDetails.QUEUE,
                    MQConnectionDetails.HOST,
                    MQConnectionDetails.PORT);
            connection = factory.createConnection();
            connection.setExceptionListener(new MQExceptionListener());
            session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            destination = session.createQueue(MQConnectionDetails.QUEUE);
            producer = session.createProducer(destination);
            connection.start();
        } catch (JMSException ex) {
            LOGGER.error("Error initializing connection to write queue", ex);
        }
    }

    public void write(String message) {
        try {
            TextMessage textMessage = session.createTextMessage(message);
            producer.send(textMessage);
        } catch (Exception ex) {
            LOGGER.error("Error sending message to write queue", ex);
        }
    }
}

控制台输出

2020-02-12 08:39:11,628 INFO [MQWriter2.<init>:29] (main) Initializing connection to write queue DEV.QUEUE.1 on 54.161.121.207:1414...
2020-02-12 08:39:14,552 INFO [Application.run:19] (main) Sending message Hello Testing 1 to MQ server...
2020-02-12 08:39:15,710 INFO [Application.run:19] (main) Sending message Hello Testing 2 to MQ server...
2020-02-12 08:39:16,841 INFO [Application.run:19] (main) Sending message Hello Testing 3 to MQ server...
...
2020-02-12 08:39:41,973 INFO [Application.run:19] (main) Sending message Hello Testing 25 to MQ server...
2020-02-12 08:41:27,314 ERROR [MQExceptionListener.onException:14] (JMSCCThreadPoolWorker-10) =====
2020-02-12 08:41:27,314 ERROR [MQExceptionListener.onException:15] (JMSCCThreadPoolWorker-10) com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ1107: A problem with this connection has occurred.
An error has occurred with the IBM MQ JMS connection.
Use the linked exception to determine the cause of this error.
2020-02-12 08:41:27,314 ERROR [MQWriter2.write:49] (main) Error sending message to write queue
com.ibm.msg.client.jms.DetailedJMSException: JMSWMQ2007: Failed to send a message to destination 'DEV.QUEUE.1'.
JMS attempted to perform an MQPUT or MQPUT1; however IBM MQ reported an error.
Use the linked exception to determine the cause of this error.
    at com.ibm.msg.client.wmq.common.internal.Reason.reasonToException(Reason.java:595)
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:215)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1288)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.checkJmqiCallSuccess(WMQMessageProducer.java:1245)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.access0(WMQMessageProducer.java:76)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$SpiIdentifiedProducerShadow.sendInternal(WMQMessageProducer.java:906)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer$ProducerShadow.send(WMQMessageProducer.java:566)
    at com.ibm.msg.client.wmq.internal.WMQMessageProducer.send(WMQMessageProducer.java:1428)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendMessage(JmsMessageProducerImpl.java:855)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.synchronousSendInternal(JmsMessageProducerImpl.java:2055)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.sendInternal(JmsMessageProducerImpl.java:1993)
    at com.ibm.msg.client.jms.internal.JmsMessageProducerImpl.send(JmsMessageProducerImpl.java:1486)
    at com.ibm.mq.jms.MQMessageProducer.send(MQMessageProducer.java:293)
    at org.example.MQWriter2.write(MQWriter2.java:47)
    at org.example.Application.run(Application.java:20)
    at org.example.Application.main(Application.java:11)
Caused by: com.ibm.mq.MQException: JMSCMQ0001: IBM MQ call failed with compcode '2' ('MQCC_FAILED') reason '2009' ('MQRC_CONNECTION_BROKEN').
    at com.ibm.msg.client.wmq.common.internal.Reason.createException(Reason.java:203)
    ... 14 more
Caused by: com.ibm.mq.jmqi.JmqiException: CC=2;RC=2009
    at com.ibm.mq.jmqi.remote.api.RemoteHconn$ReconnectionState.recordFailure(RemoteHconn.java:4931)
    at com.ibm.mq.jmqi.remote.api.RemoteHconn.setReconnectionFailureInner(RemoteHconn.java:2650)
    at com.ibm.mq.jmqi.remote.api.RemoteParentHconn.setReconnectionFailure(RemoteParentHconn.java:152)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.bestHconn(RemoteReconnectThread.java:265)
    at com.ibm.mq.jmqi.remote.impl.RemoteReconnectThread.run(RemoteReconnectThread.java:115)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.runTask(WorkQueueItem.java:319)
    at com.ibm.msg.client.commonservices.workqueue.SimpleWorkQueueItem.runItem(SimpleWorkQueueItem.java:99)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueItem.run(WorkQueueItem.java:343)
    at com.ibm.msg.client.commonservices.workqueue.WorkQueueManager.runWorkQueueItem(WorkQueueManager.java:312)
    at com.ibm.msg.client.commonservices.j2se.workqueue.WorkQueueManagerImplementation$ThreadPoolWorker.run(WorkQueueManagerImplementation.java:1227)

您可以在创建连接后在连接对象上设置一个ExceptionListener。尝试重新连接时,将调用 ExceptionListener 的 onException 方法。这是一个例子:

    ExceptionListener exceptionListener = new ExceptionListener(){
        @Override
        public void onException(JMSException e) {
            System.out.println(e);
            if(e.getLinkedException() != null)
                System.out.println(e.getLinkedException());
        }
    };
    MQQueueConnection connection = (MQQueueConnection) cf.createQueueConnection();
    connection.setExceptionListener(exceptionListener);

这里有几点可能会为您解决问题。

下一行设置 MQ 在发现连接丢失后尝试重新连接到队列管理器的时间。

jcf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, MQConnectionDetails.RECONNECT_TIMEOUT);

客户端注意到连接中断需要多长时间取决于失败的类型,但在您描述的情况下,它将基于 SVRCONN 通道的 HBINT 设置在队列管理器上。

当没有其他正常流量通过通道时,每 HBINT 秒发送一次心跳。通道的TIMEOUT是根据HBINT,如果HBINT小于60秒那么TIMEOUT是HBINT的两倍,如果HBINT是60秒或者更大,TIMEOUT 是 HBINT 加上 60 秒。超时基于最后一次发送流量或心跳,而不是当您更改防火墙配置时,尽管在这种情况下,您似乎每隔几秒发送一条消息,所以这应该已经关闭。

根据日志,我不确定它是否像我预期的那样选择了 RECONNECT 选项,当重新连接超时到期时,您会得到这些错误之一,而不是 MQRC_CONNECTION_BROKEN:

  • MQRC_RECONNECT_FAILED
  • MQRC_RECONNECT_TIMED_OUT

在我的样本中,我是这样设置的,也许可以试试这个,而不是你目前的设置方式:

jcf.setClientReconnectOptions(WMQConstants.WMQ_CLIENT_RECONNECT);
jcf.setClientReconnectTimeout(MQConnectionDetails.RECONNECT_TIMEOUT);

根据我所看到的,我猜你的 HBINTSVRCONN 频道上设置为 45 秒,并且连接只是在 MQRC_CONNECTION_BROKEN 90 秒时超时并且从不尝试重新连接。

我遇到了与主题发起人相同的问题。在花了几个小时筛选互联网上可用的信息、与同事交谈并试图让重新连接正常工作后,我放弃了,并决定通过模拟难以理解的重新连接功能来解决这个问题。我希望它能帮助其他在 IBM MQ 上苦苦挣扎的人。我写的 class 基本上做了两件事:

  1. 反复尝试连接到 IBM MQ,尝试之间的间隔不断增加。
  2. 连接后,设置一个错误处理程序,当连接出现问题时由 IBM MQ 触发(使用相同的逻辑重新连接)。

首先,这是 class 本身:

package com.raiks.mqclient;

import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.wmq.WMQConstants;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.raiks.mqclient.IbmMqMessageListener;

/**
 * This class implements the reconnection logic for JMS brokers that don't support it
 * In particular, it does it for IBM MQ with its incomprehensible reconnection algorithm
 * It's expected that each connection manager receives a separate connection factory
 * and a message listener - it's not guaranteed for those to be thread safe
 */
public final class IbmMqJmsConnectionManager {
    private static final Logger LOGGER = LoggerFactory.getLogger(IbmMqJmsConnectionManager.class);
    private static final int INITIAL_RECONNECTION_DELAY_MS = 6000;
    private static final int MAX_RECONNECTION_DELAY_MS = 60000;
    private static final String QUEUE_PREIX = "queue:///";

    private final String connectorName;
    private final JmsConnectionFactory connectionFactory;
    private final String queueName;

    private final IbmMqMessageListener messageListener;

    private final int initialReconnectionDelayMs;
    private final int maxReconnectionDelayMs;

    public IbmMqJmsConnectionManager(
        String connectorName,
        JmsConnectionFactory connectionFactory,
        String queueName,
        IbmMqMessageListener messageListener,
        int initialReconnectionDelayMs,
        int maxReconnectionDelayMs
    ) {
        this.connectorName = connectorName;
        this.connectionFactory = connectionFactory;
        this.queueName = queueName;
        this.messageListener = messageListener;
        this.initialReconnectionDelayMs = initialReconnectionDelayMs;
        this.maxReconnectionDelayMs = maxReconnectionDelayMs;
    }

    /**
     * Attempts to connect to a JMS broker and makes continuous retries with an increasing interval if fails
     * When the maximum interval is reached, issues an error and keeps on trying
     * Sets the exception listener (a callback) in the created JMSContext which calls this method when the
     * connection with the broker goes down due to network issue or intentional connection termination
     */
    public void connectToBrokerWithRetries() {
        String connectionDetails = formatConnectionDetails();
        LOGGER.info("Attempting to connect to JMS broker '{}'. Connection details = {}", connectorName, connectionDetails);

        JMSContext context = null;
        int sleepingTimeMs = INITIAL_RECONNECTION_DELAY_MS;
        int accumulatedSleepingTimeMs = 0;

        // Try to reconnect until we succeed. IMPORTANT! This is a blocking loop that never ends so it must be run in a separate thread
        while (context == null) {
            try {
                context = connectionFactory.createContext(JMSContext.AUTO_ACKNOWLEDGE);
                LOGGER.info("Successfully connected to the JMS broker '{}'. Connection details = {}", connectorName, connectionDetails);

                boolean hadUnsuccessfulConnectionAttempts = accumulatedSleepingTimeMs > 0;
                if (hadUnsuccessfulConnectionAttempts) {
                    LOGGER.warn(
                        "Before this successful attempt, I spent {} ms repeatedly trying to connect to '{}'. Please check the broker's health. Connection details = {}",
                        accumulatedSleepingTimeMs, connectorName, connectionDetails
                    );
                }

                Destination destination = context.createQueue(QUEUE_PREIX + queueName);
                JMSConsumer jmsConsumer = context.createConsumer(destination);
                jmsConsumer.setMessageListener(messageListener);
                LOGGER.info("Successfully connected to the queue '{}' at '{}'. Connection details = {}", queueName, connectorName, connectionDetails);

                // Sets a callback that will be invoked when something happens with a connection to a broker
                context.setExceptionListener(
                    jmsException -> {
                        LOGGER.warn("Something bad happened to JMS broker connection to '{}'. I will try to reconnect. Connection details = {}", connectorName, connectionDetails);
                        connectToBrokerWithRetries();
                    }
                );
            } catch (Exception e) {
                LOGGER.warn(
                    "Failed to create a JMS context for '{}'. I will wait for {} ms and then make a reconnection attempt. Connection details = {}",
                    connectorName, sleepingTimeMs, connectionDetails, e
                );
                context = null;
                try {
                    Thread.sleep(sleepingTimeMs);
                    accumulatedSleepingTimeMs += sleepingTimeMs;
                    int doubledSleepingTime = sleepingTimeMs * 2;
                    // We double the sleeping time on each subsequent attempt until we hit the limit
                    // Then we just keep on reconnecting forever using the limit value
                    boolean nextReconnectionDelayWillExceedMaxDelay = doubledSleepingTime >= MAX_RECONNECTION_DELAY_MS;
                    if (nextReconnectionDelayWillExceedMaxDelay) {
                        sleepingTimeMs = MAX_RECONNECTION_DELAY_MS;
                        LOGGER.error(
                            "Repeatedly failed to create a JMS context for {} ms. I will keep on trying every {} ms but please check the broker availability. Connection details = {}",
                            accumulatedSleepingTimeMs, sleepingTimeMs, connectionDetails
                        );
                    } else {
                        sleepingTimeMs = doubledSleepingTime;
                    }
                } catch (InterruptedException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }
    }

    private String formatConnectionDetails() {
        String connectionDetails = "[]";
        try {
            connectionDetails = String.format(
                "[ host = %s, port = %d, queueManager = %s, channel = %s, user = %s ]",
                connectionFactory.getStringProperty(WMQConstants.WMQ_HOST_NAME),
                connectionFactory.getIntProperty(WMQConstants.WMQ_PORT),
                connectionFactory.getStringProperty(WMQConstants.WMQ_QUEUE_MANAGER),
                connectionFactory.getStringProperty(WMQConstants.WMQ_CHANNEL),
                connectionFactory.getStringProperty(WMQConstants.USERID)
            );
        } catch (Exception e) {
            LOGGER.warn("Failed to get the connection details. This is not critical, but the details will be unavailable");
        }
        return connectionDetails;
    }
}

下面是您的使用方法:

LOGGER.info("Starting the initial connection thread");
Thread cftInitialConnectionThread = new Thread(cftConnectionManager::connectToBrokerWithRetries);
cftInitialConnectionThread.start();

检查这段代码,因为我正在使用 IBM WMQ 9.2.3,在 CentOS 8 上使用带有 Pacemaker 的 3 节点 IBM WMQ 多实例。

package com.ibm.mq.samples.jms;

import javax.jms.Destination;
import javax.jms.JMSConsumer;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.JMSProducer;
import javax.jms.TextMessage;

import com.ibm.msg.client.jms.JmsConnectionFactory;
import com.ibm.msg.client.jms.JmsFactoryFactory;
import com.ibm.msg.client.wmq.WMQConstants;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class JmsPutGet {

    // System exit status value (assume unset value to be 1)
    private static int status = 1;

    // Create variables for the connection to MQ
    private static final String HOST = "192.168.49.140"; // Host name or IP address
    private static final int PORT = 10200; // Listener port for your queue manager
    private static final String CHANNEL = "CHANNEL1"; // Channel name
    private static final String QMGR = "HAQM1"; // Queue manager name
    private static final String APP_USER = ""; // User name that application uses to connect to MQ
    private static final String APP_PASSWORD = ""; // Password that the application uses to connect to MQ
    private static final String QUEUE_NAME = "SOURCE"; // Queue that the application uses to put and get messages to and from
    private static final int RECONNECT_TIMEOUT = 60; // 1 minute
    private static JMSContext context = null;
    private static  Destination destination = null;

    public static void main(String[] args) {

        // Variables
        JMSProducer producer = null;
        JMSConsumer consumer = null;        
        LocalDateTime now = null;

        try {
            
            setupResources();

            long uniqueNumber = System.currentTimeMillis() % 1000;
            TextMessage message = context.createTextMessage("Your lucky number today is " + uniqueNumber);
            DateTimeFormatter dtf = DateTimeFormatter.ofPattern("uuuu/MM/dd HH:mm:ss");
            
            for(int i=0; i>=0; i++){
                producer = context.createProducer();
                producer.send(destination, message);
                //System.out.println("Sent message:\n " + i + " " + message);
                System.out.println("\nMensaje enviado:\n " + i );
                now = LocalDateTime.now();
                System.out.println(dtf.format(now));
                consumer = context.createConsumer(destination); // autoclosable
                String receivedMessage = consumer.receiveBody(String.class, 15000); // in ms or 15 seconds
                //System.out.println("\nReceived message:\n " + i + " " + receivedMessage);
                System.out.println("\nMensaje recibido:\n " + i );
                now = LocalDateTime.now();
                System.out.println(dtf.format(now));
                Thread.sleep(1000);
            }
            context.close();

            recordSuccess();
        } catch (Exception ex) {
            recordFailure(ex);
            System.out.println("DETECTING ERROR... RECONNECTING");
            setupResources();
            
        }

    } // end main()
        
    private static void setupResources() { 

        boolean connected = false; 
        while (!connected) { 
            try { 
                // Create a connection factory
                JmsFactoryFactory ff = JmsFactoryFactory.getInstance(WMQConstants.WMQ_PROVIDER);
                JmsConnectionFactory cf = ff.createConnectionFactory();             
                
                // Set the properties
                cf.setStringProperty(WMQConstants.WMQ_HOST_NAME, HOST);
                cf.setIntProperty(WMQConstants.WMQ_PORT, PORT);             
                //cf.setStringProperty(WMQConstants.WMQ_CONNECTION_NAME_LIST, "192.168.49.140(10200),192.168.49.131(10200),192.168.49.132(10200)");
                cf.setStringProperty(WMQConstants.WMQ_CHANNEL, CHANNEL);
                cf.setIntProperty(WMQConstants.WMQ_CONNECTION_MODE, WMQConstants.WMQ_CM_CLIENT);
                cf.setStringProperty(WMQConstants.WMQ_QUEUE_MANAGER, QMGR);
                cf.setStringProperty(WMQConstants.WMQ_APPLICATIONNAME, "JmsPutGet (JMS)");
                cf.setBooleanProperty(WMQConstants.USER_AUTHENTICATION_MQCSP, true);
                cf.setStringProperty(WMQConstants.USERID, APP_USER);
                cf.setStringProperty(WMQConstants.PASSWORD, APP_PASSWORD);
                cf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_TIMEOUT, RECONNECT_TIMEOUT);
                cf.setIntProperty(WMQConstants.WMQ_CLIENT_RECONNECT_OPTIONS, WMQConstants.WMQ_CLIENT_RECONNECT);
                //cf.setStringProperty(WMQConstants.WMQ_SSL_CIPHER_SUITE, "*TLS12");                

                // Create JMS objects
                context = cf.createContext();
                destination = context.createQueue("queue:///" + QUEUE_NAME);
                // no exception? then we connected ok 
                connected = true; 
                System.out.println("CONNECTED");
            } 
            catch (JMSException je) { 
                // sleep and then have another attempt 
                System.out.println("RECONNECTING");
                try {Thread.sleep(30*1000);} catch (InterruptedException ie) {} 
            } 
        } 
    }

    private static void recordSuccess() {
        System.out.println("SUCCESS");
        status = 0;
        return;
    }

    private static void recordFailure(Exception ex) {
        if (ex != null) {
            if (ex instanceof JMSException) {
                processJMSException((JMSException) ex);
            } else {
                System.out.println(ex);
            }
        }
        System.out.println("FAILURE");
        status = -1;
        return;
    }

    private static void processJMSException(JMSException jmsex) {
        System.out.println(jmsex);
        Throwable innerException = jmsex.getLinkedException();
        if (innerException != null) {
            System.out.println("Inner exception(s):");
        }
        while (innerException != null) {
            System.out.println(innerException);
            innerException = innerException.getCause();
        }
        return;
   }
}

我在 github 中添加了一个 link 以供参考 https://github.com/fintecheando/IBMMQSample