Azure 服务总线 - 将 OnMessageAsync 中的消息读取到队列末尾
Azure Service Bus - Readd message in OnMessageAsync to the end of the queue
我们正在使用 Microsoft Azure 服务总线发送命令消息,我们正在使用 OnMessage 方法从总线获取它们。在我们 IMessageSessionAsyncHandler.OnMessageAsync 的实现中,可能是我们意识到消息尚未准备好进行处理。所以我想从总线上获取消息并将它们读到队列的末尾。 get/readd 操作必须是原子的。我怎样才能做到这一点?
这是我目前的解决方案(高度抽象),但我担心非原子性。
queueClient.RegisterSessionHandlerFactory(new CommandSessionHandlerFactory(queueClient), ...);
internal class CommandSessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
private readonly QueueClient _queueClient;
public CommandSessionHandlerFactory(QueueClient queueClient)
{
_queueClient = queueClient;
}
public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
{
return new CommandSessionHandlerAsync(_queueClient);
}
}
internal class CommandSessionHandlerAsync : MessageSessionAsyncHandler
{
private readonly QueueClient _queueClient;
public CommandSessionHandlerAsync(QueueClient queueClient)
{
_queueClient = queueClient;
}
protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
{
if (!messageReadyForProcessing)
{
// How to get the following code transactional safe?
var clonedMessage = message.Clone();
await message.CompleteAsync();
await _queueClient.SendAsync(clonedMessage);
}
}
}
那么重复检测呢?我们是否必须更改克隆消息的 MessageId 以确保服务总线重复检测不会丢弃克隆消息?
And what about dublicate detection? Do we have to change the MessageId
of the cloned message to be sure that the service bus duplicate
detection does not drop the cloned message?
如果启用重复检测。您的消息将从队列中删除。因为您使用相同的 MessageId 并且在给定时间段(默认 10 分钟)中重复检测跟踪 messageId。
if (!messageReadyForProcessing)
{
// How to get the following code transactional safe?
var clonedMessage = message.Clone();
await message.CompleteAsync();
await _queueClient.SendAsync(clonedMessage);
}
你可以改成这样:
if (!messageReadyForProcessing)
{
await message.CompleteAsync();
message.MessageId = Guid.NewGuid().ToString();
await _queueClient.SendAsync(message);
}
但是还是有问题。如果消息成功完成,但发送消息失败怎么办?您将丢失消息。重试策略可以解决这个问题。然而它很脏,不能100%保证。
您可以增加 Max DeliveryCount。除了再次发送消息,只需放弃(释放)它再次使用。
if (!messageReadyForProcessing)
{
await message.AbandonAsyncy();
}
这样更好。你会确定,它是交易性的。即使超过max delivery count,也会进入dead queue
但是还是有一个缺点。真实的定位消息会发生什么?因此,如果您有一条永远不会被处理的消息,它就会定位您的消费者。因为我们增加了最大送货数量。
简而言之:OnMessage 不适合您的解决方案。当您准备好处理它时,只需从队列中获取消息。我认为这是最适合您的解决方案。
if (messageReadyForProcessing)
{
var mySession=QueueClient.AcceptMessageSession();
var message = mySession.Receive();
}
编辑:
您可以使用 TransactionScope with service bus。您应该在范围内处理相同的队列。
Ps: 事务范围不支持放弃。
所以你可以应用这个:
if (!messageReadyForProcessing)
{
using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await message.CompleteAsync();
message.MessageId = Guid.NewGuid().ToString();
await _queueClient.SendAsync(message);
scope.Complete();
}
}
勾选Brokered Messaging: Transactions
我不确定这是否适用于异步。所以你可以测试和检查 Asynchronous Transactions with Service Bus,如果它不成功。
Ps:他们说 net framework 上有一个关于 transactionscope 和异步的错误。他们建议您使用 4.5.1 或更高版本。检查 here 更多。
我们正在使用 Microsoft Azure 服务总线发送命令消息,我们正在使用 OnMessage 方法从总线获取它们。在我们 IMessageSessionAsyncHandler.OnMessageAsync 的实现中,可能是我们意识到消息尚未准备好进行处理。所以我想从总线上获取消息并将它们读到队列的末尾。 get/readd 操作必须是原子的。我怎样才能做到这一点?
这是我目前的解决方案(高度抽象),但我担心非原子性。
queueClient.RegisterSessionHandlerFactory(new CommandSessionHandlerFactory(queueClient), ...);
internal class CommandSessionHandlerFactory : IMessageSessionAsyncHandlerFactory
{
private readonly QueueClient _queueClient;
public CommandSessionHandlerFactory(QueueClient queueClient)
{
_queueClient = queueClient;
}
public IMessageSessionAsyncHandler CreateInstance(MessageSession session, BrokeredMessage message)
{
return new CommandSessionHandlerAsync(_queueClient);
}
}
internal class CommandSessionHandlerAsync : MessageSessionAsyncHandler
{
private readonly QueueClient _queueClient;
public CommandSessionHandlerAsync(QueueClient queueClient)
{
_queueClient = queueClient;
}
protected override async Task OnMessageAsync(MessageSession session, BrokeredMessage message)
{
if (!messageReadyForProcessing)
{
// How to get the following code transactional safe?
var clonedMessage = message.Clone();
await message.CompleteAsync();
await _queueClient.SendAsync(clonedMessage);
}
}
}
那么重复检测呢?我们是否必须更改克隆消息的 MessageId 以确保服务总线重复检测不会丢弃克隆消息?
And what about dublicate detection? Do we have to change the MessageId of the cloned message to be sure that the service bus duplicate detection does not drop the cloned message?
如果启用重复检测。您的消息将从队列中删除。因为您使用相同的 MessageId 并且在给定时间段(默认 10 分钟)中重复检测跟踪 messageId。
if (!messageReadyForProcessing)
{
// How to get the following code transactional safe?
var clonedMessage = message.Clone();
await message.CompleteAsync();
await _queueClient.SendAsync(clonedMessage);
}
你可以改成这样:
if (!messageReadyForProcessing)
{
await message.CompleteAsync();
message.MessageId = Guid.NewGuid().ToString();
await _queueClient.SendAsync(message);
}
但是还是有问题。如果消息成功完成,但发送消息失败怎么办?您将丢失消息。重试策略可以解决这个问题。然而它很脏,不能100%保证。
您可以增加 Max DeliveryCount。除了再次发送消息,只需放弃(释放)它再次使用。
if (!messageReadyForProcessing)
{
await message.AbandonAsyncy();
}
这样更好。你会确定,它是交易性的。即使超过max delivery count,也会进入dead queue
但是还是有一个缺点。真实的定位消息会发生什么?因此,如果您有一条永远不会被处理的消息,它就会定位您的消费者。因为我们增加了最大送货数量。
简而言之:OnMessage 不适合您的解决方案。当您准备好处理它时,只需从队列中获取消息。我认为这是最适合您的解决方案。
if (messageReadyForProcessing)
{
var mySession=QueueClient.AcceptMessageSession();
var message = mySession.Receive();
}
编辑:
您可以使用 TransactionScope with service bus。您应该在范围内处理相同的队列。
Ps: 事务范围不支持放弃。
所以你可以应用这个:
if (!messageReadyForProcessing)
{
using (TransactionScope scope = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
{
await message.CompleteAsync();
message.MessageId = Guid.NewGuid().ToString();
await _queueClient.SendAsync(message);
scope.Complete();
}
}
勾选Brokered Messaging: Transactions
我不确定这是否适用于异步。所以你可以测试和检查 Asynchronous Transactions with Service Bus,如果它不成功。
Ps:他们说 net framework 上有一个关于 transactionscope 和异步的错误。他们建议您使用 4.5.1 或更高版本。检查 here 更多。