如何正确地限制从 WebJobs 访问 DocumentDb
How properly to throttle access to DocumentDb from WebJobs
我有一个带有 blob 和队列触发器的 Azure WebKob,用于将数据保存到 Azure DocumentDb。
有时我会收到错误消息:
Microsoft.Azure.Documents.RequestRateTooLargeException: Message: {"Errors":["Request rate is large"]}
目前我使用此代码限制请求。 WebJob 函数:
public async Task ParseCategoriesFromCsv(...)
{
double find = 2.23, add = 5.9, replace = 10.67;
double requestCharge = Math.Round(find + Math.Max(add, replace));
await categoryProvider.SaveCategories(requestCharge , categories);
}
用于操作文档数据库客户端的类别提供程序:
public async Task<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
var scheduler = new IntervalTaskScheduler(requestDelay, Scheduler.Default); // Rx
var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});
return await Task.WhenAll(documents.Select(async d =>
await scheduler.ScheduleTask(
() => client.PutDocumentToDb(collectionOptions.CollectionLink, d.SearchIndex, d))));
}
任务调度程序 throttle/measure/synchronize 请求:
private readonly Subject<Action> _requests = new Subject<Action>();
private readonly IDisposable _observable;
public IntervalTaskScheduler(TimeSpan requestDelay, IScheduler scheduler)
{
_observable = _requests.Select(i => Observable.Empty<Action>()
.Delay(requestDelay)
.StartWith(i))
.Concat()
.ObserveOn(scheduler)
.Subscribe(action => action());
}
public Task<T> ScheduleTask<T>(Func<Task<T>> request)
{
var tcs = new TaskCompletionSource<T>();
_requests.OnNext(async () =>
{
try
{
T result = await request();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
所以它基本上是 ResourceResponse<Document>.RequestCharge
中的一些常量但是:
- 当我触发 1 个队列时,它工作正常,但当触发 8 个队列时,它会抛出错误。
- 如果将请求费用增加 8 倍,则 8 个队列可以正常工作,但只有 1 个队列的工作速度比它慢 8 倍。
什么 throttling/measuring/synchronization 机制可以在这里很好地工作?
收到 429(请求率太大)时,响应会告诉您等待多长时间。有一个headerx-ms-retry-after。这是有价值的。以毫秒为单位等待该时间段。
catch (AggregateException ex) when (ex.InnerException is DocumentClientException)
{
DocumentClientException dce = (DocumentClientException)ex.InnerException;
switch ((int)dce.StatusCode)
{
case 429:
Thread.Sleep(dce.RetryAfter);
break;
default:
Console.WriteLine(" Failed: {0}", ex.InnerException.Message);
throw;
}
}
在我看来,您应该能够使用您的 SaveCategories
方法来做到这一点,以使其与 Rx 一起很好地工作:
public IObservable<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});
return
Observable.Interval(requestDelay)
.Zip(documents, (delay, doc) => doc)
.SelectMany(doc => Observable.FromAsync(() => client.PutDocumentToDb(collectionOptions.CollectionLink, doc.SearchIndex, doc)))
.ToArray();
}
这完全摆脱了你的 IntervalTaskScheduler
class 并确保你将请求率限制为每个 requestDelay
时间跨度的一个请求,但允许响应花费尽可能长的时间如所须。当 observable 完成时,.ToArray()
调用将 returns 许多值的 IObservable<ResourceResponse<Document>>
变成 returns 单个值数组的 IObservable<ResourceResponse<Document>[]>
。
我无法测试你的代码,所以我测试了一个我认为模拟你的代码的示例:
var r = new Random();
var a = Enumerable.Range(0, 1000);
var i = Observable.Interval(TimeSpan.FromSeconds(2.0));
var sw = Stopwatch.StartNew();
var query =
i.Zip(a, (ii, aa) => aa)
.SelectMany(aa => Observable.Start(() =>
{
var x = sw.Elapsed.TotalMilliseconds;
Thread.Sleep(r.Next(0, 5000));
return x;
}))
.Select(x => new
{
started = x,
ended = sw.Elapsed.TotalMilliseconds
});
我得到了这样的结果,表明请求被限制了:
4026.2983 5259.7043
2030.1287 6940.2326
6027.0439 9664.1045
8027.9993 10207.0579
10028.1762 12301.4746
12028.3190 12711.4440
14040.7972 17433.1964
16040.9267 17574.5924
18041.0529 19077.5545
从.NET SDK 1.8.0开始,我们会在合理的范围内自动处理请求率过大的异常(默认重试9次,并在从服务器返回后接受重试以进行下一次重试)。
如果您需要更好的控制,您可以在传递给 DocumentClient 对象的 ConnectionPolicy 实例上配置 RetryOptions,我们将用它覆盖默认的重试策略。
因此您不再需要像上面那样在应用程序代码中添加任何自定义逻辑来处理 429 异常。
我有一个带有 blob 和队列触发器的 Azure WebKob,用于将数据保存到 Azure DocumentDb。
有时我会收到错误消息:
Microsoft.Azure.Documents.RequestRateTooLargeException: Message: {"Errors":["Request rate is large"]}
目前我使用此代码限制请求。 WebJob 函数:
public async Task ParseCategoriesFromCsv(...)
{
double find = 2.23, add = 5.9, replace = 10.67;
double requestCharge = Math.Round(find + Math.Max(add, replace));
await categoryProvider.SaveCategories(requestCharge , categories);
}
用于操作文档数据库客户端的类别提供程序:
public async Task<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
var scheduler = new IntervalTaskScheduler(requestDelay, Scheduler.Default); // Rx
var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});
return await Task.WhenAll(documents.Select(async d =>
await scheduler.ScheduleTask(
() => client.PutDocumentToDb(collectionOptions.CollectionLink, d.SearchIndex, d))));
}
任务调度程序 throttle/measure/synchronize 请求:
private readonly Subject<Action> _requests = new Subject<Action>();
private readonly IDisposable _observable;
public IntervalTaskScheduler(TimeSpan requestDelay, IScheduler scheduler)
{
_observable = _requests.Select(i => Observable.Empty<Action>()
.Delay(requestDelay)
.StartWith(i))
.Concat()
.ObserveOn(scheduler)
.Subscribe(action => action());
}
public Task<T> ScheduleTask<T>(Func<Task<T>> request)
{
var tcs = new TaskCompletionSource<T>();
_requests.OnNext(async () =>
{
try
{
T result = await request();
tcs.SetResult(result);
}
catch (Exception ex)
{
tcs.SetException(ex);
}
});
return tcs.Task;
}
所以它基本上是 ResourceResponse<Document>.RequestCharge
中的一些常量但是:
- 当我触发 1 个队列时,它工作正常,但当触发 8 个队列时,它会抛出错误。
- 如果将请求费用增加 8 倍,则 8 个队列可以正常工作,但只有 1 个队列的工作速度比它慢 8 倍。
什么 throttling/measuring/synchronization 机制可以在这里很好地工作?
收到 429(请求率太大)时,响应会告诉您等待多长时间。有一个headerx-ms-retry-after。这是有价值的。以毫秒为单位等待该时间段。
catch (AggregateException ex) when (ex.InnerException is DocumentClientException)
{
DocumentClientException dce = (DocumentClientException)ex.InnerException;
switch ((int)dce.StatusCode)
{
case 429:
Thread.Sleep(dce.RetryAfter);
break;
default:
Console.WriteLine(" Failed: {0}", ex.InnerException.Message);
throw;
}
}
在我看来,您应该能够使用您的 SaveCategories
方法来做到这一点,以使其与 Rx 一起很好地工作:
public IObservable<ResourceResponse<Document>[]> SaveCategories(double requestCharge, Category[] categories)
{
var requestDelay = TimeSpan.FromSeconds(60.0 / (collectionOptions.RequestUnits / requestCharge));
var client = new DocumentClient(endpoint, authorizationKey,
new ConnectionPolicy
{
ConnectionMode = documentDbOptions.ConnectionMode,
ConnectionProtocol = documentDbOptions.ConnectionProtocol
});
return
Observable.Interval(requestDelay)
.Zip(documents, (delay, doc) => doc)
.SelectMany(doc => Observable.FromAsync(() => client.PutDocumentToDb(collectionOptions.CollectionLink, doc.SearchIndex, doc)))
.ToArray();
}
这完全摆脱了你的 IntervalTaskScheduler
class 并确保你将请求率限制为每个 requestDelay
时间跨度的一个请求,但允许响应花费尽可能长的时间如所须。当 observable 完成时,.ToArray()
调用将 returns 许多值的 IObservable<ResourceResponse<Document>>
变成 returns 单个值数组的 IObservable<ResourceResponse<Document>[]>
。
我无法测试你的代码,所以我测试了一个我认为模拟你的代码的示例:
var r = new Random();
var a = Enumerable.Range(0, 1000);
var i = Observable.Interval(TimeSpan.FromSeconds(2.0));
var sw = Stopwatch.StartNew();
var query =
i.Zip(a, (ii, aa) => aa)
.SelectMany(aa => Observable.Start(() =>
{
var x = sw.Elapsed.TotalMilliseconds;
Thread.Sleep(r.Next(0, 5000));
return x;
}))
.Select(x => new
{
started = x,
ended = sw.Elapsed.TotalMilliseconds
});
我得到了这样的结果,表明请求被限制了:
4026.2983 5259.7043
2030.1287 6940.2326
6027.0439 9664.1045
8027.9993 10207.0579
10028.1762 12301.4746
12028.3190 12711.4440
14040.7972 17433.1964
16040.9267 17574.5924
18041.0529 19077.5545
从.NET SDK 1.8.0开始,我们会在合理的范围内自动处理请求率过大的异常(默认重试9次,并在从服务器返回后接受重试以进行下一次重试)。
如果您需要更好的控制,您可以在传递给 DocumentClient 对象的 ConnectionPolicy 实例上配置 RetryOptions,我们将用它覆盖默认的重试策略。
因此您不再需要像上面那样在应用程序代码中添加任何自定义逻辑来处理 429 异常。