Spring AOP - 正确配置重试建议

Spring AOP - Properly Configuring Retry Advice

我是 Spring AOP 的新手,已经尝试了一些。

我正在尝试通过 Spring AOP 为我的一个项目设置重试和速率限制器。 用例是这样的:-

  1. 检查TPS是否可用。如果不是,则抛出 ThrottledException
  2. 如果抛出 ThrottledExceptionRetry

我 运行 遇到的问题是:此节流和重试组合 运行 进入无限循环(如果 TPS = 0)。也就是说,重试不会在 'x' 次尝试后停止。

我的节流拦截器(在高级别)是这样的:

@Before("<pointcut>")
public void invoke() throws ThrottlingException {
        if (throttler.isThrottled(throttleKey)) {
            throw new ThrottlingException("Call Throttled");
    }
}

我的重试拦截器是这样的:

@AfterThrowing(pointcut="execution(* com.company.xyz.method())", throwing="exception")
public Object invoke(JoinPoint jp, ThrottlingException exception) throws Throwable {
    return RetryingCallable.newRetryingCallable(new Callable<Object>() {

        @Override
        public Object call() throws Exception {
                MethodSignature  signature = (MethodSignature) p.getSignature();
                Method method = signature.getMethod();
                return method.invoke(jp.getThis(), (Object[]) null);
        }

    }, retryPolicy).call();
}

这里 RetryingCallable 是一个简单的实现(由我公司的某个人编写的内部库),它接收 RetryAdvice 并应用它。

我的相关spring-config如下:

<bean id="retryInterceptor" class="com.company.xyz.RetryInterceptor">
    <constructor-arg index="0"><ref bean="retryPolicy"/></constructor-arg> 
</bean>


<bean id="throttlingInterceptor" class="com.company.xyz.ThrottlingInterceptor">
    <constructor-arg><value>throttleKey</value></constructor-arg> 
</bean>
<context:component-scan base-package="com.company.xyz">
  <context:include-filter type="annotation" expression="org.aspectj.lang.annotation.Aspect"/>
</context:component-scan>
<aop:aspectj-autoproxy/>

正如我所见,这里的问题是在每个 ThrottlingException 上应用一个新的 Retry Advice 而不是前一个生效。

关于如何解决这个问题的任何意见?

免责声明:不是一个Spring用户,因此我将在这里展示一个纯粹的AspectJ解决方案。它在 Spring AOP 中应该以相同的方式工作。您唯一需要更改的是按照 Spring AOP manual.

中所述的方面优先配置从 @DeclarePresedence 切换到 @Order

驱动申请:

package de.scrum_master.app;

public class Application {
    public static void main(String[] args) {
        new Application().doSomething();
    }

    public void doSomething() {
        System.out.println("Doing something");
    }
}

节流异常class:

package de.scrum_master.app;

public class ThrottlingException extends RuntimeException {
    private static final long serialVersionUID = 1L;

    public ThrottlingException(String arg0) {
        super(arg0);
    }
}

节流拦截器:

为了模拟节流情况,我创建了一个辅助方法 isThrottled(),其中 returns true 在 3 个案例中有 2 个是随机的。

package de.scrum_master.aspect;

import java.util.Random;

import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;

import de.scrum_master.app.ThrottlingException;

@Aspect
public class ThrottlingInterceptor {
    private static final Random RANDOM = new Random();

    @Before("execution(* doSomething())")
    public void invoke(JoinPoint thisJoinPoint) throws ThrottlingException {
        System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
        if (isThrottled()) {
            throw new ThrottlingException("call throttled");
        }
    }

    private boolean isThrottled() {
        return RANDOM.nextInt(3) > 0;
    }
}

重试拦截器:

请注意 AspectJ 注释 @DeclarePrecedence("RetryInterceptor, *") 表示此拦截器将在任何其他拦截器之前执行。请将其替换为两个拦截器 class 上的 @Order 注释。否则 @Around 建议无法捕获节流拦截器抛出的异常。

