IBM MQ 使用同步点提交和回滚

IBM MQ cmit and rollback with syncpoint

基础设施概览:

我有一个设置,我正在从 IBM MQ 读取一组消息并在 k8 集群环境中处理这些消息并将其发送到目标主机。

问题:

我观察到有时消息流很大,在将其发送到目标主机之前,我们的 pod 失败并重新启动,由于我们遵循读取和删除方法,因此我们丢失了所有消息来自 ibmmq example

预期解决方案:

我正在寻找一种解决方案,在将这些消息发送到目标主机之前,我们不会丢失消息的踪迹。

我试过的:

我们在 IBM MQ 中有一个 unit of work 的概念,但由于我们不能期望读取和处理延迟,所以我不能等待一条消息得到处理,然后再读取另一条消息因为它可能会有重大的性能挫折。

代码语言:

NodeJs

如评论所述,有多种方法可以为这只猫蒙皮,但您需要使用交易。

一旦您使用事务选项创建连接,事务范围就开始了。当您提交或回滚时,这将关闭并开始下一个事务。

因此,您应该分批处理对您的应用程序有意义的消息,并在批处理完成后提交。如果您的应用程序被 k8s 杀死,那么所有未提交的读取消息将被回滚,通过退出队列进程来停止毒消息。

Section added to show sample code, and explanation of backout queues.

In your normal processing, if an app gets stopped before it has had time to process the message, you will want that message returned to the queue. So that the message is still available to be processed.

To enable this rollback you need to or in the MQC.MQPMO_SYNCPOINT into the get message options

gmo.Options |= MQC.MQGMO_SYNCPOINT 

Then if all goes well, you can commit.

  mq.Cmit(hConn, function(err) {
    if (err) {
      debug_warn('Error on commit', err);
    } else {
      debug_info('Commit was successful');
    }
  });

or rollback

  mq.Back(hConn, function(err) {
    if (err) {
      debug_warn('Error on rollback', err);
    } else {
      debug_info('rollback was successful');
    }
  });

If you rollback, the message goes back to the queue. Which means it is also the next message that your app will read. This can generate a poison message loop. So you should also set up a backout queue with pass all context permissions for your app user and a backout threshold.

Say you set the threshold to 5. The message can be read 5 times, with rollback. Your app needs to check the threshold and decide that it is a poison message and move it off the queue.

To check the backout threshold (and the backout queue name) you can use the following code


    // Remember to or in the Inquire option on the Open
    openOptions |= MQC.MQOO_INQUIRE;

...
    attrs = [ new mq.MQAttr(MQC.MQIA_BACKOUT_THRESHOLD), 
             new mq.MQAttr(MQC.MQCA_BACKOUT_REQ_Q_NAME) ];

    mq.Inq(hObj, attrs, (err, selectors) => {
        if (err) {
          debug_warn('Error retrieving backout threshold', err);
        } else {
          debug_info('Attributes have been found');

          selectors.forEach((s) => {
              switch (s.selector) {
                case MQC.MQIA_BACKOUT_THRESHOLD:
                debug_info('Threshold is ', s.value);
                break;
              case MQC.MQCA_BACKOUT_REQ_Q_NAME:
                debug_info('Backout queue is ', s.value);
                break;
           }
          });
        }
      });

When getting the message your app can use mqmd.BackoutCount to check how often the message has been rolled back.

    if (mqmd.BackoutCount >= threshold) {
      ...
    }

What I have noticed, that if this is in the same application instance that is repeatedly calling rollback on the same message, then at the threshold a MQRC_HOBJ_ERROR error is thrown. Which your app can check for, and then discard the message. If its a different app instance then it doesn't get the MQRC_HOBJ_ERROR error, so it can check the backout threshold and can discard the message, remembering to commit the discard action.

See https://github.com/ibm-messaging/mq-dev-patterns/tree/master/transactions/JMS/SE for more information.

作为替代方案,您可以使用 keda - https://keda.sh - 它适用于 k8s 根据等待处理的消息数量监控您的队列深度和规模,而不是 CPU / 内存消耗。这样,您可以在有大量消息等待处理时扩大规模,然后慢慢缩小规模,然后队列变得易于管理。这是入门的 link - https://github.com/ibm-messaging/mq-dev-patterns/tree/master/Go-K8s - 该示例适用于 Go 应用程序,但同样适用于 Node.js