QueueUtil 为 MessageHandlers 生成了太多线程,async/await 问题
QueueUtil spawns too many threads for MessageHandlers, async/await problem
更新:
错误在于将 Action<QueueMessage> onSuccess
用于实际任务并在没有 await
的情况下调用它。我们实际上将 async
回调作为 onSuccess 传递,因此它不会阻塞,只是在不等待它的情况下被解雇。将类型更改为 Func<QueueMessage, Task>
并通过 await onSuccess(messageObj)
调用修复了行为。
我正在接收消息,每条消息都由涉及 IO 和 CPU 密集例程的长 运行 作业处理。我的问题是我注册的消息处理程序被调用了太多次(基本上与消息一样多),这使我的进程吃掉了 100% 的 CPU 资源,一切都变得非常慢。
我有一个围绕 QueueClient 的包装器,我想我因此遇到了麻烦。
我知道我的代码有误,但我对 async/await 如何在 .NET 中工作缺乏足够的了解,无法自行修复它。
这是包装器代码,它向 Azure 存储获取 json 消息或 link,因此它处理此逻辑并将其传递给回调。
public static void RegisterReceiver(string queueName,
Action<QueueMessage> onSuccess, Action<ExceptionReceivedEventArgs> onError)
_queueClient.RegisterMessageHandler(
async (message, cancellationToken) =>
{
var messageObj = new QueueMessage(message);
var regex = new Regex(@"\[blob%([^\]]+)\](.+)");
if (regex.IsMatch(messageObj.Body))
{
// Pulling blob data from storage
var match = regex.Match(messageObj.Body);
messageObj.ContentType = match.Groups[1].Value;
messageObj.Body = await BlobUtil.GetAndDeleteBlobAsync(match.Groups[2].Value, settings.QueueBlobStorageName);
}
onSuccess(messageObj);
},
new MessageHandlerOptions((exceptionArgs) =>
{
onError(exceptionArgs);
return Task.CompletedTask;
})
{
MaxConcurrentCalls = 1
}
);
这是带有最终回调的调用代码:
QueueUtil.RegisterReceiver(QueueUtil.QUEUE_NAME,
async message =>
{
await processService.RunJobs(message);
},
exceptionArgs =>
{
throw new Exception("Exception occurred while receiving message from " + QueueUtil.QUEUE_NAME,
exceptionArgs.Exception);
}
);
所以不知何故,所有这些 async/await 东西,我的接收者只是在消息到达后立即生成线程。
错误在于将 Action<QueueMessage> onSuccess
用于实际任务并在没有 await
的情况下调用它。我们实际上将 async
回调作为 onSuccess 传递,因此它不会阻塞,只是在不等待它的情况下被解雇。将类型更改为 Func<QueueMessage, Task>
并通过 await onSuccess(messageObj)
调用修复了行为。
更新:
错误在于将 Action<QueueMessage> onSuccess
用于实际任务并在没有 await
的情况下调用它。我们实际上将 async
回调作为 onSuccess 传递,因此它不会阻塞,只是在不等待它的情况下被解雇。将类型更改为 Func<QueueMessage, Task>
并通过 await onSuccess(messageObj)
调用修复了行为。
我正在接收消息,每条消息都由涉及 IO 和 CPU 密集例程的长 运行 作业处理。我的问题是我注册的消息处理程序被调用了太多次(基本上与消息一样多),这使我的进程吃掉了 100% 的 CPU 资源,一切都变得非常慢。
我有一个围绕 QueueClient 的包装器,我想我因此遇到了麻烦。 我知道我的代码有误,但我对 async/await 如何在 .NET 中工作缺乏足够的了解,无法自行修复它。
这是包装器代码,它向 Azure 存储获取 json 消息或 link,因此它处理此逻辑并将其传递给回调。
public static void RegisterReceiver(string queueName,
Action<QueueMessage> onSuccess, Action<ExceptionReceivedEventArgs> onError)
_queueClient.RegisterMessageHandler(
async (message, cancellationToken) =>
{
var messageObj = new QueueMessage(message);
var regex = new Regex(@"\[blob%([^\]]+)\](.+)");
if (regex.IsMatch(messageObj.Body))
{
// Pulling blob data from storage
var match = regex.Match(messageObj.Body);
messageObj.ContentType = match.Groups[1].Value;
messageObj.Body = await BlobUtil.GetAndDeleteBlobAsync(match.Groups[2].Value, settings.QueueBlobStorageName);
}
onSuccess(messageObj);
},
new MessageHandlerOptions((exceptionArgs) =>
{
onError(exceptionArgs);
return Task.CompletedTask;
})
{
MaxConcurrentCalls = 1
}
);
这是带有最终回调的调用代码:
QueueUtil.RegisterReceiver(QueueUtil.QUEUE_NAME,
async message =>
{
await processService.RunJobs(message);
},
exceptionArgs =>
{
throw new Exception("Exception occurred while receiving message from " + QueueUtil.QUEUE_NAME,
exceptionArgs.Exception);
}
);
所以不知何故,所有这些 async/await 东西,我的接收者只是在消息到达后立即生成线程。
错误在于将 Action<QueueMessage> onSuccess
用于实际任务并在没有 await
的情况下调用它。我们实际上将 async
回调作为 onSuccess 传递,因此它不会阻塞,只是在不等待它的情况下被解雇。将类型更改为 Func<QueueMessage, Task>
并通过 await onSuccess(messageObj)
调用修复了行为。