我必须处理 Polly 中的异常,我需要将结果集中的数据与 Cosmos 数据库中的数据进行比较,然后重试

I have to handle an exception in Polly where I need to Compare the data in the result set with the data in Cosmos Database and then Retry

对于我们从 gremlin 查询中收到的每 200 个状态代码,我想检查结果顶点是否与需要更新的顶点匹配。如果不匹配,则重试。

public async Task<string> ExecuteUpdateQueryAsync(string query, bool ignoreConflict = true)
{
    try
    {
        var repsonse = await Policy
                          .Handle<ResponseException>(x => (long)x.StatusAttributes["x-ms-status-code"] != (long)HttpStatusCode.Conflict)
                          .OrResult<ResultSet<dynamic>>(r => r.StatusAttributes["x-ms-status-code"] == 200)
                          .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)))
                          .ExecuteAsync(() => gremlinClient.SubmitAsync<dynamic>(query));

        logger.LogDebug(query + JsonConvert.SerializeObject(repsonse.StatusAttributes));
        if (repsonse.Count > 0)
        {
            return JsonConvert.SerializeObject(repsonse);
        }
        return default;
    }
    ...
}

编辑:

我们从 Gremlin 客户端得到的响应类似于:(希望这有帮助)

g.V(['1111|2222|3333','1111|2222|3333']).Property('Transaction_Quantity',4500).Property('Transaction_Type','Repack'){"x-ms-status-code":200,"x-ms-activity-id":"a4af8faf-4aa9-4ae2-8dd8-797bdbd80a97","x-ms-request-charge":34.64,"x-ms-total-request-charge":34.64,"x-ms-server-time-ms":7.6626,"x-ms-total-server-time-ms":7.6626}

我希望能够将此响应中的交易数量与我们尝试更新的交易数量进行比较。

免责声明:我不熟悉 CosmosDb 的 Gremlin API,因此我对 API 用法的建议可能不正确。我试图在我的 post.

中专注于 Polly 方面

因此,根据我的理解,您想决定是否需要根据响应执行进一步的重试尝试。如果响应的字段之一与某个值匹配,那么你不应该否则你应该。

对于 Polly,HandleHandleResultOrOrResult 策略构建器函数预期同步谓词。换句话说,它们旨在执行简单的断言(例如:状态代码检查、存在性检查、内部异常类型检查等...)因此,大多数时候它们都是用简单的 lambda 表达式定义的。

如果您需要执行更复杂/异步的逻辑,那么您应该将该逻辑放在 to-be-decorated 方法调用旁边。换句话说,逻辑应该属于 to-be-retried 函数。


正如我在免责声明中所说,我对 Gremlin API 并不熟悉。根据 ,您可以通过首先序列化 ResultSet<dynamic> 然后将其反序列化为 YourDto.

来将响应反序列化为 DTO
private async Task<ResultSet<dynamic>> IssueGremlinQueryAsync(string query)
{
    ResultSet<dynamic> results = gremlinClient.SubmitAsync<dynamic>(query);
    var parsedResult = JsonConvert.DeserializeObject<YourDto>(JsonConvert.SerializeObject(results));
    //TODO: perform assertion against the parsedResult
    return results;
}

这里有几个选项可以将进一步重试的需要传播回策略:

  • 使用其中一项是标志的 ValueTuple
  • 使用自定义异常
  • 等等

值元组

private async Task<(bool, ResultSet<dynamic>)> IssueGremlinQueryAsync(string query)
{
    ResultSet<dynamic> results = gremlinClient.SubmitAsync<dynamic>(query);
    var parsedResult = JsonConvert.DeserializeObject<YourDto>(JsonConvert.SerializeObject(results));

    //Replace this with your own assertion 
    bool shouldRetry = parsedResult.TransactionQuantity % 2 == 0; 
    return (shouldRetry, results);
}

元组中的第一项是标志,而第二项是未解析的 results

使用此方法,您可以像这样重构 ExecuteUpdateQueryAsync 方法的相关部分:

public async Task<string> ExecuteUpdateQueryAsync(string query, bool ignoreConflict = true)
{
    //...

    var retryPolicy = Policy
                        .Handle<ResponseException>(x => (long)x.StatusAttributes["x-ms-status-code"] != (long)HttpStatusCode.Conflict)
                        .OrResult<(bool ShouldRetry, ResultSet<dynamic> Response)>(
                            x => x.ShouldRetry || (long)x.Response.StatusAttributes["x-ms-status-code"] == 200)
                        .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));

    var response = await retryPolicy.ExecuteAsync(async () => await IssueGremlinQueryAsync(query));
    
    //...
}

异常

private async Task<ResultSet<dynamic>> IssueGremlinQueryAsync(string query)
{
    ResultSet<dynamic> results = gremlinClient.SubmitAsync<dynamic>(query);
    var parsedResult = JsonConvert.DeserializeObject<YourDto>(JsonConvert.SerializeObject(results));

    //Replace this with your own assertion 
    bool shouldRetry = parsedResult.TransactionQuantity % 2 == 0;
    return !shouldRetry ? results : throw new OperationFailedRetryNeededException("...");
}

如果我们不应该重试,那么我们 return 使用未解析的 results 否则我们抛出自定义异常。

使用此方法,您可以像这样重构 ExecuteUpdateQueryAsync 方法的相关部分:

public async Task<string> ExecuteUpdateQueryAsync(string query, bool ignoreConflict = true)
{    
    //...

    var retryPolicy = Policy
                        .Handle<ResponseException>(x => (long)x.StatusAttributes["x-ms-status-code"] != (long)HttpStatusCode.Conflict)
                        .Or<OperationFailedRetryNeededException>()
                        .OrResult<ResultSet<dynamic>>( x => (long)x.StatusAttributes["x-ms-status-code"] == 200)
                        .WaitAndRetryAsync(3, retryAttempt => TimeSpan.FromSeconds(Math.Pow(2, retryAttempt)));

    var response = await retryPolicy.ExecuteAsync(async () => await IssueGremlinQueryAsync(query));
    
    //...
}