具有天蓝色存储 queues 的 NServiceBus 将无限期地处理没有 headers 的消息

NServiceBus with azure storage queues will process messages without headers indefinitely

我正在尝试使用 Azure 存储 Queues 进行消息传输和 JSON 序列化的新 NServiceBus 项目。我注意到当我 运行 通过 queue 缺少 NServiceBus headers 的消息时,例如一个空的 JSON 消息:{ } 它会抛出以下警告留言:

2020-02-06 17:46:35.587 WARN  NServiceBus.Transport.AzureStorageQueues.MessagePump Azure Storage Queue transport failed pushing a message through pipeline
System.ArgumentNullException: Value cannot be null.
Parameter name: nativeMessageId
   at NServiceBus.Transport.IncomingMessage..ctor(String nativeMessageId, Dictionary`2 headers, Byte[] body)
   at NServiceBus.Transport.ErrorContext..ctor(Exception exception, Dictionary`2 headers, String transportMessageId, Byte[] body, TransportTransaction transportTransaction, Int32 immediateProcessingFailures)
   at NServiceBus.Transport.AzureStorageQueues.ReceiveStrategy.CreateErrorContext(MessageRetrieved retrieved, MessageWrapper message, Exception ex, Byte[] body)
   at NServiceBus.Transport.AzureStorageQueues.AtLeastOnceReceiveStrategy.<Receive>d__1.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at NServiceBus.Transport.AzureStorageQueues.MessagePump.<InnerReceive>d__7.MoveNext()

之后它似乎停止处理邮件,但将其留在 queue 中。然后,等待配置的消息不可见时间后,消息再次在queue中可见,NServiceBus将无限期地重复'warning and stop processing'过程。有什么方法可以改变 NServiceBus 处理这种情况的方式,以便它在无法解析 header 信息时将消息抛出到配置的错误 queue 而不是无限期地尝试处理消息?

NServiceBus 存储队列传输假设消息以正确的信封到达。如果找不到该信封,您将得到上面看到的异常。对于不是由 NServiceBus 构造或带有自定义信封的消息,请参阅文档 here。简而言之,您需要一个自定义信封拆包器。

自定义 unrapper(回调)负责的是反序列化消息并构建 NServiceBus 期望使用的MessageWrapper

var transport = endpointConfiguration.UseTransport<AzureStorageQueueTransport>();

transport.UnwrapMessagesWith(cloudQueueMessage =>
{
    using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
    using (var streamReader = new StreamReader(stream))
    using (var textReader = new JsonTextReader(streamReader))
    {
        //try deserialize to a NServiceBus envelope first
        var wrapper = jsonSerializer.Deserialize<MessageWrapper>(textReader);

        if (wrapper.Id != null)
        {
            //this was a envelope message
            return wrapper;
        }

        //this was a native message just return the body as is with no headers
        return new MessageWrapper
        {
            Id = cloudQueueMessage.Id,
            Headers = new Dictionary<string, string>(),
            Body = cloudQueueMessage.AsBytes
        };
    }
});