消费者回调:哪个队列是空的?

Consumer callback: which queue is empty?

我正在使用 MQCB 函数为我正在读取的队列添加消息使用者回调函数。我正在尝试从同一连接上的两个队列中读取数据,它在接收消息时似乎工作正常:我的回调函数获取从中接收消息的队列的对象句柄。

但是,当我收到一个 MQRC_NO_MSG_AVAILABLE 事件时(因为我在我的消费者上设置了 MQGMO_WAIT),对象句柄是 MQHO_NONE,所以我无法分辨哪个队列事件指的是。我可以通过将对象句柄放在回调上下文中来解决这个问题,但这是应该完成的方式吗?还是我在这里遗漏了一些明显的东西?

我正在 Linux 上连接到队列管理器 运行 8.0.0.2 版,使用 C 客户端库 8.0.0.5 版,同样在 Linux 上。这是我的示例程序的输出,显示对象句柄为 0:

Opened queue 'AMQ.5A55ED982D616602                            ' with handle 101
Opened queue 'AMQ.5A55ED982D616603                            ' with handle 102
Completion code MQCC_FAILED, reason MQRC_NO_MSG_AVAILABLE, object handle 0
Completion code MQCC_FAILED, reason MQRC_NO_MSG_AVAILABLE, object handle 0

以及程序本身:

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <cmqc.h>
#include <cmqxc.h>
#include <cmqstrc.h>

void my_message_consumer(MQHCONN, PMQMD, PMQGMO, PMQVOID, PMQCBC);

volatile unsigned events_received = 0;

void
check_fail(const char *action, MQLONG comp_code, MQLONG reason)
{
  if (comp_code != MQCC_OK) {
    fprintf(stderr, "%s failed with %s %s\n",
            action, MQCC_STR(comp_code), MQRC_STR(reason));
    exit(1);
  }
}

int
main()
{
  MQHCONN hconn;
  MQHOBJ hobj1, hobj2;
  MQOD od = {MQOD_DEFAULT};
  char queue_name[MQ_Q_NAME_LENGTH + 1];
  MQLONG c, r;

  MQCONN("", &hconn, &c, &r);
  check_fail("MQCONN", c, r);

  /* Open two dynamic queues */
  strcpy(od.ObjectName, "SYSTEM.DEFAULT.MODEL.QUEUE");
  MQOPEN(hconn, &od, MQOO_INPUT_EXCLUSIVE, &hobj1, &c, &r);
  check_fail("MQOPEN", c, r);

  strncpy(queue_name, od.ObjectName, MQ_Q_NAME_LENGTH);
  queue_name[MQ_Q_NAME_LENGTH] = '[=11=]';
  printf("Opened queue '%48s' with handle %d\n", queue_name, hobj1);

  strcpy(od.ObjectName, "SYSTEM.DEFAULT.MODEL.QUEUE");
  MQOPEN(hconn, &od, MQOO_INPUT_EXCLUSIVE, &hobj2, &c, &r);
  check_fail("MQOPEN", c, r);

  strncpy(queue_name, od.ObjectName, MQ_Q_NAME_LENGTH);
  queue_name[MQ_Q_NAME_LENGTH] = '[=11=]';
  printf("Opened queue '%48s' with handle %d\n", queue_name, hobj2);

  /* Add a callback with zero WaitInterval for both queues */
  MQMD md = {MQMD_DEFAULT};
  MQGMO gmo = {MQGMO_DEFAULT};
  MQCBD cbd = {MQCBD_DEFAULT};
  gmo.Options = MQGMO_NO_SYNCPOINT | MQGMO_WAIT;
  gmo.WaitInterval = 0;
  cbd.CallbackType = MQCBT_MESSAGE_CONSUMER;
  cbd.CallbackFunction = &my_message_consumer;

  MQCB(hconn, MQOP_REGISTER, &cbd, hobj1, &md, &gmo, &c, &r);
  check_fail("MQCB", c, r);
  MQCB(hconn, MQOP_REGISTER, &cbd, hobj2, &md, &gmo, &c, &r);
  check_fail("MQCB", c, r);

  /* Start consuming */
  MQCTLO ctlo = {MQCTLO_DEFAULT};
  MQCTL(hconn, MQOP_START, &ctlo, &c, &r);
  check_fail("MQCTL start", c, r);

  /* Wait until events received */
  while (events_received < 2)
    sleep(1);

  return 0;
}

