XmlMessageFormatter 正在生成正文中全为零的消息
XmlMessageFormatter is generating messages with all zeros in the body
实现简单;单进程写入(虽然多个任务可能异步写入)单进程读取。
大多数时候它似乎工作正常,但偶尔我们会收到一条消息,指出 Body size 是合理的,但如果你在计算机管理工具中查看它,它只不过是“0” .这会导致 reader 上的 XmlMessageFormatter 失败。
我们添加了可以让我们更好地处理有毒邮件的代码,但我们需要这些邮件,因此仅此一项是不可接受的。
对象:
public class SubscriptionData
{
public Guid SubscriptionInstanceId { get; set; }
public SubscriptionEntityTypes SubscriptionEntityType { get; set; }
public List<int> Positions { get; set; }
public List<EventInformation> Events { get; set; }
public int SubscriptionId { get; set; }
public SubscriptionData() { }
public SubscriptionData(SubscriptionEntityTypes entityType, List<int> positions, List<EventInformation> events, int subscriptionId)
{
SubscriptionEntityType = entityType;
Positions = positions;
Events = events;
SubscriptionId = subscriptionId;
SubscriptionInstanceId = Guid.NewGuid();
}
public override string ToString()
{
return $"Entity Type: {SubscriptionEntityType}, Instance Id: {SubscriptionInstanceId}, Events: {string.Join("/", Events)}, SubsId: {SubscriptionId}";
}
}
作者:
private static void ConstructMessageQueue()
{
_messageQueue = MessageQueue.Exists(Queue) ?
new MessageQueue(Queue) : MessageQueue.Create(Queue);
_messageQueue.Label = QueueName;
}
private static void EnqueueSubscriptionData(SubscriptionEntityTypes entityType, List<int> positions, List<EventInformation> events, int subscriptionId)
{
Task.Run(() =>
{
var subsData = new SubscriptionData(entityType, positions, events, subscriptionId);
_logger.Info(ErrorLevel.Normal, $"Enqueuing subscription: {subsData}");
_messageQueue.Send(subsData);
});
}
Reader:
private void HandleNotifications()
{
var mq = new MessageQueue(Queue);
mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(SubscriptionData) });
while (!_cancellationToken.IsCancellationRequested)
{
Message message = null;
try
{
message = mq.Peek(TimeSpan.FromSeconds(5));
if (message != null)
{
var subsData = message.Body as SubscriptionData;
if (subsData == null)
continue;
_logger.Info(ErrorLevel.Normal, $"Processing subscription: {subsData}");
// Process the notification here
mq.Receive();
}
}
catch (MessageQueueException t) when (t.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
{
_logger.Info(ErrorLevel.Normal, $"Message Queue Peek Timeout");
continue;
}
catch (MessageQueueException t)
{
_logger.Exception(t, "MessageQueueException while processing message queue for notifications");
throw;
}
catch (Exception t)
{
_logger.Exception(t, "Exception while processing message queue for notifications");
}
}
}
如果你很好奇,我听说我们会在成功后查看和接收,这样我们就不会丢失消息,但是在阅读这篇文章以尝试帮助我的同事时,看起来有交易。
The bad messages we get look like this.
Send 方法似乎不是线程安全的,您不应该在多个线程之间共享 MessageQueue 对象(您的 _messageQueue 静态变量)。在这里讨论:
Is MSMQ thread safe?
实现简单;单进程写入(虽然多个任务可能异步写入)单进程读取。
大多数时候它似乎工作正常,但偶尔我们会收到一条消息,指出 Body size 是合理的,但如果你在计算机管理工具中查看它,它只不过是“0” .这会导致 reader 上的 XmlMessageFormatter 失败。
我们添加了可以让我们更好地处理有毒邮件的代码,但我们需要这些邮件,因此仅此一项是不可接受的。
对象:
public class SubscriptionData
{
public Guid SubscriptionInstanceId { get; set; }
public SubscriptionEntityTypes SubscriptionEntityType { get; set; }
public List<int> Positions { get; set; }
public List<EventInformation> Events { get; set; }
public int SubscriptionId { get; set; }
public SubscriptionData() { }
public SubscriptionData(SubscriptionEntityTypes entityType, List<int> positions, List<EventInformation> events, int subscriptionId)
{
SubscriptionEntityType = entityType;
Positions = positions;
Events = events;
SubscriptionId = subscriptionId;
SubscriptionInstanceId = Guid.NewGuid();
}
public override string ToString()
{
return $"Entity Type: {SubscriptionEntityType}, Instance Id: {SubscriptionInstanceId}, Events: {string.Join("/", Events)}, SubsId: {SubscriptionId}";
}
}
作者:
private static void ConstructMessageQueue()
{
_messageQueue = MessageQueue.Exists(Queue) ?
new MessageQueue(Queue) : MessageQueue.Create(Queue);
_messageQueue.Label = QueueName;
}
private static void EnqueueSubscriptionData(SubscriptionEntityTypes entityType, List<int> positions, List<EventInformation> events, int subscriptionId)
{
Task.Run(() =>
{
var subsData = new SubscriptionData(entityType, positions, events, subscriptionId);
_logger.Info(ErrorLevel.Normal, $"Enqueuing subscription: {subsData}");
_messageQueue.Send(subsData);
});
}
Reader:
private void HandleNotifications()
{
var mq = new MessageQueue(Queue);
mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(SubscriptionData) });
while (!_cancellationToken.IsCancellationRequested)
{
Message message = null;
try
{
message = mq.Peek(TimeSpan.FromSeconds(5));
if (message != null)
{
var subsData = message.Body as SubscriptionData;
if (subsData == null)
continue;
_logger.Info(ErrorLevel.Normal, $"Processing subscription: {subsData}");
// Process the notification here
mq.Receive();
}
}
catch (MessageQueueException t) when (t.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
{
_logger.Info(ErrorLevel.Normal, $"Message Queue Peek Timeout");
continue;
}
catch (MessageQueueException t)
{
_logger.Exception(t, "MessageQueueException while processing message queue for notifications");
throw;
}
catch (Exception t)
{
_logger.Exception(t, "Exception while processing message queue for notifications");
}
}
}
如果你很好奇,我听说我们会在成功后查看和接收,这样我们就不会丢失消息,但是在阅读这篇文章以尝试帮助我的同事时,看起来有交易。
The bad messages we get look like this.
Send 方法似乎不是线程安全的,您不应该在多个线程之间共享 MessageQueue 对象(您的 _messageQueue 静态变量)。在这里讨论:
Is MSMQ thread safe?