在 IHostBuilder 托管服务中延迟和重新接收延迟的消息
Deferring and re-receiving a deferred message in an IHostBuilder hosted service
如果 Azure 服务总线消息的处理依赖于另一个资源,例如API 或数据库服务,并且此资源不可用,不调用 CompleteMessageAsync()
不是一个选项,因为消息将立即再次接收,直到达到最大传递计数,然后放入DLQ。如果 API 停机维护,我们想稍等片刻再重试。
的答案之一包含延迟和接收延迟消息的一般步骤。这比 Microsoft 的文档好一点,但不足以让我理解 API 的意图,以及如何在基本上整天坐在 ServiceBusProcessor.StartProcessingAsync 中的托管服务中实现它。
这是我的服务的基本结构:
public class ServiceBusWatcher : IHostedService, IDisposable
{
public Task StartAsync(CancellationToken stoppingToken)
{
ReceiveMessagesAsync();
return Task.CompletedTask;
}
private async void ReceiveMessagesAsync()
{
ServiceBusClient client = new ServiceBusClient(connectionString);
processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
processor.ProcessMessageAsync += MessageHandler;
await processor.StartProcessingAsync();
}
async Task MessageHandler(ProcessMessageEventArgs args)
{
// a dependency is not available that allows me to process a message. so:
await args.DeferMessageAsync(args.Message);
一旦消息被延迟,我的理解是处理器将不会再处理它(或者会吗?)。相反,我必须使用 ReceiveDeferredMessageAsync() 来接收它,以及最初收到的消息的序列号。
就我而言,等待几分钟或几小时再重试是有意义的。
这可以通过使用计时器和显式调用 ReceiveDeferredMessageAsync()
的单独服务来完成,而不是使用 ServiceBusProcessor。我还假设延迟的消息序列号必须保存在非易失性存储中,这样它们就不会丢失。
这听起来像是一种可行的方法吗?我不喜欢必须记住它的序列号,以便稍后可以收到消息。它首先违背了使用消息队列给 table 带来的一切。
或者,我可以 post 一条带有序列号的新“内部”消息,然后使用 ScheduledEnqueueTimeUtc
属性 延迟接收它,而不是延迟接收。收到此消息后,我可以使用该序列号调用 ReceiveDeferredMessageAsync()
以获取原始消息。这在表面上看起来很优雅,但如果依赖项的中断时间更长,消息可能会迅速增加。
另一个可以在没有其他服务的情况下工作的想法:我可以完成并重新post消息的有效负载并将ScheduledEnqueueTimeUtc
设置为将来的某个时间,如对[的另一个答案中所述=17=]。假设这行得通(微软的文档没有提到这个 属性 是做什么用的),它看起来简单干净,我喜欢简单。
你是怎么解决这个问题的?有没有一种 better/preferred 方法可以在不需要大量代码的情况下平衡低复杂度和高健壮性?
当您知道稍后要检索什么消息并且您的接收者将保存消息序列号以检索延迟的消息时,延迟消息就会起作用。如果接收方没有能力保存消息序列号,延迟消息是更好的选择。延迟消息意味着将原始消息数据复制到新安排的消息中并完成原始消息。这样,消费者既不必保留消息序列号,也不必启动特定消息的检索。
如果 Azure 服务总线消息的处理依赖于另一个资源,例如API 或数据库服务,并且此资源不可用,不调用 CompleteMessageAsync()
不是一个选项,因为消息将立即再次接收,直到达到最大传递计数,然后放入DLQ。如果 API 停机维护,我们想稍等片刻再重试。
这是我的服务的基本结构:
public class ServiceBusWatcher : IHostedService, IDisposable
{
public Task StartAsync(CancellationToken stoppingToken)
{
ReceiveMessagesAsync();
return Task.CompletedTask;
}
private async void ReceiveMessagesAsync()
{
ServiceBusClient client = new ServiceBusClient(connectionString);
processor = client.CreateProcessor(queueName, new ServiceBusProcessorOptions());
processor.ProcessMessageAsync += MessageHandler;
await processor.StartProcessingAsync();
}
async Task MessageHandler(ProcessMessageEventArgs args)
{
// a dependency is not available that allows me to process a message. so:
await args.DeferMessageAsync(args.Message);
一旦消息被延迟,我的理解是处理器将不会再处理它(或者会吗?)。相反,我必须使用 ReceiveDeferredMessageAsync() 来接收它,以及最初收到的消息的序列号。
就我而言,等待几分钟或几小时再重试是有意义的。
这可以通过使用计时器和显式调用 ReceiveDeferredMessageAsync()
的单独服务来完成,而不是使用 ServiceBusProcessor。我还假设延迟的消息序列号必须保存在非易失性存储中,这样它们就不会丢失。
这听起来像是一种可行的方法吗?我不喜欢必须记住它的序列号,以便稍后可以收到消息。它首先违背了使用消息队列给 table 带来的一切。
或者,我可以 post 一条带有序列号的新“内部”消息,然后使用 ScheduledEnqueueTimeUtc
属性 延迟接收它,而不是延迟接收。收到此消息后,我可以使用该序列号调用 ReceiveDeferredMessageAsync()
以获取原始消息。这在表面上看起来很优雅,但如果依赖项的中断时间更长,消息可能会迅速增加。
另一个可以在没有其他服务的情况下工作的想法:我可以完成并重新post消息的有效负载并将ScheduledEnqueueTimeUtc
设置为将来的某个时间,如对[的另一个答案中所述=17=]。假设这行得通(微软的文档没有提到这个 属性 是做什么用的),它看起来简单干净,我喜欢简单。
你是怎么解决这个问题的?有没有一种 better/preferred 方法可以在不需要大量代码的情况下平衡低复杂度和高健壮性?
当您知道稍后要检索什么消息并且您的接收者将保存消息序列号以检索延迟的消息时,延迟消息就会起作用。如果接收方没有能力保存消息序列号,延迟消息是更好的选择。延迟消息意味着将原始消息数据复制到新安排的消息中并完成原始消息。这样,消费者既不必保留消息序列号,也不必启动特定消息的检索。