在处理消息的过程中正确处理 Azure 服务总线客户端

Properly disposing of Azure Service Bus Client while a message is in the middle of being handled

使用 async/await 时,我关心的是在处理消息的过程中处理客户端。考虑以下因素:

  1. 初始化队列客户端 queueClient 并将对它的引用存储在 class

  2. 的全局范围内
  3. 队列客户端处理一条消息并调用一些应用程序代码来处理它,这可能最终会执行一些异步数据库工作或调用远程 api.

  4. 考虑该应用程序是一个 windows 服务,带有一个 CloseAsync 方法,该方法在服务应该关闭时发出信号。在此方法中,我调用 queueClient.CloseAsync()

  5. 第 2 步中完成的工作完成并调用 message.Complete()。此时我假设 queueClient 已关闭并且消息将被视为失败。

确保队列客户端不再处理消息并等待关闭直到所有当前处理的消息完成的最佳做法是什么?

您可以使用 CancellationToken 取消步骤 2 的工作 and/or 在等待调用 queueClient.CloseAsync() 之前等待步骤 4 中的异步消息处理代码。我想你很熟悉 Tasks and Cancellation.

等待消息处理任务

  1. 初始化队列客户端 queueClient 并将对它的引用存储在 class

    的全局范围内
  2. 队列客户端处理一条消息并调用一些应用程序代码来处理它,这可能最终会执行一些异步数据库工作或调用远程 api,例如 public Task HandleMessageAsync() {..} .在 class 的全局范围内存储对此任务的引用。例如private Task messageHandleTask;

  3. 考虑该应用程序是一个 windows 服务,带有一个 CloseAsync 方法,该方法在服务应该关闭时发出信号。在此方法中,我先调用 await messageHandleTask 然后调用 await queueClient.CloseAsync()

  4. 我们都长寿又幸福。

在这种情况下,在消息处理完成之前,服务不会完全停止。

取消消息处理任务

  1. 初始化队列客户端 queueClient 并将对它的引用存储在 class

    的全局范围内
  2. 队列客户端处理消息并调用一些应用程序代码来处理它,传递 CancellationToken,这可能最终会执行一些异步数据库工作或调用远程 api,例如 public Task HandleMessageAsync(CancellationToken token) {..}。在 class.

    的全局范围内存储对此任务的引用
  3. 考虑该应用程序是一个 windows 服务,带有一个 CloseAsync 方法,该方法在服务应该关闭时发出信号。在此方法中,我先调用 cancellationTokenSource.Cancel(),然后调用 await messageHandleTask,最后调用 await queueClient.CloseAsync()

  4. 我们都长寿又幸福。

在这种情况下,在消息处理代码中,就在调用 message.Complete(). 之前检查是否有任何取消:token.ThrowIfCancellationRequested。在这种情况下,当服务关闭时,消息永远不会达到完成状态,并将在稍后处理。 (请注意,我不知道所涉及的代码,因此如果在取消发生之前工作已经部分完成,这种情况可能会很复杂)请务必处理任何 OperationCanceledException.

并发消息处理

在并发处理多条消息的情况下,涉及更多的逻辑。流程将是这样的:

  1. 当 windows 服务即将关闭时,我们不得不停止处理更多消息
  2. 该进程应等待当时正在处理的消息完成
  3. 现在我们可以调用 queueClient.CloseAsync().

不幸的是,没有停止接受更多消息的标准机制,所以我们必须自己构建它。有一个 Azure Feedback item 请求这个,但它仍然是开放的。

基于此 documentation example

,我提出了以下实现上述流程的解决方案
QueueClient queueClient;
CancellationTokenSource cts = new CancellationTokenSource();
ActionBlock<Message> actionBlock;

async Task Main()
{
    // Define message processing pipeline
    actionBlock = new ActionBlock<Message>(ProcessMessagesAsync, new ExecutionDataflowBlockOptions
    {
        BoundedCapacity = 10,
        MaxDegreeOfParallelism = 10
    });
    
    queueClient = new QueueClient("Endpoint=sb:xxx", "test");

    RegisterOnMessageHandlerAndReceiveMessages(cts.Token);

    Console.WriteLine("Press [Enter] to stop processing messages");
    Console.ReadLine();
    
    // Signal the message handler to stop processing messages, step 1 of the flow
    cts.Cancel();
    
    // Signal the processing pipeline that no more message will come in,  step 1 of the flow
    actionBlock.Complete();
    
    // Wait for all messages to be done before closing the client, step 2 of the flow
    await actionBlock.Completion;
        
    await queueClient.CloseAsync(); // step 3 of the flow
    Console.ReadLine();
}

void RegisterOnMessageHandlerAndReceiveMessages(CancellationToken stoppingToken)
{
    // Configure the message handler options in terms of exception handling, number of concurrent messages to deliver, etc.
    var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler)
    {
        // Maximum number of concurrent calls to the callback ProcessMessagesAsync(), set to 1 for simplicity.
        // Set it according to how many messages the application wants to process in parallel.
        MaxConcurrentCalls = 10,

        // Indicates whether the message pump should automatically complete the messages after returning from user callback.
        // False below indicates the complete operation is handled by the user callback as in ProcessMessagesAsync().
        AutoComplete = false
    };

    // Register the function that processes messages.
    queueClient.RegisterMessageHandler(async (msg, token) =>
    {
        // When the stop signal is given, do not accept more messages for processing
        if(stoppingToken.IsCancellationRequested)
            return;
            
        await actionBlock.SendAsync(msg);
        
    }, messageHandlerOptions);
}

async Task ProcessMessagesAsync(Message message)
{
    Console.WriteLine($"Received message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");

    // Process the message.
    await Task.Delay(5000);
    
    // Complete the message so that it is not received again.
    // This can be done only if the queue Client is created in ReceiveMode.PeekLock mode (which is the default).
    await queueClient.CompleteAsync(message.SystemProperties.LockToken);

    Console.WriteLine($"Completed message: SequenceNumber:{message.SystemProperties.SequenceNumber} Body:{Encoding.UTF8.GetString(message.Body)}");
}

Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs)
{
    Console.WriteLine($"Message handler encountered an exception {exceptionReceivedEventArgs.Exception}.");
    var context = exceptionReceivedEventArgs.ExceptionReceivedContext;
    Console.WriteLine("Exception context for troubleshooting:");
    Console.WriteLine($"- Endpoint: {context.Endpoint}");
    Console.WriteLine($"- Entity Path: {context.EntityPath}");
    Console.WriteLine($"- Executing Action: {context.Action}");
    return Task.CompletedTask;
}