Spring AOP - 正确配置重试建议
Spring AOP - Properly Configuring Retry Advice
我是 Spring AOP 的新手,已经尝试了一些。
我正在尝试通过 Spring AOP 为我的一个项目设置重试和速率限制器。
用例是这样的:-
- 检查TPS是否可用。如果不是,则抛出
ThrottledException
- 如果抛出
ThrottledException
,Retry
。
我 运行 遇到的问题是:此节流和重试组合 运行 进入无限循环(如果 TPS = 0)。也就是说,重试不会在 'x' 次尝试后停止。
我的节流拦截器(在高级别)是这样的:
@Before("<pointcut>")
public void invoke() throws ThrottlingException {
if (throttler.isThrottled(throttleKey)) {
throw new ThrottlingException("Call Throttled");
}
}
我的重试拦截器是这样的:
@AfterThrowing(pointcut="execution(* com.company.xyz.method())", throwing="exception")
public Object invoke(JoinPoint jp, ThrottlingException exception) throws Throwable {
return RetryingCallable.newRetryingCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
MethodSignature signature = (MethodSignature) p.getSignature();
Method method = signature.getMethod();
return method.invoke(jp.getThis(), (Object[]) null);
}
}, retryPolicy).call();
}
这里 RetryingCallable
是一个简单的实现(由我公司的某个人编写的内部库),它接收 RetryAdvice
并应用它。
我的相关spring-config如下:
<bean id="retryInterceptor" class="com.company.xyz.RetryInterceptor">
<constructor-arg index="0"><ref bean="retryPolicy"/></constructor-arg>
</bean>
<bean id="throttlingInterceptor" class="com.company.xyz.ThrottlingInterceptor">
<constructor-arg><value>throttleKey</value></constructor-arg>
</bean>
<context:component-scan base-package="com.company.xyz">
<context:include-filter type="annotation" expression="org.aspectj.lang.annotation.Aspect"/>
</context:component-scan>
<aop:aspectj-autoproxy/>
正如我所见,这里的问题是在每个 ThrottlingException
上应用一个新的 Retry Advice
而不是前一个生效。
关于如何解决这个问题的任何意见?
免责声明:我不是一个Spring用户,因此我将在这里展示一个纯粹的AspectJ解决方案。它在 Spring AOP 中应该以相同的方式工作。您唯一需要更改的是按照 Spring AOP manual.
中所述的方面优先配置从 @DeclarePresedence
切换到 @Order
驱动申请:
package de.scrum_master.app;
public class Application {
public static void main(String[] args) {
new Application().doSomething();
}
public void doSomething() {
System.out.println("Doing something");
}
}
节流异常class:
package de.scrum_master.app;
public class ThrottlingException extends RuntimeException {
private static final long serialVersionUID = 1L;
public ThrottlingException(String arg0) {
super(arg0);
}
}
节流拦截器:
为了模拟节流情况,我创建了一个辅助方法 isThrottled()
,其中 returns true
在 3 个案例中有 2 个是随机的。
package de.scrum_master.aspect;
import java.util.Random;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import de.scrum_master.app.ThrottlingException;
@Aspect
public class ThrottlingInterceptor {
private static final Random RANDOM = new Random();
@Before("execution(* doSomething())")
public void invoke(JoinPoint thisJoinPoint) throws ThrottlingException {
System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
if (isThrottled()) {
throw new ThrottlingException("call throttled");
}
}
private boolean isThrottled() {
return RANDOM.nextInt(3) > 0;
}
}
重试拦截器:
请注意 AspectJ 注释 @DeclarePrecedence("RetryInterceptor, *")
表示此拦截器将在任何其他拦截器之前执行。请将其替换为两个拦截器 class 上的 @Order
注释。否则 @Around
建议无法捕获节流拦截器抛出的异常。
另外值得一提的是,这个拦截器不需要任何反射来实现重试逻辑,它直接使用重试循环中的连接点来重试thisJoinPoint.proceed()
。这可以很容易地分解为辅助方法或辅助程序 class 实现不同类型的重试行为。只需确保使用 ProceedingJoinPoint
作为参数而不是 Callable
.
package de.scrum_master.aspect;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import de.scrum_master.app.ThrottlingException;
@Aspect
@DeclarePrecedence("RetryInterceptor, *")
public class RetryInterceptor {
private static int MAX_TRIES = 5;
private static int WAIT_MILLIS_BETWEEN_TRIES = 1000;
@Around("execution(* doSomething())")
public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable {
System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
ThrottlingException throttlingException = null;
for (int i = 1; i <= MAX_TRIES; i++) {
try {
return thisJoinPoint.proceed();
}
catch (ThrottlingException e) {
throttlingException = e;
System.out.println(" Throttled during try #" + i);
Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES);
}
}
throw throttlingException;
}
}
成功重试的控制台日志:
RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #1
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #2
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Doing something
重试失败的控制台日志:
RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #1
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #2
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #3
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #4
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #5
Exception in thread "main" de.scrum_master.app.ThrottlingException: call throttled
at de.scrum_master.aspect.ThrottlingInterceptor.invoke(ThrottlingInterceptor.aj:19)
at de.scrum_master.app.Application.doSomething_aroundBody0(Application.java:9)
at de.scrum_master.app.Application.doSomething_aroundBody1$advice(Application.java:22)
at de.scrum_master.app.Application.doSomething(Application.java:1)
at de.scrum_master.app.Application.main(Application.java:5)
如有任何与我的回答相关的后续问题,请随时提出。
更新: 我不知道你的 RetryingCallable
和 RetryPolicy
classes/interfaces 是如何工作的,你没有告诉我太多。但我编造了一些东西并让它像这样工作:
package de.scrum_master.app;
import java.util.concurrent.Callable;
public interface RetryPolicy<V> {
V apply(Callable<V> callable) throws Exception;
}
package de.scrum_master.app;
import java.util.concurrent.Callable;
public class DefaultRetryPolicy<V> implements RetryPolicy<V> {
private static int MAX_TRIES = 5;
private static int WAIT_MILLIS_BETWEEN_TRIES = 1000;
@Override
public V apply(Callable<V> callable) throws Exception {
Exception throttlingException = null;
for (int i = 1; i <= MAX_TRIES; i++) {
try {
return callable.call();
}
catch (ThrottlingException e) {
throttlingException = e;
System.out.println(" Throttled during try #" + i);
Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES);
}
}
throw throttlingException;
}
}
package de.scrum_master.app;
import java.util.concurrent.Callable;
public class RetryingCallable<V> {
private RetryPolicy<V> retryPolicy;
private Callable<V> callable;
public RetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) {
this.callable = callable;
this.retryPolicy = retryPolicy;
}
public static <V> RetryingCallable<V> newRetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) {
return new RetryingCallable<V>(callable, retryPolicy);
}
public V call() throws Exception {
return retryPolicy.apply(callable);
}
}
现在像这样更改重试拦截器:
package de.scrum_master.aspect;
import java.util.concurrent.Callable;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import de.scrum_master.app.DefaultRetryPolicy;
import de.scrum_master.app.RetryPolicy;
import de.scrum_master.app.RetryingCallable;
@Aspect
@DeclarePrecedence("RetryInterceptor, *")
public class RetryInterceptor {
private RetryPolicy<Object> retryPolicy = new DefaultRetryPolicy<>();
@Around("execution(* doSomething())")
public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable {
System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
return RetryingCallable.newRetryingCallable(
new Callable<Object>() {
@Override
public Object call() throws Exception {
return thisJoinPoint.proceed();
}
},
retryPolicy
).call();
}
}
日志输出与您之前看到的非常相似。对我来说这很好用。
我是 Spring AOP 的新手,已经尝试了一些。
我正在尝试通过 Spring AOP 为我的一个项目设置重试和速率限制器。 用例是这样的:-
- 检查TPS是否可用。如果不是,则抛出
ThrottledException
- 如果抛出
ThrottledException
,Retry
。
我 运行 遇到的问题是:此节流和重试组合 运行 进入无限循环(如果 TPS = 0)。也就是说,重试不会在 'x' 次尝试后停止。
我的节流拦截器(在高级别)是这样的:
@Before("<pointcut>")
public void invoke() throws ThrottlingException {
if (throttler.isThrottled(throttleKey)) {
throw new ThrottlingException("Call Throttled");
}
}
我的重试拦截器是这样的:
@AfterThrowing(pointcut="execution(* com.company.xyz.method())", throwing="exception")
public Object invoke(JoinPoint jp, ThrottlingException exception) throws Throwable {
return RetryingCallable.newRetryingCallable(new Callable<Object>() {
@Override
public Object call() throws Exception {
MethodSignature signature = (MethodSignature) p.getSignature();
Method method = signature.getMethod();
return method.invoke(jp.getThis(), (Object[]) null);
}
}, retryPolicy).call();
}
这里 RetryingCallable
是一个简单的实现(由我公司的某个人编写的内部库),它接收 RetryAdvice
并应用它。
我的相关spring-config如下:
<bean id="retryInterceptor" class="com.company.xyz.RetryInterceptor">
<constructor-arg index="0"><ref bean="retryPolicy"/></constructor-arg>
</bean>
<bean id="throttlingInterceptor" class="com.company.xyz.ThrottlingInterceptor">
<constructor-arg><value>throttleKey</value></constructor-arg>
</bean>
<context:component-scan base-package="com.company.xyz">
<context:include-filter type="annotation" expression="org.aspectj.lang.annotation.Aspect"/>
</context:component-scan>
<aop:aspectj-autoproxy/>
正如我所见,这里的问题是在每个 ThrottlingException
上应用一个新的 Retry Advice
而不是前一个生效。
关于如何解决这个问题的任何意见?
免责声明:我不是一个Spring用户,因此我将在这里展示一个纯粹的AspectJ解决方案。它在 Spring AOP 中应该以相同的方式工作。您唯一需要更改的是按照 Spring AOP manual.
中所述的方面优先配置从@DeclarePresedence
切换到 @Order
驱动申请:
package de.scrum_master.app;
public class Application {
public static void main(String[] args) {
new Application().doSomething();
}
public void doSomething() {
System.out.println("Doing something");
}
}
节流异常class:
package de.scrum_master.app;
public class ThrottlingException extends RuntimeException {
private static final long serialVersionUID = 1L;
public ThrottlingException(String arg0) {
super(arg0);
}
}
节流拦截器:
为了模拟节流情况,我创建了一个辅助方法 isThrottled()
,其中 returns true
在 3 个案例中有 2 个是随机的。
package de.scrum_master.aspect;
import java.util.Random;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import de.scrum_master.app.ThrottlingException;
@Aspect
public class ThrottlingInterceptor {
private static final Random RANDOM = new Random();
@Before("execution(* doSomething())")
public void invoke(JoinPoint thisJoinPoint) throws ThrottlingException {
System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
if (isThrottled()) {
throw new ThrottlingException("call throttled");
}
}
private boolean isThrottled() {
return RANDOM.nextInt(3) > 0;
}
}
重试拦截器:
请注意 AspectJ 注释 @DeclarePrecedence("RetryInterceptor, *")
表示此拦截器将在任何其他拦截器之前执行。请将其替换为两个拦截器 class 上的 @Order
注释。否则 @Around
建议无法捕获节流拦截器抛出的异常。
另外值得一提的是,这个拦截器不需要任何反射来实现重试逻辑,它直接使用重试循环中的连接点来重试thisJoinPoint.proceed()
。这可以很容易地分解为辅助方法或辅助程序 class 实现不同类型的重试行为。只需确保使用 ProceedingJoinPoint
作为参数而不是 Callable
.
package de.scrum_master.aspect;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import de.scrum_master.app.ThrottlingException;
@Aspect
@DeclarePrecedence("RetryInterceptor, *")
public class RetryInterceptor {
private static int MAX_TRIES = 5;
private static int WAIT_MILLIS_BETWEEN_TRIES = 1000;
@Around("execution(* doSomething())")
public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable {
System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
ThrottlingException throttlingException = null;
for (int i = 1; i <= MAX_TRIES; i++) {
try {
return thisJoinPoint.proceed();
}
catch (ThrottlingException e) {
throttlingException = e;
System.out.println(" Throttled during try #" + i);
Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES);
}
}
throw throttlingException;
}
}
成功重试的控制台日志:
RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #1
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #2
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Doing something
重试失败的控制台日志:
RetryInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #1
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #2
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #3
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #4
ThrottlingInterceptor -> execution(void de.scrum_master.app.Application.doSomething())
Throttled during try #5
Exception in thread "main" de.scrum_master.app.ThrottlingException: call throttled
at de.scrum_master.aspect.ThrottlingInterceptor.invoke(ThrottlingInterceptor.aj:19)
at de.scrum_master.app.Application.doSomething_aroundBody0(Application.java:9)
at de.scrum_master.app.Application.doSomething_aroundBody1$advice(Application.java:22)
at de.scrum_master.app.Application.doSomething(Application.java:1)
at de.scrum_master.app.Application.main(Application.java:5)
如有任何与我的回答相关的后续问题,请随时提出。
更新: 我不知道你的 RetryingCallable
和 RetryPolicy
classes/interfaces 是如何工作的,你没有告诉我太多。但我编造了一些东西并让它像这样工作:
package de.scrum_master.app;
import java.util.concurrent.Callable;
public interface RetryPolicy<V> {
V apply(Callable<V> callable) throws Exception;
}
package de.scrum_master.app;
import java.util.concurrent.Callable;
public class DefaultRetryPolicy<V> implements RetryPolicy<V> {
private static int MAX_TRIES = 5;
private static int WAIT_MILLIS_BETWEEN_TRIES = 1000;
@Override
public V apply(Callable<V> callable) throws Exception {
Exception throttlingException = null;
for (int i = 1; i <= MAX_TRIES; i++) {
try {
return callable.call();
}
catch (ThrottlingException e) {
throttlingException = e;
System.out.println(" Throttled during try #" + i);
Thread.sleep(WAIT_MILLIS_BETWEEN_TRIES);
}
}
throw throttlingException;
}
}
package de.scrum_master.app;
import java.util.concurrent.Callable;
public class RetryingCallable<V> {
private RetryPolicy<V> retryPolicy;
private Callable<V> callable;
public RetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) {
this.callable = callable;
this.retryPolicy = retryPolicy;
}
public static <V> RetryingCallable<V> newRetryingCallable(Callable<V> callable, RetryPolicy<V> retryPolicy) {
return new RetryingCallable<V>(callable, retryPolicy);
}
public V call() throws Exception {
return retryPolicy.apply(callable);
}
}
现在像这样更改重试拦截器:
package de.scrum_master.aspect;
import java.util.concurrent.Callable;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.DeclarePrecedence;
import de.scrum_master.app.DefaultRetryPolicy;
import de.scrum_master.app.RetryPolicy;
import de.scrum_master.app.RetryingCallable;
@Aspect
@DeclarePrecedence("RetryInterceptor, *")
public class RetryInterceptor {
private RetryPolicy<Object> retryPolicy = new DefaultRetryPolicy<>();
@Around("execution(* doSomething())")
public Object invoke(ProceedingJoinPoint thisJoinPoint) throws Throwable {
System.out.println(getClass().getSimpleName() + " -> " + thisJoinPoint);
return RetryingCallable.newRetryingCallable(
new Callable<Object>() {
@Override
public Object call() throws Exception {
return thisJoinPoint.proceed();
}
},
retryPolicy
).call();
}
}
日志输出与您之前看到的非常相似。对我来说这很好用。