如果服务关闭一段时间,如何在 C# 中重新连接到 MSMQ
How to reconnect to MSMQ in C# if the service is down for a period
是否可以在服务关闭后重新连接到 MSMQ,而无需重新启动我的应用程序?如果是那又怎样?
我的代码是:
void tmrDelay_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
tmrDelay.Stop();
while (isConnected)
{
try
{
if (ImportFromMSMQ.HasMessages())
{
MSMQ_ServiceLog.WriteEntry(string.Format("Transfering data from MSMQ to ActiveMQ started at {0}", DateTime.Now.ToString()));
while (ImportFromMSMQ.HasMessages())
{
ImportFromMSMQ.Run();
MSMQ_ServiceLog.WriteEntry(string.Format("Transfering data from MSMQ to ActiveMQ completed at {0}", DateTime.Now.ToString()));
}
}
else
{
MSMQ_ServiceLog.WriteEntry(string.Format("MSMQ is empty {0}", DateTime.Now.ToString()));
}
}
catch (Exception ex)
{
logger.Error(string.Format(" Error in data transfer MSMQ to ActiveMQ {0}", ex.Message));
MSMQ_ServiceLog.WriteEntry(string.Format(" Error in data transfer MSMQ to ActiveMQ {0}", ex.Message), EventLogEntryType.Error, -1);
}
}
tmrDelay.Start();
}
while循环是我加的,还没有测试,疑似无限循环。如果关闭 MSMQ 服务,我们将在 catch
子句中结束,并且我们需要在再次启动 MSMQ 时重新启动整个应用程序。但是我怎样才能重试连接到 MSMQ 并在等待它再次启动时继续这样做呢?
public class ImportFromMSMQ
{
private static readonly ILog logger = LogManager.GetLogger(typeof(ImportFromMSMQ));
//Max records per records
private static int _maxMessagesPerTransaction = ConstantHelper.MaxMessagesPerTransaction;
private static bool _isNotCompleted = true;
private static int counter = 0;
private static MessageQueue mq;
private static long _maxMessageBodySizeLimitInBytes = ConstantHelper.MaxMessageBodySizeLimitInBytes;
// Start import
public static void Run()
{
Start();
}
public static bool HasMessages()
{
var _mqName = ConstantHelper.SourceQueue;
mq = new System.Messaging.MessageQueue(_mqName);
long _totalmsg = MessagesCounter.GetMessageCount(mq);
if (_totalmsg > 0)
{
logger.Info(string.Format(" {0} messages found in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
else
{
logger.Info(string.Format(" There are no messages in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
return _totalmsg > 0;
}
private static void Start()
{
logger.Info(string.Format("Data transfer starting at {0}", DateTime.Now.ToString()));
long _currentMessageBodySizeLimitInBytes = 0;
ArrayList messageList = new ArrayList();
System.Messaging.Message mes;
System.IO.Stream bodyStream = null;
// Create a transaction.
MessageQueueTransaction trans = new MessageQueueTransaction();
List<ConventionalData> objectList = new List<ConventionalData>();
IMessageContainer _container = new MessageContainer();
// Begin the transaction.
trans.Begin();
try
{
while (_isNotCompleted && counter < _maxMessagesPerTransaction)
{
try
{
counter++;
mes = mq.Receive(new TimeSpan(0, 0, 3), trans);
if (mes != null)
{
bodyStream = mes.BodyStream;
_currentMessageBodySizeLimitInBytes = _currentMessageBodySizeLimitInBytes + bodyStream.Length;
}
VisionAir.Messaging.ConventionalData data = ProtoBuf.Serializer.Deserialize<ConventionalData>(mes.BodyStream);
objectList.Add(data);
_isNotCompleted = _currentMessageBodySizeLimitInBytes <= _maxMessageBodySizeLimitInBytes;
}
catch
{
_isNotCompleted = false;
}
}
if (objectList.Count != 0)
{
logger.Info(string.Format("Starting transfer of {0} messages", objectList.Count));
_container.MQMessages = objectList;
ExportToActiveMQ _export = new ExportToActiveMQ(_container, (ExportOption) Enum.Parse(typeof(ExportOption),ConstantHelper.ExportFormat));
_export.Export();
}
logger.Info(string.Format("Transfer of {0} messages is completed at {1}",objectList.Count, DateTime.Now.ToString()));
// Commit transaction
trans.Commit();
counter = 0; //ResetF
_isNotCompleted = true; //Reset
}
catch (Exception ex)
{
logger.Error(ex);
// Roll back the transaction.
trans.Abort();
counter = 0;
}
}
}
我看过这个问题Service not receiving messages after Message Queuing service restarted,但似乎没有完全理解它,所以如果有人能详细说明,那就太好了。
我是否必须在 catch
子句或其他内容中递归调用我的 tmrDelay_Elapsed()
方法?
我找到了解决方案,当我第一次意识到我们第一次访问队列是在HasMessages()
方法中时很简单,所以我将其更改为以下内容:
public static bool HasMessages()
{
var _mqName = ConstantHelper.SourceQueue;
long _totalmsg = 0;
int countAttempts = 0;
queueExists = false;
while (!queueExists)
{
if (MessageQueue.Exists(_mqName))
{
queueExists = true;
mq = new System.Messaging.MessageQueue(_mqName);
_totalmsg = MessagesCounter.GetMessageCount(mq);
if (_totalmsg > 0)
{
logger.Info(string.Format(" {0} messages found in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
else
{
logger.Info(string.Format(" There are no messages in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
}
else
{
logger.Info(string.Format("No Queue named {0} found, trying again.", _mqName));
countAttempts++;
if(countAttempts % 10 == 0)
{
logger.Info(string.Format("Still no queue found, have you checked Services that Message Queuing(Micrsoft Message Queue aka. MSMQ) is up and running"));
}
Thread.Sleep(5000);
}
}
return _totalmsg > 0;
}
我检查 MessageQueue.Exists
是否存在,如果不存在则继续尝试直到找到队列。当我找到它时,我将 queueExists 设置为 true,以打破 while 循环以继续处理接收到的数据(如果收到任何数据)。我在一些 post 中读到使用 Thread.Sleep(n)
是不好的,但现在我将使用这个解决方案。
是否可以在服务关闭后重新连接到 MSMQ,而无需重新启动我的应用程序?如果是那又怎样?
我的代码是:
void tmrDelay_Elapsed(object sender, System.Timers.ElapsedEventArgs e)
{
tmrDelay.Stop();
while (isConnected)
{
try
{
if (ImportFromMSMQ.HasMessages())
{
MSMQ_ServiceLog.WriteEntry(string.Format("Transfering data from MSMQ to ActiveMQ started at {0}", DateTime.Now.ToString()));
while (ImportFromMSMQ.HasMessages())
{
ImportFromMSMQ.Run();
MSMQ_ServiceLog.WriteEntry(string.Format("Transfering data from MSMQ to ActiveMQ completed at {0}", DateTime.Now.ToString()));
}
}
else
{
MSMQ_ServiceLog.WriteEntry(string.Format("MSMQ is empty {0}", DateTime.Now.ToString()));
}
}
catch (Exception ex)
{
logger.Error(string.Format(" Error in data transfer MSMQ to ActiveMQ {0}", ex.Message));
MSMQ_ServiceLog.WriteEntry(string.Format(" Error in data transfer MSMQ to ActiveMQ {0}", ex.Message), EventLogEntryType.Error, -1);
}
}
tmrDelay.Start();
}
while循环是我加的,还没有测试,疑似无限循环。如果关闭 MSMQ 服务,我们将在 catch
子句中结束,并且我们需要在再次启动 MSMQ 时重新启动整个应用程序。但是我怎样才能重试连接到 MSMQ 并在等待它再次启动时继续这样做呢?
public class ImportFromMSMQ
{
private static readonly ILog logger = LogManager.GetLogger(typeof(ImportFromMSMQ));
//Max records per records
private static int _maxMessagesPerTransaction = ConstantHelper.MaxMessagesPerTransaction;
private static bool _isNotCompleted = true;
private static int counter = 0;
private static MessageQueue mq;
private static long _maxMessageBodySizeLimitInBytes = ConstantHelper.MaxMessageBodySizeLimitInBytes;
// Start import
public static void Run()
{
Start();
}
public static bool HasMessages()
{
var _mqName = ConstantHelper.SourceQueue;
mq = new System.Messaging.MessageQueue(_mqName);
long _totalmsg = MessagesCounter.GetMessageCount(mq);
if (_totalmsg > 0)
{
logger.Info(string.Format(" {0} messages found in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
else
{
logger.Info(string.Format(" There are no messages in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
return _totalmsg > 0;
}
private static void Start()
{
logger.Info(string.Format("Data transfer starting at {0}", DateTime.Now.ToString()));
long _currentMessageBodySizeLimitInBytes = 0;
ArrayList messageList = new ArrayList();
System.Messaging.Message mes;
System.IO.Stream bodyStream = null;
// Create a transaction.
MessageQueueTransaction trans = new MessageQueueTransaction();
List<ConventionalData> objectList = new List<ConventionalData>();
IMessageContainer _container = new MessageContainer();
// Begin the transaction.
trans.Begin();
try
{
while (_isNotCompleted && counter < _maxMessagesPerTransaction)
{
try
{
counter++;
mes = mq.Receive(new TimeSpan(0, 0, 3), trans);
if (mes != null)
{
bodyStream = mes.BodyStream;
_currentMessageBodySizeLimitInBytes = _currentMessageBodySizeLimitInBytes + bodyStream.Length;
}
VisionAir.Messaging.ConventionalData data = ProtoBuf.Serializer.Deserialize<ConventionalData>(mes.BodyStream);
objectList.Add(data);
_isNotCompleted = _currentMessageBodySizeLimitInBytes <= _maxMessageBodySizeLimitInBytes;
}
catch
{
_isNotCompleted = false;
}
}
if (objectList.Count != 0)
{
logger.Info(string.Format("Starting transfer of {0} messages", objectList.Count));
_container.MQMessages = objectList;
ExportToActiveMQ _export = new ExportToActiveMQ(_container, (ExportOption) Enum.Parse(typeof(ExportOption),ConstantHelper.ExportFormat));
_export.Export();
}
logger.Info(string.Format("Transfer of {0} messages is completed at {1}",objectList.Count, DateTime.Now.ToString()));
// Commit transaction
trans.Commit();
counter = 0; //ResetF
_isNotCompleted = true; //Reset
}
catch (Exception ex)
{
logger.Error(ex);
// Roll back the transaction.
trans.Abort();
counter = 0;
}
}
}
我看过这个问题Service not receiving messages after Message Queuing service restarted,但似乎没有完全理解它,所以如果有人能详细说明,那就太好了。
我是否必须在 catch
子句或其他内容中递归调用我的 tmrDelay_Elapsed()
方法?
我找到了解决方案,当我第一次意识到我们第一次访问队列是在HasMessages()
方法中时很简单,所以我将其更改为以下内容:
public static bool HasMessages()
{
var _mqName = ConstantHelper.SourceQueue;
long _totalmsg = 0;
int countAttempts = 0;
queueExists = false;
while (!queueExists)
{
if (MessageQueue.Exists(_mqName))
{
queueExists = true;
mq = new System.Messaging.MessageQueue(_mqName);
_totalmsg = MessagesCounter.GetMessageCount(mq);
if (_totalmsg > 0)
{
logger.Info(string.Format(" {0} messages found in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
else
{
logger.Info(string.Format(" There are no messages in the {1} Queue ", _totalmsg.ToString(), _mqName));
}
}
else
{
logger.Info(string.Format("No Queue named {0} found, trying again.", _mqName));
countAttempts++;
if(countAttempts % 10 == 0)
{
logger.Info(string.Format("Still no queue found, have you checked Services that Message Queuing(Micrsoft Message Queue aka. MSMQ) is up and running"));
}
Thread.Sleep(5000);
}
}
return _totalmsg > 0;
}
我检查 MessageQueue.Exists
是否存在,如果不存在则继续尝试直到找到队列。当我找到它时,我将 queueExists 设置为 true,以打破 while 循环以继续处理接收到的数据(如果收到任何数据)。我在一些 post 中读到使用 Thread.Sleep(n)
是不好的,但现在我将使用这个解决方案。