Azure 辅助角色多线程队列处理
Azure Worker Role multithread queue processing
我有一个具有经典配置 WebRole + Worker 角色的 Azure 云服务。
Worker 从队列中获取消息,处理它而不是一次删除它。
我的代码是这样的:
public override void Run()
{
Trace.TraceInformation("Worker is running");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
ServicePointManager.DefaultConnectionLimit = 500;
bool result = base.OnStart();
Trace.TraceInformation("WorkerAnalytics has been started");
return result;
}
private async Task RunAsync(CancellationToken cancellationToken)
{
var queue = ....//omitted info for brevity
CloudQueueMessage retrievedMessage = null;
while (!cancellationToken.IsCancellationRequested)
{
try
{
retrievedMessage = await queue.GetMessageAsync();
if (retrievedMessage != null)
{
await ProcessMessage(retrievedMessage);
}
else
{
System.Threading.Thread.Sleep(500);
}
}
catch (Exception e)
{
System.Threading.Thread.Sleep(500);
}
}
}
}
现在这很完美,但是 CPU 非常低,只有 3%,它一次只处理一个元素(每次大约 1 秒),但是队列每秒有大约 1000 个新元素并且还不够。
如何使用机器的所有 CPU 能力同时处理更多队列消息,而不会使此代码过于复杂?
ServicePointManager.DefaultConnectionLimit还有什么用?
我搜索了几个小时来寻找一个有效的 Worker Roles 多线程解决方案,但现在所有的 WebJobs 或旧框架都使事情变得复杂。
谢谢
您可以尝试 运行 多个 RunAsync()
任务。
var tasks = new List<Task>();
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
Task.WaitAll(tasks.ToArray());
我有一个具有经典配置 WebRole + Worker 角色的 Azure 云服务。 Worker 从队列中获取消息,处理它而不是一次删除它。
我的代码是这样的:
public override void Run()
{
Trace.TraceInformation("Worker is running");
try
{
this.RunAsync(this.cancellationTokenSource.Token).Wait();
}
finally
{
this.runCompleteEvent.Set();
}
}
public override bool OnStart()
{
ServicePointManager.DefaultConnectionLimit = 500;
bool result = base.OnStart();
Trace.TraceInformation("WorkerAnalytics has been started");
return result;
}
private async Task RunAsync(CancellationToken cancellationToken)
{
var queue = ....//omitted info for brevity
CloudQueueMessage retrievedMessage = null;
while (!cancellationToken.IsCancellationRequested)
{
try
{
retrievedMessage = await queue.GetMessageAsync();
if (retrievedMessage != null)
{
await ProcessMessage(retrievedMessage);
}
else
{
System.Threading.Thread.Sleep(500);
}
}
catch (Exception e)
{
System.Threading.Thread.Sleep(500);
}
}
}
}
现在这很完美,但是 CPU 非常低,只有 3%,它一次只处理一个元素(每次大约 1 秒),但是队列每秒有大约 1000 个新元素并且还不够。
如何使用机器的所有 CPU 能力同时处理更多队列消息,而不会使此代码过于复杂?
ServicePointManager.DefaultConnectionLimit还有什么用?
我搜索了几个小时来寻找一个有效的 Worker Roles 多线程解决方案,但现在所有的 WebJobs 或旧框架都使事情变得复杂。
谢谢
您可以尝试 运行 多个 RunAsync()
任务。
var tasks = new List<Task>();
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
tasks.Add(this.RunAsync(this.cancellationTokenSource.Token));
Task.WaitAll(tasks.ToArray());