每个集群节点上的 IBM MQ 和轮询
IBM MQ and Polling on each cluster node
去年我开发了一个队列监视器,它使用 System.Reactive.Linq 检查 IBM MQ 总线上是否有消息
代码如下
public class QueueMonitor : IObservable<Message>, IDisposable
{
private string queueName;
private readonly MQQueue mqQueue;
private readonly MQQueueManager queueManager;
private readonly IDisposable timer;
private readonly object lockObj = new object();
private bool isChecking;
private readonly List<IObserver<Message>> observers;
public QueueMonitor(MQQueueManager queueManager, string queueName)
{
this.queueName = queueName;
this.queueManager = queueManager;
observers = new List<IObserver<Message>>();
mqQueue = queueManager.AccessQueue(queueName,
MQC.MQOO_INPUT_AS_Q_DEF // open queue for input
+ MQC.MQOO_FAIL_IF_QUIESCING); // but not if MQM stopping
timer = Observable.Interval(TimeSpan.FromSeconds(5)).Subscribe(_ =>
{
lock (lockObj)
{
if (!isChecking)
{
isChecking = true;
var mqMsg = new MQMessage();
var mqGetMsgOpts = new MQGetMessageOptions {WaitInterval = 1};
// 15 second limit for waiting
mqGetMsgOpts.Options |= MQC.MQGMO_WAIT;
try
{
mqQueue.Get(mqMsg, mqGetMsgOpts);
if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
{
var text = mqMsg.ReadString(mqMsg.MessageLength);
System.Console.WriteLine(text);
Message message = new Message { Content = text };
foreach (var observer in observers)
observer.OnNext(message);
}
else
{
System.Console.WriteLine("Non-text message");
}
}
catch (MQException ex)
{
if ((ex.Message == "MQRC_NO_MSG_AVAILABLE"))
{
//nothing to do, emtpy queue
}
else
{
//log
}
}
finally
{
isChecking = false;
}
}
}
});
}
public IDisposable Subscribe(IObserver<Message> observer)
{
if (!observers.Contains(observer))
observers.Add(observer);
return new Unsubscriber(observers, observer);
}
public void Dispose()
{
((IDisposable)mqQueue)?.Dispose();
((IDisposable)queueManager)?.Dispose();
timer?.Dispose();
}
}
public class Unsubscriber : IDisposable
{
private readonly List<IObserver<Message>> _observers;
private readonly IObserver<Message> _observer;
public Unsubscriber(List<IObserver<Message>> observers, IObserver<Message> observer)
{
this._observers = observers;
this._observer = observer;
}
public void Dispose()
{
if (_observer != null) _observers.Remove(_observer);
}
}
这工作了将近一年,但现在有两件事需要解决,希望你能帮助我把它做好。
1)如果重启IBMMQ,目前QueueMonitor没有收到新的传入消息,需要重启。
我该如何处理?不知道Monitor端有没有重启IBM MQ
2)更复杂。我们正在迁移到新的平衡 IBMMQ 集群。它有 4 个活动节点配置为活动。它们都在负载平衡器后面,所以当我在总线上放置一条消息时,我将它发送到一个地址。
发送消息,很简单。我遇到的问题是何时需要从队列中读取。因为有 4 个不同的 IBMMQ 节点,有 4 个 IP。我怎么知道总线上已经发送了一条消息?我不能简单地听平衡器,因为它不会有通知。我应该 ping 4 个节点吗?
平衡器是netscaler。
提前致谢
在 IBM MQ 集群中,您可以连接到集群中的任何队列管理器并推送消息,MQ 确保消息到达其目标队列,即使该队列未在您连接的队列管理器上本地定义在执行放置操作时。当应用程序从队列中执行获取消息操作时,您必须连接到队列管理器,该队列管理器在本地定义了目标队列以选取消息。所以,是的,在你的情况下,你必须连接到每个单独的 4 个队列管理器来选择消息。
由于您正在迁移到新的 MQ 架构,我建议您查看具有 RDQM 概念的 IBM MQ v9.1 Advanced。 RDQM(复制数据队列管理器)是一种高可用性解决方案,目前可在 Linux 平台上使用。
在 link 下方找到:
https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q130280_.htm
去年我开发了一个队列监视器,它使用 System.Reactive.Linq 检查 IBM MQ 总线上是否有消息
代码如下
public class QueueMonitor : IObservable<Message>, IDisposable
{
private string queueName;
private readonly MQQueue mqQueue;
private readonly MQQueueManager queueManager;
private readonly IDisposable timer;
private readonly object lockObj = new object();
private bool isChecking;
private readonly List<IObserver<Message>> observers;
public QueueMonitor(MQQueueManager queueManager, string queueName)
{
this.queueName = queueName;
this.queueManager = queueManager;
observers = new List<IObserver<Message>>();
mqQueue = queueManager.AccessQueue(queueName,
MQC.MQOO_INPUT_AS_Q_DEF // open queue for input
+ MQC.MQOO_FAIL_IF_QUIESCING); // but not if MQM stopping
timer = Observable.Interval(TimeSpan.FromSeconds(5)).Subscribe(_ =>
{
lock (lockObj)
{
if (!isChecking)
{
isChecking = true;
var mqMsg = new MQMessage();
var mqGetMsgOpts = new MQGetMessageOptions {WaitInterval = 1};
// 15 second limit for waiting
mqGetMsgOpts.Options |= MQC.MQGMO_WAIT;
try
{
mqQueue.Get(mqMsg, mqGetMsgOpts);
if (mqMsg.Format.CompareTo(MQC.MQFMT_STRING) == 0)
{
var text = mqMsg.ReadString(mqMsg.MessageLength);
System.Console.WriteLine(text);
Message message = new Message { Content = text };
foreach (var observer in observers)
observer.OnNext(message);
}
else
{
System.Console.WriteLine("Non-text message");
}
}
catch (MQException ex)
{
if ((ex.Message == "MQRC_NO_MSG_AVAILABLE"))
{
//nothing to do, emtpy queue
}
else
{
//log
}
}
finally
{
isChecking = false;
}
}
}
});
}
public IDisposable Subscribe(IObserver<Message> observer)
{
if (!observers.Contains(observer))
observers.Add(observer);
return new Unsubscriber(observers, observer);
}
public void Dispose()
{
((IDisposable)mqQueue)?.Dispose();
((IDisposable)queueManager)?.Dispose();
timer?.Dispose();
}
}
public class Unsubscriber : IDisposable
{
private readonly List<IObserver<Message>> _observers;
private readonly IObserver<Message> _observer;
public Unsubscriber(List<IObserver<Message>> observers, IObserver<Message> observer)
{
this._observers = observers;
this._observer = observer;
}
public void Dispose()
{
if (_observer != null) _observers.Remove(_observer);
}
}
这工作了将近一年,但现在有两件事需要解决,希望你能帮助我把它做好。
1)如果重启IBMMQ,目前QueueMonitor没有收到新的传入消息,需要重启。
我该如何处理?不知道Monitor端有没有重启IBM MQ
2)更复杂。我们正在迁移到新的平衡 IBMMQ 集群。它有 4 个活动节点配置为活动。它们都在负载平衡器后面,所以当我在总线上放置一条消息时,我将它发送到一个地址。
发送消息,很简单。我遇到的问题是何时需要从队列中读取。因为有 4 个不同的 IBMMQ 节点,有 4 个 IP。我怎么知道总线上已经发送了一条消息?我不能简单地听平衡器,因为它不会有通知。我应该 ping 4 个节点吗?
平衡器是netscaler。
提前致谢
在 IBM MQ 集群中,您可以连接到集群中的任何队列管理器并推送消息,MQ 确保消息到达其目标队列,即使该队列未在您连接的队列管理器上本地定义在执行放置操作时。当应用程序从队列中执行获取消息操作时,您必须连接到队列管理器,该队列管理器在本地定义了目标队列以选取消息。所以,是的,在你的情况下,你必须连接到每个单独的 4 个队列管理器来选择消息。
由于您正在迁移到新的 MQ 架构,我建议您查看具有 RDQM 概念的 IBM MQ v9.1 Advanced。 RDQM(复制数据队列管理器)是一种高可用性解决方案,目前可在 Linux 平台上使用。
在 link 下方找到: https://www.ibm.com/support/knowledgecenter/SSFKSJ_9.1.0/com.ibm.mq.con.doc/q130280_.htm