XmlMessageFormatter 正在生成正文中全为零的消息

XmlMessageFormatter is generating messages with all zeros in the body

实现简单;单进程写入(虽然多个任务可能异步写入)单进程读取。

大多数时候它似乎工作正常,但偶尔我们会收到一条消息,指出 Body size 是合理的,但如果你在计算机管理工具中查看它,它只不过是“0” .这会导致 reader 上的 XmlMessageFormatter 失败。

我们添加了可以让我们更好地处理有毒邮件的代码,但我们需要这些邮件,因此仅此一项是不可接受的。

对象:

public class SubscriptionData
{
  public Guid SubscriptionInstanceId { get; set; }
  public SubscriptionEntityTypes SubscriptionEntityType { get; set; }
  public List<int> Positions { get; set; }
  public List<EventInformation> Events { get; set; }
  public int SubscriptionId { get; set; }

  public SubscriptionData() { }

  public SubscriptionData(SubscriptionEntityTypes entityType, List<int> positions, List<EventInformation> events, int subscriptionId)
  {
    SubscriptionEntityType = entityType;
    Positions = positions;
    Events = events;
    SubscriptionId = subscriptionId;
    SubscriptionInstanceId = Guid.NewGuid();
  }

  public override string ToString()
  {
    return $"Entity Type: {SubscriptionEntityType}, Instance Id: {SubscriptionInstanceId}, Events: {string.Join("/", Events)}, SubsId: {SubscriptionId}";
  }
}

作者:

private static void ConstructMessageQueue()
{
  _messageQueue = MessageQueue.Exists(Queue) ?
    new MessageQueue(Queue) : MessageQueue.Create(Queue);
  _messageQueue.Label = QueueName;
}

private static void EnqueueSubscriptionData(SubscriptionEntityTypes entityType, List<int> positions, List<EventInformation> events, int subscriptionId)
{
  Task.Run(() =>
   {
     var subsData = new SubscriptionData(entityType, positions, events, subscriptionId);
     _logger.Info(ErrorLevel.Normal, $"Enqueuing subscription: {subsData}");
     _messageQueue.Send(subsData);
   });
}

Reader:

private void HandleNotifications()
    {
        var mq = new MessageQueue(Queue);
        mq.Formatter = new XmlMessageFormatter(new Type[] { typeof(SubscriptionData) });

        while (!_cancellationToken.IsCancellationRequested)
        {
            Message message = null;
            try
            {
                message = mq.Peek(TimeSpan.FromSeconds(5));

                if (message != null)
                {
                    var subsData = message.Body as SubscriptionData;
                    if (subsData == null)
                        continue;
                    _logger.Info(ErrorLevel.Normal, $"Processing subscription: {subsData}");

                    //  Process the notification here

                    mq.Receive();
                }
            }
            catch (MessageQueueException t) when (t.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
            {
                _logger.Info(ErrorLevel.Normal, $"Message Queue Peek Timeout");
                continue;
            }
            catch (MessageQueueException t)
            {
                _logger.Exception(t, "MessageQueueException while processing message queue for notifications");
                throw;
            }
            catch (Exception t)
            {
                _logger.Exception(t, "Exception while processing message queue for notifications");
            }
        }
    }

如果你很好奇,我听说我们会在成功后查看和接收,这样我们就不会丢失消息,但是在阅读这篇文章以尝试帮助我的同事时,看起来有交易。

The bad messages we get look like this.

Send 方法似乎不是线程安全的,您不应该在多个线程之间共享 MessageQueue 对象(您的 _messageQueue 静态变量)。在这里讨论:

Is MSMQ thread safe?