使用 Reactive Extensions 重试异步任务代码
Retry async Task code using Reactive Extensions
我的数据访问中包含以下代码 class。
public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
{
using (var connection = Connection)
{
var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
Task<IEnumerable<TEntity>> queryTask =
connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
connection.Close();
connection.Dispose();
tokenSource.Dispose();
return data;
}
}
我想在抛出一个SqlExeption
的时候重试一次。请记住,我不能将 RX 应用于应用程序,只能在这段代码中使用。
我尝试了下面的代码,看起来它执行正确并且 Do
正在登录控制台输出但没有真正调用 Catch
处理程序,我不确定是否Retry
处理程序也被执行。
public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
{
return await Observable.Defer(async () =>
{
using (var connection = Connection)
{
var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
Task<IEnumerable<TEntity>> queryTask =
connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
connection.Close();
connection.Dispose();
tokenSource.Dispose();
return Observable.Return(data);
}
})
.Catch<IEnumerable<TEntity>, SqlException>(source =>
{
Debug.WriteLine($"QueryAsync Exception {source}");
return Observable.Return(new List<TEntity>());
})
.Throttle(TimeSpan.FromMilliseconds(500))
.Retry(1)
.Do(_ => Debug.WriteLine("Do QueryAsync"));
}
我发现您的代码有几个潜在问题:
- 将重试逻辑与主逻辑分开,例如在一个名为
QueryWithRetryAsync
的方法中。这只是一个设计问题,但仍然是一个问题
- 不要
Catch
直到 Retry
之后。否则 SqlException
将导致一个空列表,并且 Retry
运算符将永远不会看到异常
- 我认为
Throttle
根本没有必要,因为您只希望通过管道获得一个值
Retry(1)
并不像您想象的那样(这也让我感到惊讶)。似乎 "retry" 的定义包括第一次调用,所以你需要 Retry(2)
这是一个独立的示例,它的行为符合您的要求:
class Program
{
static void Main(string[] args)
{
var pipeline = Observable
.Defer(() => DoSomethingAsync().ToObservable())
.Retry(2)
.Catch<string, InvalidOperationException>(ex => Observable.Return("default"));
pipeline
.Do(Console.WriteLine)
.Subscribe();
Console.ReadKey();
}
private static int invocationCount = 0;
private static async Task<string> DoSomethingAsync()
{
Console.WriteLine("Attempting DoSomethingAsync");
await Task.Delay(TimeSpan.FromSeconds(2));
++invocationCount;
if (invocationCount == 2)
{
return "foo";
}
throw new InvalidOperationException();
}
}
我的数据访问中包含以下代码 class。
public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
{
using (var connection = Connection)
{
var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
Task<IEnumerable<TEntity>> queryTask =
connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
connection.Close();
connection.Dispose();
tokenSource.Dispose();
return data;
}
}
我想在抛出一个SqlExeption
的时候重试一次。请记住,我不能将 RX 应用于应用程序,只能在这段代码中使用。
我尝试了下面的代码,看起来它执行正确并且 Do
正在登录控制台输出但没有真正调用 Catch
处理程序,我不确定是否Retry
处理程序也被执行。
public async Task<IEnumerable<TEntity>> QueryAsync(string sql, object param = null,
CommandType commandType = CommandType.Text, int? commandTimeout = null, IDbTransaction transaction = null)
{
return await Observable.Defer(async () =>
{
using (var connection = Connection)
{
var tokenSource = GetCancellationTokenSource(commandTimeout ?? CommandTimeoutDefault);
Task<IEnumerable<TEntity>> queryTask =
connection.QueryAsync<TEntity>(new CommandDefinition(sql, param, transaction,
commandTimeout ?? CommandTimeoutDefault, commandType, cancellationToken: tokenSource.Token));
IEnumerable<TEntity> data = await queryTask.ConfigureAwait(false);
connection.Close();
connection.Dispose();
tokenSource.Dispose();
return Observable.Return(data);
}
})
.Catch<IEnumerable<TEntity>, SqlException>(source =>
{
Debug.WriteLine($"QueryAsync Exception {source}");
return Observable.Return(new List<TEntity>());
})
.Throttle(TimeSpan.FromMilliseconds(500))
.Retry(1)
.Do(_ => Debug.WriteLine("Do QueryAsync"));
}
我发现您的代码有几个潜在问题:
- 将重试逻辑与主逻辑分开,例如在一个名为
QueryWithRetryAsync
的方法中。这只是一个设计问题,但仍然是一个问题 - 不要
Catch
直到Retry
之后。否则SqlException
将导致一个空列表,并且Retry
运算符将永远不会看到异常 - 我认为
Throttle
根本没有必要,因为您只希望通过管道获得一个值 Retry(1)
并不像您想象的那样(这也让我感到惊讶)。似乎 "retry" 的定义包括第一次调用,所以你需要Retry(2)
这是一个独立的示例,它的行为符合您的要求:
class Program
{
static void Main(string[] args)
{
var pipeline = Observable
.Defer(() => DoSomethingAsync().ToObservable())
.Retry(2)
.Catch<string, InvalidOperationException>(ex => Observable.Return("default"));
pipeline
.Do(Console.WriteLine)
.Subscribe();
Console.ReadKey();
}
private static int invocationCount = 0;
private static async Task<string> DoSomethingAsync()
{
Console.WriteLine("Attempting DoSomethingAsync");
await Task.Delay(TimeSpan.FromSeconds(2));
++invocationCount;
if (invocationCount == 2)
{
return "foo";
}
throw new InvalidOperationException();
}
}