IBM MQQueue 获取所有消息的最安全方式

IBM MQQueue Safest way to get all messages

我有一个简单的 java 程序可以使用 IBM MQ 将消息从队列 A 传输到队列 B。

我的程序运行良好,但我担心的是丢失消息。我知道 .get() 从队列 A 中删除了消息。当然,有一小段时间我有 "got" 来自队列 A 的消息,但我还没有将它放入队列 B。如果我的程序在这段时间崩溃了——消息会丢失。

为了解决这个问题,我将当前消息写入日志。然后,如果程序崩溃,我们可以手动将消息重新输入队列。

但是 - 如果程序因 IOException 崩溃怎么办?现在消息已从队列 A 消失,尚未 .put() 进入队列 B,也未写入日志。

依我看,我有两个选择:

先浏览消息:我知道我可以在 "getting" 之前浏览消息,虽然我有点困惑这会如何影响消息的数量队列中的消息,以及它是否创建重复等

将消息写回队列A:理论上,如果我们"get"队列A的消息,应该没有问题"putting" 它回到队列 A,如果由于某种原因我们无法连接到队列 B。

有人可以首先阐明浏览消息的正确方式 - 或者建议我没有想到的第三种选择吗?

while (true) {

  try {

    // Clear the MQMessage
    theMessage.messageId = MQConstants.MQMI_NONE;
    theMessage.correlationId = MQConstants.MQCI_NONE;

    // Get the message from queue A
    queueA.get(theMessage, gmo);

    // Read the message from queue A
    byte[] messageBytes = new byte[theMessage.getMessageLength()];
    theMessage.readFully(messageBytes);
    String messageText = new String(messageBytes);

    // Store the message to the logs in case of crash

    // Put the message in queue B
    queueB.put(theMessage);

  } catch (MQException e) {

    // Break the loop if we get an MQException
    // Hopefully, it is a reason code 2033 (out of messages)

  } catch (IOException e) {

    // Something went wrong reading the message

  }
}

一般来说,如果您想跟踪已读和已写的消息,您应该使用事务性 reading/writing。

MQGetMessageOptions gmo = new MQGetMessageOptions();   
gmo.waitInterval = 1000;
gmo.options = MQGMO_WAIT;
gmo.options += MQGMO_FAIL_IF_QUIESCING;
gmo.options += MQGMO_SYNCPOINT;

MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options += MQPMO_SYNCPOINT;

// create message instance
MQMessage message = new MQMessage();
message.correlationId = MQCI_NONE;
message.messageId = MQMI_NONE;

// read message
queueA.get(message, gmo);

// write message
queueB.put(message, pmo);

// commit transaction
qmgr.commit();

在这种情况下,如果不提交事务,所有读取的消息将return到源队列,所有写入的消息将从目标队列中消失。最好不要提交每条消息,而是每 10 条或 100 条消息提交一次,具体取决于消息的数量。

如果您不打算使用分布式事务(例如,将一些信息从 MQ 消息保存到数据库),这就足够了。否则我建议切换到 JMS,因为它有更好的事务支持。