IBM MQQueue 获取所有消息的最安全方式
IBM MQQueue Safest way to get all messages
我有一个简单的 java 程序可以使用 IBM MQ 将消息从队列 A 传输到队列 B。
我的程序运行良好,但我担心的是丢失消息。我知道 .get()
从队列 A 中删除了消息。当然,有一小段时间我有 "got" 来自队列 A 的消息,但我还没有将它放入队列 B。如果我的程序在这段时间崩溃了——消息会丢失。
为了解决这个问题,我将当前消息写入日志。然后,如果程序崩溃,我们可以手动将消息重新输入队列。
但是 - 如果程序因 IOException
崩溃怎么办?现在消息已从队列 A 消失,尚未 .put()
进入队列 B,也未写入日志。
依我看,我有两个选择:
先浏览消息:我知道我可以在 "getting" 之前浏览消息,虽然我有点困惑这会如何影响消息的数量队列中的消息,以及它是否创建重复等
将消息写回队列A:理论上,如果我们"get"队列A的消息,应该没有问题"putting" 它回到队列 A,如果由于某种原因我们无法连接到队列 B。
有人可以首先阐明浏览消息的正确方式 - 或者建议我没有想到的第三种选择吗?
while (true) {
try {
// Clear the MQMessage
theMessage.messageId = MQConstants.MQMI_NONE;
theMessage.correlationId = MQConstants.MQCI_NONE;
// Get the message from queue A
queueA.get(theMessage, gmo);
// Read the message from queue A
byte[] messageBytes = new byte[theMessage.getMessageLength()];
theMessage.readFully(messageBytes);
String messageText = new String(messageBytes);
// Store the message to the logs in case of crash
// Put the message in queue B
queueB.put(theMessage);
} catch (MQException e) {
// Break the loop if we get an MQException
// Hopefully, it is a reason code 2033 (out of messages)
} catch (IOException e) {
// Something went wrong reading the message
}
}
一般来说,如果您想跟踪已读和已写的消息,您应该使用事务性 reading/writing。
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.waitInterval = 1000;
gmo.options = MQGMO_WAIT;
gmo.options += MQGMO_FAIL_IF_QUIESCING;
gmo.options += MQGMO_SYNCPOINT;
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options += MQPMO_SYNCPOINT;
// create message instance
MQMessage message = new MQMessage();
message.correlationId = MQCI_NONE;
message.messageId = MQMI_NONE;
// read message
queueA.get(message, gmo);
// write message
queueB.put(message, pmo);
// commit transaction
qmgr.commit();
在这种情况下,如果不提交事务,所有读取的消息将return到源队列,所有写入的消息将从目标队列中消失。最好不要提交每条消息,而是每 10 条或 100 条消息提交一次,具体取决于消息的数量。
如果您不打算使用分布式事务(例如,将一些信息从 MQ 消息保存到数据库),这就足够了。否则我建议切换到 JMS,因为它有更好的事务支持。
我有一个简单的 java 程序可以使用 IBM MQ 将消息从队列 A 传输到队列 B。
我的程序运行良好,但我担心的是丢失消息。我知道 .get()
从队列 A 中删除了消息。当然,有一小段时间我有 "got" 来自队列 A 的消息,但我还没有将它放入队列 B。如果我的程序在这段时间崩溃了——消息会丢失。
为了解决这个问题,我将当前消息写入日志。然后,如果程序崩溃,我们可以手动将消息重新输入队列。
但是 - 如果程序因 IOException
崩溃怎么办?现在消息已从队列 A 消失,尚未 .put()
进入队列 B,也未写入日志。
依我看,我有两个选择:
先浏览消息:我知道我可以在 "getting" 之前浏览消息,虽然我有点困惑这会如何影响消息的数量队列中的消息,以及它是否创建重复等
将消息写回队列A:理论上,如果我们"get"队列A的消息,应该没有问题"putting" 它回到队列 A,如果由于某种原因我们无法连接到队列 B。
有人可以首先阐明浏览消息的正确方式 - 或者建议我没有想到的第三种选择吗?
while (true) {
try {
// Clear the MQMessage
theMessage.messageId = MQConstants.MQMI_NONE;
theMessage.correlationId = MQConstants.MQCI_NONE;
// Get the message from queue A
queueA.get(theMessage, gmo);
// Read the message from queue A
byte[] messageBytes = new byte[theMessage.getMessageLength()];
theMessage.readFully(messageBytes);
String messageText = new String(messageBytes);
// Store the message to the logs in case of crash
// Put the message in queue B
queueB.put(theMessage);
} catch (MQException e) {
// Break the loop if we get an MQException
// Hopefully, it is a reason code 2033 (out of messages)
} catch (IOException e) {
// Something went wrong reading the message
}
}
一般来说,如果您想跟踪已读和已写的消息,您应该使用事务性 reading/writing。
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.waitInterval = 1000;
gmo.options = MQGMO_WAIT;
gmo.options += MQGMO_FAIL_IF_QUIESCING;
gmo.options += MQGMO_SYNCPOINT;
MQPutMessageOptions pmo = new MQPutMessageOptions();
pmo.options += MQPMO_SYNCPOINT;
// create message instance
MQMessage message = new MQMessage();
message.correlationId = MQCI_NONE;
message.messageId = MQMI_NONE;
// read message
queueA.get(message, gmo);
// write message
queueB.put(message, pmo);
// commit transaction
qmgr.commit();
在这种情况下,如果不提交事务,所有读取的消息将return到源队列,所有写入的消息将从目标队列中消失。最好不要提交每条消息,而是每 10 条或 100 条消息提交一次,具体取决于消息的数量。
如果您不打算使用分布式事务(例如,将一些信息从 MQ 消息保存到数据库),这就足够了。否则我建议切换到 JMS,因为它有更好的事务支持。