void
my_message_consumer(MQHCONN hconn, PMQMD md, PMQGMO gmo,
                    PMQVOID buffer, PMQCBC context)
{
  printf("Completion code %s, reason %s, object handle %d\n",
     MQCC_STR(context->CompCode), MQRC_STR(context->Reason),
     context->Hobj);
  events_received++;
}

编译它:

gcc -o mq-no-msg mq-no-msg.c -g -Wall -I/opt/mqm/inc -L/opt/mqm/lib64 -lmqic_r -Wl,-rpath=/opt/mqm/lib64

并在运行之前设置MQSERVER环境变量。

我从几个来源找到了有关此主题的信息,将这些信息放在一起描绘了整个画面(不幸的是,IBM 的 MQ KC 至少可以说并没有很好地记录这一点)。

  1. 在 Capitalware 的 MQ 技术会议 v2.0.1.3 上,Morag Hughson 做了一个演讲WebSphere MQ V7 Enhanced Application Programming,其中包含一些有用的信息。

    在第六页上指出:

    • Your message consumer can also be called with CallType set to MQCBCT_EVENT_CALL (this is also the only way an Event handler can be called). The message consumer will be given events that are pertinent to the queue it is consuming from, for example, MQRC_GET_INHIBITED whereas the event handler gets connection wide events.
  2. 在 IBM MQ v8 KC 页面 MQCBC - Callback context > Fields for MQCBC > Hobj (MQHOBJ) 中指出:

    For an event handler, this value is MQHO_NONE

  3. IBM 提供的示例 amqscbf0.c 还演示了检查 MessageConsumer 中的 pContext->CallType,如果它是类型 MQCBCT_EVENT_CALL,它会打印 Reason,如果它是 MQCBCT_MSG_REMOVED 类型,它会打印消息。


根据以上信息,您看到的行为似乎是预期的行为。


建议的解决方法是为 CallbackArea field of each queue's MQCBD 设置一个唯一值,您可以使用该值来确定事件所指的队列。


在莫拉格演讲的第四页 "WebSphere MQ V7 Enhanced Application Programming" 中陈述如下:

  • MQGMO_WAIT with MQGMO.WaitInterval = 0 operates just like MQGMO_NO_WAIT when one uses on an MQGET, but in the case of asynchronous consumers we wish to avoid the consumer from polling in a busy loop in this case, so it operates more like a backstop marker to show when the end of a batch of messages has been reached.

在同一页面的 table 中,它在 MQGMO_WAIT with MQGMO.WaitInterval = 0 的异步消耗列下指出:

Only called with MQRC_NO_MSGS_AVAILABLE if just started or has had a message since last 2033

您的消费者不会持续收到通知它队列中没有消息的事件。只有在每次从队列中读取 (GET) ALL 消息后首次启动回调 and/or 时没有消息时才会生成事件。本质上,它让您知道在它至少阅读了一条消息后,当前没有更多消息可用。如果您期望成批消息并希望在从队列中读取一批中的所有消息后执行某些操作,这可能很有用。

  • Note that MQGMO_NO_WAIT, and MQGMO_WAIT with a WaitInterval of MQWI_UNLIMITED are quite different when passed to MQGET but with the MQCB call their behaviour is the same. The consumer will only be passed messages and events, it will never be passed the reason code indicating no messages. Effectively MQGMO_NO_WAIT will be treated as an indefinite wait. This is to prevent the consumer from endlessly being called with the no messages reason code.

如果您真的不需要 MQRC_NO_MSG_AVAILABLE 事件消息,那么 MQGMO_NO_WAIT 可能是正确的选择。

当您收到 MQRC_NO_MSG_AVAILABLE 作为事件时,这意味着 any 队列中没有消息(符合您的条件,如果您指定了任何条件)你注册了。因此在这种情况下无需为回调提供任何特定的 HObj。