另外值得一提的是,这个拦截器不需要任何反射来实现重试逻辑,它直接使用重试循环中的连接点来重试thisJoinPoint.proceed()。这可以很容易地分解为辅助方法或辅助程序 class 实现不同类型的重试行为。只需确保使用 ProceedingJoinPoint 作为参数而不是 Callable.

package de.scrum_master.aspect;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;

import de.scrum_master.app.ThrottlingException;

@Aspect
@DeclarePrecedence("RetryInterceptor, *")
public class RetryInterceptor {
    private static int MAX_TRIES = 5;
    private static int WAIT_MILLIS_BETWEEN_TRIES = 1000;

    @Around("execution(* doSomething())")
    public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable {
        System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
        ThrottlingException throttlingException = null;
        for (int i = 1; i <= MAX_TRIES; i++) {
            try {
                return thisJoinPoint.proceed();
            }
            catch (ThrottlingException e) {
                throttlingException = e;
                System.out.println("  Throttled during try #" + i);
                Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES);
            }
        }
        throw throttlingException;
    }
}

成功重试的控制台日志:

RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #1
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #2
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Doing something

重试失败的控制台日志:

RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #1
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #2
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #3
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #4
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
  Throttled during try #5
Exception in thread "main" de.scrum_master.app.ThrottlingException: call throttled
    at de.scrum_master.aspect.ThrottlingInterceptor.invoke(ThrottlingInterceptor.aj:19)
    at de.scrum_master.app.Application.doSomething_aroundBody0(Application.java:9)
    at de.scrum_master.app.Application.doSomething_aroundBody1$advice(Application.java:22)
    at de.scrum_master.app.Application.doSomething(Application.java:1)
    at de.scrum_master.app.Application.main(Application.java:5)

如有任何与我的回答相关的后续问题,请随时提出。


更新: 我不知道你的 RetryingCallableRetryPolicy classes/interfaces 是如何工作的,你没有告诉我太多。但我编造了一些东西并让它像这样工作:

package de.scrum_master.app;

import java.util.concurrent.Callable;

public interface RetryPolicy<V> {
    V apply(Callable<V> callable) throws Exception;
}
package de.scrum_master.app;

import java.util.concurrent.Callable;

public class DefaultRetryPolicy<V> implements RetryPolicy<V> {
    private static int MAX_TRIES = 5;
    private static int WAIT_MILLIS_BETWEEN_TRIES = 1000;

    @Override
    public V apply(Callable<V> callable) throws Exception {
        Exception throttlingException = null;
        for (int i = 1; i <= MAX_TRIES; i++) {
            try {
                return callable.call();
            }
            catch (ThrottlingException e) {
                throttlingException = e;
                System.out.println("  Throttled during try #" + i);
                Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES);
            }
        }
        throw throttlingException;
    }
}
package de.scrum_master.app;

import java.util.concurrent.Callable;

public class RetryingCallable<V> {
    private RetryPolicy<V> retryPolicy;
    private Callable<V> callable;

    public RetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) {
        this.callable = callable;
        this.retryPolicy = retryPolicy;
    }

    public static <V> RetryingCallable<V> newRetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) {
        return new RetryingCallable<V>(callable, retryPolicy);
    }

    public V call() throws Exception {
        return retryPolicy.apply(callable);
    }
}

现在像这样更改重试拦截器:

package de.scrum_master.aspect;

import java.util.concurrent.Callable;

import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;

import de.scrum_master.app.DefaultRetryPolicy;
import de.scrum_master.app.RetryPolicy;
import de.scrum_master.app.RetryingCallable;

@Aspect
@DeclarePrecedence("RetryInterceptor, *")
public class RetryInterceptor {
    private RetryPolicy<Object> retryPolicy = new DefaultRetryPolicy<>();

    @Around("execution(* doSomething())")
    public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable {
        System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
        return RetryingCallable.newRetryingCallable(
            new Callable<Object>() {
                @Override
                public Object call() throws Exception {
                    return thisJoinPoint.proceed();
                }
            },
            retryPolicy
        ).call();
    }
}

日志输出与您之前看到的非常相似。对我来说这很好用。