每个集群节点上的 IBM MQ 和轮询

IBM MQ and Polling on each cluster node

去年我开发了一个队列监视器,它使用 System.Reactive.Linq 检查 IBM MQ 总线上是否有消息

代码如下

 public class QueueMonitor : IObservable<Message>, IDisposable
{
    private string queueName;
    private readonly MQQueue mqQueue;
    private readonly MQQueueManager queueManager;

    private readonly IDisposable timer;
    private readonly object lockObj = new object();
    private bool isChecking;

    private readonly List<IObserver<Message>> observers;

    public QueueMonitor(MQQueueManager queueManager, string queueName)
    {
        this.queueName = queueName;
        this.queueManager = queueManager;

        observers = new List<IObserver<Message>>();

        mqQueue = queueManager.AccessQueue(queueName,
            MQC.MQOO_INPUT_AS_Q_DEF // open queue for input
            + MQC.MQOO_FAIL_IF_QUIESCING); // but not if MQM stopping

        timer = Observable.Interval(TimeSpan.FromSeconds(5)).Subscribe(_ =>
        {
            lock (lockObj)
            {
                if (!isChecking)
                {
                    isChecking = true;

                    var mqMsg = new MQMessage();
                    var mqGetMsgOpts = new MQGetMessageOptions {WaitInterval = 1};

                    // 15 second limit for waiting
                    mqGetMsgOpts.Options |= MQC.MQGMO_WAIT;
                    try
                    {
                        mqQueue.Get(mqMsg, mqGetMsgOpts);
                        if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
                        {
                            var text = mqMsg.ReadString(mqMsg.MessageLength);
                            System.Console.WriteLine(text);

                            Message message = new Message { Content = text };
                            foreach (var observer in observers)
                                observer.OnNext(message);
                        }
                        else
                        {
                            System.Console.WriteLine("Non-text message");
                        }
                    }
                    catch (MQException ex)
                    {
                        if ((ex.Message == "MQRC_NO_MSG_AVAILABLE"))
                        {
                            //nothing to do, emtpy queue
                        }
                        else
                        {
                            //log
                        }
                    }
                    finally
                    {
                        isChecking = false;
                    }
                }
            }
        });

    }

    public IDisposable Subscribe(IObserver<Message> observer)
    {
        if (!observers.Contains(observer))
            observers.Add(observer);

        return new Unsubscriber(observers, observer);
    }

    public void Dispose()
    {
        ((IDisposable)mqQueue)?.Dispose();
        ((IDisposable)queueManager)?.Dispose();

        timer?.Dispose();
    }
}

public class Unsubscriber : IDisposable
{
    private readonly List<IObserver<Message>> _observers;
    private readonly IObserver<Message> _observer;

    public Unsubscriber(List<IObserver<Message>> observers, IObserver<Message> observer)
    {
        this._observers = observers;
        this._observer = observer;
    }

    public void Dispose()
    {
        if (_observer != null) _observers.Remove(_observer);
    }
}

这工作了将近一年,但现在有两件事需要解决,希望你能帮助我把它做好。

1)如果重启IBMMQ,目前QueueMonitor没有收到新的传入消息,需要重启。

我该如何处理?不知道Monitor端有没有重启IBM MQ

2)更复杂。我们正在迁移到新的平衡 IBMMQ 集群。它有 4 个活动节点配置为活动。它们都在负载平衡器后面,所以当我在总线上放置一条消息时,我将它发送到一个地址。

发送消息,很简单。我遇到的问题是何时需要从队列中读取。因为有 4 个不同的 IBMMQ 节点,有 4 个 IP。我怎么知道总线上已经发送了一条消息?我不能简单地听平衡器,因为它不会有通知。我应该 ping 4 个节点吗?

平衡器是netscaler。

提前致谢

在 IBM MQ 集群中,您可以连接到集群中的任何队列管理器并推送消息,MQ 确保消息到达其目标队列,即使该队列未在您连接的队列管理器上本地定义在执行放置操作时。当应用程序从队列中执行获取消息操作时,您必须连接到队列管理器,该队列管理器在本地定义了目标队列以选取消息。所以,是的,在你的情况下,你必须连接到每个单独的 4 个队列管理器来选择消息。

由于您正在迁移到新的 MQ 架构,我建议您查看具有 RDQM 概念的 IBM MQ v9.1 Advanced。 RDQM(复制数据队列管理器)是一种高可用性解决方案,目前可在 Linux 平台上使用。

在 link 下方找到: https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q130280_.htm