queueBrowser.GetEnumerator() 没有收到任何消息 | IBM MQ XMS C# 客户端

queueBrowser.GetEnumerator() doesnt get any messages | IBM MQ XMS C# client

我们正在使用 IBM MQ XMS C# 客户端版本 9.0(在 .NET 4.6.2 框架上)与 IBM MQ 一起工作。我只需要知道给定队列中的所有消息,而无需将它们从队列中删除。

我们还为队列设置了消费者。需要消费者和浏览器协同工作。浏览器不应删除消息,但仍需要获取所有消息。


所以我设置了如下所示的 QueueBrowser,但是 queueBrowser.GetEnumerator() 根本收不到消息。

使用相同的代码,如果创建一个 MessageConsumer 并附加一个侦听器,它将获取发布到队列中的消息。所以仅针对 QueueBrowser 问题。

谁能指出为什么会这样。为什么 queueEnumerator.MoveNext() 总是 return false,表示队列中没有消息。

       XMSFactoryFactory  xMSFactoryFactory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);

        // Create WMQ Connection Factory.
        IConnectionFactory  connectionFactory = xMSFactoryFactory.CreateConnectionFactory();

        connectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, "hostname");
        connectionFactory.SetIntProperty(XMSC.WMQ_PORT, portNumber);
        connectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, "channelName");
        connectionFactory.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
        connectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "QueueManagerName");

        // Create connection.
        connectionWMQ = connectionFactory.CreateConnection();
        connectionWMQ.ExceptionListener = new ExceptionListener(OnXMSException);

        // Create session
        ISession sessionWMQ = connectionWMQ.CreateSession(false, AcknowledgeMode.AutoAcknowledge);

        IDestination destination = sessionWMQ.CreateQueue("QueueName"); 


       IQueueBrowser  queueBrowser = sessionWMQ.CreateBrowser(destination);

        connectionWMQ.Start();

         Thread thread = new Thread(KeepBrowsingMessaegs);
            thread.Start();

   --end of the method

   private void KeepBrowsingMessaegs()
    {
      IEnumerator queueEnumerator = queueBrowser.GetEnumerator();
        while (!cancellationTokenSource.IsCancellationRequested)
        {

            if (queueEnumerator.MoveNext())
            {
                ITextMessage textMessage = queueEnumerator.Current as ITextMessage;
                if (textMessage != null)
                {
                    System.Diagnostics.Trace.Write(textMessage);
                }
            }
        }
    }

OP 在评论中提到了以下内容:我们有消费者 运行 将消息作为业务案例的一部分阅读,并希望设置浏览器以便我们只获取消息的副本我们可以在单独的应用程序中跟踪它以记录收到的所有消息。

我在下面提供了一些选项。


选项 1

不要单独的应用程序只是为了记录收到的消息,而是让您的 "business case" 应用程序记录消息作为处理的一部分。


选项 2

设置两个队列,第一个将接收入站消息并由记录消息的应用程序使用,然后将其副本放入第二个队列,该队列将由您的 "business case" 应用程序使用.


选项 3

使用 IBM MQ pub/sub 功能创建消息的两个副本,其中一个将由记录消息的应用程序使用,另一个将由您的 "business case" 应用程序使用.请注意,这只会复制消息正文,不会复制 MQ 消息描述符 (MQMD),它是有关消息的元数据(例如放置时间、放置日期、消息 ID、用户 ID)。许多应用程序不查看 MQMD,因此这对您来说可能不是问题。

设置如下所示:

为入站消息创建一个QALIAS,这个别名将通过指定目标是一个主题对象来指向一个主题字符串:

DEFINE QALIAS(INBOUND.QUEUE) TARGET(INBOUND.TOPIC) TARGTYPE(TOPIC)

定义 TOPIC 对象:

DEFINE TOPIC(INBOUND.TOPIC) TOPICSTR(INBOUND/TOPIC)

创建两个 QLOCAL 对象,一个供记录消息的应用程序使用,另一个供您的 "business case" 应用程序使用:

DEFINE QLOCAL(INBOUND.QUEUE.LOGGER)
DEFINE QLOCAL(INBOUND.QUEUE.PROCESSOR)

定义两个管理 SUBSCRIPTION 对象以将两个队列订阅到主题主题字符串:

