使用 Reactive Extensions 重试异步任务代码

Retry async Task code using Reactive Extensions

我的数据访问中包含以下代码 class。

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            using (var connection = Connection)
            {
                var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                Task<IEnumerable<TEntity>> queryTask =
                    connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                        commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                connection.Close();
                connection.Dispose();
                tokenSource.Dispose();
                return data;
            }
        }

我想在抛出一个SqlExeption的时候重试一次。请记住,我不能将 RX 应用于应用程序,只能在这段代码中使用。

我尝试了下面的代码,看起来它执行正确并且 Do 正在登录控制台输出但没有真正调用 Catch 处理程序,我不确定是否Retry 处理程序也被执行。

public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
            CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
        {
            return await Observable.Defer(async () =>
            {
                using (var connection = Connection)
                {
                    var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
                    Task<IEnumerable<TEntity>> queryTask =
                        connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
                            commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
                    IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
                    connection.Close();
                    connection.Dispose();
                    tokenSource.Dispose();
                    return Observable.Return(data);
                }
            })
            .Catch<IEnumerable<TEntity>, SqlException>(source =>
           {
               Debug.WriteLine($"QueryAsync Exception {source}");
               return Observable.Return(new List<TEntity>());
           })
           .Throttle(TimeSpan.FromMilliseconds(500))
           .Retry(1)
           .Do(_ => Debug.WriteLine("Do QueryAsync"));
        }

我发现您的代码有几个潜在问题:

  • 将重试逻辑与主逻辑分开,例如在一个名为 QueryWithRetryAsync 的方法中。这只是一个设计问题,但仍然是一个问题
  • 不要 Catch 直到 Retry 之后。否则 SqlException 将导致一个空列表,并且 Retry 运算符将永远不会看到异常
  • 我认为 Throttle 根本没有必要,因为您只希望通过管道获得一个值
  • Retry(1) 并不像您想象的那样(这也让我感到惊讶)。似乎 "retry" 的定义包括第一次调用,所以你需要 Retry(2)

这是一个独立的示例,它的行为符合您的要求:

class Program
{
    static void Main(string[] args)
    {
        var pipeline = Observable
            .Defer(() => DoSomethingAsync().ToObservable())
            .Retry(2)
            .Catch<string, InvalidOperationException>(ex => Observable.Return("default"));

        pipeline
            .Do(Console.WriteLine)
            .Subscribe();

        Console.ReadKey();
    }

    private static int invocationCount = 0;

    private static async Task<string> DoSomethingAsync()
    {
        Console.WriteLine("Attempting DoSomethingAsync");

        await Task.Delay(TimeSpan.FromSeconds(2));

        ++invocationCount;

        if (invocationCount == 2)
        {
            return "foo";
        }

        throw new InvalidOperationException();
    }
}