异步发布到 Azure 队列
Asynchronously posting to Azure Queues
我尝试像这样在 Azure 队列中异步排队消息:
private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
var tasks = messages.Select(msg => _queue.AddMessageAsync(new CloudQueueMessage(msg),
null, null, null, null));
await Task.WhenAll(tasks);
}
如果我做对了,会显示 "start enqueuing one item after the other without waiting them to get posted, keep a reference for each task and then wait until all get posted"。
此代码在大多数情况下工作正常,但对于大量项目 (5000),它开始排队,然后 抛出超时异常(在排队约 3500 个项目后).
我通过等待每个完成后再继续下一个来解决它
private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
foreach (var message in messages) {
await _queue.AddMessageAsync(new CloudQueueMessage(message), null, null, null, null);
}
}
谁能解释为什么会这样?
异常:
System.AggregateException
which wraps many such exceptions:
Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass4.<CreateCallbackVoid>b__3(IAsyncResult
ar)
Request Information RequestID: RequestDate: StatusMessage: <---
---> (Inner Exception #1) Microsoft.WindowsAzure.Storage.StorageException: The client could not
finish the operation within specified timeout. --->
System.TimeoutException: The client could not finish the operation
within specified timeout. --- End of inner exception stack trace ---
Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult
result)`.
Azure 中的队列设计为每秒 2000 条消息的吞吐量。
参见:Azure Storage Scalability and Performance Targets
When your application reaches the limit of what a partition can handle for your workload, Azure Storage will begin to return error code 503 (Server Busy) or error code 500 (Operation Timeout) responses. When this occurs, the application should use an exponential backoff policy for retries. The exponential backoff allows the load on the partition to decrease, and to ease out spikes in traffic to that partition.
看来你可以通过将 QueryRequestOptions
传递给 AddMessageAsync
来建立更强大的机制。
在发送查询之前,请求消息将这些属性添加到命令中。
我会尝试传递 QueryRequestOptions
并将值设置为 MaximumExecutionTime
and ServerTimeout
并设置一个更大的值。
请求在发送前是这样填写的:
// Microsoft.WindowsAzure.Storage.Queue.QueueRequestOptions
internal void ApplyToStorageCommand<T>(RESTCommand<T> cmd)
{
if (this.LocationMode.HasValue)
{
cmd.LocationMode = this.LocationMode.Value;
}
if (this.ServerTimeout.HasValue)
{
cmd.ServerTimeoutInSeconds = new int?((int)this.ServerTimeout.Value.TotalSeconds);
}
if (this.OperationExpiryTime.HasValue)
{
cmd.OperationExpiryTime = this.OperationExpiryTime;
return;
}
if (this.MaximumExecutionTime.HasValue)
{
cmd.OperationExpiryTime = new DateTime?(DateTime.Now + this.MaximumExecutionTime.Value);
}
}
发送方式如下:
rESTCommand.PreProcessResponse = delegate(RESTCommand<NullType> cmd, HttpWebResponse resp, Exception ex, OperationContext ctx)
{
HttpResponseParsers.ProcessExpectedStatusCodeNoException<NullType>(HttpStatusCode.Created, resp, NullType.Value, cmd, ex);
return NullType.Value;
};
我尝试像这样在 Azure 队列中异步排队消息:
private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
var tasks = messages.Select(msg => _queue.AddMessageAsync(new CloudQueueMessage(msg),
null, null, null, null));
await Task.WhenAll(tasks);
}
如果我做对了,会显示 "start enqueuing one item after the other without waiting them to get posted, keep a reference for each task and then wait until all get posted"。
此代码在大多数情况下工作正常,但对于大量项目 (5000),它开始排队,然后 抛出超时异常(在排队约 3500 个项目后).
我通过等待每个完成后再继续下一个来解决它
private async Task EnqueueItemsAsync(IEnumerable<string> messages) {
foreach (var message in messages) {
await _queue.AddMessageAsync(new CloudQueueMessage(message), null, null, null, null);
}
}
谁能解释为什么会这样?
异常:
System.AggregateException
which wraps many such exceptions:Microsoft.WindowsAzure.Storage.Core.Util.AsyncExtensions.<>c__DisplayClass4.<CreateCallbackVoid>b__3(IAsyncResult ar)
Request Information RequestID: RequestDate: StatusMessage: <--- ---> (Inner Exception #1) Microsoft.WindowsAzure.Storage.StorageException: The client could not finish the operation within specified timeout. ---> System.TimeoutException: The client could not finish the operation within specified timeout. --- End of inner exception stack trace --- Microsoft.WindowsAzure.Storage.Core.Executor.Executor.EndExecuteAsync[T](IAsyncResult result)`.
Azure 中的队列设计为每秒 2000 条消息的吞吐量。
参见:Azure Storage Scalability and Performance Targets
When your application reaches the limit of what a partition can handle for your workload, Azure Storage will begin to return error code 503 (Server Busy) or error code 500 (Operation Timeout) responses. When this occurs, the application should use an exponential backoff policy for retries. The exponential backoff allows the load on the partition to decrease, and to ease out spikes in traffic to that partition.
看来你可以通过将 QueryRequestOptions
传递给 AddMessageAsync
来建立更强大的机制。
在发送查询之前,请求消息将这些属性添加到命令中。
我会尝试传递 QueryRequestOptions
并将值设置为 MaximumExecutionTime
and ServerTimeout
并设置一个更大的值。
请求在发送前是这样填写的:
// Microsoft.WindowsAzure.Storage.Queue.QueueRequestOptions
internal void ApplyToStorageCommand<T>(RESTCommand<T> cmd)
{
if (this.LocationMode.HasValue)
{
cmd.LocationMode = this.LocationMode.Value;
}
if (this.ServerTimeout.HasValue)
{
cmd.ServerTimeoutInSeconds = new int?((int)this.ServerTimeout.Value.TotalSeconds);
}
if (this.OperationExpiryTime.HasValue)
{
cmd.OperationExpiryTime = this.OperationExpiryTime;
return;
}
if (this.MaximumExecutionTime.HasValue)
{
cmd.OperationExpiryTime = new DateTime?(DateTime.Now + this.MaximumExecutionTime.Value);
}
}
发送方式如下:
rESTCommand.PreProcessResponse = delegate(RESTCommand<NullType> cmd, HttpWebResponse resp, Exception ex, OperationContext ctx)
{
HttpResponseParsers.ProcessExpectedStatusCodeNoException<NullType>(HttpStatusCode.Created, resp, NullType.Value, cmd, ex);
return NullType.Value;
};