连接失败时重新连接到 IBM MQ Queue
Reconnecting to IBM MQ Queue on connection failure
以下代码片段包含我对 IBM MQ 队列的连接和订阅逻辑。当连接失败时,我使用 IConnection.ExceptionListener 委托通过队列建立新连接并重新订阅消息。但问题是,我可以看到多个队列句柄。如果由于网络问题或 MQ 服务器重启而导致连接中断,我如何确保关闭之前的连接句柄并建立新连接?
private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;
private void CreateWebsphereQueueConnection () {
SetConnectionFactory ();
//Connection
_connection = _connectionfactory.CreateConnection (null, null);
_connection.ExceptionListener = new ExceptionListener (OnConnectionException);
//Session
_session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);
//Destination
_destination = _session.CreateQueue ("queue://My.Queue.Name");
_destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
_destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);
//Consumer
_consumer = _session.CreateConsumer (_destination);
}
private IConnectionFactory SetConnectionFactory () {
XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();
// Set the properties
cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);
cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
return cf;
}
public override void Subscribe<T> (Action<T> onMessageReceived) {
try {
_connection.ExceptionListener = delegate (Exception connectionException) {
//Using any of these two statements is termination my code. Debugger doesn't move to CreateWebsphereQueueConnection() line of code at all
//_conection.Stop()
//_conection.Close()
CreateWebsphereQueueConnection ();
Subscribe (onMessageReceived);
};
MessageListener messageListener = new MessageListener ((msg) => {
onMessageReceived (message);
});
_consumer.MessageListener = messageListener;
// Start the connection
_connection.Start ();
} catch (Exception ex) {
//Log exception details
}
}
您已经在连接工厂上设置了重新连接选项。当与队列管理器的连接中断时,XMS 库将自动重新连接,除非队列管理器在没有 -r 或 -s 选项的情况下关闭。因此您的应用程序不需要显式重新连接。拥有异常侦听器将有助于了解重新连接过程的情况。
为了我的服务,我将 CreateWebsphereQueueConnection()
和 Subscribe<T>()
合并为一个 Connect()
方法。
有:
connectionFactory.SetIntProperty(XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, XMSC.WMQ_CLIENT_RECONNECT);
connectionFactory.SetIntProperty(XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, 3600);
connectionFactory.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT_UNMANAGED);
var queueConnection = connectionFactory.CreateConnection();
queueConnection.ExceptionListener = OnException;
然后我这样处理异常:
private void OnException(Exception exception)
{
Policy.Handle<Exception>()
.WaitAndRetryForever(retryAttempt => TimeSpan.FromSeconds(5), (ex, timespan) =>
{
_logger.Warning($"Unable to connect: {ex.Message}.");
})
.Execute(CreateWebsphereQueueConnection);
}
这对 retry 很重要,因为您不知道需要多长时间才能重新连接。
IBM.XMS.dll 将负责 MQ 故障转移或使用 -r 开关重新启动。但是,如果重新启动时没有要求连接的客户端重新连接,XMS 库将不会尝试重新连接,客户将不得不手动处理这种情况,正如@Shashi 和@JoshMc 所指出的那样。
我不得不处理这种情况,如下更改我的 Connection ExceptionListener 帮助了我:
private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;
private bool _reConnectOnConnectionBreak = false;
private bool _connected = false;
private void CreateWebsphereQueueConnection () {
SetConnectionFactory ();
while (!_connected || _reConnectOnConnectionBreak) {
try {
//Connection
_connection = _connectionfactory.CreateConnection (null, null);
_connection.ExceptionListener = new ExceptionListener (OnConnectionException);
//Session
_session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);
//Destination
_destination = _session.CreateQueue ("queue://My.Queue.Name");
_destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
_destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);
//Consumer
_consumer = _session.CreateConsumer (_destination);
_connected = true;
} catch (Exception ex) {
_connected = false;
}
}
}
private IConnectionFactory SetConnectionFactory () {
XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();
// Set the properties
cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);
cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
return cf;
}
public override void Subscribe<T> (Action<T> onMessageReceived) {
try {
_connection.ExceptionListener = delegate (Exception connectionException) {
XMSException xmsError = (XMSException) connectionException;
int reasonCode = ((IBM.WMQ.MQException) (xmsError).LinkedException).ReasonCode;
if (reasonCode == MQC.MQRC_Q_MGR_QUIESCING || reasonCode == MQC.MQRC_CONNECTION_BROKEN) {
_reConnectOnConnectionBreak = true;
_connection.Close ();
CreateWebsphereQueueConnection ();
Subscribe (onMessageReceived);
_reConnectOnConnectionBreak = false;
}
}
MessageListener messageListener = new MessageListener ((msg) => {
onMessageReceived (message);
});
_consumer.MessageListener = messageListener;
// Start the connection
_connection.Start ();
} catch (Exception ex) {
//Log exception details
}
}
在 IBM MQ 版本 8 中没有更好的方法来检查连接 IConnection 的状态。因此,我不得不使用原因代码。在 IBM MQ 版本 9 中,我们可以使用服务器公开的其余 API 来检查连接状态。
以下代码片段包含我对 IBM MQ 队列的连接和订阅逻辑。当连接失败时,我使用 IConnection.ExceptionListener 委托通过队列建立新连接并重新订阅消息。但问题是,我可以看到多个队列句柄。如果由于网络问题或 MQ 服务器重启而导致连接中断,我如何确保关闭之前的连接句柄并建立新连接?
private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;
private void CreateWebsphereQueueConnection () {
SetConnectionFactory ();
//Connection
_connection = _connectionfactory.CreateConnection (null, null);
_connection.ExceptionListener = new ExceptionListener (OnConnectionException);
//Session
_session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);
//Destination
_destination = _session.CreateQueue ("queue://My.Queue.Name");
_destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
_destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);
//Consumer
_consumer = _session.CreateConsumer (_destination);
}
private IConnectionFactory SetConnectionFactory () {
XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();
// Set the properties
cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);
cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
return cf;
}
public override void Subscribe<T> (Action<T> onMessageReceived) {
try {
_connection.ExceptionListener = delegate (Exception connectionException) {
//Using any of these two statements is termination my code. Debugger doesn't move to CreateWebsphereQueueConnection() line of code at all
//_conection.Stop()
//_conection.Close()
CreateWebsphereQueueConnection ();
Subscribe (onMessageReceived);
};
MessageListener messageListener = new MessageListener ((msg) => {
onMessageReceived (message);
});
_consumer.MessageListener = messageListener;
// Start the connection
_connection.Start ();
} catch (Exception ex) {
//Log exception details
}
}
您已经在连接工厂上设置了重新连接选项。当与队列管理器的连接中断时,XMS 库将自动重新连接,除非队列管理器在没有 -r 或 -s 选项的情况下关闭。因此您的应用程序不需要显式重新连接。拥有异常侦听器将有助于了解重新连接过程的情况。
为了我的服务,我将 CreateWebsphereQueueConnection()
和 Subscribe<T>()
合并为一个 Connect()
方法。
有:
connectionFactory.SetIntProperty(XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, XMSC.WMQ_CLIENT_RECONNECT);
connectionFactory.SetIntProperty(XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, 3600);
connectionFactory.SetIntProperty(XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT_UNMANAGED);
var queueConnection = connectionFactory.CreateConnection();
queueConnection.ExceptionListener = OnException;
然后我这样处理异常:
private void OnException(Exception exception)
{
Policy.Handle<Exception>()
.WaitAndRetryForever(retryAttempt => TimeSpan.FromSeconds(5), (ex, timespan) =>
{
_logger.Warning($"Unable to connect: {ex.Message}.");
})
.Execute(CreateWebsphereQueueConnection);
}
这对 retry 很重要,因为您不知道需要多长时间才能重新连接。
IBM.XMS.dll 将负责 MQ 故障转移或使用 -r 开关重新启动。但是,如果重新启动时没有要求连接的客户端重新连接,XMS 库将不会尝试重新连接,客户将不得不手动处理这种情况,正如@Shashi 和@JoshMc 所指出的那样。
我不得不处理这种情况,如下更改我的 Connection ExceptionListener 帮助了我:
private CancellationToken _cancellationToken;
private IConnection _connection;
private IConnectionFactory _connectionfactory;
private IMessageConsumer _consumer;
private IDestination _destination;
private MessageFormat _msgFormat;
private IMessageProducer _producer;
private ISession _session;
private bool _reConnectOnConnectionBreak = false;
private bool _connected = false;
private void CreateWebsphereQueueConnection () {
SetConnectionFactory ();
while (!_connected || _reConnectOnConnectionBreak) {
try {
//Connection
_connection = _connectionfactory.CreateConnection (null, null);
_connection.ExceptionListener = new ExceptionListener (OnConnectionException);
//Session
_session = _connection.CreateSession (false, AcknowledgeMode.AutoAcknowledge);
//Destination
_destination = _session.CreateQueue ("queue://My.Queue.Name");
_destination.SetIntProperty (XMSC.DELIVERY_MODE, 2);
_destination.SetIntProperty (XMSC.WMQ_TARGET_CLIENT, 0);
//Consumer
_consumer = _session.CreateConsumer (_destination);
_connected = true;
} catch (Exception ex) {
_connected = false;
}
}
}
private IConnectionFactory SetConnectionFactory () {
XMSFactoryFactory factoryFactory = XMSFactoryFactory.GetInstance (XMSC.CT_WMQ);
IConnectionFactory cf = factoryFactory.CreateConnectionFactory ();
// Set the properties
cf.SetStringProperty (XMSC.WMQ_CHANNEL, ConnectionSettings.Channel);
cf.SetIntProperty (XMSC.WMQ_CONNECTION_MODE, XMSC.WMQ_CM_CLIENT);
cf.SetIntProperty (XMSC.WMQ_FAIL_IF_QUIESCE, XMSC.WMQ_FIQ_YES);
cf.SetStringProperty (XMSC.WMQ_QUEUE_MANAGER, ConnectionSettings.QueueManager);
cf.SetStringProperty (XMSC.WMQ_CONNECTION_NAME_LIST, ConnectionSettings.ConnectionList);
cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_TIMEOUT, ConnectionSettings.ReconnectTimeout);
cf.SetIntProperty (XMSC.WMQ_CLIENT_RECONNECT_OPTIONS, ConnectionSettings.ReconnectOptions);
cf.SetStringProperty (XMSC.WMQ_PROVIDER_VERSION, XMSC.WMQ_PROVIDER_VERSION_DEFAULT);
cf.SetBooleanProperty (XMSC.WMQ_SYNCPOINT_ALL_GETS, true);
return cf;
}
public override void Subscribe<T> (Action<T> onMessageReceived) {
try {
_connection.ExceptionListener = delegate (Exception connectionException) {
XMSException xmsError = (XMSException) connectionException;
int reasonCode = ((IBM.WMQ.MQException) (xmsError).LinkedException).ReasonCode;
if (reasonCode == MQC.MQRC_Q_MGR_QUIESCING || reasonCode == MQC.MQRC_CONNECTION_BROKEN) {
_reConnectOnConnectionBreak = true;
_connection.Close ();
CreateWebsphereQueueConnection ();
Subscribe (onMessageReceived);
_reConnectOnConnectionBreak = false;
}
}
MessageListener messageListener = new MessageListener ((msg) => {
onMessageReceived (message);
});
_consumer.MessageListener = messageListener;
// Start the connection
_connection.Start ();
} catch (Exception ex) {
//Log exception details
}
}
在 IBM MQ 版本 8 中没有更好的方法来检查连接 IConnection 的状态。因此,我不得不使用原因代码。在 IBM MQ 版本 9 中,我们可以使用服务器公开的其余 API 来检查连接状态。