从 Websphere MQ 7 读取消息的独占访问权,处理它然后删除它

Exclusive access to read a message from Websphere MQ 7, proccess it then delete it

我有一个队列表单,我希望它能够检索消息。然后我需要以某种方式(不在范围内)处理它们,然后将它们从队列中移除。

我尝试创建 2 个队列,一个用于浏览邮件,一个用于在处理完邮件后将其删除。

    MQQueue browseQueue = qMgr.AccessQueue(QUEUE_NAME, MQC.MQOO_BROWSE);
    MQGetMessageOptions browseOptions = new MQGetMessageOptions()
    {
        Options = MQC.MQGMO_WAIT | MQC.MQPMO_FAIL_IF_QUIESCING | MQC.MQGMO_BROWSE_NEXT,
        WaitInterval = MQC.MQWI_UNLIMITED
    };

    MQQueue acknowledgeQueue = qMgr.AccessQueue(QUEUE_NAME, MQC.MQOO_INPUT_AS_Q_DEF);
    MQGetMessageOptions acknowledgeOptions = new MQGetMessageOptions()
    {
        Options = MQC.MQGMO_WAIT | MQC.MQPMO_FAIL_IF_QUIESCING | MQC.MQMO_MATCH_MSG_ID,
        WaitInterval = MQC.MQWI_UNLIMITED
    };

    while (keepRunning.WaitOne(0))
    {
        MQMessage browseMessage = new MQMessage();

        try
        {
            browseQueue.Get(browseMessage, browseOptions);
        }
        catch (MQException mqexe)
        {
            throw;
        }

        if (browseMessage.MessageType != ShutDown.TYPE)
        {
            object o = browseMessage.ReadObject();
            Console.WriteLine("The message is: {0}", o);
        }

        browseMessage.ClearMessage();

        MQMessage acknowledgeMessage = new MQMessage()
        {
            MessageId = browseMessage.MessageId
        };
        acknowledgeQueue.Get(acknowledgeMessage, acknowledgeOptions);
    }

但我需要确保没有其他进程可以访问同一消息。因为,我依赖于使用 2 个队列,我不知道该怎么做。

您可以在使用 MQ 时以事务方式处理消息。 因此,您的要求应该通过使用 1 个输入队列并在同步点下从该队列读取消息来满足。

所以你的应用程序应该首先获取同步点下的消息(它是一个获取消息的选项),进行处理,如果成功它可以提交MQ事务,否则它可以回滚它,从而将消息放入返回队列不变,或者甚至更好地将消息移动到撤销队列并使用事务性来防止应用程序中出现意外错误。 此方法自动满足您的要求,即 1 条消息不应可用于并行读取。

无论如何,你提出的方法没有任何意义,1 条消息被放入 1 个队列,你如何将它送到你提出的两个输入队列,即使这样做之后,从队列中获取消息也没有'影响不同队列上的消息或浏览游标。

呸。你不懂MQ。正如 Attila 所说,您需要使用 MQ SyncPoint,如果您对消息感到满意,则提交它,否则将其撤消。另外,不要在程序中硬编码队列管理器或队列名称,您应该从 属性 文件中读取它们。

代码应如下所示:

MQQueueManager qMgr = new MQQueueManager(qMgrName);
MQQueue queue = qMgr.AccessQueue(qName, MQC.MQOO_INPUT_AS_Q_DEF + MQC.MQOO_FAIL_IF_QUIESCING);
MQGetMessageOptions gmo = new MQGetMessageOptions();
gmo.Options = MQC.MQGMO_WAIT | MQC.MQGMO_FAIL_IF_QUIESCING | MQC.MQGMO_SYNCPOINT;
gmo.WaitInterval = MQC.MQWI_UNLIMITED;
MQMessage receiveMsg;

while (keepRunning.WaitOne(0))
{
   receiveMsg = new MQMessage();

   try
   {
      queue.Get(receiveMsg, gmo);
      if (receiveMsg.MessageType != ShutDown.TYPE)
      {
          object o = receiveMsg.ReadObject();
          Console.WriteLine("The message is: {0}", o);
      }

      qMgr.Commit();
   }
   catch (MQException mqexe)
   {
      qMgr.Backout();
      throw mqexe;
   }
}