使用 Polly 处理 CosmosDb Mongo 429 错误的正确方法
Proper way to handle CosmosDb Mongo 429 error with Polly
我在 Azure Web 应用程序上部署了一个应用程序,该应用程序使用 .netCore 3 中的 mongoDb 驱动程序与 CosmosDB 数据库进行交互。
根据 this 文档,我必须设置重试策略,以便在 RU/s 不可用时处理 429 错误代码。考虑到当发生 429 错误时我没有看到错误显示,我找不到正确的方法来处理 Polly 的策略。
唯一正确的方法是使用以下政策:
_retryPolicy = Policy
.Handle<MongoCommandException>(r => r.Message.Contains("Request rate is large"))
.WaitAndRetry(3, i => TimeSpan.FromSeconds(1));
下面是 Polly 策略的使用:
public long CountProjetByProjectNumber(string projectNumber)
{
long result = 0;
_retryPolicy.Execute(() =>
{
result = _mongoCollection.CountDocuments(x => x.ProjectNumber == projectNumber);
});
return result;
}
有人在使用 Mongodb 驱动程序的 CosmosDb 中发生 429 异常时有正确的错误显示吗?或者有人可以告诉我他如何正确处理它。
为了正确处理速率限制和超时,您实际上还需要处理更多异常 - 特别是在使用较新的 MongoDB V3.6 端点(与旧的 V3.6 端点相反)时。 2 端点)。
- For V3.2 Endpoints: The two exceptions you care about is the
MongoCommandException
and MongoExecutionTimeoutException
. The
MongoCommandException
includes a BsonDocument
property in its
Result
field. This document has a StatusCode
you can use to detect 429. That said, from my testing, I also found that I had to handle Http Service Unavailable (1) and Operation Exceeded Time Limit
(50) status codes.
- For V3.6 Endpoints: You probably also want to handle
MongoWriteException
and MongoBulkWriteException
. These exceptions
include a RetryAfterMs=
value in the exception message (not
always though!). Unfortunately, this value does not seem to be
directly exposed via a class property - most likely because this is
a CosmosDB specific feature and thus does not map to the MongoDB
driver defined exceptions.
以下代码是在 .NET Standard 2.0 中实现的,应该能为您提供一个良好的起点。您肯定会希望根据您的情况和测试调整一些常量。
public static class Policies
{
public const int HttpThrottleErrorCode = 429;
public const int HttpServiceIsUnavailable = 1;
public const int HttpOperationExceededTimeLimit = 50;
public const int RateLimitCode = 16500;
public const string RetryAfterToken = "RetryAfterMs=";
public const int MaxRetries = 10;
public static readonly int RetryAfterTokenLength = RetryAfterToken.Length;
private static readonly Random JitterSeed = new Random();
public static readonly IAsyncPolicy NoPolicy = Policy.NoOpAsync();
public static Func<int, TimeSpan> SleepDurationProviderWithJitter(double exponentialBackoffInSeconds, int maxBackoffTimeInSeconds) => retryAttempt
=> TimeSpan.FromSeconds(Math.Min(Math.Pow(exponentialBackoffInSeconds, retryAttempt), maxBackoffTimeInSeconds)) // exponential back-off: 2, 4, 8 etc
+ TimeSpan.FromMilliseconds(JitterSeed.Next(0, 1000)); // plus some jitter: up to 1 second
public static readonly Func<int, TimeSpan> DefaultSleepDurationProviderWithJitter =
SleepDurationProviderWithJitter(1.5, 23);
public static readonly IAsyncPolicy MongoCommandExceptionPolicy = Policy
.Handle<MongoCommandException>(e =>
{
if (e.Code != RateLimitCode || !(e.Result is BsonDocument bsonDocument))
{
return false;
}
if (bsonDocument.TryGetValue("StatusCode", out var statusCode) && statusCode.IsInt32)
{
switch (statusCode.AsInt32)
{
case HttpThrottleErrorCode:
case HttpServiceIsUnavailable:
case HttpOperationExceededTimeLimit:
return true;
default:
return false;
}
}
if (bsonDocument.TryGetValue("IsValid", out var isValid) && isValid.IsBoolean)
{
return isValid.AsBoolean;
}
return true;
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
DefaultSleepDurationProviderWithJitter
);
public static readonly IAsyncPolicy ExecutionTimeoutPolicy = Policy
.Handle<MongoExecutionTimeoutException>(e =>
e.Code == RateLimitCode || e.Code == HttpOperationExceededTimeLimit
)
.WaitAndRetryAsync(
retryCount: MaxRetries,
DefaultSleepDurationProviderWithJitter
);
public static readonly IAsyncPolicy MongoWriteExceptionPolicy = Policy
.Handle<MongoWriteException>(e =>
{
return e.WriteError?.Code == RateLimitCode
|| (e.InnerException is MongoBulkWriteException bulkException &&
bulkException.WriteErrors.Any(error => error.Code == RateLimitCode));
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
sleepDurationProvider: (retryAttempt, e, ctx) =>
{
var timeToWaitInMs = ExtractTimeToWait(e.Message);
if (!timeToWaitInMs.HasValue && e.InnerException != null)
{
timeToWaitInMs = ExtractTimeToWait(e.InnerException.Message);
}
return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
},
onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
);
public static readonly IAsyncPolicy MongoBulkWriteExceptionPolicy = Policy
.Handle<MongoBulkWriteException>(e =>
{
return e.WriteErrors.Any(error => error.Code == RateLimitCode);
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
sleepDurationProvider: (retryAttempt, e, ctx) =>
{
var timeToWaitInMs = ExtractTimeToWait(e.Message);
return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
},
onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
);
/// <summary>
/// It doesn't seem like RetryAfterMs is a property value - so unfortunately, we have to extract it from a string... (crazy??!)
/// </summary>
private static TimeSpan? ExtractTimeToWait(string messageToParse)
{
var retryPos = messageToParse.IndexOf(RetryAfterToken, StringComparison.OrdinalIgnoreCase);
if (retryPos >= 0)
{
retryPos += RetryAfterTokenLength;
var endPos = messageToParse.IndexOf(',', retryPos);
if (endPos > 0)
{
var timeToWaitInMsString = messageToParse.Substring(retryPos, endPos - retryPos);
if (Int32.TryParse(timeToWaitInMsString, out int timeToWaitInMs))
{
return TimeSpan.FromMilliseconds(timeToWaitInMs)
+ TimeSpan.FromMilliseconds(JitterSeed.Next(100, 1000));
}
}
}
return default;
}
/// <summary>
/// Use this policy if your CosmosDB MongoDB endpoint is V3.2
/// </summary>
public static readonly IAsyncPolicy DefaultPolicyForMongo3_2 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy);
/// <summary>
/// Use this policy if your CosmosDB MongoDB endpoint is V3.6 or V3.2
/// </summary>
public static readonly IAsyncPolicy DefaultPolicyForMongo3_6 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy, MongoWriteExceptionPolicy, MongoBulkWriteExceptionPolicy);
}
public static IAsyncPolicy DefaultPolicy { get; set; } = Policies.DefaultPolicyForMongo3_6;
我在 Azure Web 应用程序上部署了一个应用程序,该应用程序使用 .netCore 3 中的 mongoDb 驱动程序与 CosmosDB 数据库进行交互。
根据 this 文档,我必须设置重试策略,以便在 RU/s 不可用时处理 429 错误代码。考虑到当发生 429 错误时我没有看到错误显示,我找不到正确的方法来处理 Polly 的策略。
唯一正确的方法是使用以下政策:
_retryPolicy = Policy
.Handle<MongoCommandException>(r => r.Message.Contains("Request rate is large"))
.WaitAndRetry(3, i => TimeSpan.FromSeconds(1));
下面是 Polly 策略的使用:
public long CountProjetByProjectNumber(string projectNumber)
{
long result = 0;
_retryPolicy.Execute(() =>
{
result = _mongoCollection.CountDocuments(x => x.ProjectNumber == projectNumber);
});
return result;
}
有人在使用 Mongodb 驱动程序的 CosmosDb 中发生 429 异常时有正确的错误显示吗?或者有人可以告诉我他如何正确处理它。
为了正确处理速率限制和超时,您实际上还需要处理更多异常 - 特别是在使用较新的 MongoDB V3.6 端点(与旧的 V3.6 端点相反)时。 2 端点)。
- For V3.2 Endpoints: The two exceptions you care about is the
MongoCommandException
andMongoExecutionTimeoutException
. TheMongoCommandException
includes aBsonDocument
property in itsResult
field. This document has aStatusCode
you can use to detect 429. That said, from my testing, I also found that I had to handle Http Service Unavailable (1) and Operation Exceeded Time Limit (50) status codes.- For V3.6 Endpoints: You probably also want to handle
MongoWriteException
andMongoBulkWriteException
. These exceptions include aRetryAfterMs=
value in the exception message (not always though!). Unfortunately, this value does not seem to be directly exposed via a class property - most likely because this is a CosmosDB specific feature and thus does not map to the MongoDB driver defined exceptions.
以下代码是在 .NET Standard 2.0 中实现的,应该能为您提供一个良好的起点。您肯定会希望根据您的情况和测试调整一些常量。
public static class Policies
{
public const int HttpThrottleErrorCode = 429;
public const int HttpServiceIsUnavailable = 1;
public const int HttpOperationExceededTimeLimit = 50;
public const int RateLimitCode = 16500;
public const string RetryAfterToken = "RetryAfterMs=";
public const int MaxRetries = 10;
public static readonly int RetryAfterTokenLength = RetryAfterToken.Length;
private static readonly Random JitterSeed = new Random();
public static readonly IAsyncPolicy NoPolicy = Policy.NoOpAsync();
public static Func<int, TimeSpan> SleepDurationProviderWithJitter(double exponentialBackoffInSeconds, int maxBackoffTimeInSeconds) => retryAttempt
=> TimeSpan.FromSeconds(Math.Min(Math.Pow(exponentialBackoffInSeconds, retryAttempt), maxBackoffTimeInSeconds)) // exponential back-off: 2, 4, 8 etc
+ TimeSpan.FromMilliseconds(JitterSeed.Next(0, 1000)); // plus some jitter: up to 1 second
public static readonly Func<int, TimeSpan> DefaultSleepDurationProviderWithJitter =
SleepDurationProviderWithJitter(1.5, 23);
public static readonly IAsyncPolicy MongoCommandExceptionPolicy = Policy
.Handle<MongoCommandException>(e =>
{
if (e.Code != RateLimitCode || !(e.Result is BsonDocument bsonDocument))
{
return false;
}
if (bsonDocument.TryGetValue("StatusCode", out var statusCode) && statusCode.IsInt32)
{
switch (statusCode.AsInt32)
{
case HttpThrottleErrorCode:
case HttpServiceIsUnavailable:
case HttpOperationExceededTimeLimit:
return true;
default:
return false;
}
}
if (bsonDocument.TryGetValue("IsValid", out var isValid) && isValid.IsBoolean)
{
return isValid.AsBoolean;
}
return true;
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
DefaultSleepDurationProviderWithJitter
);
public static readonly IAsyncPolicy ExecutionTimeoutPolicy = Policy
.Handle<MongoExecutionTimeoutException>(e =>
e.Code == RateLimitCode || e.Code == HttpOperationExceededTimeLimit
)
.WaitAndRetryAsync(
retryCount: MaxRetries,
DefaultSleepDurationProviderWithJitter
);
public static readonly IAsyncPolicy MongoWriteExceptionPolicy = Policy
.Handle<MongoWriteException>(e =>
{
return e.WriteError?.Code == RateLimitCode
|| (e.InnerException is MongoBulkWriteException bulkException &&
bulkException.WriteErrors.Any(error => error.Code == RateLimitCode));
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
sleepDurationProvider: (retryAttempt, e, ctx) =>
{
var timeToWaitInMs = ExtractTimeToWait(e.Message);
if (!timeToWaitInMs.HasValue && e.InnerException != null)
{
timeToWaitInMs = ExtractTimeToWait(e.InnerException.Message);
}
return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
},
onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
);
public static readonly IAsyncPolicy MongoBulkWriteExceptionPolicy = Policy
.Handle<MongoBulkWriteException>(e =>
{
return e.WriteErrors.Any(error => error.Code == RateLimitCode);
})
.WaitAndRetryAsync(
retryCount: MaxRetries,
sleepDurationProvider: (retryAttempt, e, ctx) =>
{
var timeToWaitInMs = ExtractTimeToWait(e.Message);
return timeToWaitInMs ?? DefaultSleepDurationProviderWithJitter(retryAttempt);
},
onRetryAsync: (e, ts, i, ctx) => Task.CompletedTask
);
/// <summary>
/// It doesn't seem like RetryAfterMs is a property value - so unfortunately, we have to extract it from a string... (crazy??!)
/// </summary>
private static TimeSpan? ExtractTimeToWait(string messageToParse)
{
var retryPos = messageToParse.IndexOf(RetryAfterToken, StringComparison.OrdinalIgnoreCase);
if (retryPos >= 0)
{
retryPos += RetryAfterTokenLength;
var endPos = messageToParse.IndexOf(',', retryPos);
if (endPos > 0)
{
var timeToWaitInMsString = messageToParse.Substring(retryPos, endPos - retryPos);
if (Int32.TryParse(timeToWaitInMsString, out int timeToWaitInMs))
{
return TimeSpan.FromMilliseconds(timeToWaitInMs)
+ TimeSpan.FromMilliseconds(JitterSeed.Next(100, 1000));
}
}
}
return default;
}
/// <summary>
/// Use this policy if your CosmosDB MongoDB endpoint is V3.2
/// </summary>
public static readonly IAsyncPolicy DefaultPolicyForMongo3_2 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy);
/// <summary>
/// Use this policy if your CosmosDB MongoDB endpoint is V3.6 or V3.2
/// </summary>
public static readonly IAsyncPolicy DefaultPolicyForMongo3_6 = Policy.WrapAsync(MongoCommandExceptionPolicy, ExecutionTimeoutPolicy, MongoWriteExceptionPolicy, MongoBulkWriteExceptionPolicy);
}
public static IAsyncPolicy DefaultPolicy { get; set; } = Policies.DefaultPolicyForMongo3_6;