如何避免过多的 IBM MQ 通道实例?

How to avoid too many IBM MQ channel instances?

我构建了一个 Java 小程序,它使用 MQ 客户端 类 与 Websphere MQ 服务器和其上的队列通信。我想要的是定期查询某个队列的大小、错误深度,例如每 10 秒一次。

我所做的,示意性的:

这很有魅力!

出于懒惰,我编写的代码每次都要执行上述序列的所有 6 个步骤。所以我最终每 10 秒重复一次这个循环。在大约半小时内,监控服务器的人员告诉我,我在服务器上打开了 153 个实例。显然,仅在队列上执行 close() 和 QM 不足以清理我自己。

我将进行明显的修复并在程序的生命周期内保留 QM 和队列。但是有人能告诉我为什么我以前的方法会泄漏资源吗?如果我选择沿着这条路我能做些什么?


更新 (2017-01-25)

时机接受

Roger 提供了一些漂亮的代码和一些关于性能和未提交消息主题的有用解释。这很酷,但 Eugene 的回答完全(并且最低限度)足以解决我的问题,而且他的回答更快一些。在我根据 Eugene 的建议将 close() 更改为 disconnect() 之后,我的小应用程序现在正在做我想要的事情。现在我有点纠结谁应该得到复选标记。向你们道歉!

我得到了我想要的东西,我想我已经完成了这个问题,但我应该补充一点,我感谢 Roger 深思熟虑的回答,因为它为这个问题的任何可能的未来其他 reader 提供了有用的信息.希望其他人会听他的,而不是跟随我的领导。

性能

老实说,我根本不在乎性能。服务器比它当前正在做的事情需要的要胖得多,而且我的监控客户端(这个问题的主题)在通常是空的开发机器上运行。我希望它能在一两天内发挥作用,然后我会把它扔掉。与此同时,如果每 10 秒连接一次导致性能问题,他们应该将负载平衡到 Raspberry Pi 或其他东西。

未提交的消息

我认为这也不是问题。发送应用程序在几毫秒内以大约 1-5 条消息的小批量排队并立即提交。接收应用程序(我写的一个客户端,它很可能有问题)不断地监听队列,接收并确认(提交)每条消息。所以我有理由相信永远不会超过 1 个,可能是 5 个未确认的消息。与...

的参数相比,这个数字是微不足道的

我的实际问题,

这涉及每天早上大约同一时间处理 5 到 20 分钟之间的少量到数百条消息。我怀疑我的接收客户端被 "hanging" 网络 IO 操作停滞了,除了 MQS。在探索这一点时,我想采取的一个步骤是证明 MQS 上确实有一个等待消息队列,并且没有被拾取。我想看看这个队列何时开始增加,它在那里停留了多长时间,以及一旦我的客户再次醒来它会多快消退。

此连接每天接收 2,000 到 10,000 条消息,峰值对应于下午和晚上的批处理,但延迟仅在早上发生,通常实际通过的数量很少。我想查看队列大小的日志,以便更好地了解正在发生的事情。下一步,我可能需要在我的接收客户端中进行更多检测。

环境

就其价值而言,服务器是 运行 WSMQ 6,所以我的客户端使用阻塞和等待队列获取,而不是我更喜欢使用的漂亮的异步通知过程。我的接收客户端是虚拟机中 RHEL 下的独立 C 应用程序 运行。

我认为你真的应该在 finally 块中使用 disconnect 而不是关闭。

哇!不是很有效的代码。 connect 是 FAR 最昂贵的 MQ API 调用。另外,获取队列深度是没有意义的。它为您提供队列中所有未提交和已提交的消息。即队列深度可能表示队列中有 5 条消息,但您的程序可能只检索 3 条!!您会说 MQ 已损坏,但实际上有 2 条消息未提交,因此对消费者不可用。

如果您真的想每 10 秒获取并记录一次信息,请保持连接。

这里有一个更好的方法:

boolean working = true;
int openOptions  = CMQC.MQOO_INQUIRE + CMQC.MQOO_FAIL_IF_QUIESCING;
int depth = 0;
MQQueueManager qm = null;
MQQueue q = null;

try
{
   qm = new MQQueueManager("MQA1");

   while (working)
   {
      try
      {
         q = qm.accessQueue("TEST.Q1", openOptions);
         depth = q.getCurrentDepth();
//         (do something with depth)
      }
      catch (MQException e)
      {
         System.out.println("CC = " + e.completionCode + " : RC=" + e.reasonCode);
         System.out.println(e.getLocalizedMessage() );
         working = false;
      }
      finally
      {
         if (q != null)
         {
            q.close();
            q = null;
         }
      }

      try
      {
         if (working)
            Thread.sleep(10*1000); // time in milliseconds
      }
      catch (InterruptedException ie) {}
   }
}
catch (MQException e)
{
   System.out.println("CC = " + e.completionCode + " : RC=" + e.reasonCode);
   System.out.println(e.getLocalizedMessage() );
}
finally
{
   try
   {
      if (q != null)
         q.close();
   }
   catch (MQException e)
   {
      System.out.println("CC = " + e.completionCode + " : RC=" + e.reasonCode);
      System.out.println(e.getLocalizedMessage() );
   }

   try
   {
      if (qm != null)
         qm.disconnect();
   }
   catch (MQException e)
   {
      System.out.println("CC = " + e.completionCode + " : RC=" + e.reasonCode);
      System.out.println(e.getLocalizedMessage() );
   }
}