由于更改消息属性,带有 AzureStorageQueues 的 NServiceBus 未从输入队列中删除有毒消息
NServiceBus with AzureStorageQueues not removing poison messages from input queue due to changing message properties
我正在试验一个新的 NServiceBus 项目,该项目利用 Azure 存储队列进行消息传输和 JSON 使用自定义消息解包逻辑进行序列化,参见此处:
var jsonSerializer = new Newtonsoft.Json.JsonSerializer();
transportExtensions.UnwrapMessagesWith(cloudQueueMessage =>
{
using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
using (var streamReader = new StreamReader(stream))
using (var textReader = new JsonTextReader(streamReader))
{
try
{
var jObject = JObject.Load(textReader);
using (var jsonReader = jObject.CreateReader())
{
// Try deserialize to a NServiceBus envelope first
var wrapper = jsonSerializer.Deserialize<MessageWrapper>(jsonReader);
if (wrapper.MessageIntent != default)
{
// This was a envelope message
return wrapper;
}
}
// Otherwise this was an EventGrid event
using (var jsonReader = jObject.CreateReader())
{
var @event = jsonSerializer.Deserialize<EventGridEvent>(jsonReader);
var wrapper = new MessageWrapper
{
Id = @event.Id,
Headers = new Dictionary<string, string>
{
{ "NServiceBus.EnclosedMessageTypes", @event.EventType },
{ "NServiceBus.MessageIntent", "Publish" },
{ "EventGrid.topic", @event.Topic },
{ "EventGrid.subject", @event.Subject },
{ "EventGrid.eventTime", @event.EventTime.ToString("u") },
{ "EventGrid.dataVersion", @event.DataVersion },
{ "EventGrid.metadataVersion", @event.MetadataVersion },
},
Body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event.Data)),
MessageIntent = MessageIntentEnum.Publish
};
return wrapper;
}
}
catch
{
logger.Error("Message deserialization failed, sending message to error queue");
throw;
}
}
});
自定义消息解包逻辑对于格式正确的 JSON 消息可以正常工作,当格式不正确的 JSON 消息被放入输入队列时,自定义消息解包逻辑将在第一行出错在我创建 jObject 的使用中,这是预期的行为。但是,当自定义消息展开逻辑失败时,错误将被 MessageRetrieved class 中的逻辑捕获,它是 NServiceBus.Azure.Transports.WindowsAzureStorageQueues NuGet 包 (v8.2.0) 的一部分,如下所示:
public async Task<MessageWrapper> Unwrap()
{
try
{
Logger.DebugFormat("Unwrapping message with native ID: '{0}'", rawMessage.Id);
return unwrapper.Unwrap(rawMessage);
}
catch (Exception ex)
{
await errorQueue.AddMessageAsync(rawMessage).ConfigureAwait(false);
await inputQueue.DeleteMessageAsync(rawMessage).ConfigureAwait(false);
throw new SerializationException($"Failed to deserialize message envelope for message with id {rawMessage.Id}. Make sure the configured serializer is used across all endpoints or configure the message wrapper serializer for this endpoint using the `SerializeMessageWrapperWith` extension on the transport configuration. Please refer to the Azure Storage Queue Transport configuration documentation for more details.", ex);
}
}
try catch 的第一行运行正确,将消息添加到配置的错误队列中,但是,当它这样做时,它似乎正在更改原始消息的消息 ID 和 popreceipt,如下所示:
Initial Message Values
Updated Message Values
然后当下一行试图从输入队列中删除原始消息时,它无法找到它,因为根据这篇文章https://docs.microsoft.com/en-us/rest/api/storageservices/delete-message2#remarks它需要原始消息 ID 和 pop 收据,现在已经改变了导致抛出以下错误:
2020-04-20 14:17:58,603 WARN : Azure Storage Queue transport failed pushing a message through pipeline
Type: Microsoft.WindowsAzure.Storage.StorageException
Message: The remote server returned an error: (404) Not Found.
Source: Microsoft.WindowsAzure.Storage
StackTrace:
at Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult result) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Executor\Executor.cs:line 50
at Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass7.<CreateCallbackVoid>b__5(IAsyncResult ar) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Util\AsyncExtensions.cs:line 121
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.AzureStorageQueues.MessageRetrieved.<Unwrap>d__3.MoveNext() in C:\BuildAgent\workc19e2a032c05076\src\Transport\MessageRetrieved.cs:line 40
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.AzureStorageQueues.MessagePump.<InnerReceive>d__7.MoveNext() in C:\BuildAgent\workc19e2a032c05076\src\Transport\MessagePump.cs:line 153
TargetSite: T EndExecuteAsync[T](System.IAsyncResult)
这是 NServiceBus 包逻辑的问题,还是我的自定义消息解包逻辑中的某些内容导致这些值发生变化?
这是一个错误。当解包失败时,消息尚未通过处理管道。因此,正常的可恢复性不适用。 CloudQueueMessage
需要是 "cloned" 并且克隆要发送到错误队列,而原始消息用于将其从输入队列中删除。我在 GitHub 中提出了一个 bug issue,你可以在那里跟踪过程。
我正在试验一个新的 NServiceBus 项目,该项目利用 Azure 存储队列进行消息传输和 JSON 使用自定义消息解包逻辑进行序列化,参见此处:
var jsonSerializer = new Newtonsoft.Json.JsonSerializer();
transportExtensions.UnwrapMessagesWith(cloudQueueMessage =>
{
using (var stream = new MemoryStream(cloudQueueMessage.AsBytes))
using (var streamReader = new StreamReader(stream))
using (var textReader = new JsonTextReader(streamReader))
{
try
{
var jObject = JObject.Load(textReader);
using (var jsonReader = jObject.CreateReader())
{
// Try deserialize to a NServiceBus envelope first
var wrapper = jsonSerializer.Deserialize<MessageWrapper>(jsonReader);
if (wrapper.MessageIntent != default)
{
// This was a envelope message
return wrapper;
}
}
// Otherwise this was an EventGrid event
using (var jsonReader = jObject.CreateReader())
{
var @event = jsonSerializer.Deserialize<EventGridEvent>(jsonReader);
var wrapper = new MessageWrapper
{
Id = @event.Id,
Headers = new Dictionary<string, string>
{
{ "NServiceBus.EnclosedMessageTypes", @event.EventType },
{ "NServiceBus.MessageIntent", "Publish" },
{ "EventGrid.topic", @event.Topic },
{ "EventGrid.subject", @event.Subject },
{ "EventGrid.eventTime", @event.EventTime.ToString("u") },
{ "EventGrid.dataVersion", @event.DataVersion },
{ "EventGrid.metadataVersion", @event.MetadataVersion },
},
Body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(@event.Data)),
MessageIntent = MessageIntentEnum.Publish
};
return wrapper;
}
}
catch
{
logger.Error("Message deserialization failed, sending message to error queue");
throw;
}
}
});
自定义消息解包逻辑对于格式正确的 JSON 消息可以正常工作,当格式不正确的 JSON 消息被放入输入队列时,自定义消息解包逻辑将在第一行出错在我创建 jObject 的使用中,这是预期的行为。但是,当自定义消息展开逻辑失败时,错误将被 MessageRetrieved class 中的逻辑捕获,它是 NServiceBus.Azure.Transports.WindowsAzureStorageQueues NuGet 包 (v8.2.0) 的一部分,如下所示:
public async Task<MessageWrapper> Unwrap()
{
try
{
Logger.DebugFormat("Unwrapping message with native ID: '{0}'", rawMessage.Id);
return unwrapper.Unwrap(rawMessage);
}
catch (Exception ex)
{
await errorQueue.AddMessageAsync(rawMessage).ConfigureAwait(false);
await inputQueue.DeleteMessageAsync(rawMessage).ConfigureAwait(false);
throw new SerializationException($"Failed to deserialize message envelope for message with id {rawMessage.Id}. Make sure the configured serializer is used across all endpoints or configure the message wrapper serializer for this endpoint using the `SerializeMessageWrapperWith` extension on the transport configuration. Please refer to the Azure Storage Queue Transport configuration documentation for more details.", ex);
}
}
try catch 的第一行运行正确,将消息添加到配置的错误队列中,但是,当它这样做时,它似乎正在更改原始消息的消息 ID 和 popreceipt,如下所示:
Initial Message Values
Updated Message Values
然后当下一行试图从输入队列中删除原始消息时,它无法找到它,因为根据这篇文章https://docs.microsoft.com/en-us/rest/api/storageservices/delete-message2#remarks它需要原始消息 ID 和 pop 收据,现在已经改变了导致抛出以下错误:
2020-04-20 14:17:58,603 WARN : Azure Storage Queue transport failed pushing a message through pipeline
Type: Microsoft.WindowsAzure.Storage.StorageException
Message: The remote server returned an error: (404) Not Found.
Source: Microsoft.WindowsAzure.Storage
StackTrace:
at Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult result) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Executor\Executor.cs:line 50
at Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass7.<CreateCallbackVoid>b__5(IAsyncResult ar) in c:\Program Files (x86)\Jenkins\workspace\release_dotnet_master\Lib\ClassLibraryCommon\Core\Util\AsyncExtensions.cs:line 121
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.AzureStorageQueues.MessageRetrieved.<Unwrap>d__3.MoveNext() in C:\BuildAgent\workc19e2a032c05076\src\Transport\MessageRetrieved.cs:line 40
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
at NServiceBus.Transport.AzureStorageQueues.MessagePump.<InnerReceive>d__7.MoveNext() in C:\BuildAgent\workc19e2a032c05076\src\Transport\MessagePump.cs:line 153
TargetSite: T EndExecuteAsync[T](System.IAsyncResult)
这是 NServiceBus 包逻辑的问题,还是我的自定义消息解包逻辑中的某些内容导致这些值发生变化?
这是一个错误。当解包失败时,消息尚未通过处理管道。因此,正常的可恢复性不适用。 CloudQueueMessage
需要是 "cloned" 并且克隆要发送到错误队列,而原始消息用于将其从输入队列中删除。我在 GitHub 中提出了一个 bug issue,你可以在那里跟踪过程。