Azure QueueClient MaxConcurrentCalls 不工作
Azure QueueClient MaxConcurrentCalls Not Working
我的任务是提高处理来自队列的消息的辅助角色的性能。
它使用 queueclient.OnMessage
模型,MaxConcurrentCalls
设置为 1。
在处理过程中有 thread.sleep 5 分钟,这会影响整体性能,即如果队列中有 10 条消息,它至少需要 45 分钟才能处理第 10 条消息
我想如果我将 MaxConcurrentCalls 更改为 5,那么它应该并行处理 5 条消息并将等待时间减少 25 分钟,但这不起作用:(
我还尝试将 OnMessageAsync 与 MaxConcurrentCalls 一起使用,但没有成功。下面是我试过的片段,
.OnMessageAsync( async (brokeredMessage) =>
{
bool shouldAbandon = false;
try
{
logger.Debug("Rcvd:" + brokeredMessage.SequenceNumber);
SomeTask(brokeredMessage);
await brokeredMessage.CompleteAsync();
}
catch (Exception ex)
{
logger.Error(String.Format("An Error occured {0}", ex.ToString()));
shouldAbandon = true;
}
if (shouldAbandon)
{
await brokeredMessage.AbandonAsync();
}
}, new OnMessageOptions { AutoComplete = false, MaxConcurrentCalls = 10 });
private void SomeTask(BrokeredMessage bm)
{
logger.Debug("id: " + bm.MessageId + "on thread: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(30 * 1000);
logger.Debug("seq: " + bm.SequenceNumber);
logger.Debug("body: " + bm.GetBody<string>());
}
我能想到的最后一个选项是在 OnMessage 事件上启动一个新任务。有几件事我需要处理(确保已完成的任务从 collection/main 线程中删除并从 BrokeredMessage 传递一个对象,因为 BrokeredMessage 已被处理,所以我不能在任务线程上使用)和我也对其进行了理智测试,但我不相信这是最好的解决方案。
将 SomeTask
实施替换为以下内容:
async Task SomeTask(BrokeredMessage bm)
{
logger.Debug("id: " + bm.MessageId + "on thread: " + Thread.CurrentThread.ManagedThreadId);
await Task.Delay(30 * 1000).ConfigureAwait(false);
logger.Debug("seq: " + bm.SequenceNumber);
logger.Debug("body: " + bm.GetBody<string>());
}
当 Thread.Sleep
与 async
在同一线程上混合时(在您的情况下),它会导致线程 'stall' 所有任务。由于没有 UI 工作,更喜欢在异步操作上使用 .ConfigureAwait(false)
以允许调度程序管理要在哪个线程上执行。
我的任务是提高处理来自队列的消息的辅助角色的性能。
它使用 queueclient.OnMessage
模型,MaxConcurrentCalls
设置为 1。
在处理过程中有 thread.sleep 5 分钟,这会影响整体性能,即如果队列中有 10 条消息,它至少需要 45 分钟才能处理第 10 条消息
我想如果我将 MaxConcurrentCalls 更改为 5,那么它应该并行处理 5 条消息并将等待时间减少 25 分钟,但这不起作用:(
我还尝试将 OnMessageAsync 与 MaxConcurrentCalls 一起使用,但没有成功。下面是我试过的片段,
.OnMessageAsync( async (brokeredMessage) =>
{
bool shouldAbandon = false;
try
{
logger.Debug("Rcvd:" + brokeredMessage.SequenceNumber);
SomeTask(brokeredMessage);
await brokeredMessage.CompleteAsync();
}
catch (Exception ex)
{
logger.Error(String.Format("An Error occured {0}", ex.ToString()));
shouldAbandon = true;
}
if (shouldAbandon)
{
await brokeredMessage.AbandonAsync();
}
}, new OnMessageOptions { AutoComplete = false, MaxConcurrentCalls = 10 });
private void SomeTask(BrokeredMessage bm)
{
logger.Debug("id: " + bm.MessageId + "on thread: " + Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(30 * 1000);
logger.Debug("seq: " + bm.SequenceNumber);
logger.Debug("body: " + bm.GetBody<string>());
}
我能想到的最后一个选项是在 OnMessage 事件上启动一个新任务。有几件事我需要处理(确保已完成的任务从 collection/main 线程中删除并从 BrokeredMessage 传递一个对象,因为 BrokeredMessage 已被处理,所以我不能在任务线程上使用)和我也对其进行了理智测试,但我不相信这是最好的解决方案。
将 SomeTask
实施替换为以下内容:
async Task SomeTask(BrokeredMessage bm)
{
logger.Debug("id: " + bm.MessageId + "on thread: " + Thread.CurrentThread.ManagedThreadId);
await Task.Delay(30 * 1000).ConfigureAwait(false);
logger.Debug("seq: " + bm.SequenceNumber);
logger.Debug("body: " + bm.GetBody<string>());
}
当 Thread.Sleep
与 async
在同一线程上混合时(在您的情况下),它会导致线程 'stall' 所有任务。由于没有 UI 工作,更喜欢在异步操作上使用 .ConfigureAwait(false)
以允许调度程序管理要在哪个线程上执行。