QueueUtil 为 MessageHandlers 生成了太多线程,async/await 问题

QueueUtil spawns too many threads for MessageHandlers, async/await problem

更新: 错误在于将 Action<QueueMessage> onSuccess 用于实际任务并在没有 await 的情况下调用它。我们实际上将 async 回调作为 onSuccess 传递,因此它不会阻塞,只是在不等待它的情况下被解雇。将类型更改为 Func<QueueMessage, Task> 并通过 await onSuccess(messageObj) 调用修复了行为。

我正在接收消息,每条消息都由涉及 IO 和 CPU 密集例程的长 运行 作业处理。我的问题是我注册的消息处理程序被调用了太多次(基本上与消息一样多),这使我的进程吃掉了 100% 的 CPU 资源,一切都变得非常慢。

我有一个围绕 QueueClient 的包装器,我想我因此遇到了麻烦。 我知道我的代码有误,但我对 async/await 如何在 .NET 中工作缺乏足够的了解,无法自行修复它。

这是包装器代码,它向 Azure 存储获取 json 消息或 link,因此它处理此逻辑并将其传递给回调。

public static void RegisterReceiver(string queueName,
        Action<QueueMessage> onSuccess, Action<ExceptionReceivedEventArgs> onError)
        _queueClient.RegisterMessageHandler(
                async (message, cancellationToken) =>
                {
                    var messageObj = new QueueMessage(message);
                    var regex = new Regex(@"\[blob%([^\]]+)\](.+)");
                    if (regex.IsMatch(messageObj.Body))
                    {
                        // Pulling blob data from storage
                        var match = regex.Match(messageObj.Body);
                        messageObj.ContentType = match.Groups[1].Value;
                        messageObj.Body = await BlobUtil.GetAndDeleteBlobAsync(match.Groups[2].Value, settings.QueueBlobStorageName);
                    }
                    onSuccess(messageObj);
                }, 
                new MessageHandlerOptions((exceptionArgs) =>
                {
                    onError(exceptionArgs);
                    return Task.CompletedTask;
                })
                {
                    MaxConcurrentCalls = 1
                }
            );

这是带有最终回调的调用代码:

QueueUtil.RegisterReceiver(QueueUtil.QUEUE_NAME, 
                async message =>
                {
                    await processService.RunJobs(message);
                }, 
                exceptionArgs =>
                {
                    throw new Exception("Exception occurred while receiving message from " + QueueUtil.QUEUE_NAME, 
                        exceptionArgs.Exception);
                }
            );

所以不知何故,所有这些 async/await 东西,我的接收者只是在消息到达后立即生成线程。

错误在于将 Action<QueueMessage> onSuccess 用于实际任务并在没有 await 的情况下调用它。我们实际上将 async 回调作为 onSuccess 传递,因此它不会阻塞,只是在不等待它的情况下被解雇。将类型更改为 Func<QueueMessage, Task> 并通过 await onSuccess(messageObj) 调用修复了行为。