DEFINE SUB(INBOUND.QUEUE.LOGGER.SUB) TOPICSTR(INBOUND/TOPIC) DEST(INBOUND.QUEUE.LOGGER)
DEFINE SUB(INBOUND.QUEUE.PROCESSOR.SUB) TOPICSTR(INBOUND/TOPIC) DEST(INBOUND.QUEUE.PROCESSOR)

上述设置的结果是,每条放入名为 INBOUND.QUEUE 的队列的消息都会有一个副本发布到两个队列 INBOUND.QUEUE.LOGGERINBOUND.QUEUE.PROCESSOR


选项 4

您可以设置三个 QLOCAL 对象,一个是入站消息的队列,您可以让一个程序从该队列中读取消息,然后将消息的副本写入其他两个队列。 Capitalware 维护了一个名为 Message Multiplexer (MMX) 的开源工具,它可以从源队列读取并写入一个或多个队列,这确实复制了 MQMD 以及消息正文。

设置如下所示:

创建三个 QLOCAL 对象,第一个将被一个应用程序使用,该应用程序将复制到第二个将被记录消息的应用程序使用,第三个将被您的 "business case" 申请:

DEFINE QLOCAL(INBOUND.QUEUE)
DEFINE QLOCAL(INBOUND.QUEUE.LOGGER)
DEFINE QLOCAL(INBOUND.QUEUE.PROCESSOR)

选项 5

有一种名为 MQ Message Replication (MQMR) 的商业产品可以使用 MQ API 出口将发送到 "business case" 应用程序队列的消息精确复制到一个或多个其他队列.我没有亲自使用过它,但设置将只有两个 QLOCAL 队列,第一个将由您的 "business case" 应用程序使用,第二个将由记录消息的应用程序使用。

DEFINE QLOCAL(INBOUND.QUEUE)
DEFINE QLOCAL(INBOUND.QUEUE.LOGGER)

我只是稍微调整了你的代码,我可以浏览消息了。

namespace XmsBrowser
{
    class BrowseMessages
    {
        IConnection connectionWMQ;
        IQueueBrowser queueBrowser;

    static void Main(string[] args)
    {
        BrowseMessages pgm = new BrowseMessages();
        pgm.browseMessage();
    }

    private void browseMessage()
    {
        try
        {
            XMSFactoryFactory xMSFactoryFactory = XMSFactoryFactory.GetInstance(XMSC.CT_WMQ);

            // Create WMQ Connection Factory.
            IConnectionFactory connectionFactory = xMSFactoryFactory.CreateConnectionFactory();

            connectionFactory.SetStringProperty(XMSC.WMQ_HOST_NAME, "localhost");
            connectionFactory.SetIntProperty(XMSC.WMQ_PORT, 1414);
            connectionFactory.SetStringProperty(XMSC.WMQ_CHANNEL, "QM_SVRCONN_CHANNEL");
            connectionFactory.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
            connectionFactory.SetStringProperty(XMSC.WMQ_QUEUE_MANAGER, "QMDEMO");

            // Create connection.
            connectionWMQ = connectionFactory.CreateConnection();
            //connectionWMQ.ExceptionListener = new ExceptionListener(OnXMSException);

            // Create session
            ISession sessionWMQ = connectionWMQ.CreateSession(false, AcknowledgeMode.AutoAcknowledge);

            IDestination destination = sessionWMQ.CreateQueue("Q1");

            queueBrowser = sessionWMQ.CreateBrowser(destination);

            Thread thread = new Thread(KeepBrowsingMessaegs);
            thread.Start();
            connectionWMQ.Start();
            thread.Join();
        }
        catch (Exception ex)
        {
            Console.WriteLine(ex);
        }
        Console.ReadKey();
    }

    private void KeepBrowsingMessaegs()
    {
        IEnumerator queueEnumerator = queueBrowser.GetEnumerator();
        while (true)
        {
            if (queueEnumerator.MoveNext())
            {
                ITextMessage textMessage = queueEnumerator.Current as ITextMessage;
                if (textMessage != null)
                {
                    System.Diagnostics.Trace.Write(textMessage);
                }
            }else
            {
                break;
            }
        }
       }
    }
}