AsyncInterceptor with Polly(AOP 重试风格)
AsyncInterceptor with Polly (AOP retry style)
对于我系统的某些部分,我需要添加重试逻辑以从数据库读取。我有许多无法更改的带有异步和同步读取方法的存储库。我找到了一个简单的解决方案——在捕获数据库异常时使用 AsyncInterceptor and add retry read policy with Polly 拦截所有读取方法。 Polly 每隔一段时间重试读取。
拦截器代码:
public class RetriableReadAsyncInterceptor : IAsyncInterceptor
{
public void InterceptSynchronous(IInvocation invocation)
{
invocation.ReturnValue = InternalInterceptSync(invocation);
}
public void InterceptAsynchronous(IInvocation invocation)
{
throw new NotImplementedException();
}
public void InterceptAsynchronous<TResult>(IInvocation invocation)
{
invocation.ReturnValue = InternalInterceptAsync<TResult>(invocation);
}
private IEnumerable<TimeSpan> RetryIntervals =>
new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(15)
};
private object InternalInterceptSync(IInvocation invocation)
{
return Policy
.Handle<DatabaseException>()
.WaitAndRetry(RetryIntervals, (exception, timeSpan) =>
{
Console.WriteLine($"Exception {timeSpan}");
})
.Execute(() =>
{
invocation.Proceed();
return invocation.ReturnValue;
});
}
private async Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
{
return await Policy
.Handle<DatabaseException>()
.WaitAndRetryAsync(RetryIntervals, (exception, timeSpan) =>
{
Console.WriteLine($"Exception {timeSpan}");
})
.ExecuteAsync(async () =>
{
invocation.Proceed();
var task = (Task<TResult>)invocation.ReturnValue;
return await task;
});
}
}
存储库代码:
public class Repository : IRepository
{
private int _exceptionsCoutner;
public Entity GetById(int id)
{
if (_exceptionsCoutner <= 2)
{
_exceptionsCoutner++;
throw new DatabaseException();
}
//read from db
return new Entity {Id = id};
}
public async Task<Entity> GetByIdAsync(int id)
{
if (_exceptionsCoutner <= 2)
{
_exceptionsCoutner++;
throw new DatabaseException();
}
//read from db
return await Task.FromResult(new Entity { Id = id });
}
}
GetById 的同步版本按预期工作(间隔重试):
Exception 00:00:01
Exception 00:00:05
Exception 00:00:10
异步版本的 GetById 重试但不等待时间间隔过去:
Exception 00:00:01
Exception 00:00:01
Exception 00:00:01
我不明白问题出在哪里。如果您有任何想法 - 请分享。
可以找到完整示例 here.
好的,这是我天真的重试实现:
public class Retry
{
public static async Task<TResult> DoAsync<TResult, TException>(
Func<Task<TResult>> action,
TimeSpan retryInterval,
int maxAttemptCount = 3)
where TException : Exception
{
TException exception = null;
var startDateTime = DateTime.UtcNow;
for (var attempted = 0; attempted < maxAttemptCount; attempted++)
{
try
{
return await action().ConfigureAwait(false);
}
catch (TException ex)
{
exception = ex;
Console.WriteLine($"Exception {DateTime.UtcNow - startDateTime}");
await Task.Delay(retryInterval); //doesnt work
//Thread.Sleep(retryInterval); works!
}
}
throw exception;
}
}
和拦截器:
private async Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
{
return await Retry.DoAsync<TResult, DatabaseException>(async () =>
{
invocation.Proceed();
var task = (Task<TResult>) invocation.ReturnValue;
return await task.ConfigureAwait(false);
},
TimeSpan.FromSeconds(3),
4);
}
阻塞 Tread.Sleep 的实现效果很好,但 Task.Delay 不行。
这是一种 'chicken and egg' 问题,现在可以使用 Castle.Core 的较新版本(我试过 4.4.0 版)利用 invocation.CaptureProceedInfo 方法解决:
private Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
{
var capture = invocation.CaptureProceedInfo();
return Policy
.Handle<DatabaseException>()
.WaitAndRetryAsync(RetryIntervals, (exception, timeSpan) =>
{
Console.WriteLine($"Exception {timeSpan}");
})
.ExecuteAsync(async () =>
{
capture.Invoke();
var task = (Task<TResult>)invocation.ReturnValue;
return await task;
});
}
对于我系统的某些部分,我需要添加重试逻辑以从数据库读取。我有许多无法更改的带有异步和同步读取方法的存储库。我找到了一个简单的解决方案——在捕获数据库异常时使用 AsyncInterceptor and add retry read policy with Polly 拦截所有读取方法。 Polly 每隔一段时间重试读取。
拦截器代码:
public class RetriableReadAsyncInterceptor : IAsyncInterceptor
{
public void InterceptSynchronous(IInvocation invocation)
{
invocation.ReturnValue = InternalInterceptSync(invocation);
}
public void InterceptAsynchronous(IInvocation invocation)
{
throw new NotImplementedException();
}
public void InterceptAsynchronous<TResult>(IInvocation invocation)
{
invocation.ReturnValue = InternalInterceptAsync<TResult>(invocation);
}
private IEnumerable<TimeSpan> RetryIntervals =>
new[]
{
TimeSpan.FromSeconds(1),
TimeSpan.FromSeconds(5),
TimeSpan.FromSeconds(10),
TimeSpan.FromSeconds(15)
};
private object InternalInterceptSync(IInvocation invocation)
{
return Policy
.Handle<DatabaseException>()
.WaitAndRetry(RetryIntervals, (exception, timeSpan) =>
{
Console.WriteLine($"Exception {timeSpan}");
})
.Execute(() =>
{
invocation.Proceed();
return invocation.ReturnValue;
});
}
private async Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
{
return await Policy
.Handle<DatabaseException>()
.WaitAndRetryAsync(RetryIntervals, (exception, timeSpan) =>
{
Console.WriteLine($"Exception {timeSpan}");
})
.ExecuteAsync(async () =>
{
invocation.Proceed();
var task = (Task<TResult>)invocation.ReturnValue;
return await task;
});
}
}
存储库代码:
public class Repository : IRepository
{
private int _exceptionsCoutner;
public Entity GetById(int id)
{
if (_exceptionsCoutner <= 2)
{
_exceptionsCoutner++;
throw new DatabaseException();
}
//read from db
return new Entity {Id = id};
}
public async Task<Entity> GetByIdAsync(int id)
{
if (_exceptionsCoutner <= 2)
{
_exceptionsCoutner++;
throw new DatabaseException();
}
//read from db
return await Task.FromResult(new Entity { Id = id });
}
}
GetById 的同步版本按预期工作(间隔重试):
Exception 00:00:01
Exception 00:00:05
Exception 00:00:10
异步版本的 GetById 重试但不等待时间间隔过去:
Exception 00:00:01
Exception 00:00:01
Exception 00:00:01
我不明白问题出在哪里。如果您有任何想法 - 请分享。 可以找到完整示例 here.
好的,这是我天真的重试实现:
public class Retry
{
public static async Task<TResult> DoAsync<TResult, TException>(
Func<Task<TResult>> action,
TimeSpan retryInterval,
int maxAttemptCount = 3)
where TException : Exception
{
TException exception = null;
var startDateTime = DateTime.UtcNow;
for (var attempted = 0; attempted < maxAttemptCount; attempted++)
{
try
{
return await action().ConfigureAwait(false);
}
catch (TException ex)
{
exception = ex;
Console.WriteLine($"Exception {DateTime.UtcNow - startDateTime}");
await Task.Delay(retryInterval); //doesnt work
//Thread.Sleep(retryInterval); works!
}
}
throw exception;
}
}
和拦截器:
private async Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
{
return await Retry.DoAsync<TResult, DatabaseException>(async () =>
{
invocation.Proceed();
var task = (Task<TResult>) invocation.ReturnValue;
return await task.ConfigureAwait(false);
},
TimeSpan.FromSeconds(3),
4);
}
阻塞 Tread.Sleep 的实现效果很好,但 Task.Delay 不行。
这是一种 'chicken and egg' 问题,现在可以使用 Castle.Core 的较新版本(我试过 4.4.0 版)利用 invocation.CaptureProceedInfo 方法解决:
private Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
{
var capture = invocation.CaptureProceedInfo();
return Policy
.Handle<DatabaseException>()
.WaitAndRetryAsync(RetryIntervals, (exception, timeSpan) =>
{
Console.WriteLine($"Exception {timeSpan}");
})
.ExecuteAsync(async () =>
{
capture.Invoke();
var task = (Task<TResult>)invocation.ReturnValue;
return await task;
});
}