AsyncInterceptor with Polly(AOP 重试风格)

AsyncInterceptor with Polly (AOP retry style)

对于我系统的某些部分,我需要添加重试逻辑以从数据库读取。我有许多无法更改的带有异步和同步读取方法的存储库。我找到了一个简单的解决方案——在捕获数据库异常时使用 AsyncInterceptor and add retry read policy with Polly 拦截所有读取方法。 Polly 每隔一段时间重试读取。

拦截器代码:

public class RetriableReadAsyncInterceptor : IAsyncInterceptor
{
    public void InterceptSynchronous(IInvocation invocation)
    {
        invocation.ReturnValue = InternalInterceptSync(invocation);
    }

    public void InterceptAsynchronous(IInvocation invocation)
    {
        throw new NotImplementedException();
    }

    public void InterceptAsynchronous<TResult>(IInvocation invocation)
    {
        invocation.ReturnValue = InternalInterceptAsync<TResult>(invocation);
    }

    private IEnumerable<TimeSpan> RetryIntervals =>
        new[]
        {
            TimeSpan.FromSeconds(1),
            TimeSpan.FromSeconds(5),
            TimeSpan.FromSeconds(10),
            TimeSpan.FromSeconds(15)
        };

    private object InternalInterceptSync(IInvocation invocation)
    {
        return Policy
            .Handle<DatabaseException>()
            .WaitAndRetry(RetryIntervals, (exception, timeSpan) =>
            {
                Console.WriteLine($"Exception {timeSpan}");
            })
            .Execute(() =>
            {
                invocation.Proceed();
                return invocation.ReturnValue;
            });
    }

    private async Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
    {
        return await Policy
            .Handle<DatabaseException>()
            .WaitAndRetryAsync(RetryIntervals, (exception, timeSpan) =>
            {
                Console.WriteLine($"Exception {timeSpan}");
            })
            .ExecuteAsync(async () =>
            {
                invocation.Proceed();
                var task = (Task<TResult>)invocation.ReturnValue;
                return await task;
            });
    }
}

存储库代码:

public class Repository : IRepository
{
    private int _exceptionsCoutner;

    public Entity GetById(int id)
    {
        if (_exceptionsCoutner <= 2)
        {
            _exceptionsCoutner++;
            throw new DatabaseException();
        }

        //read from db
        return new Entity {Id = id};
    }

    public async Task<Entity> GetByIdAsync(int id)
    {
        if (_exceptionsCoutner <= 2)
        {
            _exceptionsCoutner++;
            throw new DatabaseException();
        }

        //read from db
        return await Task.FromResult(new Entity { Id = id });
    }
}

GetById 的同步版本按预期工作(间隔重试):

Exception 00:00:01
Exception 00:00:05
Exception 00:00:10

异步版本的 GetById 重试但不等待时间间隔过去:

Exception 00:00:01
Exception 00:00:01
Exception 00:00:01

我不明白问题出在哪里。如果您有任何想法 - 请分享。 可以找到完整示例 here.

好的,这是我天真的重试实现:

public class Retry
{
    public static async Task<TResult> DoAsync<TResult, TException>(
        Func<Task<TResult>> action,
        TimeSpan retryInterval,
        int maxAttemptCount = 3)
        where TException : Exception
    {
        TException exception = null;
        var startDateTime = DateTime.UtcNow;

        for (var attempted = 0; attempted < maxAttemptCount; attempted++)
        {
            try
            {
                return await action().ConfigureAwait(false);
            }
            catch (TException ex)
            {
                exception = ex;
                Console.WriteLine($"Exception {DateTime.UtcNow - startDateTime}");
                await Task.Delay(retryInterval); //doesnt work
                //Thread.Sleep(retryInterval); works!
            }
        }

        throw exception;
    }
}

和拦截器:

private async Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
{
    return await Retry.DoAsync<TResult, DatabaseException>(async () =>
        {
            invocation.Proceed();
            var task = (Task<TResult>) invocation.ReturnValue;
            return await task.ConfigureAwait(false);
        },
        TimeSpan.FromSeconds(3),
        4);
}

阻塞 Tread.Sleep 的实现效果很好,但 Task.Delay 不行。

这是一种 'chicken and egg' 问题,现在可以使用 Castle.Core 的较新版本(我试过 4.4.0 版)利用 invocation.CaptureProceedInfo 方法解决:

private Task<TResult> InternalInterceptAsync<TResult>(IInvocation invocation)
{
    var capture = invocation.CaptureProceedInfo();

    return Policy
        .Handle<DatabaseException>()
        .WaitAndRetryAsync(RetryIntervals, (exception, timeSpan) =>
        {
            Console.WriteLine($"Exception {timeSpan}");
        })
        .ExecuteAsync(async () =>
        {
            capture.Invoke();
            var task = (Task<TResult>)invocation.ReturnValue;
            return await task;
        });
}